Skip to main content

ai_memory/multistep_ingest/
executor.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Form 3 — pipeline executor.
5//!
6//! Threads the deterministic helpers through their stages first
7//! (parallel-where-independent), then dispatches the LLM stages with
8//! the shared-prefix prompt assembled in [`super::cache`]. Trust slots
9//! are resolved against the in-flight stage outputs and rendered into
10//! the LLM prompt verbatim, so the explicit-trust contract holds end-
11//! to-end.
12
13use std::sync::Arc;
14
15use serde::{Deserialize, Serialize};
16use serde_json::{Value, json};
17
18use super::cache::{
19    CacheKey, EXPLICIT_TRUST_INSTRUCTION, PromptCacheTelemetry, build_shared_prefix,
20};
21#[cfg(test)]
22use super::helpers::HelperParams;
23use super::helpers::{HelperContext, HelperOutput, MemoryHandle, run_helper_with};
24use super::pipeline::{HelperOutputRef, Pipeline, Stage};
25
26/// Default cap on the number of characters of `content` inlined into a
27/// single Form 3 multistep-ingest LLM stage (issue #782 PERF-11).
28/// Mirrors the synthesis-prompt cap from Cluster B (PERF-7); operators
29/// override per-namespace via
30/// [`crate::models::GovernancePolicy::multistep_max_content_chars`].
31pub const DEFAULT_MULTISTEP_MAX_CONTENT_CHARS: usize = 1500;
32
33/// Per-stage trace entry produced by the executor.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(tag = "stage_type", rename_all = "snake_case")]
36pub enum StageOutcome {
37    /// A deterministic helper stage.
38    Helper {
39        /// Stage index in the pipeline.
40        index: usize,
41        /// Snake-case helper discriminator (matches
42        /// `HelperKind::as_str`).
43        helper: String,
44        /// Helper's one-line summary (operator-facing).
45        summary: String,
46        /// Structured payload threaded into downstream LLM stages.
47        payload: Value,
48        /// v0.7.0 polish (issue #782 PERF-11) — number of `content`
49        /// bytes the executor surfaced to this helper stage. Helper
50        /// stages receive content by **borrow**, so this number is the
51        /// size of the same backing string across every helper in the
52        /// run — operators inspecting the trace can prove the
53        /// content-clone-per-stage regression has not regressed.
54        content_bytes: usize,
55    },
56    /// An LLM call stage.
57    LlmCall {
58        /// Stage index in the pipeline.
59        index: usize,
60        /// Stage label from the descriptor.
61        label: String,
62        /// Prompt string sent to the LLM (shared prefix + trust slots
63        /// + per-stage body). Included verbatim so test assertions can
64        /// check for the explicit-trust phrase.
65        prompt: String,
66        /// Prompt-cache key derived from the shared prefix.
67        cache_key: String,
68        /// LLM response — parsed as JSON when the response was JSON,
69        /// or wrapped in `{"raw": "..."}` when the LLM returned text.
70        response: Value,
71        /// v0.7.0 polish (issue #782 PERF-11) — number of `content`
72        /// bytes actually inlined into the LLM prompt **after** the
73        /// `multistep_max_content_chars` cap was applied. Lets
74        /// operators observe truncation events without diffing the
75        /// raw prompt strings.
76        content_bytes: usize,
77        /// v0.7.0 polish (issue #782 PERF-11) — `true` when the
78        /// content was truncated to fit the cap.
79        content_truncated: bool,
80    },
81}
82
83/// Full execution trace returned by [`IngestExecutor::run`].
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct ExecutionTrace {
86    /// Pipeline variant tag (`"two_phase"` / `"four_step"`).
87    pub variant: String,
88    /// Stage-by-stage outcomes in execution order.
89    pub stages: Vec<StageOutcome>,
90    /// Distinct cache keys observed across LLM stages. Form 3's
91    /// acceptance criterion is that this set has length 1 (or 0 when
92    /// the pipeline has no LLM stages).
93    pub distinct_cache_keys: Vec<String>,
94    /// `true` when every LLM stage shared the same cache key.
95    pub prompt_cache_consistent: bool,
96    /// Final structured output emitted by the last LLM stage, OR the
97    /// last helper stage if the pipeline had no LLM stages.
98    pub final_output: Value,
99    /// v0.7.0 polish (issue #782 PERF-11) — per-stage content-bytes
100    /// histogram. Indexed by stage execution order (matches
101    /// `stages[i]`). Helpers report the borrowed-slice length; LLM
102    /// stages report the post-truncation length. Operators threading
103    /// the trace into Prometheus/Statsd can publish this as a
104    /// histogram with one bucket per stage label.
105    pub bytes_per_stage: Vec<usize>,
106}
107
108/// Structured error surface for the executor.
109#[derive(Debug, Clone, PartialEq, Eq)]
110pub enum ExecutorError {
111    /// A trust slot pointed at a stage index that hasn't run yet, or
112    /// at a non-helper stage.
113    InvalidTrustSlot {
114        /// Stage index the trust slot pointed at.
115        stage_index: usize,
116        /// Label of the trust slot that failed to resolve.
117        label: String,
118    },
119    /// The LLM dispatch returned an error.
120    LlmDispatch(String),
121    /// Pipeline descriptor had no stages — nothing to execute.
122    EmptyPipeline,
123}
124
125impl std::fmt::Display for ExecutorError {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        match self {
128            Self::InvalidTrustSlot { stage_index, label } => write!(
129                f,
130                "invalid trust slot: stage_index={stage_index} (label={label})"
131            ),
132            Self::LlmDispatch(msg) => write!(f, "llm dispatch failed: {msg}"),
133            Self::EmptyPipeline => write!(f, "pipeline has no stages"),
134        }
135    }
136}
137
138impl std::error::Error for ExecutorError {}
139
140/// LLM dispatch trait — abstracted so tests can wire a deterministic
141/// mock while production binds to `OllamaClient::generate` via
142/// [`OllamaDispatch`].
143pub trait LlmDispatch: Send + Sync {
144    /// Dispatch a single LLM call. The prompt carries the full
145    /// shared-prefix + trust slots + stage body; the executor has
146    /// already recorded the cache key.
147    ///
148    /// # Errors
149    ///
150    /// Returns `Err(String)` when the underlying LLM call fails — the
151    /// executor maps that into [`ExecutorError::LlmDispatch`].
152    fn dispatch(&self, prompt: &str) -> Result<String, String>;
153}
154
155/// Production binding to the project's `OllamaClient::generate`. Wraps
156/// the existing breaker / timeout discipline.
157pub struct OllamaDispatch {
158    client: Arc<crate::llm::OllamaClient>,
159}
160
161impl OllamaDispatch {
162    /// Construct a production dispatch around an existing `OllamaClient`.
163    #[must_use]
164    pub fn new(client: Arc<crate::llm::OllamaClient>) -> Self {
165        Self { client }
166    }
167}
168
169impl LlmDispatch for OllamaDispatch {
170    fn dispatch(&self, prompt: &str) -> Result<String, String> {
171        self.client
172            .generate(prompt, None)
173            .map_err(|e| e.to_string())
174    }
175}
176
177/// Deterministic mock dispatch used by the test suite and the
178/// cookbook demo. Pops canned responses off a queue; returns
179/// `Err("mock: queue exhausted")` once empty so tests catch over-call
180/// bugs.
181pub struct MockLlmDispatch {
182    responses: std::sync::Mutex<Vec<Result<String, String>>>,
183}
184
185impl MockLlmDispatch {
186    /// Construct a mock dispatch with a canned response queue.
187    #[must_use]
188    pub fn new(responses: Vec<Result<String, String>>) -> Self {
189        Self {
190            responses: std::sync::Mutex::new(responses),
191        }
192    }
193}
194
195impl LlmDispatch for MockLlmDispatch {
196    fn dispatch(&self, _prompt: &str) -> Result<String, String> {
197        let mut q = self.responses.lock().expect("mutex not poisoned in tests");
198        if q.is_empty() {
199            return Err("mock: queue exhausted".to_string());
200        }
201        q.remove(0)
202    }
203}
204
205/// The orchestrator. Walks a [`Pipeline`] start-to-finish, runs
206/// helpers up front (parallel-where-independent), threads outputs into
207/// LLM stages with explicit-trust slots, and returns an
208/// [`ExecutionTrace`].
209pub struct IngestExecutor<D: LlmDispatch + ?Sized> {
210    dispatch: Arc<D>,
211    telemetry: Arc<PromptCacheTelemetry>,
212    /// v0.7.0 polish (issue #782 PERF-11) — per-LLM-stage content cap.
213    /// `None` defers to [`DEFAULT_MULTISTEP_MAX_CONTENT_CHARS`].
214    /// Operators set this via [`Self::with_max_content_chars`] after
215    /// resolving the per-namespace
216    /// [`crate::models::GovernancePolicy::multistep_max_content_chars`].
217    max_content_chars: Option<usize>,
218    /// v0.7.0 polish (issue #782 PERF-11) — debug-build test seam
219    /// recording `content.as_ptr() as usize` for every helper
220    /// invocation. Used by the borrow-not-clone acceptance test
221    /// (`tests/form_3_multistep_ingest.rs::
222    /// multistep_phase_1_helpers_receive_content_borrow_not_clone`)
223    /// to prove that the content string is threaded by reference,
224    /// not duplicated per helper. Release builds elide the
225    /// recording entirely so production paths see zero overhead.
226    helper_content_ptrs: Arc<std::sync::Mutex<Vec<usize>>>,
227}
228
229impl<D: LlmDispatch + ?Sized> IngestExecutor<D> {
230    /// Construct an executor around a dispatch implementation.
231    #[must_use]
232    pub fn new(dispatch: Arc<D>) -> Self {
233        Self {
234            dispatch,
235            telemetry: Arc::new(PromptCacheTelemetry::new()),
236            max_content_chars: None,
237            helper_content_ptrs: Arc::new(std::sync::Mutex::new(Vec::new())),
238        }
239    }
240
241    /// Builder-style setter for the per-LLM-stage content cap (issue
242    /// #782 PERF-11). Callers resolve the namespace policy via
243    /// [`crate::models::GovernancePolicy::effective_multistep_max_content_chars`]
244    /// and thread the value here before calling [`Self::run`].
245    #[must_use]
246    pub fn with_max_content_chars(mut self, cap: usize) -> Self {
247        self.max_content_chars = Some(cap);
248        self
249    }
250
251    /// Telemetry handle. Used by the MCP tool surface to surface the
252    /// per-run cache-key trace.
253    #[must_use]
254    pub fn telemetry(&self) -> Arc<PromptCacheTelemetry> {
255        Arc::clone(&self.telemetry)
256    }
257
258    /// v0.7.0 polish (issue #782 PERF-11) — debug-build test seam
259    /// returning the helper-content pointer recordings from the
260    /// most-recent `run()`. The integration test
261    /// `multistep_phase_1_helpers_receive_content_borrow_not_clone`
262    /// pins the borrow invariant by asserting every entry is the
263    /// same pointer.
264    ///
265    /// Hidden from rustdoc because it is a test seam, not a
266    /// production API. The recorder is only populated under
267    /// `debug_assertions` (debug builds); release builds return an
268    /// empty vec so the call has zero observable overhead in
269    /// production.
270    #[doc(hidden)]
271    #[must_use]
272    pub fn helper_content_ptrs(&self) -> Vec<usize> {
273        self.helper_content_ptrs
274            .lock()
275            .map(|g| g.clone())
276            .unwrap_or_default()
277    }
278
279    /// Run a pipeline against an incoming content blob + candidate
280    /// memory set.
281    ///
282    /// # Errors
283    ///
284    /// - [`ExecutorError::EmptyPipeline`] if the descriptor has no
285    ///   stages.
286    /// - [`ExecutorError::InvalidTrustSlot`] if an LLM stage references
287    ///   a stage index that hasn't run yet or doesn't refer to a
288    ///   helper.
289    /// - [`ExecutorError::LlmDispatch`] if the underlying LLM call
290    ///   fails.
291    pub fn run(
292        &self,
293        pipeline: &Pipeline,
294        content: &str,
295        candidates: &[MemoryHandle],
296        content_embedding: Option<&[f32]>,
297        namespace: Option<&str>,
298    ) -> Result<ExecutionTrace, ExecutorError> {
299        if pipeline.stages.is_empty() {
300            return Err(ExecutorError::EmptyPipeline);
301        }
302
303        let mut helper_outputs: Vec<Option<HelperOutput>> = vec![None; pipeline.stages.len()];
304        let mut stage_outcomes: Vec<StageOutcome> = Vec::with_capacity(pipeline.stages.len());
305        let mut bytes_per_stage: Vec<usize> = Vec::with_capacity(pipeline.stages.len());
306
307        // v0.7.0 polish (issue #782 PERF-11): build the borrowed helper
308        // context ONCE per run. Every helper stage in Phase 1 receives
309        // the SAME `&str` slice — the executor never clones `content`
310        // into a per-stage `HelperParams::content`. The
311        // `multistep_phase_1_helpers_receive_content_borrow_not_clone`
312        // integration test pins this invariant by asserting the
313        // pointer recorded for each helper is identical.
314        let helper_ctx = HelperContext::new(content, candidates, content_embedding, namespace);
315        // v0.7.0 polish (issue #782 PERF-11): record the caller's
316        // pointer once so the borrow-not-clone invariant can be
317        // observed by the integration test. Debug builds only —
318        // release builds skip the recording entirely.
319        #[cfg(debug_assertions)]
320        let content_ptr_for_test = content.as_ptr() as usize;
321
322        // Phase 1: run every helper stage in declaration order. Helpers
323        // are pure functions of their `HelperParams`, so a future
324        // optimisation could parallelise them via rayon; for now the
325        // serial walk keeps the trace deterministic for tests.
326        for (idx, stage) in pipeline.stages.iter().enumerate() {
327            if let Stage::Helper { kind, params } = stage {
328                #[cfg(debug_assertions)]
329                {
330                    // Record the pointer of the EFFECTIVE content slice
331                    // for the borrow-not-clone acceptance test. The
332                    // ctx's `effective_content` returns either the
333                    // descriptor override (rare) or the same borrowed
334                    // slice across every stage.
335                    let effective_ptr = helper_ctx.effective_content(params).as_ptr() as usize;
336                    if let Ok(mut g) = self.helper_content_ptrs.lock() {
337                        g.push(effective_ptr);
338                    }
339                    // Pin against accidental drift: if no descriptor
340                    // override is present, the pointer MUST equal the
341                    // caller's `content.as_ptr()`.
342                    if params.content.is_empty() {
343                        debug_assert_eq!(effective_ptr, content_ptr_for_test);
344                    }
345                }
346                let out = run_helper_with(*kind, params, &helper_ctx);
347                // Helpers see the borrowed slice — the byte count is
348                // the size of the SAME backing string across every
349                // stage in the run.
350                bytes_per_stage.push(content.len());
351                stage_outcomes.push(StageOutcome::Helper {
352                    index: idx,
353                    helper: out.kind.as_str().to_string(),
354                    summary: out.summary.clone(),
355                    payload: out.payload.clone(),
356                    content_bytes: content.len(),
357                });
358                helper_outputs[idx] = Some(out);
359            }
360        }
361
362        // Phase 2: walk the LLM stages, assembling the shared-prefix
363        // prompt and resolving trust slots against helper_outputs.
364        let prefix = build_shared_prefix(pipeline.variant_tag(), &pipeline.system_prompt);
365        let cache_key = CacheKey::from_prefix(&prefix);
366        let llm_cap = self
367            .max_content_chars
368            .unwrap_or(DEFAULT_MULTISTEP_MAX_CONTENT_CHARS);
369
370        let mut last_llm_response: Option<Value> = None;
371
372        for (idx, stage) in pipeline.stages.iter().enumerate() {
373            let Stage::LlmCall {
374                prompt_template,
375                trust_inputs,
376                output_schema,
377                label,
378            } = stage
379            else {
380                continue;
381            };
382
383            // Build the trust-slot block.
384            let trust_block = render_trust_inputs(trust_inputs, &helper_outputs)?;
385
386            // v0.7.0 polish (issue #782 PERF-11): cap the content
387            // inlined into the LLM prompt to `llm_cap` characters
388            // (default 1500, mirroring Cluster B's PERF-7 synthesis
389            // cap). Truncation only affects the LLM prompt — the
390            // helper payloads and the caller-visible final output are
391            // untouched. The truncation marker keeps the LLM informed
392            // that it's seeing a clipped view so it doesn't
393            // hallucinate "completeness" claims.
394            let (content_view, truncated) = truncate_content_for_llm(content, llm_cap);
395
396            // Compose the full prompt: shared prefix + stage tail.
397            let stage_tail = format!(
398                "\n[STAGE label={label} index={idx}]\n\
399                 [INCOMING CONTENT]\n{content_view}\n\
400                 [TRUST INPUTS]\n{trust_block}\n\
401                 [TASK]\n{prompt_template}\n\
402                 [OUTPUT SCHEMA]\n{schema}\n",
403                schema = serde_json::to_string(output_schema).unwrap_or_else(|_| "{}".to_string()),
404            );
405            let prompt = format!("{prefix}{stage_tail}");
406
407            // Telemetry: cache key is the same for every LLM stage in
408            // the run because the prefix is identical.
409            self.telemetry.record(cache_key.clone());
410
411            let response_text = self
412                .dispatch
413                .dispatch(&prompt)
414                .map_err(ExecutorError::LlmDispatch)?;
415
416            let response_value = match serde_json::from_str::<Value>(&response_text) {
417                Ok(v) => v,
418                Err(_) => json!({ "raw": response_text }),
419            };
420
421            let content_bytes = content_view.len();
422            bytes_per_stage.push(content_bytes);
423            stage_outcomes.push(StageOutcome::LlmCall {
424                index: idx,
425                label: label.clone(),
426                prompt,
427                cache_key: cache_key.as_hex().to_string(),
428                response: response_value.clone(),
429                content_bytes,
430                content_truncated: truncated,
431            });
432            last_llm_response = Some(response_value);
433        }
434
435        let distinct_cache_keys: Vec<String> = {
436            let mut seen: Vec<String> =
437                self.telemetry.snapshot().into_iter().map(|k| k.0).collect();
438            seen.sort();
439            seen.dedup();
440            seen
441        };
442        let prompt_cache_consistent = self.telemetry.all_keys_match();
443
444        // Choose the final output: last LLM response if any, else the
445        // last helper payload.
446        let final_output = last_llm_response.unwrap_or_else(|| {
447            helper_outputs
448                .iter()
449                .rev()
450                .find_map(|o| o.as_ref().map(|h| h.payload.clone()))
451                .unwrap_or_else(|| json!({}))
452        });
453
454        Ok(ExecutionTrace {
455            variant: pipeline.variant_tag().to_string(),
456            stages: stage_outcomes,
457            distinct_cache_keys,
458            prompt_cache_consistent,
459            final_output,
460            bytes_per_stage,
461        })
462    }
463}
464
465/// v0.7.0 polish (issue #782 PERF-11) — truncate `content` to at most
466/// `cap` characters (codepoint-safe), appending a `[...truncated N
467/// chars]` marker when truncation occurred. Returns the rendered view
468/// + a flag so the caller can record the truncation event in the
469/// trace.
470///
471/// A `cap` of `0` is treated as "do not truncate"; callers who want to
472/// disable the LLM content slot entirely should compose a different
473/// prompt template instead.
474fn truncate_content_for_llm(content: &str, cap: usize) -> (std::borrow::Cow<'_, str>, bool) {
475    use std::fmt::Write as _;
476    if cap == 0 {
477        return (std::borrow::Cow::Borrowed(content), false);
478    }
479    let total_chars = content.chars().count();
480    if total_chars <= cap {
481        return (std::borrow::Cow::Borrowed(content), false);
482    }
483    let mut truncated: String = content.chars().take(cap).collect();
484    // `write!` into a `String` is infallible — discard the error to
485    // satisfy clippy::format_push_string.
486    let _ = write!(
487        truncated,
488        " [...truncated {} chars]",
489        total_chars.saturating_sub(cap)
490    );
491    (std::borrow::Cow::Owned(truncated), true)
492}
493
494/// Render the trust-slot block for an LLM stage's prompt. Each slot's
495/// label and payload appears under the explicit-trust banner so the
496/// LLM sees the same instruction every stage.
497fn render_trust_inputs(
498    inputs: &[HelperOutputRef],
499    helper_outputs: &[Option<HelperOutput>],
500) -> Result<String, ExecutorError> {
501    if inputs.is_empty() {
502        return Ok(format!("(none — but: {EXPLICIT_TRUST_INSTRUCTION})"));
503    }
504    let mut out = String::new();
505    out.push_str(EXPLICIT_TRUST_INSTRUCTION);
506    out.push_str("\n\n");
507    for input in inputs {
508        let payload = helper_outputs
509            .get(input.stage_index)
510            .and_then(|o| o.as_ref())
511            .ok_or_else(|| ExecutorError::InvalidTrustSlot {
512                stage_index: input.stage_index,
513                label: input.label.clone(),
514            })?;
515        out.push_str(&format!(
516            "<<TRUST label={} helper={}>>\n{}\n<<END TRUST>>\n\n",
517            input.label,
518            payload.kind.as_str(),
519            serde_json::to_string_pretty(&payload.payload).unwrap_or_default()
520        ));
521    }
522    Ok(out)
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use crate::multistep_ingest::pipeline::{four_step_default, two_phase_default};
529
530    fn mh(id: &str, body: &str) -> MemoryHandle {
531        MemoryHandle {
532            id: id.to_string(),
533            body: body.to_string(),
534            embedding: None,
535            namespace: None,
536        }
537    }
538
539    #[test]
540    fn helper_then_llm_runs_in_order_and_renders_trust_slot() {
541        let mock = MockLlmDispatch::new(vec![Ok(
542            r#"{"title":"T","summary":"S","tags":[],"atoms":[]}"#.to_string(),
543        )]);
544        let exec = IngestExecutor::new(Arc::new(mock));
545        let pipeline = two_phase_default();
546        let trace = exec
547            .run(
548                &pipeline,
549                "the quick brown fox",
550                &[mh("c1", "a quick fox")],
551                None,
552                Some("global"),
553            )
554            .expect("pipeline runs");
555
556        // First two stages are helpers; third is the LLM call.
557        assert!(matches!(trace.stages[0], StageOutcome::Helper { .. }));
558        assert!(matches!(trace.stages[1], StageOutcome::Helper { .. }));
559        assert!(matches!(trace.stages[2], StageOutcome::LlmCall { .. }));
560
561        // The LLM prompt must carry the explicit-trust instruction.
562        if let StageOutcome::LlmCall { prompt, .. } = &trace.stages[2] {
563            assert!(
564                prompt.contains(EXPLICIT_TRUST_INSTRUCTION),
565                "LLM prompt must carry the explicit-trust instruction verbatim"
566            );
567            assert!(
568                prompt.contains("jaccard_overlap") || prompt.contains("fts_classifier"),
569                "LLM prompt must cite a helper kind from the trust slots"
570            );
571        } else {
572            panic!("stage 2 must be an LLM call");
573        }
574    }
575
576    #[test]
577    fn two_phase_pipeline_produces_structured_output() {
578        let mock = MockLlmDispatch::new(vec![Ok(
579            r#"{"title":"T","summary":"S","tags":["a"],"atoms":["one","two"]}"#.to_string(),
580        )]);
581        let exec = IngestExecutor::new(Arc::new(mock));
582        let pipeline = two_phase_default();
583        let trace = exec
584            .run(&pipeline, "anything", &[], None, None)
585            .expect("ok");
586        assert_eq!(trace.variant, "two_phase");
587        assert_eq!(trace.final_output["title"], "T");
588        assert_eq!(trace.final_output["atoms"].as_array().unwrap().len(), 2);
589    }
590
591    #[test]
592    fn four_step_pipeline_produces_structured_output() {
593        let mock = MockLlmDispatch::new(vec![
594            Ok(r#"{"fact_kind":"declarative","confidence":0.9}"#.to_string()),
595            Ok(r#"{"entities":["a"],"claims":["c"],"relations":[]}"#.to_string()),
596            Ok(r#"{"title":"X","summary":"Y","tags":[],"proposed_links":[]}"#.to_string()),
597        ]);
598        let exec = IngestExecutor::new(Arc::new(mock));
599        let pipeline = four_step_default();
600        let trace = exec
601            .run(
602                &pipeline,
603                "Paris is the capital of France.",
604                &[],
605                None,
606                None,
607            )
608            .expect("ok");
609        assert_eq!(trace.variant, "four_step");
610        // Three LLM stages → three entries in the cache-key trace.
611        let llm_count = trace
612            .stages
613            .iter()
614            .filter(|s| matches!(s, StageOutcome::LlmCall { .. }))
615            .count();
616        assert_eq!(llm_count, 3);
617        // Final output is the emit stage's response.
618        assert_eq!(trace.final_output["title"], "X");
619    }
620
621    #[test]
622    fn prompt_cache_key_is_consistent_across_stages_within_a_run() {
623        let mock = MockLlmDispatch::new(vec![
624            Ok(r#"{"fact_kind":"declarative","confidence":0.5}"#.to_string()),
625            Ok(r#"{"entities":[],"claims":[],"relations":[]}"#.to_string()),
626            Ok(r#"{"title":"T","summary":"S","tags":[],"proposed_links":[]}"#.to_string()),
627        ]);
628        let exec = IngestExecutor::new(Arc::new(mock));
629        let pipeline = four_step_default();
630        let trace = exec.run(&pipeline, "content", &[], None, None).expect("ok");
631        assert!(
632            trace.prompt_cache_consistent,
633            "every LLM stage within a run must share the cache key"
634        );
635        assert_eq!(
636            trace.distinct_cache_keys.len(),
637            1,
638            "exactly one distinct cache key for a single-variant run"
639        );
640    }
641
642    #[test]
643    fn explicit_trust_instruction_appears_in_every_llm_prompt() {
644        let mock = MockLlmDispatch::new(vec![
645            Ok("{}".to_string()),
646            Ok("{}".to_string()),
647            Ok("{}".to_string()),
648        ]);
649        let exec = IngestExecutor::new(Arc::new(mock));
650        let pipeline = four_step_default();
651        let trace = exec.run(&pipeline, "content", &[], None, None).expect("ok");
652        for stage in &trace.stages {
653            if let StageOutcome::LlmCall { prompt, .. } = stage {
654                assert!(
655                    prompt.contains(EXPLICIT_TRUST_INSTRUCTION),
656                    "every LLM prompt must carry the explicit-trust phrase"
657                );
658            }
659        }
660    }
661
662    #[test]
663    fn empty_pipeline_returns_structured_error() {
664        let mock = MockLlmDispatch::new(vec![]);
665        let exec = IngestExecutor::new(Arc::new(mock));
666        let pipeline = Pipeline {
667            variant: super::super::pipeline::PipelineVariant::TwoPhase,
668            stages: vec![],
669            system_prompt: String::new(),
670        };
671        let err = exec
672            .run(&pipeline, "x", &[], None, None)
673            .expect_err("empty pipeline should error");
674        assert!(matches!(err, ExecutorError::EmptyPipeline));
675    }
676
677    #[test]
678    fn helper_only_pipeline_uses_last_helper_payload_as_final_output() {
679        let mock = MockLlmDispatch::new(vec![]);
680        let exec = IngestExecutor::new(Arc::new(mock));
681        let pipeline = Pipeline {
682            variant: super::super::pipeline::PipelineVariant::TwoPhase,
683            stages: vec![Stage::Helper {
684                kind: super::super::helpers::HelperKind::FtsClassifier,
685                params: HelperParams::default(),
686            }],
687            system_prompt: String::new(),
688        };
689        let trace = exec
690            .run(&pipeline, "first, do X. then do Y.", &[], None, None)
691            .expect("ok");
692        assert_eq!(trace.final_output["helper"], "fts_classifier");
693        assert_eq!(trace.final_output["fact_kind"], "procedural");
694    }
695
696    #[test]
697    fn invalid_trust_slot_index_returns_structured_error() {
698        let mock = MockLlmDispatch::new(vec![Ok("{}".to_string())]);
699        let exec = IngestExecutor::new(Arc::new(mock));
700        let pipeline = Pipeline {
701            variant: super::super::pipeline::PipelineVariant::TwoPhase,
702            stages: vec![Stage::LlmCall {
703                prompt_template: "anything".to_string(),
704                trust_inputs: vec![HelperOutputRef {
705                    stage_index: 99,
706                    label: "missing".to_string(),
707                }],
708                output_schema: json!({}),
709                label: "broken".to_string(),
710            }],
711            system_prompt: "x".to_string(),
712        };
713        let err = exec
714            .run(&pipeline, "y", &[], None, None)
715            .expect_err("invalid trust slot must error");
716        assert!(matches!(err, ExecutorError::InvalidTrustSlot { .. }));
717    }
718
719    #[test]
720    fn telemetry_records_one_key_per_llm_stage() {
721        let mock = MockLlmDispatch::new(vec![
722            Ok("{}".to_string()),
723            Ok("{}".to_string()),
724            Ok("{}".to_string()),
725        ]);
726        let exec = IngestExecutor::new(Arc::new(mock));
727        let telemetry = exec.telemetry();
728        let pipeline = four_step_default();
729        exec.run(&pipeline, "content", &[], None, None).unwrap();
730        assert_eq!(telemetry.len(), 3, "four-step has 3 LLM stages");
731        assert!(telemetry.all_keys_match());
732    }
733}