zeph_memory/document/
pipeline.rs1use qdrant_client::qdrant::PointStruct;
5use serde_json::json;
6use uuid::Uuid;
7
8use super::{Document, DocumentError, DocumentLoader, TextSplitter};
9use crate::QdrantOps;
10
11pub struct IngestionPipeline {
12 splitter: TextSplitter,
13 qdrant: QdrantOps,
14 collection: String,
15 embed_fn: Box<dyn Fn(&str) -> zeph_llm::provider::EmbedFuture + Send + Sync>,
16}
17
18impl IngestionPipeline {
19 pub fn new(
20 splitter: TextSplitter,
21 qdrant: QdrantOps,
22 collection: impl Into<String>,
23 embed_fn: Box<dyn Fn(&str) -> zeph_llm::provider::EmbedFuture + Send + Sync>,
24 ) -> Self {
25 Self {
26 splitter,
27 qdrant,
28 collection: collection.into(),
29 embed_fn,
30 }
31 }
32
33 pub async fn ingest(&self, document: Document) -> Result<usize, DocumentError> {
39 let chunks = self.splitter.split(&document);
40 if chunks.is_empty() {
41 return Ok(0);
42 }
43
44 let mut points = Vec::with_capacity(chunks.len());
45 for chunk in &chunks {
46 let vector = (self.embed_fn)(&chunk.content).await?;
47 let payload = QdrantOps::json_to_payload(json!({
48 "source": chunk.metadata.source,
49 "content_type": chunk.metadata.content_type,
50 "chunk_index": chunk.chunk_index,
51 "content": chunk.content,
52 }))
53 .map_err(|e| DocumentError::Storage(crate::error::MemoryError::Json(e)))?;
54
55 points.push(PointStruct::new(
56 Uuid::new_v4().to_string(),
57 vector,
58 payload,
59 ));
60 }
61
62 let count = points.len();
63 self.qdrant
64 .upsert(&self.collection, points)
65 .await
66 .map_err(|e| DocumentError::Storage(crate::error::MemoryError::Qdrant(e)))?;
67
68 Ok(count)
69 }
70
71 pub async fn load_and_ingest(
75 &self,
76 loader: &(dyn DocumentLoader + '_),
77 path: &std::path::Path,
78 ) -> Result<usize, DocumentError> {
79 let documents = loader.load(path).await?;
80 let mut total = 0;
81 for doc in documents {
82 total += self.ingest(doc).await?;
83 }
84 Ok(total)
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91 use crate::document::splitter::SplitterConfig;
92 use crate::document::types::DocumentMetadata;
93 use std::collections::HashMap;
94
95 fn make_document(content: &str) -> Document {
96 Document {
97 content: content.to_string(),
98 metadata: DocumentMetadata {
99 source: "test".to_string(),
100 content_type: "text/plain".to_string(),
101 extra: HashMap::new(),
102 },
103 }
104 }
105
106 fn noop_embed() -> Box<dyn Fn(&str) -> zeph_llm::provider::EmbedFuture + Send + Sync> {
107 Box::new(|_text: &str| Box::pin(async move { Ok(vec![0.0f32; 4]) }))
108 }
109
110 fn error_embed() -> Box<dyn Fn(&str) -> zeph_llm::provider::EmbedFuture + Send + Sync> {
111 Box::new(|_text: &str| {
112 Box::pin(
113 async move { Err(zeph_llm::error::LlmError::Other("mock embed error".into())) },
114 )
115 })
116 }
117
118 #[tokio::test]
119 async fn ingest_empty_document_returns_zero() {
120 let qdrant = crate::QdrantOps::new("http://127.0.0.1:1").unwrap();
123 let splitter = TextSplitter::new(SplitterConfig::default());
124 let pipeline = IngestionPipeline::new(splitter, qdrant, "col", noop_embed());
125
126 let doc = make_document("");
127 let count = pipeline.ingest(doc).await.unwrap();
128 assert_eq!(count, 0);
129 }
130
131 #[tokio::test]
132 async fn ingest_document_embedding_error_propagates() {
133 let qdrant = crate::QdrantOps::new("http://127.0.0.1:1").unwrap();
135 let splitter = TextSplitter::new(SplitterConfig::default());
136 let pipeline = IngestionPipeline::new(splitter, qdrant, "col", error_embed());
137
138 let doc = make_document("hello world, this is test content for embedding");
139 let result = pipeline.ingest(doc).await;
140 assert!(result.is_err(), "expected error from embedding failure");
141 }
142}