entelix-rag 0.5.4

entelix RAG primitives — Document with provenance + lineage, DocumentLoader / TextSplitter / Chunker traits. Algorithmic primitives for retrieval-augmented generation pipelines; concrete source connectors ship as companion crates.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
//! `ContextualChunker` — Anthropic Contextual Retrieval (2024-09).
//!
//! Each chunk's content is prefixed with a 50-100 token contextual
//! summary the model generates by reading the parent document plus
//! the chunk in question. Retrieval accuracy improves ~30% on the
//! benchmarks Anthropic published; the cost is one model call per
//! chunk. Production deployments amortise that cost via prompt
//! caching — the parent document carries
//! [`entelix_core::ir::CacheControl`] on its content part, so every
//! chunk after the first hits the cache and pays only the
//! per-chunk delta.
//!
//! ## Failure handling
//!
//! Network blips and rate-limit hiccups are first-class operational
//! states for an LLM-backed chunker. The
//! [`FailurePolicy`](Self::with_failure_policy) knob picks
//! per-chunk semantics:
//!
//! - [`FailurePolicy::KeepOriginal`] (default) — failed chunks
//!   pass through with their original content; the lineage
//!   `chunker_chain` records the chunker name without a prefix
//!   so audit consumers know the chunker was attempted but
//!   couldn't enrich. Most production use cases want this — one
//!   model glitch shouldn't drop a document's whole indexing
//!   pass.
//! - [`FailurePolicy::Skip`] — failed chunks are dropped from the
//!   output. Use when retrieval relies on the contextual prefix
//!   being present.
//! - [`FailurePolicy::Abort`] — first failure surfaces as
//!   `Err(...)` from `process`; the [`crate::IngestionPipeline`]
//!   records a `chunk` stage error and the document doesn't
//!   index. Strictest mode for compliance-bound flows where a
//!   partial result is worse than no result.

use std::sync::Arc;

use async_trait::async_trait;
use entelix_core::ir::{CacheControl, ContentPart, Message, Role};
use entelix_core::{ExecutionContext, Result};
use entelix_runnable::Runnable;

use crate::chunker::Chunker;
use crate::document::Document;

/// Default operator-facing instruction prepended to every model
/// call. Verbatim from Anthropic's published Contextual Retrieval
/// recipe — lifts the model into the right framing without
/// requiring per-corpus tuning.
pub const CONTEXTUAL_CHUNKER_DEFAULT_INSTRUCTION: &str = "\
You are an expert assistant that produces a short standalone context for a chunk extracted \
from a document. The context will be prepended to the chunk to improve retrieval accuracy. \
Reply with one or two sentences (50-100 tokens) describing how this chunk relates to the \
overall document. Do not echo the chunk content — produce only the contextual summary.";

/// Stable identifier surfaced on every transformed chunk's
/// [`crate::Lineage::chunker_chain`] field.
const CHUNKER_NAME: &str = "contextual";

/// Per-chunk failure policy — picks how the chunker reacts when
/// the underlying model call fails on one chunk. See module docs
/// for the trade-off matrix.
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
#[non_exhaustive]
pub enum FailurePolicy {
    /// Failed chunks pass through unchanged. The default —
    /// matches the partial-success contract of the rest of the
    /// ingestion pipeline.
    #[default]
    KeepOriginal,
    /// Failed chunks are dropped from the output.
    Skip,
    /// First failure aborts; `process` returns the originating
    /// error.
    Abort,
}

/// Builder for [`ContextualChunker`]. Construct via
/// [`ContextualChunker::builder`]; chain config setters; finalise
/// with [`Self::build`].
pub struct ContextualChunkerBuilder<M> {
    model: Arc<M>,
    instruction: String,
    cache_control: Option<CacheControl>,
    failure_policy: FailurePolicy,
}

impl<M> ContextualChunkerBuilder<M>
where
    M: Runnable<Vec<Message>, Message> + 'static,
{
    /// Override the operator-facing instruction. The default is
    /// [`CONTEXTUAL_CHUNKER_DEFAULT_INSTRUCTION`].
    #[must_use]
    pub fn with_instruction(mut self, instruction: impl Into<String>) -> Self {
        self.instruction = instruction.into();
        self
    }

    /// Stamp [`CacheControl`] onto the parent-document content
    /// part of every model call. With caching enabled, the second
    /// chunk onwards hits the prompt cache for the parent doc and
    /// pays only per-chunk delta tokens. Recommended for any
    /// production deployment ingesting docs longer than the
    /// vendor's per-prompt cache threshold (Anthropic ≥ 1024
    /// tokens, OpenAI ≥ 1024 tokens for GPT-4o-class models).
    ///
    /// `None` (the default) leaves caching unconfigured — useful
    /// for operators whose vendor doesn't support prompt caching
    /// or whose documents are too short to amortise the cache
    /// write cost.
    #[must_use]
    pub const fn with_cache_control(mut self, cache: CacheControl) -> Self {
        self.cache_control = Some(cache);
        self
    }

    /// Override the per-chunk [`FailurePolicy`]. Default
    /// [`FailurePolicy::KeepOriginal`].
    #[must_use]
    pub const fn with_failure_policy(mut self, policy: FailurePolicy) -> Self {
        self.failure_policy = policy;
        self
    }

    /// Finalise into a runnable chunker.
    #[must_use]
    pub fn build(self) -> ContextualChunker<M> {
        ContextualChunker {
            model: self.model,
            instruction: Arc::from(self.instruction),
            cache_control: self.cache_control,
            failure_policy: self.failure_policy,
        }
    }
}

/// Anthropic Contextual Retrieval chunker. Each chunk's content is
/// rewritten as `<contextual prefix>\n\n<original chunk>` where
/// `<contextual prefix>` is a model-generated 50-100 token summary
/// of how the chunk relates to its parent document.
///
/// Construct via [`Self::builder`]; clone is cheap (`Arc`-shared
/// model + instruction).
pub struct ContextualChunker<M> {
    model: Arc<M>,
    instruction: Arc<str>,
    cache_control: Option<CacheControl>,
    failure_policy: FailurePolicy,
}

impl<M> ContextualChunker<M>
where
    M: Runnable<Vec<Message>, Message> + 'static,
{
    /// Start a builder bound to the supplied model. The model is
    /// any `Runnable<Vec<Message>, Message>` — `ChatModel`,
    /// layered chat model, mock-for-tests stub.
    #[must_use]
    pub fn builder(model: Arc<M>) -> ContextualChunkerBuilder<M> {
        ContextualChunkerBuilder {
            model,
            instruction: CONTEXTUAL_CHUNKER_DEFAULT_INSTRUCTION.to_owned(),
            cache_control: None,
            failure_policy: FailurePolicy::default(),
        }
    }

    /// Build the user message that frames one chunk's contextual
    /// generation. Three text parts: the operator-facing
    /// instruction, the parent document content (with optional
    /// `cache_control`), and the chunk in question. Single-message
    /// shape so any `Runnable<Vec<Message>, Message>` impl can
    /// execute without recipe-side wiring.
    fn build_prompt(&self, parent_content: &str, chunk_content: &str) -> Vec<Message> {
        let parent_text = format!("<document>\n{parent_content}\n</document>");
        let parent_part = self.cache_control.map_or_else(
            || ContentPart::text(parent_text.clone()),
            |cache| ContentPart::Text {
                text: parent_text.clone(),
                cache_control: Some(cache),
                provider_echoes: Vec::new(),
            },
        );
        let user = Message::new(
            Role::User,
            vec![
                ContentPart::text(self.instruction.to_string()),
                parent_part,
                ContentPart::text(format!("<chunk>\n{chunk_content}\n</chunk>")),
            ],
        );
        vec![user]
    }

    /// Borrow the configured failure policy.
    #[must_use]
    pub const fn failure_policy(&self) -> FailurePolicy {
        self.failure_policy
    }
}

impl<M> Clone for ContextualChunker<M> {
    fn clone(&self) -> Self {
        Self {
            model: Arc::clone(&self.model),
            instruction: Arc::clone(&self.instruction),
            cache_control: self.cache_control,
            failure_policy: self.failure_policy,
        }
    }
}

impl<M> std::fmt::Debug for ContextualChunker<M> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ContextualChunker")
            .field("failure_policy", &self.failure_policy)
            .field("cache_control", &self.cache_control.is_some())
            .finish_non_exhaustive()
    }
}

#[async_trait]
impl<M> Chunker for ContextualChunker<M>
where
    M: Runnable<Vec<Message>, Message> + 'static,
{
    fn name(&self) -> &'static str {
        CHUNKER_NAME
    }

    async fn process(
        &self,
        chunks: Vec<Document>,
        ctx: &ExecutionContext,
    ) -> Result<Vec<Document>> {
        let mut out = Vec::with_capacity(chunks.len());
        for mut chunk in chunks {
            if ctx.is_cancelled() {
                return Err(entelix_core::Error::Cancelled);
            }
            // The chunker operates over post-split chunks — by the
            // time the chunker runs, only chunk-shaped Documents
            // remain in memory; the original parent body has been
            // released by the splitter. The prompt therefore frames
            // the chunk as both `<document>` context and `<chunk>`
            // payload. Operators wanting the *full* parent body in
            // the prompt cache (the highest-leverage Contextual
            // Retrieval shape Anthropic published) stamp the parent
            // text into chunk metadata at split time and supply a
            // bespoke chunker that reads it from there. Default
            // shape works without parent-body propagation; the
            // bespoke shape reaches the upper bound of accuracy.
            let prompt = self.build_prompt(&chunk.content, &chunk.content);
            let outcome = self.model.invoke(prompt, ctx).await;
            match outcome {
                Ok(reply) => {
                    let prefix = extract_text(&reply);
                    if !prefix.is_empty() {
                        chunk.content = format!("{prefix}\n\n{}", chunk.content);
                    }
                    if let Some(lineage) = chunk.lineage.as_mut() {
                        lineage.push_chunker(CHUNKER_NAME);
                    }
                    out.push(chunk);
                }
                Err(err) => match self.failure_policy {
                    FailurePolicy::KeepOriginal => {
                        if let Some(lineage) = chunk.lineage.as_mut() {
                            lineage.push_chunker(CHUNKER_NAME);
                        }
                        out.push(chunk);
                    }
                    FailurePolicy::Skip => {
                        // Drop the chunk; do not record the
                        // chunker on lineage since the chunk did
                        // not undergo the transform.
                    }
                    FailurePolicy::Abort => return Err(err),
                },
            }
        }
        Ok(out)
    }
}

/// Pull the assistant's reply text out of the [`Message`] the model
/// returned. Concatenates every [`ContentPart::Text`] part with
/// blank-line separators; non-text parts (tool-use, image-output)
/// are skipped — a contextual chunker that emits tool calls is a
/// misconfiguration we silently ignore rather than fail on.
fn extract_text(message: &Message) -> String {
    let mut buf = String::new();
    for part in &message.content {
        if let ContentPart::Text { text, .. } = part {
            if !buf.is_empty() {
                buf.push_str("\n\n");
            }
            buf.push_str(text);
        }
    }
    buf.trim().to_owned()
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::document::{Lineage, Source};
    use entelix_memory::Namespace;
    use std::sync::Mutex;

    fn ns() -> Namespace {
        Namespace::new(entelix_core::TenantId::new("acme"))
    }

    fn chunk_with_content(content: &str, idx: u32) -> Document {
        let parent = Document::root("doc", "<parent>", Source::now("test://", "test"), ns());
        let lineage = Lineage::from_split(parent.id.clone(), idx, 3, "test-splitter");
        parent.child(content, lineage)
    }

    /// Scripted model that pops the next reply per invocation.
    /// Every Ok yields one assistant text; every Err pops a
    /// scripted error. Empty queue panics.
    struct ScriptedModel {
        script: Mutex<Vec<Result<String>>>,
    }

    impl ScriptedModel {
        fn new(script: Vec<Result<String>>) -> Self {
            Self {
                script: Mutex::new(script.into_iter().rev().collect()),
            }
        }
    }

    #[async_trait]
    impl Runnable<Vec<Message>, Message> for ScriptedModel {
        async fn invoke(&self, _input: Vec<Message>, _ctx: &ExecutionContext) -> Result<Message> {
            let next = self
                .script
                .lock()
                .unwrap()
                .pop()
                .expect("ScriptedModel exhausted");
            next.map(|text| Message::new(Role::Assistant, vec![ContentPart::text(text)]))
        }
    }

    #[tokio::test]
    async fn empty_input_produces_empty_output() {
        let model = Arc::new(ScriptedModel::new(vec![]));
        let chunker = ContextualChunker::builder(model).build();
        let out = chunker
            .process(Vec::new(), &ExecutionContext::new())
            .await
            .unwrap();
        assert!(out.is_empty());
    }

    #[tokio::test]
    async fn happy_path_prepends_contextual_prefix_and_records_lineage() {
        let model = Arc::new(ScriptedModel::new(vec![
            Ok("This chunk explains the alpha case.".into()),
            Ok("This chunk covers the beta path.".into()),
        ]));
        let chunker = ContextualChunker::builder(model).build();
        let chunks = vec![
            chunk_with_content("alpha body", 0),
            chunk_with_content("beta body", 1),
        ];
        let out = chunker
            .process(chunks, &ExecutionContext::new())
            .await
            .unwrap();
        assert_eq!(out.len(), 2);
        assert!(
            out[0]
                .content
                .starts_with("This chunk explains the alpha case."),
            "prefix prepended: {:?}",
            out[0].content
        );
        assert!(out[0].content.ends_with("alpha body"));
        for chunk in &out {
            let chain = &chunk.lineage.as_ref().unwrap().chunker_chain;
            assert_eq!(chain.len(), 1);
            assert_eq!(chain[0], CHUNKER_NAME);
        }
    }

    #[tokio::test]
    async fn failure_policy_keep_original_passes_through_unmodified_content() {
        // Mid-batch failure: chunk 1 succeeds, chunk 2 errors,
        // chunk 3 succeeds. KeepOriginal default → all three land
        // in the output, chunk 2 with its original content.
        let model = Arc::new(ScriptedModel::new(vec![
            Ok("alpha context.".into()),
            Err(entelix_core::Error::provider_http(503, "transient")),
            Ok("gamma context.".into()),
        ]));
        let chunker = ContextualChunker::builder(model).build();
        let chunks = vec![
            chunk_with_content("alpha body", 0),
            chunk_with_content("beta body", 1),
            chunk_with_content("gamma body", 2),
        ];
        let out = chunker
            .process(chunks, &ExecutionContext::new())
            .await
            .unwrap();
        assert_eq!(out.len(), 3);
        assert!(out[0].content.starts_with("alpha context."));
        assert_eq!(
            out[1].content, "beta body",
            "failed chunk passes through with original content"
        );
        // Lineage still records the chunker attempt — audit
        // consumers know the chunker ran (even though it didn't
        // enrich) for that chunk.
        assert_eq!(
            out[1].lineage.as_ref().unwrap().chunker_chain,
            vec![CHUNKER_NAME.to_owned()]
        );
        assert!(out[2].content.starts_with("gamma context."));
    }

    #[tokio::test]
    async fn failure_policy_skip_drops_failed_chunks() {
        let model = Arc::new(ScriptedModel::new(vec![
            Ok("alpha context.".into()),
            Err(entelix_core::Error::provider_http(503, "transient")),
            Ok("gamma context.".into()),
        ]));
        let chunker = ContextualChunker::builder(model)
            .with_failure_policy(FailurePolicy::Skip)
            .build();
        let chunks = vec![
            chunk_with_content("alpha body", 0),
            chunk_with_content("beta body", 1),
            chunk_with_content("gamma body", 2),
        ];
        let out = chunker
            .process(chunks, &ExecutionContext::new())
            .await
            .unwrap();
        assert_eq!(out.len(), 2, "failed chunk dropped");
        assert!(out[0].content.starts_with("alpha context."));
        assert!(out[1].content.starts_with("gamma context."));
    }

    #[tokio::test]
    async fn failure_policy_abort_returns_first_error() {
        let model = Arc::new(ScriptedModel::new(vec![
            Ok("alpha context.".into()),
            Err(entelix_core::Error::provider_http(503, "transient")),
        ]));
        let chunker = ContextualChunker::builder(model)
            .with_failure_policy(FailurePolicy::Abort)
            .build();
        let chunks = vec![
            chunk_with_content("alpha body", 0),
            chunk_with_content("beta body", 1),
            chunk_with_content("gamma body", 2),
        ];
        let err = chunker
            .process(chunks, &ExecutionContext::new())
            .await
            .unwrap_err();
        assert!(matches!(
            err,
            entelix_core::Error::Provider {
                kind: entelix_core::ProviderErrorKind::Http(503),
                ..
            }
        ));
    }

    #[tokio::test]
    async fn cache_control_attached_when_configured() {
        // Verifies the parent-content message part carries the
        // configured cache_control. The chunker doesn't directly
        // observe what the model receives, so we inspect the prompt
        // it would build via the public-shape helper.
        let model = Arc::new(ScriptedModel::new(vec![Ok("ok".into())]));
        let chunker = ContextualChunker::builder(model)
            .with_cache_control(CacheControl::one_hour())
            .build();
        let prompt = chunker.build_prompt("parent body", "chunk body");
        // First message is user; its content[1] is the parent part.
        let parent_part = &prompt[0].content[1];
        match parent_part {
            ContentPart::Text { cache_control, .. } => {
                assert!(
                    cache_control.is_some(),
                    "cache_control stamped on parent part"
                );
            }
            _ => panic!("parent part must be Text"),
        }
    }

    #[tokio::test]
    async fn cancellation_short_circuits_between_chunks() {
        let model = Arc::new(ScriptedModel::new(vec![Ok("alpha context.".into())]));
        let chunker = ContextualChunker::builder(model).build();
        let token = entelix_core::cancellation::CancellationToken::new();
        let ctx = ExecutionContext::with_cancellation(token.clone());
        token.cancel();
        let err = chunker
            .process(vec![chunk_with_content("alpha", 0)], &ctx)
            .await
            .unwrap_err();
        assert!(matches!(err, entelix_core::Error::Cancelled));
    }
}