Skip to main content

zeph_memory/document/
pipeline.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use qdrant_client::qdrant::PointStruct;
5use serde_json::json;
6use uuid::Uuid;
7
8use super::{Document, DocumentError, DocumentLoader, TextSplitter};
9use crate::QdrantOps;
10
11pub struct IngestionPipeline {
12    splitter: TextSplitter,
13    qdrant: QdrantOps,
14    collection: String,
15    embed_fn: Box<dyn Fn(&str) -> zeph_llm::provider::EmbedFuture + Send + Sync>,
16}
17
18impl IngestionPipeline {
19    pub fn new(
20        splitter: TextSplitter,
21        qdrant: QdrantOps,
22        collection: impl Into<String>,
23        embed_fn: Box<dyn Fn(&str) -> zeph_llm::provider::EmbedFuture + Send + Sync>,
24    ) -> Self {
25        Self {
26            splitter,
27            qdrant,
28            collection: collection.into(),
29            embed_fn,
30        }
31    }
32
33    /// Ingest a document: split -> embed -> store in Qdrant. Returns chunk count.
34    ///
35    /// # Errors
36    ///
37    /// Returns an error if embedding or Qdrant storage fails.
38    pub async fn ingest(&self, document: Document) -> Result<usize, DocumentError> {
39        let chunks = self.splitter.split(&document);
40        if chunks.is_empty() {
41            return Ok(0);
42        }
43
44        let mut points = Vec::with_capacity(chunks.len());
45        for chunk in &chunks {
46            let vector = (self.embed_fn)(&chunk.content).await?;
47            let payload = QdrantOps::json_to_payload(json!({
48                "source": chunk.metadata.source,
49                "content_type": chunk.metadata.content_type,
50                "chunk_index": chunk.chunk_index,
51                "content": chunk.content,
52            }))
53            .map_err(|e| DocumentError::Storage(crate::error::MemoryError::Json(e)))?;
54
55            points.push(PointStruct::new(
56                Uuid::new_v4().to_string(),
57                vector,
58                payload,
59            ));
60        }
61
62        let count = points.len();
63        self.qdrant
64            .upsert(&self.collection, points)
65            .await
66            .map_err(|e| DocumentError::Storage(crate::error::MemoryError::Qdrant(e)))?;
67
68        Ok(count)
69    }
70
71    /// # Errors
72    ///
73    /// Returns an error if loading, embedding, or storage fails.
74    pub async fn load_and_ingest(
75        &self,
76        loader: &(dyn DocumentLoader + '_),
77        path: &std::path::Path,
78    ) -> Result<usize, DocumentError> {
79        let documents = loader.load(path).await?;
80        let mut total = 0;
81        for doc in documents {
82            total += self.ingest(doc).await?;
83        }
84        Ok(total)
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91    use crate::document::splitter::SplitterConfig;
92    use crate::document::types::DocumentMetadata;
93    use std::collections::HashMap;
94
95    fn make_document(content: &str) -> Document {
96        Document {
97            content: content.to_string(),
98            metadata: DocumentMetadata {
99                source: "test".to_string(),
100                content_type: "text/plain".to_string(),
101                extra: HashMap::new(),
102            },
103        }
104    }
105
106    fn noop_embed() -> Box<dyn Fn(&str) -> zeph_llm::provider::EmbedFuture + Send + Sync> {
107        Box::new(|_text: &str| Box::pin(async move { Ok(vec![0.0f32; 4]) }))
108    }
109
110    fn error_embed() -> Box<dyn Fn(&str) -> zeph_llm::provider::EmbedFuture + Send + Sync> {
111        Box::new(|_text: &str| {
112            Box::pin(
113                async move { Err(zeph_llm::error::LlmError::Other("mock embed error".into())) },
114            )
115        })
116    }
117
118    #[tokio::test]
119    async fn ingest_empty_document_returns_zero() {
120        // Empty document should short-circuit before calling Qdrant.
121        // We use an invalid Qdrant URL; the early-return path won't reach it.
122        let qdrant = crate::QdrantOps::new("http://127.0.0.1:1").unwrap();
123        let splitter = TextSplitter::new(SplitterConfig::default());
124        let pipeline = IngestionPipeline::new(splitter, qdrant, "col", noop_embed());
125
126        let doc = make_document("");
127        let count = pipeline.ingest(doc).await.unwrap();
128        assert_eq!(count, 0);
129    }
130
131    #[tokio::test]
132    async fn ingest_document_embedding_error_propagates() {
133        // Embedding failure should return DocumentError without reaching Qdrant.
134        let qdrant = crate::QdrantOps::new("http://127.0.0.1:1").unwrap();
135        let splitter = TextSplitter::new(SplitterConfig::default());
136        let pipeline = IngestionPipeline::new(splitter, qdrant, "col", error_embed());
137
138        let doc = make_document("hello world, this is test content for embedding");
139        let result = pipeline.ingest(doc).await;
140        assert!(result.is_err(), "expected error from embedding failure");
141    }
142}