1use std::sync::Arc;
14
15use serde::{Deserialize, Serialize};
16use serde_json::{Value, json};
17
18use super::cache::{
19 CacheKey, EXPLICIT_TRUST_INSTRUCTION, PromptCacheTelemetry, build_shared_prefix,
20};
21#[cfg(test)]
22use super::helpers::HelperParams;
23use super::helpers::{HelperContext, HelperOutput, MemoryHandle, run_helper_with};
24use super::pipeline::{HelperOutputRef, Pipeline, Stage};
25
26pub const DEFAULT_MULTISTEP_MAX_CONTENT_CHARS: usize = 1500;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(tag = "stage_type", rename_all = "snake_case")]
36pub enum StageOutcome {
37 Helper {
39 index: usize,
41 helper: String,
44 summary: String,
46 payload: Value,
48 content_bytes: usize,
55 },
56 LlmCall {
58 index: usize,
60 label: String,
62 prompt: String,
66 cache_key: String,
68 response: Value,
71 content_bytes: usize,
77 content_truncated: bool,
80 },
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct ExecutionTrace {
86 pub variant: String,
88 pub stages: Vec<StageOutcome>,
90 pub distinct_cache_keys: Vec<String>,
94 pub prompt_cache_consistent: bool,
96 pub final_output: Value,
99 pub bytes_per_stage: Vec<usize>,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq)]
110pub enum ExecutorError {
111 InvalidTrustSlot {
114 stage_index: usize,
116 label: String,
118 },
119 LlmDispatch(String),
121 EmptyPipeline,
123}
124
125impl std::fmt::Display for ExecutorError {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 match self {
128 Self::InvalidTrustSlot { stage_index, label } => write!(
129 f,
130 "invalid trust slot: stage_index={stage_index} (label={label})"
131 ),
132 Self::LlmDispatch(msg) => write!(f, "llm dispatch failed: {msg}"),
133 Self::EmptyPipeline => write!(f, "pipeline has no stages"),
134 }
135 }
136}
137
138impl std::error::Error for ExecutorError {}
139
140pub trait LlmDispatch: Send + Sync {
144 fn dispatch(&self, prompt: &str) -> Result<String, String>;
153}
154
155pub struct OllamaDispatch {
158 client: Arc<crate::llm::OllamaClient>,
159}
160
161impl OllamaDispatch {
162 #[must_use]
164 pub fn new(client: Arc<crate::llm::OllamaClient>) -> Self {
165 Self { client }
166 }
167}
168
169impl LlmDispatch for OllamaDispatch {
170 fn dispatch(&self, prompt: &str) -> Result<String, String> {
171 self.client
172 .generate(prompt, None)
173 .map_err(|e| e.to_string())
174 }
175}
176
177pub struct MockLlmDispatch {
182 responses: std::sync::Mutex<Vec<Result<String, String>>>,
183}
184
185impl MockLlmDispatch {
186 #[must_use]
188 pub fn new(responses: Vec<Result<String, String>>) -> Self {
189 Self {
190 responses: std::sync::Mutex::new(responses),
191 }
192 }
193}
194
195impl LlmDispatch for MockLlmDispatch {
196 fn dispatch(&self, _prompt: &str) -> Result<String, String> {
197 let mut q = self.responses.lock().expect("mutex not poisoned in tests");
198 if q.is_empty() {
199 return Err("mock: queue exhausted".to_string());
200 }
201 q.remove(0)
202 }
203}
204
205pub struct IngestExecutor<D: LlmDispatch + ?Sized> {
210 dispatch: Arc<D>,
211 telemetry: Arc<PromptCacheTelemetry>,
212 max_content_chars: Option<usize>,
218 helper_content_ptrs: Arc<std::sync::Mutex<Vec<usize>>>,
227}
228
229impl<D: LlmDispatch + ?Sized> IngestExecutor<D> {
230 #[must_use]
232 pub fn new(dispatch: Arc<D>) -> Self {
233 Self {
234 dispatch,
235 telemetry: Arc::new(PromptCacheTelemetry::new()),
236 max_content_chars: None,
237 helper_content_ptrs: Arc::new(std::sync::Mutex::new(Vec::new())),
238 }
239 }
240
241 #[must_use]
246 pub fn with_max_content_chars(mut self, cap: usize) -> Self {
247 self.max_content_chars = Some(cap);
248 self
249 }
250
251 #[must_use]
254 pub fn telemetry(&self) -> Arc<PromptCacheTelemetry> {
255 Arc::clone(&self.telemetry)
256 }
257
258 #[doc(hidden)]
271 #[must_use]
272 pub fn helper_content_ptrs(&self) -> Vec<usize> {
273 self.helper_content_ptrs
274 .lock()
275 .map(|g| g.clone())
276 .unwrap_or_default()
277 }
278
279 pub fn run(
292 &self,
293 pipeline: &Pipeline,
294 content: &str,
295 candidates: &[MemoryHandle],
296 content_embedding: Option<&[f32]>,
297 namespace: Option<&str>,
298 ) -> Result<ExecutionTrace, ExecutorError> {
299 if pipeline.stages.is_empty() {
300 return Err(ExecutorError::EmptyPipeline);
301 }
302
303 let mut helper_outputs: Vec<Option<HelperOutput>> = vec![None; pipeline.stages.len()];
304 let mut stage_outcomes: Vec<StageOutcome> = Vec::with_capacity(pipeline.stages.len());
305 let mut bytes_per_stage: Vec<usize> = Vec::with_capacity(pipeline.stages.len());
306
307 let helper_ctx = HelperContext::new(content, candidates, content_embedding, namespace);
315 #[cfg(debug_assertions)]
320 let content_ptr_for_test = content.as_ptr() as usize;
321
322 for (idx, stage) in pipeline.stages.iter().enumerate() {
327 if let Stage::Helper { kind, params } = stage {
328 #[cfg(debug_assertions)]
329 {
330 let effective_ptr = helper_ctx.effective_content(params).as_ptr() as usize;
336 if let Ok(mut g) = self.helper_content_ptrs.lock() {
337 g.push(effective_ptr);
338 }
339 if params.content.is_empty() {
343 debug_assert_eq!(effective_ptr, content_ptr_for_test);
344 }
345 }
346 let out = run_helper_with(*kind, params, &helper_ctx);
347 bytes_per_stage.push(content.len());
351 stage_outcomes.push(StageOutcome::Helper {
352 index: idx,
353 helper: out.kind.as_str().to_string(),
354 summary: out.summary.clone(),
355 payload: out.payload.clone(),
356 content_bytes: content.len(),
357 });
358 helper_outputs[idx] = Some(out);
359 }
360 }
361
362 let prefix = build_shared_prefix(pipeline.variant_tag(), &pipeline.system_prompt);
365 let cache_key = CacheKey::from_prefix(&prefix);
366 let llm_cap = self
367 .max_content_chars
368 .unwrap_or(DEFAULT_MULTISTEP_MAX_CONTENT_CHARS);
369
370 let mut last_llm_response: Option<Value> = None;
371
372 for (idx, stage) in pipeline.stages.iter().enumerate() {
373 let Stage::LlmCall {
374 prompt_template,
375 trust_inputs,
376 output_schema,
377 label,
378 } = stage
379 else {
380 continue;
381 };
382
383 let trust_block = render_trust_inputs(trust_inputs, &helper_outputs)?;
385
386 let (content_view, truncated) = truncate_content_for_llm(content, llm_cap);
395
396 let stage_tail = format!(
398 "\n[STAGE label={label} index={idx}]\n\
399 [INCOMING CONTENT]\n{content_view}\n\
400 [TRUST INPUTS]\n{trust_block}\n\
401 [TASK]\n{prompt_template}\n\
402 [OUTPUT SCHEMA]\n{schema}\n",
403 schema = serde_json::to_string(output_schema).unwrap_or_else(|_| "{}".to_string()),
404 );
405 let prompt = format!("{prefix}{stage_tail}");
406
407 self.telemetry.record(cache_key.clone());
410
411 let response_text = self
412 .dispatch
413 .dispatch(&prompt)
414 .map_err(ExecutorError::LlmDispatch)?;
415
416 let response_value = match serde_json::from_str::<Value>(&response_text) {
417 Ok(v) => v,
418 Err(_) => json!({ "raw": response_text }),
419 };
420
421 let content_bytes = content_view.len();
422 bytes_per_stage.push(content_bytes);
423 stage_outcomes.push(StageOutcome::LlmCall {
424 index: idx,
425 label: label.clone(),
426 prompt,
427 cache_key: cache_key.as_hex().to_string(),
428 response: response_value.clone(),
429 content_bytes,
430 content_truncated: truncated,
431 });
432 last_llm_response = Some(response_value);
433 }
434
435 let distinct_cache_keys: Vec<String> = {
436 let mut seen: Vec<String> =
437 self.telemetry.snapshot().into_iter().map(|k| k.0).collect();
438 seen.sort();
439 seen.dedup();
440 seen
441 };
442 let prompt_cache_consistent = self.telemetry.all_keys_match();
443
444 let final_output = last_llm_response.unwrap_or_else(|| {
447 helper_outputs
448 .iter()
449 .rev()
450 .find_map(|o| o.as_ref().map(|h| h.payload.clone()))
451 .unwrap_or_else(|| json!({}))
452 });
453
454 Ok(ExecutionTrace {
455 variant: pipeline.variant_tag().to_string(),
456 stages: stage_outcomes,
457 distinct_cache_keys,
458 prompt_cache_consistent,
459 final_output,
460 bytes_per_stage,
461 })
462 }
463}
464
465fn truncate_content_for_llm(content: &str, cap: usize) -> (std::borrow::Cow<'_, str>, bool) {
475 use std::fmt::Write as _;
476 if cap == 0 {
477 return (std::borrow::Cow::Borrowed(content), false);
478 }
479 let total_chars = content.chars().count();
480 if total_chars <= cap {
481 return (std::borrow::Cow::Borrowed(content), false);
482 }
483 let mut truncated: String = content.chars().take(cap).collect();
484 let _ = write!(
487 truncated,
488 " [...truncated {} chars]",
489 total_chars.saturating_sub(cap)
490 );
491 (std::borrow::Cow::Owned(truncated), true)
492}
493
494fn render_trust_inputs(
498 inputs: &[HelperOutputRef],
499 helper_outputs: &[Option<HelperOutput>],
500) -> Result<String, ExecutorError> {
501 if inputs.is_empty() {
502 return Ok(format!("(none — but: {EXPLICIT_TRUST_INSTRUCTION})"));
503 }
504 let mut out = String::new();
505 out.push_str(EXPLICIT_TRUST_INSTRUCTION);
506 out.push_str("\n\n");
507 for input in inputs {
508 let payload = helper_outputs
509 .get(input.stage_index)
510 .and_then(|o| o.as_ref())
511 .ok_or_else(|| ExecutorError::InvalidTrustSlot {
512 stage_index: input.stage_index,
513 label: input.label.clone(),
514 })?;
515 out.push_str(&format!(
516 "<<TRUST label={} helper={}>>\n{}\n<<END TRUST>>\n\n",
517 input.label,
518 payload.kind.as_str(),
519 serde_json::to_string_pretty(&payload.payload).unwrap_or_default()
520 ));
521 }
522 Ok(out)
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use crate::multistep_ingest::pipeline::{four_step_default, two_phase_default};
529
530 fn mh(id: &str, body: &str) -> MemoryHandle {
531 MemoryHandle {
532 id: id.to_string(),
533 body: body.to_string(),
534 embedding: None,
535 namespace: None,
536 }
537 }
538
539 #[test]
540 fn helper_then_llm_runs_in_order_and_renders_trust_slot() {
541 let mock = MockLlmDispatch::new(vec![Ok(
542 r#"{"title":"T","summary":"S","tags":[],"atoms":[]}"#.to_string(),
543 )]);
544 let exec = IngestExecutor::new(Arc::new(mock));
545 let pipeline = two_phase_default();
546 let trace = exec
547 .run(
548 &pipeline,
549 "the quick brown fox",
550 &[mh("c1", "a quick fox")],
551 None,
552 Some("global"),
553 )
554 .expect("pipeline runs");
555
556 assert!(matches!(trace.stages[0], StageOutcome::Helper { .. }));
558 assert!(matches!(trace.stages[1], StageOutcome::Helper { .. }));
559 assert!(matches!(trace.stages[2], StageOutcome::LlmCall { .. }));
560
561 if let StageOutcome::LlmCall { prompt, .. } = &trace.stages[2] {
563 assert!(
564 prompt.contains(EXPLICIT_TRUST_INSTRUCTION),
565 "LLM prompt must carry the explicit-trust instruction verbatim"
566 );
567 assert!(
568 prompt.contains("jaccard_overlap") || prompt.contains("fts_classifier"),
569 "LLM prompt must cite a helper kind from the trust slots"
570 );
571 } else {
572 panic!("stage 2 must be an LLM call");
573 }
574 }
575
576 #[test]
577 fn two_phase_pipeline_produces_structured_output() {
578 let mock = MockLlmDispatch::new(vec![Ok(
579 r#"{"title":"T","summary":"S","tags":["a"],"atoms":["one","two"]}"#.to_string(),
580 )]);
581 let exec = IngestExecutor::new(Arc::new(mock));
582 let pipeline = two_phase_default();
583 let trace = exec
584 .run(&pipeline, "anything", &[], None, None)
585 .expect("ok");
586 assert_eq!(trace.variant, "two_phase");
587 assert_eq!(trace.final_output["title"], "T");
588 assert_eq!(trace.final_output["atoms"].as_array().unwrap().len(), 2);
589 }
590
591 #[test]
592 fn four_step_pipeline_produces_structured_output() {
593 let mock = MockLlmDispatch::new(vec![
594 Ok(r#"{"fact_kind":"declarative","confidence":0.9}"#.to_string()),
595 Ok(r#"{"entities":["a"],"claims":["c"],"relations":[]}"#.to_string()),
596 Ok(r#"{"title":"X","summary":"Y","tags":[],"proposed_links":[]}"#.to_string()),
597 ]);
598 let exec = IngestExecutor::new(Arc::new(mock));
599 let pipeline = four_step_default();
600 let trace = exec
601 .run(
602 &pipeline,
603 "Paris is the capital of France.",
604 &[],
605 None,
606 None,
607 )
608 .expect("ok");
609 assert_eq!(trace.variant, "four_step");
610 let llm_count = trace
612 .stages
613 .iter()
614 .filter(|s| matches!(s, StageOutcome::LlmCall { .. }))
615 .count();
616 assert_eq!(llm_count, 3);
617 assert_eq!(trace.final_output["title"], "X");
619 }
620
621 #[test]
622 fn prompt_cache_key_is_consistent_across_stages_within_a_run() {
623 let mock = MockLlmDispatch::new(vec![
624 Ok(r#"{"fact_kind":"declarative","confidence":0.5}"#.to_string()),
625 Ok(r#"{"entities":[],"claims":[],"relations":[]}"#.to_string()),
626 Ok(r#"{"title":"T","summary":"S","tags":[],"proposed_links":[]}"#.to_string()),
627 ]);
628 let exec = IngestExecutor::new(Arc::new(mock));
629 let pipeline = four_step_default();
630 let trace = exec.run(&pipeline, "content", &[], None, None).expect("ok");
631 assert!(
632 trace.prompt_cache_consistent,
633 "every LLM stage within a run must share the cache key"
634 );
635 assert_eq!(
636 trace.distinct_cache_keys.len(),
637 1,
638 "exactly one distinct cache key for a single-variant run"
639 );
640 }
641
642 #[test]
643 fn explicit_trust_instruction_appears_in_every_llm_prompt() {
644 let mock = MockLlmDispatch::new(vec![
645 Ok("{}".to_string()),
646 Ok("{}".to_string()),
647 Ok("{}".to_string()),
648 ]);
649 let exec = IngestExecutor::new(Arc::new(mock));
650 let pipeline = four_step_default();
651 let trace = exec.run(&pipeline, "content", &[], None, None).expect("ok");
652 for stage in &trace.stages {
653 if let StageOutcome::LlmCall { prompt, .. } = stage {
654 assert!(
655 prompt.contains(EXPLICIT_TRUST_INSTRUCTION),
656 "every LLM prompt must carry the explicit-trust phrase"
657 );
658 }
659 }
660 }
661
662 #[test]
663 fn empty_pipeline_returns_structured_error() {
664 let mock = MockLlmDispatch::new(vec![]);
665 let exec = IngestExecutor::new(Arc::new(mock));
666 let pipeline = Pipeline {
667 variant: super::super::pipeline::PipelineVariant::TwoPhase,
668 stages: vec![],
669 system_prompt: String::new(),
670 };
671 let err = exec
672 .run(&pipeline, "x", &[], None, None)
673 .expect_err("empty pipeline should error");
674 assert!(matches!(err, ExecutorError::EmptyPipeline));
675 }
676
677 #[test]
678 fn helper_only_pipeline_uses_last_helper_payload_as_final_output() {
679 let mock = MockLlmDispatch::new(vec![]);
680 let exec = IngestExecutor::new(Arc::new(mock));
681 let pipeline = Pipeline {
682 variant: super::super::pipeline::PipelineVariant::TwoPhase,
683 stages: vec![Stage::Helper {
684 kind: super::super::helpers::HelperKind::FtsClassifier,
685 params: HelperParams::default(),
686 }],
687 system_prompt: String::new(),
688 };
689 let trace = exec
690 .run(&pipeline, "first, do X. then do Y.", &[], None, None)
691 .expect("ok");
692 assert_eq!(trace.final_output["helper"], "fts_classifier");
693 assert_eq!(trace.final_output["fact_kind"], "procedural");
694 }
695
696 #[test]
697 fn invalid_trust_slot_index_returns_structured_error() {
698 let mock = MockLlmDispatch::new(vec![Ok("{}".to_string())]);
699 let exec = IngestExecutor::new(Arc::new(mock));
700 let pipeline = Pipeline {
701 variant: super::super::pipeline::PipelineVariant::TwoPhase,
702 stages: vec![Stage::LlmCall {
703 prompt_template: "anything".to_string(),
704 trust_inputs: vec![HelperOutputRef {
705 stage_index: 99,
706 label: "missing".to_string(),
707 }],
708 output_schema: json!({}),
709 label: "broken".to_string(),
710 }],
711 system_prompt: "x".to_string(),
712 };
713 let err = exec
714 .run(&pipeline, "y", &[], None, None)
715 .expect_err("invalid trust slot must error");
716 assert!(matches!(err, ExecutorError::InvalidTrustSlot { .. }));
717 }
718
719 #[test]
720 fn telemetry_records_one_key_per_llm_stage() {
721 let mock = MockLlmDispatch::new(vec![
722 Ok("{}".to_string()),
723 Ok("{}".to_string()),
724 Ok("{}".to_string()),
725 ]);
726 let exec = IngestExecutor::new(Arc::new(mock));
727 let telemetry = exec.telemetry();
728 let pipeline = four_step_default();
729 exec.run(&pipeline, "content", &[], None, None).unwrap();
730 assert_eq!(telemetry.len(), 3, "four-step has 3 LLM stages");
731 assert!(telemetry.all_keys_match());
732 }
733}