use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::mpsc;
use tracing::{debug, trace, warn};
use uuid::Uuid;
use super::PipelineError;
#[derive(Debug, Clone)]
pub struct SummaryWorkItem {
pub session_id: Uuid,
}
pub struct SummaryQueue {
sender: mpsc::Sender<SummaryWorkItem>,
backlog: Arc<AtomicUsize>,
}
impl SummaryQueue {
#[must_use]
pub fn start(capacity: usize, backlog: Arc<AtomicUsize>) -> Self {
let (tx, mut rx) = mpsc::channel::<SummaryWorkItem>(capacity);
let worker_backlog = Arc::clone(&backlog);
tokio::spawn(async move {
debug!("summary pipeline worker started");
while let Some(item) = rx.recv().await {
trace!(session_id = %item.session_id, "summary pipeline: processing item");
worker_backlog.fetch_sub(1, Ordering::Relaxed);
}
warn!("summary pipeline worker exiting (channel closed)");
});
Self {
sender: tx,
backlog,
}
}
pub fn submit(&self, item: SummaryWorkItem) -> Result<(), PipelineError> {
match self.sender.try_send(item) {
Ok(()) => {
self.backlog.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(mpsc::error::TrySendError::Full(_)) => {
Err(PipelineError::Backpressure { queue: "summary" })
}
Err(mpsc::error::TrySendError::Closed(_)) => {
Err(PipelineError::WorkerShutdown { queue: "summary" })
}
}
}
}