Skip to main content

post_cortex_memory/pipeline/
embedding.rs

1// Copyright (c) 2025, 2026 Julius ML
2// Licensed under the MIT License. See LICENSE at the workspace root.
3
4//! Embedding work queue.
5
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use tokio::sync::mpsc;
10use tracing::{debug, trace, warn};
11use uuid::Uuid;
12
13use super::PipelineError;
14use crate::memory_system::ConversationMemorySystem;
15
16/// One unit of embedding work.
17///
18/// `session_id` is the canonical identifier; the worker re-derives the
19/// "latest update to vectorise" from the session's hot context (matching
20/// the legacy `spawn_background_vectorization` path). `entry_id` + `text`
21/// are kept for forward compatibility with per-entry vectorisation
22/// (`ContentVectorizer::vectorize_one`-style API, not yet implemented).
23#[derive(Debug, Clone)]
24pub struct EmbeddingWorkItem {
25    /// Session that owns the content.
26    pub session_id: Uuid,
27    /// Context-update entry to vectorise.
28    pub entry_id: Uuid,
29    /// Raw text to embed.
30    pub text: String,
31}
32
33/// Handle to the bounded MPSC channel feeding the embedding worker.
34pub struct EmbeddingQueue {
35    sender: mpsc::Sender<EmbeddingWorkItem>,
36    backlog: Arc<AtomicUsize>,
37}
38
39impl EmbeddingQueue {
40    /// Spawn the embedding worker on the current Tokio runtime and
41    /// return the sending handle.
42    ///
43    /// `system` gives the worker access to the canonical
44    /// [`ConversationMemorySystem::vectorize_latest_update_now`] helper.
45    /// When the `embeddings` feature is disabled the worker still runs
46    /// but reports each item as a no-op so callers see deterministic
47    /// backlog drain regardless of build features.
48    #[must_use]
49    pub fn start(
50        capacity: usize,
51        backlog: Arc<AtomicUsize>,
52        system: Arc<ConversationMemorySystem>,
53    ) -> Self {
54        let (tx, mut rx) = mpsc::channel::<EmbeddingWorkItem>(capacity);
55        let worker_backlog = Arc::clone(&backlog);
56
57        tokio::spawn(async move {
58            debug!("embedding pipeline worker started");
59            while let Some(item) = rx.recv().await {
60                trace!(
61                    session_id = %item.session_id,
62                    entry_id = %item.entry_id,
63                    bytes = item.text.len(),
64                    "embedding pipeline: processing item"
65                );
66
67                #[cfg(feature = "embeddings")]
68                {
69                    match system.vectorize_latest_update_now(item.session_id).await {
70                        Ok(0) => {
71                            trace!(
72                                session_id = %item.session_id,
73                                "embedding pipeline: nothing to vectorise (auto-disabled or no fresh update)"
74                            );
75                        }
76                        Ok(count) => {
77                            debug!(
78                                session_id = %item.session_id,
79                                vectorised = count,
80                                "embedding pipeline: vectorisation complete"
81                            );
82                        }
83                        Err(e) => warn!(
84                            session_id = %item.session_id,
85                            error = %e,
86                            "embedding pipeline: vectorise failed (non-fatal)"
87                        ),
88                    }
89                }
90
91                #[cfg(not(feature = "embeddings"))]
92                {
93                    let _ = &system; // silence unused warning when feature is off
94                }
95
96                worker_backlog.fetch_sub(1, Ordering::Relaxed);
97            }
98            warn!("embedding pipeline worker exiting (channel closed)");
99        });
100
101        Self {
102            sender: tx,
103            backlog,
104        }
105    }
106
107    /// Non-blocking submit; returns `Backpressure` if the queue is full.
108    pub fn submit(&self, item: EmbeddingWorkItem) -> Result<(), PipelineError> {
109        match self.sender.try_send(item) {
110            Ok(()) => {
111                self.backlog.fetch_add(1, Ordering::Relaxed);
112                Ok(())
113            }
114            Err(mpsc::error::TrySendError::Full(_)) => Err(PipelineError::Backpressure {
115                queue: "embedding",
116            }),
117            Err(mpsc::error::TrySendError::Closed(_)) => Err(PipelineError::WorkerShutdown {
118                queue: "embedding",
119            }),
120        }
121    }
122}