post_cortex_memory/pipeline/graph.rs
1// Copyright (c) 2025, 2026 Julius ML
2// Licensed under the MIT License. See LICENSE at the workspace root.
3
4//! Entity-graph work queue.
5
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use post_cortex_core::core::context_update::ContextUpdate;
10use tokio::sync::mpsc;
11use tracing::{debug, trace, warn};
12use uuid::Uuid;
13
14use super::PipelineError;
15use crate::memory_system::ConversationMemorySystem;
16
17/// One unit of graph work.
18///
19/// The single `ApplyUpdate` variant matches the canonical entity-graph
20/// flow: each [`ContextUpdate`] carries its own typed entities + relations
21/// and the worker applies them atomically via
22/// [`ConversationMemorySystem::apply_entity_graph_update_now`]. Per-entity
23/// or per-relation deltas can be added as new variants if a use case
24/// emerges, but no path through the codebase currently emits them — the
25/// canonical write at `MemoryServiceImpl::update_context` always submits a
26/// full update.
27#[derive(Debug, Clone)]
28pub enum GraphWorkItem {
29 /// Apply a full [`ContextUpdate`]'s entity + relation set to the
30 /// session graph.
31 ApplyUpdate {
32 /// Session that owns the entities / relations.
33 session_id: Uuid,
34 /// The context-update payload to merge into the entity graph.
35 update: ContextUpdate,
36 },
37}
38
39impl GraphWorkItem {
40 /// Convenience: build an `ApplyUpdate` work item.
41 #[must_use]
42 pub fn apply_update(session_id: Uuid, update: ContextUpdate) -> Self {
43 Self::ApplyUpdate { session_id, update }
44 }
45}
46
47/// Bounded MPSC queue feeding the entity-graph worker task.
48pub struct GraphQueue {
49 sender: mpsc::Sender<GraphWorkItem>,
50 backlog: Arc<AtomicUsize>,
51}
52
53impl GraphQueue {
54 /// Spawn the graph worker on the current Tokio runtime and return the sending handle.
55 ///
56 /// `system` lets the worker reach the canonical
57 /// [`ConversationMemorySystem::apply_entity_graph_update_now`] helper
58 /// — extracting that body into a public method is how the worker
59 /// avoids reimplementing the load → mutate → CAS → persist flow.
60 #[must_use]
61 pub fn start(
62 capacity: usize,
63 backlog: Arc<AtomicUsize>,
64 system: Arc<ConversationMemorySystem>,
65 ) -> Self {
66 let (tx, mut rx) = mpsc::channel::<GraphWorkItem>(capacity);
67 let worker_backlog = Arc::clone(&backlog);
68
69 tokio::spawn(async move {
70 debug!("graph pipeline worker started");
71 while let Some(item) = rx.recv().await {
72 match &item {
73 GraphWorkItem::ApplyUpdate { session_id, update } => {
74 trace!(
75 session_id = %session_id,
76 update_id = %update.id,
77 entities = update.creates_entities.len(),
78 relations = update.creates_relationships.len(),
79 "graph pipeline: applying update"
80 );
81 if let Err(e) = system
82 .apply_entity_graph_update_now(*session_id, update.clone())
83 .await
84 {
85 warn!(
86 session_id = %session_id,
87 error = %e,
88 "graph pipeline: apply failed (non-fatal)"
89 );
90 }
91 }
92 }
93 worker_backlog.fetch_sub(1, Ordering::Relaxed);
94 }
95 warn!("graph pipeline worker exiting (channel closed)");
96 });
97
98 Self {
99 sender: tx,
100 backlog,
101 }
102 }
103
104 /// Non-blocking submit; returns [`PipelineError::Backpressure`] if the queue is full.
105 pub fn submit(&self, item: GraphWorkItem) -> Result<(), PipelineError> {
106 match self.sender.try_send(item) {
107 Ok(()) => {
108 self.backlog.fetch_add(1, Ordering::Relaxed);
109 Ok(())
110 }
111 Err(mpsc::error::TrySendError::Full(_)) => {
112 Err(PipelineError::Backpressure { queue: "graph" })
113 }
114 Err(mpsc::error::TrySendError::Closed(_)) => {
115 Err(PipelineError::WorkerShutdown { queue: "graph" })
116 }
117 }
118 }
119}