Skip to main content

post_cortex_memory/pipeline/
graph.rs

1// Copyright (c) 2025, 2026 Julius ML
2// Licensed under the MIT License. See LICENSE at the workspace root.
3
4//! Entity-graph work queue.
5
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use post_cortex_core::core::context_update::ContextUpdate;
10use tokio::sync::mpsc;
11use tracing::{debug, trace, warn};
12use uuid::Uuid;
13
14use super::PipelineError;
15use crate::memory_system::ConversationMemorySystem;
16
17/// One unit of graph work.
18///
19/// The single `ApplyUpdate` variant matches the canonical entity-graph
20/// flow: each [`ContextUpdate`] carries its own typed entities + relations
21/// and the worker applies them atomically via
22/// [`ConversationMemorySystem::apply_entity_graph_update_now`]. Per-entity
23/// or per-relation deltas can be added as new variants if a use case
24/// emerges, but no path through the codebase currently emits them — the
25/// canonical write at `MemoryServiceImpl::update_context` always submits a
26/// full update.
27#[derive(Debug, Clone)]
28pub enum GraphWorkItem {
29    /// Apply a full [`ContextUpdate`]'s entity + relation set to the
30    /// session graph.
31    ApplyUpdate {
32        /// Session that owns the entities / relations.
33        session_id: Uuid,
34        /// The context-update payload to merge into the entity graph.
35        update: ContextUpdate,
36    },
37}
38
39impl GraphWorkItem {
40    /// Convenience: build an `ApplyUpdate` work item.
41    #[must_use]
42    pub fn apply_update(session_id: Uuid, update: ContextUpdate) -> Self {
43        Self::ApplyUpdate { session_id, update }
44    }
45}
46
47/// Bounded MPSC queue feeding the entity-graph worker task.
48pub struct GraphQueue {
49    sender: mpsc::Sender<GraphWorkItem>,
50    backlog: Arc<AtomicUsize>,
51}
52
53impl GraphQueue {
54    /// Spawn the graph worker on the current Tokio runtime and return the sending handle.
55    ///
56    /// `system` lets the worker reach the canonical
57    /// [`ConversationMemorySystem::apply_entity_graph_update_now`] helper
58    /// — extracting that body into a public method is how the worker
59    /// avoids reimplementing the load → mutate → CAS → persist flow.
60    #[must_use]
61    pub fn start(
62        capacity: usize,
63        backlog: Arc<AtomicUsize>,
64        system: Arc<ConversationMemorySystem>,
65    ) -> Self {
66        let (tx, mut rx) = mpsc::channel::<GraphWorkItem>(capacity);
67        let worker_backlog = Arc::clone(&backlog);
68
69        tokio::spawn(async move {
70            debug!("graph pipeline worker started");
71            while let Some(item) = rx.recv().await {
72                match &item {
73                    GraphWorkItem::ApplyUpdate { session_id, update } => {
74                        trace!(
75                            session_id = %session_id,
76                            update_id = %update.id,
77                            entities = update.creates_entities.len(),
78                            relations = update.creates_relationships.len(),
79                            "graph pipeline: applying update"
80                        );
81                        if let Err(e) = system
82                            .apply_entity_graph_update_now(*session_id, update.clone())
83                            .await
84                        {
85                            warn!(
86                                session_id = %session_id,
87                                error = %e,
88                                "graph pipeline: apply failed (non-fatal)"
89                            );
90                        }
91                    }
92                }
93                worker_backlog.fetch_sub(1, Ordering::Relaxed);
94            }
95            warn!("graph pipeline worker exiting (channel closed)");
96        });
97
98        Self {
99            sender: tx,
100            backlog,
101        }
102    }
103
104    /// Non-blocking submit; returns [`PipelineError::Backpressure`] if the queue is full.
105    pub fn submit(&self, item: GraphWorkItem) -> Result<(), PipelineError> {
106        match self.sender.try_send(item) {
107            Ok(()) => {
108                self.backlog.fetch_add(1, Ordering::Relaxed);
109                Ok(())
110            }
111            Err(mpsc::error::TrySendError::Full(_)) => {
112                Err(PipelineError::Backpressure { queue: "graph" })
113            }
114            Err(mpsc::error::TrySendError::Closed(_)) => {
115                Err(PipelineError::WorkerShutdown { queue: "graph" })
116            }
117        }
118    }
119}