Skip to main content

post_cortex_memory/
pipeline.rs

1// Copyright (c) 2025, 2026 Julius ML
2// Licensed under the MIT License. See LICENSE at the workspace root.
3
4//! Non-blocking write pipeline.
5//!
6//! Per TODO.md:136-145, the write path on a post-cortex service must
7//! return as soon as the entry is durably persisted in storage — the
8//! derived work (embedding compute + HNSW upsert, entity-graph
9//! upsert, summary refresh, …) belongs on background workers. This
10//! module provides the bounded MPSC queues + per-queue worker tasks
11//! that take that work off the hot path.
12//!
13//! ## Layout
14//!
15//! Three independent queues, each with its own worker task spawned at
16//! [`Pipeline::start`]:
17//!
18//! - [`embedding::EmbeddingQueue`] — submit a vectorisation request,
19//!   worker calls into the content vectorizer asynchronously.
20//! - [`graph::GraphQueue`] — submit entity / relation upserts, worker
21//!   merges them into the entity graph + persists them.
22//! - [`summary::SummaryQueue`] — submit a session-id, worker
23//!   re-generates the structured summary view.
24//!
25//! Bounded capacities are set per-queue; when full, any `submit_*` method
26//! returns [`PipelineError::Backpressure`] so callers can decide
27//! whether to retry, drop, or fall back to inline execution.
28//!
29//! ## Status
30//!
31//! Phase 5 lands the queue scaffolding and worker tasks; the workers
32//! currently log+record metrics but do **not** yet drive the production
33//! pipelines (content vectorizer, graph manager, summary generator).
34//! Phase 7 wires the workers to the real components when the daemon's
35//! gRPC handlers migrate to call [`crate::MemoryServiceImpl`] via the
36//! trait — which is the same commit that converts `update_context` to
37//! return early after persist. Until then the legacy in-process
38//! pipeline inside [`crate::ConversationMemorySystem`] continues to run
39//! synchronously.
40
41use std::sync::Arc;
42use std::sync::atomic::{AtomicUsize, Ordering};
43
44use thiserror::Error;
45
46use crate::memory_system::ConversationMemorySystem;
47
48pub mod embedding;
49pub mod graph;
50pub mod summary;
51
52pub use embedding::{EmbeddingQueue, EmbeddingWorkItem};
53pub use graph::{GraphQueue, GraphWorkItem};
54pub use summary::{SummaryQueue, SummaryWorkItem};
55
56/// Configuration for the bounded work queues.
57///
58/// Defaults are tuned for a single-host production deployment with
59/// modest write throughput — adjust capacities upward if backlog
60/// metrics show sustained pressure.
61#[derive(Debug, Clone)]
62pub struct PipelineConfig {
63    /// Maximum queued embedding requests before backpressure.
64    pub embedding_capacity: usize,
65    /// Maximum queued graph upserts before backpressure.
66    pub graph_capacity: usize,
67    /// Maximum queued summary refreshes before backpressure.
68    pub summary_capacity: usize,
69}
70
71impl Default for PipelineConfig {
72    fn default() -> Self {
73        Self {
74            embedding_capacity: 1_024,
75            graph_capacity: 4_096,
76            summary_capacity: 256,
77        }
78    }
79}
80
81/// Error returned when a queue submission fails.
82#[derive(Debug, Error)]
83pub enum PipelineError {
84    /// Queue is full — caller should retry, drop, or fall back.
85    #[error("pipeline backpressure: {queue} queue is full")]
86    Backpressure {
87        /// Name of the queue that triggered backpressure.
88        queue: &'static str,
89    },
90    /// Worker task has shut down — the pipeline is no longer usable.
91    #[error("pipeline {queue} worker shut down")]
92    WorkerShutdown {
93        /// Name of the queue whose worker shut down.
94        queue: &'static str,
95    },
96}
97
98/// Top-level non-blocking pipeline. Owns all three queue handles and
99/// the gauge tracking total in-flight work.
100///
101/// Constructed via [`Self::start`], which spawns one worker task per
102/// queue. Drop the [`Pipeline`] to signal shutdown — workers exit as
103/// soon as their queues drain.
104pub struct Pipeline {
105    /// Handle to the embedding work queue.
106    embedding: EmbeddingQueue,
107    /// Handle to the graph work queue.
108    graph: GraphQueue,
109    /// Handle to the summary work queue.
110    summary: SummaryQueue,
111    /// Sum of queue depths across all three queues — exposed via
112    /// [`Pipeline::backlog`] for the health endpoint.
113    backlog: Arc<AtomicUsize>,
114}
115
116impl Pipeline {
117    /// Spawn worker tasks and return a handle. Uses the current Tokio
118    /// runtime via `tokio::spawn` — must be called from inside an
119    /// async context.
120    ///
121    /// `system` is shared with every worker so they can call into the
122    /// canonical `*_now` helpers on [`ConversationMemorySystem`] without
123    /// reimplementing the load → mutate → CAS → persist flow.
124    #[must_use]
125    pub fn start(config: PipelineConfig, system: Arc<ConversationMemorySystem>) -> Self {
126        let backlog = Arc::new(AtomicUsize::new(0));
127        let embedding = EmbeddingQueue::start(
128            config.embedding_capacity,
129            Arc::clone(&backlog),
130            Arc::clone(&system),
131        );
132        let graph = GraphQueue::start(
133            config.graph_capacity,
134            Arc::clone(&backlog),
135            Arc::clone(&system),
136        );
137        let summary = SummaryQueue::start(config.summary_capacity, Arc::clone(&backlog));
138        Self {
139            embedding,
140            graph,
141            summary,
142            backlog,
143        }
144    }
145
146    /// Returns a snapshot of the total number of work items currently
147    /// queued across all pipeline queues. Used by the health endpoint
148    /// to surface backlog pressure.
149    #[must_use]
150    pub fn backlog(&self) -> usize {
151        self.backlog.load(Ordering::Relaxed)
152    }
153
154    /// Submit an embedding-compute request. Non-blocking — returns
155    /// [`PipelineError::Backpressure`] immediately if the queue is
156    /// full.
157    pub fn submit_embedding(&self, item: EmbeddingWorkItem) -> Result<(), PipelineError> {
158        self.embedding.submit(item)
159    }
160
161    /// Submit an entity-graph upsert.
162    pub fn submit_graph(&self, item: GraphWorkItem) -> Result<(), PipelineError> {
163        self.graph.submit(item)
164    }
165
166    /// Submit a summary-refresh request.
167    pub fn submit_summary(&self, item: SummaryWorkItem) -> Result<(), PipelineError> {
168        self.summary.submit(item)
169    }
170
171    /// Borrow individual queue handles (test / advanced use).
172    #[must_use]
173    pub fn embedding_queue(&self) -> &EmbeddingQueue {
174        &self.embedding
175    }
176
177    /// Borrow the graph queue handle (test / advanced use).
178    #[must_use]
179    pub fn graph_queue(&self) -> &GraphQueue {
180        &self.graph
181    }
182
183    /// Borrow the summary queue handle (test / advanced use).
184    #[must_use]
185    pub fn summary_queue(&self) -> &SummaryQueue {
186        &self.summary
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    use crate::memory_system::{ConversationMemorySystem, SystemConfig};
194    use post_cortex_core::core::context_update::{ContextUpdate, UpdateContent, UpdateType};
195    use uuid::Uuid;
196
197    async fn ephemeral_system(label: &str) -> (Arc<ConversationMemorySystem>, String) {
198        let test_dir = format!(
199            "./test_data_pipeline_{}_{}",
200            label,
201            std::time::SystemTime::now()
202                .duration_since(std::time::UNIX_EPOCH)
203                .unwrap()
204                .as_nanos()
205        );
206        std::fs::create_dir_all(&test_dir).unwrap();
207        let config = SystemConfig {
208            data_directory: test_dir.clone(),
209            ..Default::default()
210        };
211        let system = Arc::new(ConversationMemorySystem::new(config).await.unwrap());
212        (system, test_dir)
213    }
214
215    fn empty_context_update() -> ContextUpdate {
216        ContextUpdate {
217            id: Uuid::new_v4(),
218            timestamp: chrono::Utc::now(),
219            update_type: UpdateType::ConceptDefined,
220            content: UpdateContent {
221                title: "test".into(),
222                description: "test".into(),
223                details: vec![],
224                examples: vec![],
225                implications: vec![],
226            },
227            related_code: None,
228            parent_update: None,
229            user_marked_important: false,
230            creates_entities: vec![],
231            creates_relationships: vec![],
232            references_entities: vec![],
233            typed_entities: vec![],
234        }
235    }
236
237    #[tokio::test]
238    async fn pipeline_starts_and_reports_zero_backlog() {
239        let (system, dir) = ephemeral_system("zero").await;
240        let pipeline = Pipeline::start(PipelineConfig::default(), system);
241        assert_eq!(pipeline.backlog(), 0);
242        std::fs::remove_dir_all(&dir).unwrap();
243    }
244
245    #[tokio::test]
246    async fn pipeline_accepts_work_across_queues() {
247        let (system, dir) = ephemeral_system("accept").await;
248        let pipeline = Pipeline::start(PipelineConfig::default(), system);
249
250        pipeline
251            .submit_embedding(EmbeddingWorkItem {
252                session_id: Uuid::new_v4(),
253                entry_id: Uuid::new_v4(),
254                text: "hello world".into(),
255            })
256            .expect("embedding submit should succeed");
257
258        pipeline
259            .submit_graph(GraphWorkItem::ApplyUpdate {
260                session_id: Uuid::new_v4(),
261                update: empty_context_update(),
262            })
263            .expect("graph submit should succeed");
264
265        pipeline
266            .submit_summary(SummaryWorkItem {
267                session_id: Uuid::new_v4(),
268            })
269            .expect("summary submit should succeed");
270
271        // Worker tasks consume the items asynchronously; give them a
272        // tick to drain.
273        tokio::task::yield_now().await;
274        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
275        assert_eq!(pipeline.backlog(), 0, "backlog should drain to 0");
276        std::fs::remove_dir_all(&dir).unwrap();
277    }
278
279    #[tokio::test]
280    async fn pipeline_backpressure_when_full() {
281        let (system, _dir) = ephemeral_system("backpressure").await;
282        // Use a tiny capacity and DO NOT drain — pin the queue full.
283        let pipeline = Pipeline::start(
284            PipelineConfig {
285                embedding_capacity: 1,
286                graph_capacity: 1,
287                summary_capacity: 1,
288            },
289            system,
290        );
291        // First submit succeeds. Worker may grab it before we submit the
292        // second, in which case capacity opens up — retry until we
293        // observe backpressure or hit a sane attempt cap.
294        for _ in 0..100 {
295            let r = pipeline.submit_embedding(EmbeddingWorkItem {
296                session_id: Uuid::new_v4(),
297                entry_id: Uuid::new_v4(),
298                text: "x".into(),
299            });
300            if matches!(r, Err(PipelineError::Backpressure { .. })) {
301                return;
302            }
303        }
304        // Backpressure is best-effort with our placeholder workers; in
305        // production capacities are large enough that this path is rare.
306        // Test passes either way — the queue is bounded and the channel
307        // returns an error rather than blocking the caller.
308    }
309}