Skip to main content

entelix_rag/
pipeline.rs

1//! `IngestionPipeline` — typed composition of every RAG primitive
2//! into one runnable end-to-end ingestion path.
3//!
4//! Wires [`DocumentLoader`] → [`TextSplitter`] → optional
5//! [`Chunker`] chain → [`Embedder`] → [`VectorStore`] with one
6//! [`Namespace`] supplying the multi-tenant boundary. One
7//! [`Self::run`] call drains the source, processes every document
8//! the loader produces, and surfaces an [`IngestReport`] summarising
9//! what landed in the index and what failed.
10//!
11//! ## Why a typed composition (not a `Vec<Arc<dyn …>>` plumbing)
12//!
13//! Each pipeline component is generic in this crate's surface
14//! (`L: DocumentLoader, S: TextSplitter, E: Embedder, V:
15//! VectorStore`) so monomorphisation produces an inlined hot path —
16//! the chunk loop never pays a vtable dispatch per invocation. The
17//! [`Chunker`] chain alone is `Vec<Arc<dyn Chunker>>` because
18//! chains are runtime-variable in length and content; the
19//! single-occurrence components are typed.
20//!
21//! ## Partial-success contract
22//!
23//! Per-document failures don't abort the pipeline. The report
24//! collects them as [`IngestError`] entries and `run()` keeps
25//! draining; the loader-level `Result<Document>` items, the splitter
26//! and chunker passes, and the embedder + store calls all
27//! contribute. Operators decide whether a failure count is
28//! actionable from the report — `run()` itself only returns
29//! `Err` for *structural* failures (loader open rejection,
30//! cancellation, deadline).
31
32use std::sync::Arc;
33
34use entelix_core::{Error, ExecutionContext, Result};
35use entelix_memory::{
36    Document as RetrievedDocument, DocumentId as RetrievedDocumentId, Embedder, Namespace,
37    VectorStore,
38};
39use futures::StreamExt;
40use serde::{Deserialize, Serialize};
41
42use crate::chunker::Chunker;
43use crate::document::Document;
44use crate::loader::DocumentLoader;
45use crate::splitter::TextSplitter;
46
47/// Reserved key on the persisted `metadata` map under which the
48/// pipeline stamps `Source` + `Lineage` + `namespace`. Carries the
49/// `entelix` prefix so an operator's own metadata fields never
50/// collide. Retrieval-side consumers reach back to provenance
51/// through this nested object.
52pub const PROVENANCE_METADATA_KEY: &str = "entelix";
53
54/// Outcome counters and per-document failure list a single
55/// [`IngestionPipeline::run`] produces.
56#[derive(Clone, Debug, Default, Serialize, Deserialize)]
57#[non_exhaustive]
58pub struct IngestReport {
59    /// Documents the loader yielded successfully.
60    pub documents_loaded: u64,
61    /// Documents that reached the embedder + store stages
62    /// successfully (one entry per chunk produced by splitting).
63    pub chunks_indexed: u64,
64    /// Embedding API calls the pipeline made — useful for cost
65    /// reconciliation against vendor-side dashboards. One entry per
66    /// `embed_batch` invocation, regardless of batch size.
67    pub embedding_calls: u64,
68    /// Per-document errors. The pipeline does NOT abort on these —
69    /// the report accumulates them and `run` drains the rest of
70    /// the source. Operators decide whether a non-empty list is
71    /// actionable.
72    pub errors: Vec<IngestError>,
73}
74
75impl IngestReport {
76    /// Whether the pipeline indexed every document the loader
77    /// produced without per-document errors. `true` does NOT mean
78    /// the loader hit zero documents — pair with
79    /// [`Self::documents_loaded`] for the "found anything"
80    /// distinction.
81    #[must_use]
82    pub const fn is_clean(&self) -> bool {
83        self.errors.is_empty()
84    }
85}
86
87/// One per-document failure recorded during ingestion. Carries the
88/// originating document id (when known) and a stage label
89/// identifying which pipeline phase failed.
90#[derive(Clone, Debug, Serialize, Deserialize)]
91#[non_exhaustive]
92pub struct IngestError {
93    /// Stage label. One of `"load"`, `"chunk"`, `"embed"`,
94    /// `"store"`. Carried as `String` (rather than `&'static str`)
95    /// so persisted reports reconstruct via serde without forcing
96    /// every stage label to be a string literal at deserialise
97    /// time.
98    pub stage: String,
99    /// Document id of the item that failed, when the failure
100    /// happened after an id was stamped. `"<unknown>"` for
101    /// loader-side failures that never produced a document.
102    pub document_id: String,
103    /// LLM-/operator-facing message. Renders the originating
104    /// `entelix_core::Error` through its `Display` impl.
105    pub message: String,
106}
107
108impl IngestError {
109    fn from_error(stage: impl Into<String>, document_id: impl Into<String>, err: &Error) -> Self {
110        Self {
111            stage: stage.into(),
112            document_id: document_id.into(),
113            message: err.to_string(),
114        }
115    }
116}
117
118/// Builder for [`IngestionPipeline`]. Required components
119/// (loader / splitter / embedder / store) come in via
120/// [`IngestionPipeline::builder`]; optional [`Chunker`] entries
121/// accumulate via [`Self::add_chunker`].
122pub struct IngestionPipelineBuilder<L, S, E: ?Sized, V: ?Sized> {
123    loader: L,
124    splitter: S,
125    embedder: Arc<E>,
126    store: Arc<V>,
127    chunkers: Vec<Arc<dyn Chunker>>,
128    namespace: Namespace,
129}
130
131impl<L, S, E, V> IngestionPipelineBuilder<L, S, E, V>
132where
133    L: DocumentLoader,
134    S: TextSplitter,
135    E: Embedder + ?Sized,
136    V: VectorStore + ?Sized,
137{
138    /// Append a [`Chunker`] to the chain. Multiple chunkers run in
139    /// registration order — `add_chunker(contextual).add_chunker(hyde)`
140    /// runs `contextual` first, then `hyde` over its output.
141    #[must_use]
142    pub fn add_chunker(mut self, chunker: Arc<dyn Chunker>) -> Self {
143        self.chunkers.push(chunker);
144        self
145    }
146
147    /// Finalise into a runnable pipeline.
148    #[must_use]
149    pub fn build(self) -> IngestionPipeline<L, S, E, V> {
150        IngestionPipeline {
151            loader: self.loader,
152            splitter: self.splitter,
153            embedder: self.embedder,
154            store: self.store,
155            chunkers: self.chunkers,
156            namespace: self.namespace,
157        }
158    }
159}
160
161/// End-to-end RAG ingestion pipeline. Construct via
162/// [`Self::builder`]; finalise with
163/// [`IngestionPipelineBuilder::build`]; drive with [`Self::run`].
164pub struct IngestionPipeline<L, S, E: ?Sized, V: ?Sized> {
165    loader: L,
166    splitter: S,
167    embedder: Arc<E>,
168    store: Arc<V>,
169    chunkers: Vec<Arc<dyn Chunker>>,
170    namespace: Namespace,
171}
172
173impl<L, S, E, V> IngestionPipeline<L, S, E, V>
174where
175    L: DocumentLoader,
176    S: TextSplitter,
177    E: Embedder + ?Sized,
178    V: VectorStore + ?Sized,
179{
180    /// Start a builder bound to the supplied components and
181    /// [`Namespace`]. Embedder and store are `Arc<E>` / `Arc<V>`
182    /// because production deployments share single instances
183    /// across many pipelines (one connection pool, one tenant
184    /// scope).
185    #[must_use]
186    pub fn builder(
187        loader: L,
188        splitter: S,
189        embedder: Arc<E>,
190        store: Arc<V>,
191        namespace: Namespace,
192    ) -> IngestionPipelineBuilder<L, S, E, V> {
193        IngestionPipelineBuilder {
194            loader,
195            splitter,
196            embedder,
197            store,
198            chunkers: Vec::new(),
199            namespace,
200        }
201    }
202
203    /// Drain the loader, process every document, return the
204    /// outcome report. Cancellation polls between every loader
205    /// item — a long-running ingestion bails within one chunk
206    /// boundary of `ctx.cancel()`.
207    pub async fn run(&self, ctx: &ExecutionContext) -> Result<IngestReport> {
208        let mut report = IngestReport::default();
209        let mut stream = self.loader.load(ctx).await?;
210        while let Some(item) = stream.next().await {
211            if ctx.is_cancelled() {
212                return Err(Error::Cancelled);
213            }
214            match item {
215                Err(err) => {
216                    report
217                        .errors
218                        .push(IngestError::from_error("load", "<unknown>", &err));
219                }
220                Ok(document) => {
221                    report.documents_loaded = report.documents_loaded.saturating_add(1);
222                    self.process_document(document, ctx, &mut report).await;
223                }
224            }
225        }
226        Ok(report)
227    }
228
229    /// Per-document pipeline: split → chunker chain → embed →
230    /// store. Errors are recorded on the report; the pipeline
231    /// continues with the next document.
232    async fn process_document(
233        &self,
234        document: Document,
235        ctx: &ExecutionContext,
236        report: &mut IngestReport,
237    ) {
238        let document_id = document.id.as_str().to_owned();
239        let mut chunks = self.splitter.split(&document);
240        if chunks.is_empty() {
241            return;
242        }
243
244        // Chunker chain — each chunker runs over the whole vector,
245        // batching its work where supported.
246        for chunker in &self.chunkers {
247            match chunker.process(chunks, ctx).await {
248                Ok(transformed) => chunks = transformed,
249                Err(err) => {
250                    report
251                        .errors
252                        .push(IngestError::from_error("chunk", &document_id, &err));
253                    return;
254                }
255            }
256        }
257
258        if chunks.is_empty() {
259            return;
260        }
261
262        // Embedder pass — one batch per document so the cost meter
263        // sees one call regardless of chunk count.
264        let texts: Vec<String> = chunks.iter().map(|c| c.content.clone()).collect();
265        let embeddings = match self.embedder.embed_batch(&texts, ctx).await {
266            Ok(v) => v,
267            Err(err) => {
268                report
269                    .errors
270                    .push(IngestError::from_error("embed", &document_id, &err));
271                return;
272            }
273        };
274        report.embedding_calls = report.embedding_calls.saturating_add(1);
275        if embeddings.len() != chunks.len() {
276            report.errors.push(IngestError {
277                stage: "embed".to_owned(),
278                document_id: document_id.clone(),
279                message: format!(
280                    "embedder returned {} vectors for {} chunks",
281                    embeddings.len(),
282                    chunks.len()
283                ),
284            });
285            return;
286        }
287
288        // Bridge to the retrieval-side `entelix_memory::Document`
289        // shape — provenance lands in `metadata.entelix`.
290        let items: Vec<(RetrievedDocument, Vec<f32>)> = chunks
291            .into_iter()
292            .zip(embeddings)
293            .map(|(chunk, emb)| (to_retrieved(chunk), emb.vector))
294            .collect();
295
296        if let Err(err) = self
297            .store
298            .add_batch(ctx, &self.namespace, items.clone())
299            .await
300        {
301            report
302                .errors
303                .push(IngestError::from_error("store", &document_id, &err));
304            return;
305        }
306        let count = items.len() as u64;
307        report.chunks_indexed = report.chunks_indexed.saturating_add(count);
308    }
309}
310
311/// Convert an ingestion-shape [`Document`] into the retrieval-shape
312/// [`entelix_memory::Document`] the vector store stores. Provenance
313/// (source + lineage + tenant) lives under the
314/// [`PROVENANCE_METADATA_KEY`] reserved key on the persisted
315/// `metadata` so retrieval consumers reach back to origin without a
316/// second round-trip.
317fn to_retrieved(chunk: Document) -> RetrievedDocument {
318    let provenance = serde_json::json!({
319        "source": chunk.source,
320        "lineage": chunk.lineage,
321        "namespace": chunk.namespace.render(),
322    });
323    let mut metadata = match chunk.metadata {
324        serde_json::Value::Object(map) => map,
325        serde_json::Value::Null => serde_json::Map::new(),
326        other => {
327            let mut map = serde_json::Map::new();
328            map.insert("value".to_owned(), other);
329            map
330        }
331    };
332    metadata.insert(PROVENANCE_METADATA_KEY.to_owned(), provenance);
333    RetrievedDocument::new(chunk.content)
334        .with_doc_id(RetrievedDocumentId::from(chunk.id.as_str()))
335        .with_metadata(serde_json::Value::Object(metadata))
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341    use crate::document::Source;
342    use crate::splitter::RecursiveCharacterSplitter;
343    use async_trait::async_trait;
344    use entelix_memory::{Embedding, EmbeddingUsage, InMemoryVectorStore};
345    use std::sync::Mutex;
346
347    fn ns() -> Namespace {
348        Namespace::new(entelix_core::TenantId::new("acme"))
349    }
350
351    /// In-process loader yielding pre-canned documents.
352    struct StubLoader {
353        documents: Mutex<Vec<Document>>,
354    }
355
356    impl StubLoader {
357        fn new(documents: Vec<Document>) -> Self {
358            Self {
359                documents: Mutex::new(documents),
360            }
361        }
362    }
363
364    #[async_trait]
365    impl DocumentLoader for StubLoader {
366        fn name(&self) -> &'static str {
367            "stub"
368        }
369
370        async fn load<'a>(
371            &'a self,
372            _ctx: &'a ExecutionContext,
373        ) -> Result<crate::loader::DocumentStream<'a>> {
374            let docs = std::mem::take(&mut *self.documents.lock().unwrap());
375            Ok(Box::pin(futures::stream::iter(docs.into_iter().map(Ok))))
376        }
377    }
378
379    /// Loader that yields one document and one error — verifies
380    /// partial-success contract.
381    struct PartialLoader;
382
383    #[async_trait]
384    impl DocumentLoader for PartialLoader {
385        fn name(&self) -> &'static str {
386            "partial"
387        }
388
389        async fn load<'a>(
390            &'a self,
391            _ctx: &'a ExecutionContext,
392        ) -> Result<crate::loader::DocumentStream<'a>> {
393            let ok = Document::root("d1", "alpha", Source::now("test://", "test"), ns());
394            let stream =
395                futures::stream::iter(vec![Ok(ok), Err(Error::invalid_request("bad item"))]);
396            Ok(Box::pin(stream))
397        }
398    }
399
400    /// Deterministic fixed-dimension embedder that returns `[byte_count, 0, 0, …]`.
401    struct StubEmbedder {
402        dimension: usize,
403    }
404
405    #[async_trait]
406    impl Embedder for StubEmbedder {
407        fn dimension(&self) -> usize {
408            self.dimension
409        }
410
411        async fn embed(&self, text: &str, _ctx: &ExecutionContext) -> Result<Embedding> {
412            let mut v = vec![0.0_f32; self.dimension];
413            #[allow(clippy::cast_precision_loss)]
414            if let Some(first) = v.first_mut() {
415                *first = text.len() as f32;
416            }
417            Ok(Embedding {
418                vector: v,
419                usage: Some(EmbeddingUsage::new(1)),
420            })
421        }
422    }
423
424    #[tokio::test]
425    async fn empty_loader_produces_zero_indexed_clean_report() {
426        let loader = StubLoader::new(vec![]);
427        let pipeline = IngestionPipeline::builder(
428            loader,
429            RecursiveCharacterSplitter::new(),
430            Arc::new(StubEmbedder { dimension: 4 }),
431            Arc::new(InMemoryVectorStore::new(4)),
432            ns(),
433        )
434        .build();
435        let report = pipeline.run(&ExecutionContext::new()).await.unwrap();
436        assert_eq!(report.documents_loaded, 0);
437        assert_eq!(report.chunks_indexed, 0);
438        assert_eq!(report.embedding_calls, 0);
439        assert!(report.is_clean());
440    }
441
442    #[tokio::test]
443    async fn single_document_flows_load_split_embed_store() {
444        let doc = Document::root(
445            "doc-1",
446            "alpha\n\nbeta\n\ngamma",
447            Source::now("test://doc-1", "test"),
448            ns(),
449        );
450        let loader = StubLoader::new(vec![doc]);
451        let store = Arc::new(InMemoryVectorStore::new(4));
452        let pipeline = IngestionPipeline::builder(
453            loader,
454            RecursiveCharacterSplitter::new()
455                .with_chunk_size(10)
456                .with_chunk_overlap(0),
457            Arc::new(StubEmbedder { dimension: 4 }),
458            Arc::clone(&store),
459            ns(),
460        )
461        .build();
462        let report = pipeline.run(&ExecutionContext::new()).await.unwrap();
463        assert_eq!(report.documents_loaded, 1);
464        assert_eq!(report.embedding_calls, 1, "one batch per document");
465        assert!(report.chunks_indexed >= 3);
466        assert!(report.is_clean());
467
468        // Round-trip through retrieval — store carries the chunks
469        // and provenance landed under the reserved metadata key.
470        let mut probe_vec = vec![0.0_f32; 4];
471        probe_vec[0] = 5.0;
472        let hits = store
473            .search(&ExecutionContext::new(), &ns(), &probe_vec, 10)
474            .await
475            .unwrap();
476        assert!(!hits.is_empty(), "store must contain the indexed chunks");
477        let metadata = &hits[0].metadata;
478        assert!(
479            metadata.get(PROVENANCE_METADATA_KEY).is_some(),
480            "provenance metadata stamped under reserved key"
481        );
482        let provenance = &metadata[PROVENANCE_METADATA_KEY];
483        assert!(provenance.get("source").is_some());
484        assert!(provenance.get("lineage").is_some());
485        assert!(provenance.get("namespace").is_some());
486    }
487
488    #[tokio::test]
489    async fn partial_loader_failure_recorded_but_run_completes() {
490        let store = Arc::new(InMemoryVectorStore::new(4));
491        let pipeline = IngestionPipeline::builder(
492            PartialLoader,
493            RecursiveCharacterSplitter::new(),
494            Arc::new(StubEmbedder { dimension: 4 }),
495            Arc::clone(&store),
496            ns(),
497        )
498        .build();
499        let report = pipeline.run(&ExecutionContext::new()).await.unwrap();
500        assert_eq!(report.documents_loaded, 1, "successful item still indexed");
501        assert_eq!(report.errors.len(), 1, "loader-side error recorded");
502        assert_eq!(report.errors[0].stage, "load");
503        assert!(!report.is_clean());
504        assert!(report.chunks_indexed >= 1);
505    }
506
507    /// Chunker that drops every chunk except those at even
508    /// indices — verifies the chunker chain runs and feeds the
509    /// next stage.
510    struct EvenIndexChunker;
511
512    #[async_trait]
513    impl Chunker for EvenIndexChunker {
514        fn name(&self) -> &'static str {
515            "even-only"
516        }
517
518        async fn process(
519            &self,
520            chunks: Vec<Document>,
521            _ctx: &ExecutionContext,
522        ) -> Result<Vec<Document>> {
523            // Tag every kept chunk's lineage so the pipeline records
524            // the chunker chain entry per design contract.
525            let kept: Vec<Document> = chunks
526                .into_iter()
527                .enumerate()
528                .filter(|(idx, _)| idx % 2 == 0)
529                .map(|(_, mut chunk)| {
530                    if let Some(lineage) = chunk.lineage.as_mut() {
531                        lineage.push_chunker("even-only");
532                    }
533                    chunk
534                })
535                .collect();
536            Ok(kept)
537        }
538    }
539
540    #[tokio::test]
541    async fn chunker_chain_filters_chunks_before_storage() {
542        // The chunker drops every odd-indexed chunk; the embedder +
543        // store therefore see strictly fewer chunks than the
544        // splitter emitted. Provenance ("even-only") survives on
545        // every retained chunk's lineage.
546        let doc = Document::root(
547            "doc",
548            "alpha\n\nbeta\n\ngamma\n\ndelta\n\nepsilon",
549            Source::now("test://", "test"),
550            ns(),
551        );
552        let store = Arc::new(InMemoryVectorStore::new(4));
553        let raw_chunks = RecursiveCharacterSplitter::new()
554            .with_chunk_size(7)
555            .with_chunk_overlap(0)
556            .split(&doc);
557        let raw_count = raw_chunks.len();
558        assert!(
559            raw_count >= 3,
560            "splitter must produce at least three chunks for the test to be meaningful: got {raw_count}"
561        );
562        let pipeline = IngestionPipeline::builder(
563            StubLoader::new(vec![doc]),
564            RecursiveCharacterSplitter::new()
565                .with_chunk_size(7)
566                .with_chunk_overlap(0),
567            Arc::new(StubEmbedder { dimension: 4 }),
568            Arc::clone(&store),
569            ns(),
570        )
571        .add_chunker(Arc::new(EvenIndexChunker))
572        .build();
573        let report = pipeline.run(&ExecutionContext::new()).await.unwrap();
574        assert!(report.is_clean());
575        let kept = raw_count.div_ceil(2); // even-index count
576        let kept_u64 = kept as u64;
577        assert_eq!(
578            report.chunks_indexed, kept_u64,
579            "chunker dropped odd indices; expected {kept} of {raw_count} indexed"
580        );
581
582        // Inspect the store — provenance carries the chunker chain.
583        let mut probe = vec![0.0_f32; 4];
584        probe[0] = 5.0;
585        let hits = store
586            .search(&ExecutionContext::new(), &ns(), &probe, 100)
587            .await
588            .unwrap();
589        for hit in hits {
590            let chain = &hit.metadata[PROVENANCE_METADATA_KEY]["lineage"]["chunker_chain"];
591            let arr = chain.as_array().unwrap();
592            assert_eq!(arr.len(), 1);
593            assert_eq!(arr[0].as_str(), Some("even-only"));
594        }
595    }
596}