use crate::capture::CapturedFrame;
use crate::config::OsPipeConfig;
use crate::error::Result;
use crate::graph::KnowledgeGraph;
use crate::pipeline::dedup::FrameDeduplicator;
use crate::safety::{SafetyDecision, SafetyGate};
use crate::search::enhanced::EnhancedSearch;
use crate::storage::embedding::EmbeddingEngine;
use crate::storage::vector_store::{SearchResult, VectorStore};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub enum IngestResult {
Stored {
id: Uuid,
},
Deduplicated {
similar_to: Uuid,
similarity: f32,
},
Denied {
reason: String,
},
}
#[derive(Debug, Clone, Default)]
pub struct PipelineStats {
pub total_ingested: u64,
pub total_deduplicated: u64,
pub total_denied: u64,
pub total_redacted: u64,
}
pub struct IngestionPipeline {
embedding_engine: EmbeddingEngine,
vector_store: VectorStore,
safety_gate: SafetyGate,
dedup: FrameDeduplicator,
stats: PipelineStats,
knowledge_graph: Option<KnowledgeGraph>,
enhanced_search: Option<EnhancedSearch>,
}
impl IngestionPipeline {
pub fn new(config: OsPipeConfig) -> Result<Self> {
let embedding_engine = EmbeddingEngine::new(config.storage.embedding_dim);
let vector_store = VectorStore::new(config.storage.clone())?;
let safety_gate = SafetyGate::new(config.safety.clone());
let dedup = FrameDeduplicator::new(config.storage.dedup_threshold, 100);
Ok(Self {
embedding_engine,
vector_store,
safety_gate,
dedup,
stats: PipelineStats::default(),
knowledge_graph: None,
enhanced_search: None,
})
}
pub fn with_graph(mut self, kg: KnowledgeGraph) -> Self {
self.knowledge_graph = Some(kg);
self
}
pub fn with_enhanced_search(mut self, es: EnhancedSearch) -> Self {
self.enhanced_search = Some(es);
self
}
pub fn ingest(&mut self, frame: CapturedFrame) -> Result<IngestResult> {
let text = frame.text_content().to_string();
let safe_text = match self.safety_gate.check(&text) {
SafetyDecision::Allow => text,
SafetyDecision::AllowRedacted(redacted) => {
self.stats.total_redacted += 1;
redacted
}
SafetyDecision::Deny { reason } => {
self.stats.total_denied += 1;
return Ok(IngestResult::Denied { reason });
}
};
let embedding = self.embedding_engine.embed(&safe_text);
if let Some((similar_id, similarity)) = self.dedup.is_duplicate(&embedding) {
self.stats.total_deduplicated += 1;
return Ok(IngestResult::Deduplicated {
similar_to: similar_id,
similarity,
});
}
let mut store_frame = frame;
if safe_text != store_frame.text_content() {
store_frame.content = match &store_frame.content {
crate::capture::FrameContent::OcrText(_) => {
crate::capture::FrameContent::OcrText(safe_text)
}
crate::capture::FrameContent::Transcription(_) => {
crate::capture::FrameContent::Transcription(safe_text)
}
crate::capture::FrameContent::UiEvent(_) => {
crate::capture::FrameContent::UiEvent(safe_text)
}
};
}
self.vector_store.insert(&store_frame, &embedding)?;
let id = store_frame.id;
self.dedup.add(id, embedding);
self.stats.total_ingested += 1;
if let Some(ref mut kg) = self.knowledge_graph {
let frame_id_str = id.to_string();
let _ = kg.ingest_frame_entities(&frame_id_str, store_frame.text_content());
}
Ok(IngestResult::Stored { id })
}
pub fn ingest_batch(&mut self, frames: Vec<CapturedFrame>) -> Result<Vec<IngestResult>> {
let mut results = Vec::with_capacity(frames.len());
for frame in frames {
results.push(self.ingest(frame)?);
}
Ok(results)
}
pub fn stats(&self) -> &PipelineStats {
&self.stats
}
pub fn vector_store(&self) -> &VectorStore {
&self.vector_store
}
pub fn embedding_engine(&self) -> &EmbeddingEngine {
&self.embedding_engine
}
pub fn knowledge_graph(&self) -> Option<&KnowledgeGraph> {
self.knowledge_graph.as_ref()
}
pub fn search(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
let embedding = self.embedding_engine.embed(query);
if let Some(ref es) = self.enhanced_search {
es.search(query, &embedding, &self.vector_store, k)
} else {
self.vector_store.search(&embedding, k)
}
}
}