use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use post_cortex_core::core::context_update::ContextUpdate;
use tokio::sync::mpsc;
use tracing::{debug, trace, warn};
use uuid::Uuid;
use super::PipelineError;
use crate::memory_system::ConversationMemorySystem;
#[derive(Debug, Clone)]
pub enum GraphWorkItem {
ApplyUpdate {
session_id: Uuid,
update: ContextUpdate,
},
}
impl GraphWorkItem {
#[must_use]
pub fn apply_update(session_id: Uuid, update: ContextUpdate) -> Self {
Self::ApplyUpdate { session_id, update }
}
}
pub struct GraphQueue {
sender: mpsc::Sender<GraphWorkItem>,
backlog: Arc<AtomicUsize>,
}
impl GraphQueue {
#[must_use]
pub fn start(
capacity: usize,
backlog: Arc<AtomicUsize>,
system: Arc<ConversationMemorySystem>,
) -> Self {
let (tx, mut rx) = mpsc::channel::<GraphWorkItem>(capacity);
let worker_backlog = Arc::clone(&backlog);
tokio::spawn(async move {
debug!("graph pipeline worker started");
while let Some(item) = rx.recv().await {
match &item {
GraphWorkItem::ApplyUpdate { session_id, update } => {
trace!(
session_id = %session_id,
update_id = %update.id,
entities = update.creates_entities.len(),
relations = update.creates_relationships.len(),
"graph pipeline: applying update"
);
if let Err(e) = system
.apply_entity_graph_update_now(*session_id, update.clone())
.await
{
warn!(
session_id = %session_id,
error = %e,
"graph pipeline: apply failed (non-fatal)"
);
}
}
}
worker_backlog.fetch_sub(1, Ordering::Relaxed);
}
warn!("graph pipeline worker exiting (channel closed)");
});
Self {
sender: tx,
backlog,
}
}
pub fn submit(&self, item: GraphWorkItem) -> Result<(), PipelineError> {
match self.sender.try_send(item) {
Ok(()) => {
self.backlog.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(mpsc::error::TrySendError::Full(_)) => {
Err(PipelineError::Backpressure { queue: "graph" })
}
Err(mpsc::error::TrySendError::Closed(_)) => {
Err(PipelineError::WorkerShutdown { queue: "graph" })
}
}
}
}