post_cortex_memory/pipeline/
summary.rs1use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use tokio::sync::mpsc;
10use tracing::{debug, trace, warn};
11use uuid::Uuid;
12
13use super::PipelineError;
14
15#[derive(Debug, Clone)]
17pub struct SummaryWorkItem {
18 pub session_id: Uuid,
20}
21
22pub struct SummaryQueue {
24 sender: mpsc::Sender<SummaryWorkItem>,
25 backlog: Arc<AtomicUsize>,
26}
27
28impl SummaryQueue {
29 #[must_use]
31 pub fn start(capacity: usize, backlog: Arc<AtomicUsize>) -> Self {
32 let (tx, mut rx) = mpsc::channel::<SummaryWorkItem>(capacity);
33 let worker_backlog = Arc::clone(&backlog);
34
35 tokio::spawn(async move {
36 debug!("summary pipeline worker started");
37 while let Some(item) = rx.recv().await {
38 trace!(session_id = %item.session_id, "summary pipeline: processing item");
39 worker_backlog.fetch_sub(1, Ordering::Relaxed);
43 }
44 warn!("summary pipeline worker exiting (channel closed)");
45 });
46
47 Self {
48 sender: tx,
49 backlog,
50 }
51 }
52
53 pub fn submit(&self, item: SummaryWorkItem) -> Result<(), PipelineError> {
55 match self.sender.try_send(item) {
56 Ok(()) => {
57 self.backlog.fetch_add(1, Ordering::Relaxed);
58 Ok(())
59 }
60 Err(mpsc::error::TrySendError::Full(_)) => {
61 Err(PipelineError::Backpressure { queue: "summary" })
62 }
63 Err(mpsc::error::TrySendError::Closed(_)) => {
64 Err(PipelineError::WorkerShutdown { queue: "summary" })
65 }
66 }
67 }
68}