post_cortex_memory/pipeline/embedding.rs
1// Copyright (c) 2025, 2026 Julius ML
2// Licensed under the MIT License. See LICENSE at the workspace root.
3
4//! Embedding work queue.
5
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use tokio::sync::mpsc;
10use tracing::{debug, trace, warn};
11use uuid::Uuid;
12
13use super::PipelineError;
14use crate::memory_system::ConversationMemorySystem;
15
16/// One unit of embedding work.
17///
18/// `session_id` is the canonical identifier; the worker re-derives the
19/// "latest update to vectorise" from the session's hot context (matching
20/// the legacy `spawn_background_vectorization` path). `entry_id` + `text`
21/// are kept for forward compatibility with per-entry vectorisation
22/// (`ContentVectorizer::vectorize_one`-style API, not yet implemented).
23#[derive(Debug, Clone)]
24pub struct EmbeddingWorkItem {
25 /// Session that owns the content.
26 pub session_id: Uuid,
27 /// Context-update entry to vectorise.
28 pub entry_id: Uuid,
29 /// Raw text to embed.
30 pub text: String,
31}
32
33/// Handle to the bounded MPSC channel feeding the embedding worker.
34pub struct EmbeddingQueue {
35 sender: mpsc::Sender<EmbeddingWorkItem>,
36 backlog: Arc<AtomicUsize>,
37}
38
39impl EmbeddingQueue {
40 /// Spawn the embedding worker on the current Tokio runtime and
41 /// return the sending handle.
42 ///
43 /// `system` gives the worker access to the canonical
44 /// [`ConversationMemorySystem::vectorize_latest_update_now`] helper.
45 /// When the `embeddings` feature is disabled the worker still runs
46 /// but reports each item as a no-op so callers see deterministic
47 /// backlog drain regardless of build features.
48 #[must_use]
49 pub fn start(
50 capacity: usize,
51 backlog: Arc<AtomicUsize>,
52 system: Arc<ConversationMemorySystem>,
53 ) -> Self {
54 let (tx, mut rx) = mpsc::channel::<EmbeddingWorkItem>(capacity);
55 let worker_backlog = Arc::clone(&backlog);
56
57 tokio::spawn(async move {
58 debug!("embedding pipeline worker started");
59 while let Some(item) = rx.recv().await {
60 trace!(
61 session_id = %item.session_id,
62 entry_id = %item.entry_id,
63 bytes = item.text.len(),
64 "embedding pipeline: processing item"
65 );
66
67 #[cfg(feature = "embeddings")]
68 {
69 match system.vectorize_latest_update_now(item.session_id).await {
70 Ok(0) => {
71 trace!(
72 session_id = %item.session_id,
73 "embedding pipeline: nothing to vectorise (auto-disabled or no fresh update)"
74 );
75 }
76 Ok(count) => {
77 debug!(
78 session_id = %item.session_id,
79 vectorised = count,
80 "embedding pipeline: vectorisation complete"
81 );
82 }
83 Err(e) => warn!(
84 session_id = %item.session_id,
85 error = %e,
86 "embedding pipeline: vectorise failed (non-fatal)"
87 ),
88 }
89 }
90
91 #[cfg(not(feature = "embeddings"))]
92 {
93 let _ = &system; // silence unused warning when feature is off
94 }
95
96 worker_backlog.fetch_sub(1, Ordering::Relaxed);
97 }
98 warn!("embedding pipeline worker exiting (channel closed)");
99 });
100
101 Self {
102 sender: tx,
103 backlog,
104 }
105 }
106
107 /// Non-blocking submit; returns `Backpressure` if the queue is full.
108 pub fn submit(&self, item: EmbeddingWorkItem) -> Result<(), PipelineError> {
109 match self.sender.try_send(item) {
110 Ok(()) => {
111 self.backlog.fetch_add(1, Ordering::Relaxed);
112 Ok(())
113 }
114 Err(mpsc::error::TrySendError::Full(_)) => {
115 Err(PipelineError::Backpressure { queue: "embedding" })
116 }
117 Err(mpsc::error::TrySendError::Closed(_)) => {
118 Err(PipelineError::WorkerShutdown { queue: "embedding" })
119 }
120 }
121 }
122}