use std::sync::Arc;
use entelix_core::{Error, ExecutionContext, Result};
use entelix_memory::{
Document as RetrievedDocument, DocumentId as RetrievedDocumentId, Embedder, Namespace,
VectorStore,
};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use crate::chunker::Chunker;
use crate::document::Document;
use crate::loader::DocumentLoader;
use crate::splitter::TextSplitter;
pub const PROVENANCE_METADATA_KEY: &str = "entelix";
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[non_exhaustive]
pub struct IngestReport {
pub documents_loaded: u64,
pub chunks_indexed: u64,
pub embedding_calls: u64,
pub errors: Vec<IngestError>,
}
impl IngestReport {
#[must_use]
pub const fn is_clean(&self) -> bool {
self.errors.is_empty()
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct IngestError {
pub stage: String,
pub document_id: String,
pub message: String,
}
impl IngestError {
fn from_error(stage: impl Into<String>, document_id: impl Into<String>, err: &Error) -> Self {
Self {
stage: stage.into(),
document_id: document_id.into(),
message: err.to_string(),
}
}
}
pub struct IngestionPipelineBuilder<L, S, E: ?Sized, V: ?Sized> {
loader: L,
splitter: S,
embedder: Arc<E>,
store: Arc<V>,
chunkers: Vec<Arc<dyn Chunker>>,
namespace: Namespace,
}
impl<L, S, E, V> IngestionPipelineBuilder<L, S, E, V>
where
L: DocumentLoader,
S: TextSplitter,
E: Embedder + ?Sized,
V: VectorStore + ?Sized,
{
#[must_use]
pub fn add_chunker(mut self, chunker: Arc<dyn Chunker>) -> Self {
self.chunkers.push(chunker);
self
}
#[must_use]
pub fn build(self) -> IngestionPipeline<L, S, E, V> {
IngestionPipeline {
loader: self.loader,
splitter: self.splitter,
embedder: self.embedder,
store: self.store,
chunkers: self.chunkers,
namespace: self.namespace,
}
}
}
pub struct IngestionPipeline<L, S, E: ?Sized, V: ?Sized> {
loader: L,
splitter: S,
embedder: Arc<E>,
store: Arc<V>,
chunkers: Vec<Arc<dyn Chunker>>,
namespace: Namespace,
}
impl<L, S, E, V> IngestionPipeline<L, S, E, V>
where
L: DocumentLoader,
S: TextSplitter,
E: Embedder + ?Sized,
V: VectorStore + ?Sized,
{
#[must_use]
pub fn builder(
loader: L,
splitter: S,
embedder: Arc<E>,
store: Arc<V>,
namespace: Namespace,
) -> IngestionPipelineBuilder<L, S, E, V> {
IngestionPipelineBuilder {
loader,
splitter,
embedder,
store,
chunkers: Vec::new(),
namespace,
}
}
pub async fn run(&self, ctx: &ExecutionContext) -> Result<IngestReport> {
let mut report = IngestReport::default();
let mut stream = self.loader.load(ctx).await?;
while let Some(item) = stream.next().await {
if ctx.is_cancelled() {
return Err(Error::Cancelled);
}
match item {
Err(err) => {
report
.errors
.push(IngestError::from_error("load", "<unknown>", &err));
}
Ok(document) => {
report.documents_loaded = report.documents_loaded.saturating_add(1);
self.process_document(document, ctx, &mut report).await;
}
}
}
Ok(report)
}
async fn process_document(
&self,
document: Document,
ctx: &ExecutionContext,
report: &mut IngestReport,
) {
let document_id = document.id.as_str().to_owned();
let mut chunks = self.splitter.split(&document);
if chunks.is_empty() {
return;
}
for chunker in &self.chunkers {
match chunker.process(chunks, ctx).await {
Ok(transformed) => chunks = transformed,
Err(err) => {
report
.errors
.push(IngestError::from_error("chunk", &document_id, &err));
return;
}
}
}
if chunks.is_empty() {
return;
}
let texts: Vec<String> = chunks.iter().map(|c| c.content.clone()).collect();
let embeddings = match self.embedder.embed_batch(&texts, ctx).await {
Ok(v) => v,
Err(err) => {
report
.errors
.push(IngestError::from_error("embed", &document_id, &err));
return;
}
};
report.embedding_calls = report.embedding_calls.saturating_add(1);
if embeddings.len() != chunks.len() {
report.errors.push(IngestError {
stage: "embed".to_owned(),
document_id: document_id.clone(),
message: format!(
"embedder returned {} vectors for {} chunks",
embeddings.len(),
chunks.len()
),
});
return;
}
let items: Vec<(RetrievedDocument, Vec<f32>)> = chunks
.into_iter()
.zip(embeddings)
.map(|(chunk, emb)| (to_retrieved(chunk), emb.vector))
.collect();
if let Err(err) = self
.store
.add_batch(ctx, &self.namespace, items.clone())
.await
{
report
.errors
.push(IngestError::from_error("store", &document_id, &err));
return;
}
let count = items.len() as u64;
report.chunks_indexed = report.chunks_indexed.saturating_add(count);
}
}
fn to_retrieved(chunk: Document) -> RetrievedDocument {
let provenance = serde_json::json!({
"source": chunk.source,
"lineage": chunk.lineage,
"namespace": chunk.namespace.render(),
});
let mut metadata = match chunk.metadata {
serde_json::Value::Object(map) => map,
serde_json::Value::Null => serde_json::Map::new(),
other => {
let mut map = serde_json::Map::new();
map.insert("value".to_owned(), other);
map
}
};
metadata.insert(PROVENANCE_METADATA_KEY.to_owned(), provenance);
RetrievedDocument::new(chunk.content)
.with_doc_id(RetrievedDocumentId::from(chunk.id.as_str()))
.with_metadata(serde_json::Value::Object(metadata))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::document::Source;
use crate::splitter::RecursiveCharacterSplitter;
use async_trait::async_trait;
use entelix_memory::{Embedding, EmbeddingUsage, InMemoryVectorStore};
use std::sync::Mutex;
fn ns() -> Namespace {
Namespace::new(entelix_core::TenantId::new("acme"))
}
struct StubLoader {
documents: Mutex<Vec<Document>>,
}
impl StubLoader {
fn new(documents: Vec<Document>) -> Self {
Self {
documents: Mutex::new(documents),
}
}
}
#[async_trait]
impl DocumentLoader for StubLoader {
fn name(&self) -> &'static str {
"stub"
}
async fn load<'a>(
&'a self,
_ctx: &'a ExecutionContext,
) -> Result<crate::loader::DocumentStream<'a>> {
let docs = std::mem::take(&mut *self.documents.lock().unwrap());
Ok(Box::pin(futures::stream::iter(docs.into_iter().map(Ok))))
}
}
struct PartialLoader;
#[async_trait]
impl DocumentLoader for PartialLoader {
fn name(&self) -> &'static str {
"partial"
}
async fn load<'a>(
&'a self,
_ctx: &'a ExecutionContext,
) -> Result<crate::loader::DocumentStream<'a>> {
let ok = Document::root("d1", "alpha", Source::now("test://", "test"), ns());
let stream =
futures::stream::iter(vec![Ok(ok), Err(Error::invalid_request("bad item"))]);
Ok(Box::pin(stream))
}
}
struct StubEmbedder {
dimension: usize,
}
#[async_trait]
impl Embedder for StubEmbedder {
fn dimension(&self) -> usize {
self.dimension
}
async fn embed(&self, text: &str, _ctx: &ExecutionContext) -> Result<Embedding> {
let mut v = vec![0.0_f32; self.dimension];
#[allow(clippy::cast_precision_loss)]
if let Some(first) = v.first_mut() {
*first = text.len() as f32;
}
Ok(Embedding {
vector: v,
usage: Some(EmbeddingUsage::new(1)),
})
}
}
#[tokio::test]
async fn empty_loader_produces_zero_indexed_clean_report() {
let loader = StubLoader::new(vec![]);
let pipeline = IngestionPipeline::builder(
loader,
RecursiveCharacterSplitter::new(),
Arc::new(StubEmbedder { dimension: 4 }),
Arc::new(InMemoryVectorStore::new(4)),
ns(),
)
.build();
let report = pipeline.run(&ExecutionContext::new()).await.unwrap();
assert_eq!(report.documents_loaded, 0);
assert_eq!(report.chunks_indexed, 0);
assert_eq!(report.embedding_calls, 0);
assert!(report.is_clean());
}
#[tokio::test]
async fn single_document_flows_load_split_embed_store() {
let doc = Document::root(
"doc-1",
"alpha\n\nbeta\n\ngamma",
Source::now("test://doc-1", "test"),
ns(),
);
let loader = StubLoader::new(vec![doc]);
let store = Arc::new(InMemoryVectorStore::new(4));
let pipeline = IngestionPipeline::builder(
loader,
RecursiveCharacterSplitter::new()
.with_chunk_size(10)
.with_chunk_overlap(0),
Arc::new(StubEmbedder { dimension: 4 }),
Arc::clone(&store),
ns(),
)
.build();
let report = pipeline.run(&ExecutionContext::new()).await.unwrap();
assert_eq!(report.documents_loaded, 1);
assert_eq!(report.embedding_calls, 1, "one batch per document");
assert!(report.chunks_indexed >= 3);
assert!(report.is_clean());
let mut probe_vec = vec![0.0_f32; 4];
probe_vec[0] = 5.0;
let hits = store
.search(&ExecutionContext::new(), &ns(), &probe_vec, 10)
.await
.unwrap();
assert!(!hits.is_empty(), "store must contain the indexed chunks");
let metadata = &hits[0].metadata;
assert!(
metadata.get(PROVENANCE_METADATA_KEY).is_some(),
"provenance metadata stamped under reserved key"
);
let provenance = &metadata[PROVENANCE_METADATA_KEY];
assert!(provenance.get("source").is_some());
assert!(provenance.get("lineage").is_some());
assert!(provenance.get("namespace").is_some());
}
#[tokio::test]
async fn partial_loader_failure_recorded_but_run_completes() {
let store = Arc::new(InMemoryVectorStore::new(4));
let pipeline = IngestionPipeline::builder(
PartialLoader,
RecursiveCharacterSplitter::new(),
Arc::new(StubEmbedder { dimension: 4 }),
Arc::clone(&store),
ns(),
)
.build();
let report = pipeline.run(&ExecutionContext::new()).await.unwrap();
assert_eq!(report.documents_loaded, 1, "successful item still indexed");
assert_eq!(report.errors.len(), 1, "loader-side error recorded");
assert_eq!(report.errors[0].stage, "load");
assert!(!report.is_clean());
assert!(report.chunks_indexed >= 1);
}
struct EvenIndexChunker;
#[async_trait]
impl Chunker for EvenIndexChunker {
fn name(&self) -> &'static str {
"even-only"
}
async fn process(
&self,
chunks: Vec<Document>,
_ctx: &ExecutionContext,
) -> Result<Vec<Document>> {
let kept: Vec<Document> = chunks
.into_iter()
.enumerate()
.filter(|(idx, _)| idx % 2 == 0)
.map(|(_, mut chunk)| {
if let Some(lineage) = chunk.lineage.as_mut() {
lineage.push_chunker("even-only");
}
chunk
})
.collect();
Ok(kept)
}
}
#[tokio::test]
async fn chunker_chain_filters_chunks_before_storage() {
let doc = Document::root(
"doc",
"alpha\n\nbeta\n\ngamma\n\ndelta\n\nepsilon",
Source::now("test://", "test"),
ns(),
);
let store = Arc::new(InMemoryVectorStore::new(4));
let raw_chunks = RecursiveCharacterSplitter::new()
.with_chunk_size(7)
.with_chunk_overlap(0)
.split(&doc);
let raw_count = raw_chunks.len();
assert!(
raw_count >= 3,
"splitter must produce at least three chunks for the test to be meaningful: got {raw_count}"
);
let pipeline = IngestionPipeline::builder(
StubLoader::new(vec![doc]),
RecursiveCharacterSplitter::new()
.with_chunk_size(7)
.with_chunk_overlap(0),
Arc::new(StubEmbedder { dimension: 4 }),
Arc::clone(&store),
ns(),
)
.add_chunker(Arc::new(EvenIndexChunker))
.build();
let report = pipeline.run(&ExecutionContext::new()).await.unwrap();
assert!(report.is_clean());
let kept = raw_count.div_ceil(2); let kept_u64 = kept as u64;
assert_eq!(
report.chunks_indexed, kept_u64,
"chunker dropped odd indices; expected {kept} of {raw_count} indexed"
);
let mut probe = vec![0.0_f32; 4];
probe[0] = 5.0;
let hits = store
.search(&ExecutionContext::new(), &ns(), &probe, 100)
.await
.unwrap();
for hit in hits {
let chain = &hit.metadata[PROVENANCE_METADATA_KEY]["lineage"]["chunker_chain"];
let arr = chain.as_array().unwrap();
assert_eq!(arr.len(), 1);
assert_eq!(arr[0].as_str(), Some("even-only"));
}
}
}