Skip to main content

post_cortex_memory/pipeline/
summary.rs

1// Copyright (c) 2025, 2026 Julius ML
2// Licensed under the MIT License. See LICENSE at the workspace root.
3
4//! Summary-refresh work queue.
5
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use tokio::sync::mpsc;
10use tracing::{debug, trace, warn};
11use uuid::Uuid;
12
13use super::PipelineError;
14
15/// Refresh the structured summary view for a session.
16#[derive(Debug, Clone)]
17pub struct SummaryWorkItem {
18    /// Session whose summary should be regenerated.
19    pub session_id: Uuid,
20}
21
22/// Bounded MPSC queue feeding the summary-refresh worker task.
23pub struct SummaryQueue {
24    sender: mpsc::Sender<SummaryWorkItem>,
25    backlog: Arc<AtomicUsize>,
26}
27
28impl SummaryQueue {
29    /// Spawn the summary worker on the current Tokio runtime and return the sending handle.
30    #[must_use]
31    pub fn start(capacity: usize, backlog: Arc<AtomicUsize>) -> Self {
32        let (tx, mut rx) = mpsc::channel::<SummaryWorkItem>(capacity);
33        let worker_backlog = Arc::clone(&backlog);
34
35        tokio::spawn(async move {
36            debug!("summary pipeline worker started");
37            while let Some(item) = rx.recv().await {
38                trace!(session_id = %item.session_id, "summary pipeline: processing item");
39                // Phase 5 stub. Phase 7 wires this to call into
40                // post_cortex_core::summary::SummaryGenerator after
41                // re-loading the session's active state.
42                worker_backlog.fetch_sub(1, Ordering::Relaxed);
43            }
44            warn!("summary pipeline worker exiting (channel closed)");
45        });
46
47        Self {
48            sender: tx,
49            backlog,
50        }
51    }
52
53    /// Non-blocking submit; returns [`PipelineError::Backpressure`] if the queue is full.
54    pub fn submit(&self, item: SummaryWorkItem) -> Result<(), PipelineError> {
55        match self.sender.try_send(item) {
56            Ok(()) => {
57                self.backlog.fetch_add(1, Ordering::Relaxed);
58                Ok(())
59            }
60            Err(mpsc::error::TrySendError::Full(_)) => {
61                Err(PipelineError::Backpressure { queue: "summary" })
62            }
63            Err(mpsc::error::TrySendError::Closed(_)) => {
64                Err(PipelineError::WorkerShutdown { queue: "summary" })
65            }
66        }
67    }
68}