1use std::sync::Arc;
33
34use entelix_core::{Error, ExecutionContext, Result};
35use entelix_memory::{
36 Document as RetrievedDocument, DocumentId as RetrievedDocumentId, Embedder, Namespace,
37 VectorStore,
38};
39use futures::StreamExt;
40use serde::{Deserialize, Serialize};
41
42use crate::chunker::Chunker;
43use crate::document::Document;
44use crate::loader::DocumentLoader;
45use crate::splitter::TextSplitter;
46
47pub const PROVENANCE_METADATA_KEY: &str = "entelix";
53
54#[derive(Clone, Debug, Default, Serialize, Deserialize)]
57#[non_exhaustive]
58pub struct IngestReport {
59 pub documents_loaded: u64,
61 pub chunks_indexed: u64,
64 pub embedding_calls: u64,
68 pub errors: Vec<IngestError>,
73}
74
75impl IngestReport {
76 #[must_use]
82 pub const fn is_clean(&self) -> bool {
83 self.errors.is_empty()
84 }
85}
86
87#[derive(Clone, Debug, Serialize, Deserialize)]
91#[non_exhaustive]
92pub struct IngestError {
93 pub stage: String,
99 pub document_id: String,
103 pub message: String,
106}
107
108impl IngestError {
109 fn from_error(stage: impl Into<String>, document_id: impl Into<String>, err: &Error) -> Self {
110 Self {
111 stage: stage.into(),
112 document_id: document_id.into(),
113 message: err.to_string(),
114 }
115 }
116}
117
118pub struct IngestionPipelineBuilder<L, S, E: ?Sized, V: ?Sized> {
123 loader: L,
124 splitter: S,
125 embedder: Arc<E>,
126 store: Arc<V>,
127 chunkers: Vec<Arc<dyn Chunker>>,
128 namespace: Namespace,
129}
130
131impl<L, S, E, V> IngestionPipelineBuilder<L, S, E, V>
132where
133 L: DocumentLoader,
134 S: TextSplitter,
135 E: Embedder + ?Sized,
136 V: VectorStore + ?Sized,
137{
138 #[must_use]
142 pub fn add_chunker(mut self, chunker: Arc<dyn Chunker>) -> Self {
143 self.chunkers.push(chunker);
144 self
145 }
146
147 #[must_use]
149 pub fn build(self) -> IngestionPipeline<L, S, E, V> {
150 IngestionPipeline {
151 loader: self.loader,
152 splitter: self.splitter,
153 embedder: self.embedder,
154 store: self.store,
155 chunkers: self.chunkers,
156 namespace: self.namespace,
157 }
158 }
159}
160
161pub struct IngestionPipeline<L, S, E: ?Sized, V: ?Sized> {
165 loader: L,
166 splitter: S,
167 embedder: Arc<E>,
168 store: Arc<V>,
169 chunkers: Vec<Arc<dyn Chunker>>,
170 namespace: Namespace,
171}
172
173impl<L, S, E, V> IngestionPipeline<L, S, E, V>
174where
175 L: DocumentLoader,
176 S: TextSplitter,
177 E: Embedder + ?Sized,
178 V: VectorStore + ?Sized,
179{
180 #[must_use]
186 pub fn builder(
187 loader: L,
188 splitter: S,
189 embedder: Arc<E>,
190 store: Arc<V>,
191 namespace: Namespace,
192 ) -> IngestionPipelineBuilder<L, S, E, V> {
193 IngestionPipelineBuilder {
194 loader,
195 splitter,
196 embedder,
197 store,
198 chunkers: Vec::new(),
199 namespace,
200 }
201 }
202
203 pub async fn run(&self, ctx: &ExecutionContext) -> Result<IngestReport> {
208 let mut report = IngestReport::default();
209 let mut stream = self.loader.load(ctx).await?;
210 while let Some(item) = stream.next().await {
211 if ctx.is_cancelled() {
212 return Err(Error::Cancelled);
213 }
214 match item {
215 Err(err) => {
216 report
217 .errors
218 .push(IngestError::from_error("load", "<unknown>", &err));
219 }
220 Ok(document) => {
221 report.documents_loaded = report.documents_loaded.saturating_add(1);
222 self.process_document(document, ctx, &mut report).await;
223 }
224 }
225 }
226 Ok(report)
227 }
228
229 async fn process_document(
233 &self,
234 document: Document,
235 ctx: &ExecutionContext,
236 report: &mut IngestReport,
237 ) {
238 let document_id = document.id.as_str().to_owned();
239 let mut chunks = self.splitter.split(&document);
240 if chunks.is_empty() {
241 return;
242 }
243
244 for chunker in &self.chunkers {
247 match chunker.process(chunks, ctx).await {
248 Ok(transformed) => chunks = transformed,
249 Err(err) => {
250 report
251 .errors
252 .push(IngestError::from_error("chunk", &document_id, &err));
253 return;
254 }
255 }
256 }
257
258 if chunks.is_empty() {
259 return;
260 }
261
262 let texts: Vec<String> = chunks.iter().map(|c| c.content.clone()).collect();
265 let embeddings = match self.embedder.embed_batch(&texts, ctx).await {
266 Ok(v) => v,
267 Err(err) => {
268 report
269 .errors
270 .push(IngestError::from_error("embed", &document_id, &err));
271 return;
272 }
273 };
274 report.embedding_calls = report.embedding_calls.saturating_add(1);
275 if embeddings.len() != chunks.len() {
276 report.errors.push(IngestError {
277 stage: "embed".to_owned(),
278 document_id: document_id.clone(),
279 message: format!(
280 "embedder returned {} vectors for {} chunks",
281 embeddings.len(),
282 chunks.len()
283 ),
284 });
285 return;
286 }
287
288 let items: Vec<(RetrievedDocument, Vec<f32>)> = chunks
291 .into_iter()
292 .zip(embeddings)
293 .map(|(chunk, emb)| (to_retrieved(chunk), emb.vector))
294 .collect();
295
296 if let Err(err) = self
297 .store
298 .add_batch(ctx, &self.namespace, items.clone())
299 .await
300 {
301 report
302 .errors
303 .push(IngestError::from_error("store", &document_id, &err));
304 return;
305 }
306 let count = items.len() as u64;
307 report.chunks_indexed = report.chunks_indexed.saturating_add(count);
308 }
309}
310
311fn to_retrieved(chunk: Document) -> RetrievedDocument {
318 let provenance = serde_json::json!({
319 "source": chunk.source,
320 "lineage": chunk.lineage,
321 "namespace": chunk.namespace.render(),
322 });
323 let mut metadata = match chunk.metadata {
324 serde_json::Value::Object(map) => map,
325 serde_json::Value::Null => serde_json::Map::new(),
326 other => {
327 let mut map = serde_json::Map::new();
328 map.insert("value".to_owned(), other);
329 map
330 }
331 };
332 metadata.insert(PROVENANCE_METADATA_KEY.to_owned(), provenance);
333 RetrievedDocument::new(chunk.content)
334 .with_doc_id(RetrievedDocumentId::from(chunk.id.as_str()))
335 .with_metadata(serde_json::Value::Object(metadata))
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use crate::document::Source;
342 use crate::splitter::RecursiveCharacterSplitter;
343 use async_trait::async_trait;
344 use entelix_memory::{Embedding, EmbeddingUsage, InMemoryVectorStore};
345 use std::sync::Mutex;
346
347 fn ns() -> Namespace {
348 Namespace::new(entelix_core::TenantId::new("acme"))
349 }
350
351 struct StubLoader {
353 documents: Mutex<Vec<Document>>,
354 }
355
356 impl StubLoader {
357 fn new(documents: Vec<Document>) -> Self {
358 Self {
359 documents: Mutex::new(documents),
360 }
361 }
362 }
363
364 #[async_trait]
365 impl DocumentLoader for StubLoader {
366 fn name(&self) -> &'static str {
367 "stub"
368 }
369
370 async fn load<'a>(
371 &'a self,
372 _ctx: &'a ExecutionContext,
373 ) -> Result<crate::loader::DocumentStream<'a>> {
374 let docs = std::mem::take(&mut *self.documents.lock().unwrap());
375 Ok(Box::pin(futures::stream::iter(docs.into_iter().map(Ok))))
376 }
377 }
378
379 struct PartialLoader;
382
383 #[async_trait]
384 impl DocumentLoader for PartialLoader {
385 fn name(&self) -> &'static str {
386 "partial"
387 }
388
389 async fn load<'a>(
390 &'a self,
391 _ctx: &'a ExecutionContext,
392 ) -> Result<crate::loader::DocumentStream<'a>> {
393 let ok = Document::root("d1", "alpha", Source::now("test://", "test"), ns());
394 let stream =
395 futures::stream::iter(vec![Ok(ok), Err(Error::invalid_request("bad item"))]);
396 Ok(Box::pin(stream))
397 }
398 }
399
400 struct StubEmbedder {
402 dimension: usize,
403 }
404
405 #[async_trait]
406 impl Embedder for StubEmbedder {
407 fn dimension(&self) -> usize {
408 self.dimension
409 }
410
411 async fn embed(&self, text: &str, _ctx: &ExecutionContext) -> Result<Embedding> {
412 let mut v = vec![0.0_f32; self.dimension];
413 #[allow(clippy::cast_precision_loss)]
414 if let Some(first) = v.first_mut() {
415 *first = text.len() as f32;
416 }
417 Ok(Embedding {
418 vector: v,
419 usage: Some(EmbeddingUsage::new(1)),
420 })
421 }
422 }
423
424 #[tokio::test]
425 async fn empty_loader_produces_zero_indexed_clean_report() {
426 let loader = StubLoader::new(vec![]);
427 let pipeline = IngestionPipeline::builder(
428 loader,
429 RecursiveCharacterSplitter::new(),
430 Arc::new(StubEmbedder { dimension: 4 }),
431 Arc::new(InMemoryVectorStore::new(4)),
432 ns(),
433 )
434 .build();
435 let report = pipeline.run(&ExecutionContext::new()).await.unwrap();
436 assert_eq!(report.documents_loaded, 0);
437 assert_eq!(report.chunks_indexed, 0);
438 assert_eq!(report.embedding_calls, 0);
439 assert!(report.is_clean());
440 }
441
442 #[tokio::test]
443 async fn single_document_flows_load_split_embed_store() {
444 let doc = Document::root(
445 "doc-1",
446 "alpha\n\nbeta\n\ngamma",
447 Source::now("test://doc-1", "test"),
448 ns(),
449 );
450 let loader = StubLoader::new(vec![doc]);
451 let store = Arc::new(InMemoryVectorStore::new(4));
452 let pipeline = IngestionPipeline::builder(
453 loader,
454 RecursiveCharacterSplitter::new()
455 .with_chunk_size(10)
456 .with_chunk_overlap(0),
457 Arc::new(StubEmbedder { dimension: 4 }),
458 Arc::clone(&store),
459 ns(),
460 )
461 .build();
462 let report = pipeline.run(&ExecutionContext::new()).await.unwrap();
463 assert_eq!(report.documents_loaded, 1);
464 assert_eq!(report.embedding_calls, 1, "one batch per document");
465 assert!(report.chunks_indexed >= 3);
466 assert!(report.is_clean());
467
468 let mut probe_vec = vec![0.0_f32; 4];
471 probe_vec[0] = 5.0;
472 let hits = store
473 .search(&ExecutionContext::new(), &ns(), &probe_vec, 10)
474 .await
475 .unwrap();
476 assert!(!hits.is_empty(), "store must contain the indexed chunks");
477 let metadata = &hits[0].metadata;
478 assert!(
479 metadata.get(PROVENANCE_METADATA_KEY).is_some(),
480 "provenance metadata stamped under reserved key"
481 );
482 let provenance = &metadata[PROVENANCE_METADATA_KEY];
483 assert!(provenance.get("source").is_some());
484 assert!(provenance.get("lineage").is_some());
485 assert!(provenance.get("namespace").is_some());
486 }
487
488 #[tokio::test]
489 async fn partial_loader_failure_recorded_but_run_completes() {
490 let store = Arc::new(InMemoryVectorStore::new(4));
491 let pipeline = IngestionPipeline::builder(
492 PartialLoader,
493 RecursiveCharacterSplitter::new(),
494 Arc::new(StubEmbedder { dimension: 4 }),
495 Arc::clone(&store),
496 ns(),
497 )
498 .build();
499 let report = pipeline.run(&ExecutionContext::new()).await.unwrap();
500 assert_eq!(report.documents_loaded, 1, "successful item still indexed");
501 assert_eq!(report.errors.len(), 1, "loader-side error recorded");
502 assert_eq!(report.errors[0].stage, "load");
503 assert!(!report.is_clean());
504 assert!(report.chunks_indexed >= 1);
505 }
506
507 struct EvenIndexChunker;
511
512 #[async_trait]
513 impl Chunker for EvenIndexChunker {
514 fn name(&self) -> &'static str {
515 "even-only"
516 }
517
518 async fn process(
519 &self,
520 chunks: Vec<Document>,
521 _ctx: &ExecutionContext,
522 ) -> Result<Vec<Document>> {
523 let kept: Vec<Document> = chunks
526 .into_iter()
527 .enumerate()
528 .filter(|(idx, _)| idx % 2 == 0)
529 .map(|(_, mut chunk)| {
530 if let Some(lineage) = chunk.lineage.as_mut() {
531 lineage.push_chunker("even-only");
532 }
533 chunk
534 })
535 .collect();
536 Ok(kept)
537 }
538 }
539
540 #[tokio::test]
541 async fn chunker_chain_filters_chunks_before_storage() {
542 let doc = Document::root(
547 "doc",
548 "alpha\n\nbeta\n\ngamma\n\ndelta\n\nepsilon",
549 Source::now("test://", "test"),
550 ns(),
551 );
552 let store = Arc::new(InMemoryVectorStore::new(4));
553 let raw_chunks = RecursiveCharacterSplitter::new()
554 .with_chunk_size(7)
555 .with_chunk_overlap(0)
556 .split(&doc);
557 let raw_count = raw_chunks.len();
558 assert!(
559 raw_count >= 3,
560 "splitter must produce at least three chunks for the test to be meaningful: got {raw_count}"
561 );
562 let pipeline = IngestionPipeline::builder(
563 StubLoader::new(vec![doc]),
564 RecursiveCharacterSplitter::new()
565 .with_chunk_size(7)
566 .with_chunk_overlap(0),
567 Arc::new(StubEmbedder { dimension: 4 }),
568 Arc::clone(&store),
569 ns(),
570 )
571 .add_chunker(Arc::new(EvenIndexChunker))
572 .build();
573 let report = pipeline.run(&ExecutionContext::new()).await.unwrap();
574 assert!(report.is_clean());
575 let kept = raw_count.div_ceil(2); let kept_u64 = kept as u64;
577 assert_eq!(
578 report.chunks_indexed, kept_u64,
579 "chunker dropped odd indices; expected {kept} of {raw_count} indexed"
580 );
581
582 let mut probe = vec![0.0_f32; 4];
584 probe[0] = 5.0;
585 let hits = store
586 .search(&ExecutionContext::new(), &ns(), &probe, 100)
587 .await
588 .unwrap();
589 for hit in hits {
590 let chain = &hit.metadata[PROVENANCE_METADATA_KEY]["lineage"]["chunker_chain"];
591 let arr = chain.as_array().unwrap();
592 assert_eq!(arr.len(), 1);
593 assert_eq!(arr[0].as_str(), Some("even-only"));
594 }
595 }
596}