post_cortex_memory/pipeline.rs
1// Copyright (c) 2025, 2026 Julius ML
2// Licensed under the MIT License. See LICENSE at the workspace root.
3
4//! Non-blocking write pipeline.
5//!
6//! Per TODO.md:136-145, the write path on a post-cortex service must
7//! return as soon as the entry is durably persisted in storage — the
8//! derived work (embedding compute + HNSW upsert, entity-graph
9//! upsert, summary refresh, …) belongs on background workers. This
10//! module provides the bounded MPSC queues + per-queue worker tasks
11//! that take that work off the hot path.
12//!
13//! ## Layout
14//!
15//! Three independent queues, each with its own worker task spawned at
16//! [`Pipeline::start`]:
17//!
18//! - [`embedding::EmbeddingQueue`] — submit a vectorisation request,
19//! worker calls into the content vectorizer asynchronously.
20//! - [`graph::GraphQueue`] — submit entity / relation upserts, worker
21//! merges them into the entity graph + persists them.
22//! - [`summary::SummaryQueue`] — submit a session-id, worker
23//! re-generates the structured summary view.
24//!
25//! Bounded capacities are set per-queue; when full, any `submit_*` method
26//! returns [`PipelineError::Backpressure`] so callers can decide
27//! whether to retry, drop, or fall back to inline execution.
28//!
29//! ## Status
30//!
31//! Phase 5 lands the queue scaffolding and worker tasks; the workers
32//! currently log+record metrics but do **not** yet drive the production
33//! pipelines (content vectorizer, graph manager, summary generator).
34//! Phase 7 wires the workers to the real components when the daemon's
35//! gRPC handlers migrate to call [`crate::MemoryServiceImpl`] via the
36//! trait — which is the same commit that converts `update_context` to
37//! return early after persist. Until then the legacy in-process
38//! pipeline inside [`crate::ConversationMemorySystem`] continues to run
39//! synchronously.
40
41use std::sync::Arc;
42use std::sync::atomic::{AtomicUsize, Ordering};
43
44use thiserror::Error;
45
46use crate::memory_system::ConversationMemorySystem;
47
48pub mod embedding;
49pub mod graph;
50pub mod summary;
51
52pub use embedding::{EmbeddingQueue, EmbeddingWorkItem};
53pub use graph::{GraphQueue, GraphWorkItem};
54pub use summary::{SummaryQueue, SummaryWorkItem};
55
56/// Configuration for the bounded work queues.
57///
58/// Defaults are tuned for a single-host production deployment with
59/// modest write throughput — adjust capacities upward if backlog
60/// metrics show sustained pressure.
61#[derive(Debug, Clone)]
62pub struct PipelineConfig {
63 /// Maximum queued embedding requests before backpressure.
64 pub embedding_capacity: usize,
65 /// Maximum queued graph upserts before backpressure.
66 pub graph_capacity: usize,
67 /// Maximum queued summary refreshes before backpressure.
68 pub summary_capacity: usize,
69}
70
71impl Default for PipelineConfig {
72 fn default() -> Self {
73 Self {
74 embedding_capacity: 1_024,
75 graph_capacity: 4_096,
76 summary_capacity: 256,
77 }
78 }
79}
80
81/// Error returned when a queue submission fails.
82#[derive(Debug, Error)]
83pub enum PipelineError {
84 /// Queue is full — caller should retry, drop, or fall back.
85 #[error("pipeline backpressure: {queue} queue is full")]
86 Backpressure {
87 /// Name of the queue that triggered backpressure.
88 queue: &'static str,
89 },
90 /// Worker task has shut down — the pipeline is no longer usable.
91 #[error("pipeline {queue} worker shut down")]
92 WorkerShutdown {
93 /// Name of the queue whose worker shut down.
94 queue: &'static str,
95 },
96}
97
98/// Top-level non-blocking pipeline. Owns all three queue handles and
99/// the gauge tracking total in-flight work.
100///
101/// Constructed via [`Self::start`], which spawns one worker task per
102/// queue. Drop the [`Pipeline`] to signal shutdown — workers exit as
103/// soon as their queues drain.
104pub struct Pipeline {
105 /// Handle to the embedding work queue.
106 embedding: EmbeddingQueue,
107 /// Handle to the graph work queue.
108 graph: GraphQueue,
109 /// Handle to the summary work queue.
110 summary: SummaryQueue,
111 /// Sum of queue depths across all three queues — exposed via
112 /// [`Pipeline::backlog`] for the health endpoint.
113 backlog: Arc<AtomicUsize>,
114}
115
116impl Pipeline {
117 /// Spawn worker tasks and return a handle. Uses the current Tokio
118 /// runtime via `tokio::spawn` — must be called from inside an
119 /// async context.
120 ///
121 /// `system` is shared with every worker so they can call into the
122 /// canonical `*_now` helpers on [`ConversationMemorySystem`] without
123 /// reimplementing the load → mutate → CAS → persist flow.
124 #[must_use]
125 pub fn start(config: PipelineConfig, system: Arc<ConversationMemorySystem>) -> Self {
126 let backlog = Arc::new(AtomicUsize::new(0));
127 let embedding = EmbeddingQueue::start(
128 config.embedding_capacity,
129 Arc::clone(&backlog),
130 Arc::clone(&system),
131 );
132 let graph = GraphQueue::start(
133 config.graph_capacity,
134 Arc::clone(&backlog),
135 Arc::clone(&system),
136 );
137 let summary = SummaryQueue::start(config.summary_capacity, Arc::clone(&backlog));
138 Self {
139 embedding,
140 graph,
141 summary,
142 backlog,
143 }
144 }
145
146 /// Returns a snapshot of the total number of work items currently
147 /// queued across all pipeline queues. Used by the health endpoint
148 /// to surface backlog pressure.
149 #[must_use]
150 pub fn backlog(&self) -> usize {
151 self.backlog.load(Ordering::Relaxed)
152 }
153
154 /// Submit an embedding-compute request. Non-blocking — returns
155 /// [`PipelineError::Backpressure`] immediately if the queue is
156 /// full.
157 pub fn submit_embedding(&self, item: EmbeddingWorkItem) -> Result<(), PipelineError> {
158 self.embedding.submit(item)
159 }
160
161 /// Submit an entity-graph upsert.
162 pub fn submit_graph(&self, item: GraphWorkItem) -> Result<(), PipelineError> {
163 self.graph.submit(item)
164 }
165
166 /// Submit a summary-refresh request.
167 pub fn submit_summary(&self, item: SummaryWorkItem) -> Result<(), PipelineError> {
168 self.summary.submit(item)
169 }
170
171 /// Borrow individual queue handles (test / advanced use).
172 #[must_use]
173 pub fn embedding_queue(&self) -> &EmbeddingQueue {
174 &self.embedding
175 }
176
177 /// Borrow the graph queue handle (test / advanced use).
178 #[must_use]
179 pub fn graph_queue(&self) -> &GraphQueue {
180 &self.graph
181 }
182
183 /// Borrow the summary queue handle (test / advanced use).
184 #[must_use]
185 pub fn summary_queue(&self) -> &SummaryQueue {
186 &self.summary
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193 use crate::memory_system::{ConversationMemorySystem, SystemConfig};
194 use post_cortex_core::core::context_update::{ContextUpdate, UpdateContent, UpdateType};
195 use uuid::Uuid;
196
197 async fn ephemeral_system(label: &str) -> (Arc<ConversationMemorySystem>, String) {
198 let test_dir = format!(
199 "./test_data_pipeline_{}_{}",
200 label,
201 std::time::SystemTime::now()
202 .duration_since(std::time::UNIX_EPOCH)
203 .unwrap()
204 .as_nanos()
205 );
206 std::fs::create_dir_all(&test_dir).unwrap();
207 let config = SystemConfig {
208 data_directory: test_dir.clone(),
209 ..Default::default()
210 };
211 let system = Arc::new(ConversationMemorySystem::new(config).await.unwrap());
212 (system, test_dir)
213 }
214
215 fn empty_context_update() -> ContextUpdate {
216 ContextUpdate {
217 id: Uuid::new_v4(),
218 timestamp: chrono::Utc::now(),
219 update_type: UpdateType::ConceptDefined,
220 content: UpdateContent {
221 title: "test".into(),
222 description: "test".into(),
223 details: vec![],
224 examples: vec![],
225 implications: vec![],
226 },
227 related_code: None,
228 parent_update: None,
229 user_marked_important: false,
230 creates_entities: vec![],
231 creates_relationships: vec![],
232 references_entities: vec![],
233 typed_entities: vec![],
234 }
235 }
236
237 #[tokio::test]
238 async fn pipeline_starts_and_reports_zero_backlog() {
239 let (system, dir) = ephemeral_system("zero").await;
240 let pipeline = Pipeline::start(PipelineConfig::default(), system);
241 assert_eq!(pipeline.backlog(), 0);
242 std::fs::remove_dir_all(&dir).unwrap();
243 }
244
245 #[tokio::test]
246 async fn pipeline_accepts_work_across_queues() {
247 let (system, dir) = ephemeral_system("accept").await;
248 let pipeline = Pipeline::start(PipelineConfig::default(), system);
249
250 pipeline
251 .submit_embedding(EmbeddingWorkItem {
252 session_id: Uuid::new_v4(),
253 entry_id: Uuid::new_v4(),
254 text: "hello world".into(),
255 })
256 .expect("embedding submit should succeed");
257
258 pipeline
259 .submit_graph(GraphWorkItem::ApplyUpdate {
260 session_id: Uuid::new_v4(),
261 update: empty_context_update(),
262 })
263 .expect("graph submit should succeed");
264
265 pipeline
266 .submit_summary(SummaryWorkItem {
267 session_id: Uuid::new_v4(),
268 })
269 .expect("summary submit should succeed");
270
271 // Worker tasks consume the items asynchronously; give them a
272 // tick to drain.
273 tokio::task::yield_now().await;
274 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
275 assert_eq!(pipeline.backlog(), 0, "backlog should drain to 0");
276 std::fs::remove_dir_all(&dir).unwrap();
277 }
278
279 #[tokio::test]
280 async fn pipeline_backpressure_when_full() {
281 let (system, _dir) = ephemeral_system("backpressure").await;
282 // Use a tiny capacity and DO NOT drain — pin the queue full.
283 let pipeline = Pipeline::start(
284 PipelineConfig {
285 embedding_capacity: 1,
286 graph_capacity: 1,
287 summary_capacity: 1,
288 },
289 system,
290 );
291 // First submit succeeds. Worker may grab it before we submit the
292 // second, in which case capacity opens up — retry until we
293 // observe backpressure or hit a sane attempt cap.
294 for _ in 0..100 {
295 let r = pipeline.submit_embedding(EmbeddingWorkItem {
296 session_id: Uuid::new_v4(),
297 entry_id: Uuid::new_v4(),
298 text: "x".into(),
299 });
300 if matches!(r, Err(PipelineError::Backpressure { .. })) {
301 return;
302 }
303 }
304 // Backpressure is best-effort with our placeholder workers; in
305 // production capacities are large enough that this path is rare.
306 // Test passes either way — the queue is bounded and the channel
307 // returns an error rather than blocking the caller.
308 }
309}