use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::mpsc;
use tracing::{debug, trace, warn};
use uuid::Uuid;
use super::PipelineError;
use crate::memory_system::ConversationMemorySystem;
#[derive(Debug, Clone)]
pub struct EmbeddingWorkItem {
pub session_id: Uuid,
pub entry_id: Uuid,
pub text: String,
}
pub struct EmbeddingQueue {
sender: mpsc::Sender<EmbeddingWorkItem>,
backlog: Arc<AtomicUsize>,
}
impl EmbeddingQueue {
#[must_use]
pub fn start(
capacity: usize,
backlog: Arc<AtomicUsize>,
system: Arc<ConversationMemorySystem>,
) -> Self {
let (tx, mut rx) = mpsc::channel::<EmbeddingWorkItem>(capacity);
let worker_backlog = Arc::clone(&backlog);
tokio::spawn(async move {
debug!("embedding pipeline worker started");
while let Some(item) = rx.recv().await {
trace!(
session_id = %item.session_id,
entry_id = %item.entry_id,
bytes = item.text.len(),
"embedding pipeline: processing item"
);
#[cfg(feature = "embeddings")]
{
match system.vectorize_latest_update_now(item.session_id).await {
Ok(0) => {
trace!(
session_id = %item.session_id,
"embedding pipeline: nothing to vectorise (auto-disabled or no fresh update)"
);
}
Ok(count) => {
debug!(
session_id = %item.session_id,
vectorised = count,
"embedding pipeline: vectorisation complete"
);
}
Err(e) => warn!(
session_id = %item.session_id,
error = %e,
"embedding pipeline: vectorise failed (non-fatal)"
),
}
}
#[cfg(not(feature = "embeddings"))]
{
let _ = &system; }
worker_backlog.fetch_sub(1, Ordering::Relaxed);
}
warn!("embedding pipeline worker exiting (channel closed)");
});
Self {
sender: tx,
backlog,
}
}
pub fn submit(&self, item: EmbeddingWorkItem) -> Result<(), PipelineError> {
match self.sender.try_send(item) {
Ok(()) => {
self.backlog.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(mpsc::error::TrySendError::Full(_)) => Err(PipelineError::Backpressure {
queue: "embedding",
}),
Err(mpsc::error::TrySendError::Closed(_)) => Err(PipelineError::WorkerShutdown {
queue: "embedding",
}),
}
}
}