use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use thiserror::Error;
use crate::memory_system::ConversationMemorySystem;
pub mod embedding;
pub mod graph;
pub mod summary;
pub use embedding::{EmbeddingQueue, EmbeddingWorkItem};
pub use graph::{GraphQueue, GraphWorkItem};
pub use summary::{SummaryQueue, SummaryWorkItem};
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub embedding_capacity: usize,
pub graph_capacity: usize,
pub summary_capacity: usize,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self {
embedding_capacity: 1_024,
graph_capacity: 4_096,
summary_capacity: 256,
}
}
}
#[derive(Debug, Error)]
pub enum PipelineError {
#[error("pipeline backpressure: {queue} queue is full")]
Backpressure {
queue: &'static str,
},
#[error("pipeline {queue} worker shut down")]
WorkerShutdown {
queue: &'static str,
},
}
pub struct Pipeline {
embedding: EmbeddingQueue,
graph: GraphQueue,
summary: SummaryQueue,
backlog: Arc<AtomicUsize>,
}
impl Pipeline {
#[must_use]
pub fn start(config: PipelineConfig, system: Arc<ConversationMemorySystem>) -> Self {
let backlog = Arc::new(AtomicUsize::new(0));
let embedding = EmbeddingQueue::start(
config.embedding_capacity,
Arc::clone(&backlog),
Arc::clone(&system),
);
let graph = GraphQueue::start(
config.graph_capacity,
Arc::clone(&backlog),
Arc::clone(&system),
);
let summary = SummaryQueue::start(config.summary_capacity, Arc::clone(&backlog));
Self {
embedding,
graph,
summary,
backlog,
}
}
#[must_use]
pub fn backlog(&self) -> usize {
self.backlog.load(Ordering::Relaxed)
}
pub fn submit_embedding(&self, item: EmbeddingWorkItem) -> Result<(), PipelineError> {
self.embedding.submit(item)
}
pub fn submit_graph(&self, item: GraphWorkItem) -> Result<(), PipelineError> {
self.graph.submit(item)
}
pub fn submit_summary(&self, item: SummaryWorkItem) -> Result<(), PipelineError> {
self.summary.submit(item)
}
#[must_use]
pub fn embedding_queue(&self) -> &EmbeddingQueue {
&self.embedding
}
#[must_use]
pub fn graph_queue(&self) -> &GraphQueue {
&self.graph
}
#[must_use]
pub fn summary_queue(&self) -> &SummaryQueue {
&self.summary
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory_system::{ConversationMemorySystem, SystemConfig};
use post_cortex_core::core::context_update::{ContextUpdate, UpdateContent, UpdateType};
use uuid::Uuid;
async fn ephemeral_system(label: &str) -> (Arc<ConversationMemorySystem>, String) {
let test_dir = format!(
"./test_data_pipeline_{}_{}",
label,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
std::fs::create_dir_all(&test_dir).unwrap();
let config = SystemConfig {
data_directory: test_dir.clone(),
..Default::default()
};
let system = Arc::new(ConversationMemorySystem::new(config).await.unwrap());
(system, test_dir)
}
fn empty_context_update() -> ContextUpdate {
ContextUpdate {
id: Uuid::new_v4(),
timestamp: chrono::Utc::now(),
update_type: UpdateType::ConceptDefined,
content: UpdateContent {
title: "test".into(),
description: "test".into(),
details: vec![],
examples: vec![],
implications: vec![],
},
related_code: None,
parent_update: None,
user_marked_important: false,
creates_entities: vec![],
creates_relationships: vec![],
references_entities: vec![],
typed_entities: vec![],
}
}
#[tokio::test]
async fn pipeline_starts_and_reports_zero_backlog() {
let (system, dir) = ephemeral_system("zero").await;
let pipeline = Pipeline::start(PipelineConfig::default(), system);
assert_eq!(pipeline.backlog(), 0);
std::fs::remove_dir_all(&dir).unwrap();
}
#[tokio::test]
async fn pipeline_accepts_work_across_queues() {
let (system, dir) = ephemeral_system("accept").await;
let pipeline = Pipeline::start(PipelineConfig::default(), system);
pipeline
.submit_embedding(EmbeddingWorkItem {
session_id: Uuid::new_v4(),
entry_id: Uuid::new_v4(),
text: "hello world".into(),
})
.expect("embedding submit should succeed");
pipeline
.submit_graph(GraphWorkItem::ApplyUpdate {
session_id: Uuid::new_v4(),
update: empty_context_update(),
})
.expect("graph submit should succeed");
pipeline
.submit_summary(SummaryWorkItem {
session_id: Uuid::new_v4(),
})
.expect("summary submit should succeed");
tokio::task::yield_now().await;
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert_eq!(pipeline.backlog(), 0, "backlog should drain to 0");
std::fs::remove_dir_all(&dir).unwrap();
}
#[tokio::test]
async fn pipeline_backpressure_when_full() {
let (system, _dir) = ephemeral_system("backpressure").await;
let pipeline = Pipeline::start(
PipelineConfig {
embedding_capacity: 1,
graph_capacity: 1,
summary_capacity: 1,
},
system,
);
for _ in 0..100 {
let r = pipeline.submit_embedding(EmbeddingWorkItem {
session_id: Uuid::new_v4(),
entry_id: Uuid::new_v4(),
text: "x".into(),
});
if matches!(r, Err(PipelineError::Backpressure { .. })) {
return;
}
}
}
}