use std::sync::Arc;
use serde_json::{Value, json};
use crate::config::FeatureTier;
use crate::mcp::param_names;
use crate::models::field_names;
#[cfg(test)]
use crate::multistep_ingest::MockLlmDispatch;
use crate::multistep_ingest::{
IngestExecutor, LlmDispatch, Pipeline, PipelineVariant, four_step_default, two_phase_default,
};
pub struct IngestMultistepHandler {
pub dispatch: Arc<dyn LlmDispatch>,
#[allow(dead_code)]
pub tier: FeatureTier,
}
impl IngestMultistepHandler {
#[must_use]
pub fn new(dispatch: Arc<dyn LlmDispatch>, tier: FeatureTier) -> Self {
Self { dispatch, tier }
}
}
const REQUIRED_TIER: &str = "smart";
pub fn handle_ingest_multistep(
params: &Value,
handler: Option<&IngestMultistepHandler>,
tier: FeatureTier,
) -> Result<Value, String> {
let content = params
.get(param_names::CONTENT)
.ok_or(crate::errors::msg::CONTENT_REQUIRED)?
.as_str()
.ok_or("content must be a string")?;
if content.is_empty() {
return Err("content must not be empty".to_string());
}
let namespace = params
.get(param_names::NAMESPACE)
.and_then(Value::as_str)
.unwrap_or(crate::DEFAULT_NAMESPACE);
if tier == FeatureTier::Keyword || handler.is_none() {
return Ok(json!({
(field_names::TIER_LOCKED): "memory_ingest_multistep requires smart tier or higher",
(field_names::CURRENT_TIER): tier.as_str(),
(field_names::REQUIRED_TIER): REQUIRED_TIER,
}));
}
let handler = handler.expect("checked above");
let pipeline = if let Some(override_value) = params.get(param_names::PIPELINE_OVERRIDE) {
if !override_value.is_null() {
serde_json::from_value::<Pipeline>(override_value.clone())
.map_err(|e| format!("pipeline_override is malformed: {e}"))?
} else {
resolve_variant(params)?
}
} else {
resolve_variant(params)?
};
let executor: IngestExecutor<dyn LlmDispatch> =
IngestExecutor::new(Arc::clone(&handler.dispatch));
let trace = executor
.run(&pipeline, content, &[], None, Some(namespace))
.map_err(|e| format!("INGEST_MULTISTEP_FAILED: {e}"))?;
Ok(json!({
"variant": trace.variant,
"stages": trace.stages,
"distinct_cache_keys": trace.distinct_cache_keys,
"prompt_cache_consistent": trace.prompt_cache_consistent,
"final_output": trace.final_output,
"ingested_memory_ids": Vec::<String>::new(),
}))
}
fn resolve_variant(params: &Value) -> Result<Pipeline, String> {
let variant_tag = params
.get("pipeline_variant")
.and_then(Value::as_str)
.unwrap_or("two_phase");
let variant = PipelineVariant::from_str(variant_tag).ok_or_else(|| {
format!(
"pipeline_variant must be one of \"two_phase\" | \"four_step\"; got {variant_tag:?}"
)
})?;
Ok(match variant {
PipelineVariant::TwoPhase => two_phase_default(),
PipelineVariant::FourStep => four_step_default(),
})
}
#[cfg(test)]
pub(crate) fn handler_with_mock_responses(
responses: Vec<Result<String, String>>,
tier: FeatureTier,
) -> IngestMultistepHandler {
let dispatch: Arc<dyn LlmDispatch> = Arc::new(MockLlmDispatch::new(responses));
IngestMultistepHandler::new(dispatch, tier)
}
use crate::mcp::registry::McpTool;
use schemars::JsonSchema;
use serde::Deserialize;
#[derive(Debug, Clone, Default, Deserialize, JsonSchema)]
#[allow(dead_code)]
pub struct IngestMultistepRequest {
pub content: String,
#[serde(default)]
pub namespace: Option<String>,
#[serde(default)]
pub pipeline_variant: Option<String>,
#[serde(default)]
pub pipeline_override: Option<serde_json::Value>,
}
#[allow(dead_code)]
pub struct IngestMultistepTool;
impl McpTool for IngestMultistepTool {
fn name() -> &'static str {
crate::mcp::registry::tool_names::MEMORY_INGEST_MULTISTEP
}
fn description() -> &'static str {
"Form 3 multi-step ingest: deterministic helpers + LLM stages."
}
fn docs() -> &'static str {
"Form 3 (#756): two_phase (FTS + Jaccard -> synthesise) or four_step (load_context -> classify -> enrich -> emit). Helpers run first; LLM stages receive helper output under explicit-trust banner + SHARED PREFIX for cache-key reuse. Response carries trace + cache-key set + final output. Smart+ tier only."
}
fn input_schema() -> Value {
crate::mcp::registry::input_schema_for::<IngestMultistepRequest>()
}
fn family() -> &'static str {
crate::profile::Family::Power.name()
}
}
#[cfg(test)]
mod d1_5_986_tests {
use super::*;
use crate::mcp::parity_test_helpers::{
assert_descriptions_match, assert_property_set_parity, derived_props_for,
};
#[test]
fn ingest_multistep_parity_986() {
let derived = derived_props_for::<IngestMultistepRequest>();
assert_property_set_parity("memory_ingest_multistep", &derived);
assert_descriptions_match("memory_ingest_multistep", &derived);
}
#[test]
fn ingest_multistep_tool_metadata_986() {
assert_eq!(IngestMultistepTool::name(), "memory_ingest_multistep");
assert_eq!(IngestMultistepTool::family(), "power");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn missing_content_errors() {
let err = handle_ingest_multistep(&json!({}), None, FeatureTier::Smart).unwrap_err();
assert!(err.contains("content is required"), "got: {err}");
}
#[test]
fn non_string_content_errors() {
let err =
handle_ingest_multistep(&json!({"content": 42}), None, FeatureTier::Smart).unwrap_err();
assert!(err.contains("must be a string"), "got: {err}");
}
#[test]
fn empty_content_errors() {
let err =
handle_ingest_multistep(&json!({"content": ""}), None, FeatureTier::Smart).unwrap_err();
assert!(err.contains("must not be empty"), "got: {err}");
}
#[test]
fn keyword_tier_returns_tier_locked_advisory() {
let h = handler_with_mock_responses(vec![Ok("{}".to_string())], FeatureTier::Smart);
let resp = handle_ingest_multistep(
&json!({"content": "hello world"}),
Some(&h),
FeatureTier::Keyword,
)
.expect("tier-locked is informational");
assert_eq!(
resp["tier-locked"].as_str(),
Some("memory_ingest_multistep requires smart tier or higher")
);
assert_eq!(resp["current_tier"].as_str(), Some("keyword"));
}
#[test]
fn handler_none_returns_tier_locked_at_higher_tier() {
let resp = handle_ingest_multistep(
&json!({"content": "hello world"}),
None,
FeatureTier::Semantic,
)
.expect("none-handler degrades to advisory");
assert!(resp["tier-locked"].is_string());
}
#[test]
fn unknown_variant_errors_with_explicit_options() {
let h = handler_with_mock_responses(vec![Ok("{}".to_string())], FeatureTier::Smart);
let err = handle_ingest_multistep(
&json!({"content": "hi", "pipeline_variant": "magic"}),
Some(&h),
FeatureTier::Smart,
)
.unwrap_err();
assert!(err.contains("two_phase"), "got: {err}");
assert!(err.contains("four_step"), "got: {err}");
}
#[test]
fn two_phase_run_returns_structured_envelope() {
let h = handler_with_mock_responses(
vec![Ok(
r#"{"title":"T","summary":"S","tags":[],"atoms":[]}"#.to_string()
)],
FeatureTier::Smart,
);
let resp = handle_ingest_multistep(
&json!({"content": "Paris is the capital of France."}),
Some(&h),
FeatureTier::Smart,
)
.expect("ok");
assert_eq!(resp["variant"], "two_phase");
assert_eq!(resp["prompt_cache_consistent"], true);
assert!(resp["stages"].as_array().unwrap().len() >= 3);
}
#[test]
fn four_step_run_returns_structured_envelope() {
let h = handler_with_mock_responses(
vec![
Ok(r#"{"fact_kind":"declarative","confidence":0.9}"#.to_string()),
Ok(r#"{"entities":[],"claims":[],"relations":[]}"#.to_string()),
Ok(r#"{"title":"X","summary":"Y","tags":[],"proposed_links":[]}"#.to_string()),
],
FeatureTier::Smart,
);
let resp = handle_ingest_multistep(
&json!({"content": "Paris", "pipeline_variant": "four_step"}),
Some(&h),
FeatureTier::Smart,
)
.expect("ok");
assert_eq!(resp["variant"], "four_step");
assert_eq!(resp["distinct_cache_keys"].as_array().unwrap().len(), 1);
}
#[test]
fn pipeline_override_drives_custom_pipeline() {
use crate::multistep_ingest::HelperKind;
use crate::multistep_ingest::pipeline::{Pipeline, PipelineVariant, Stage};
let pipeline = Pipeline {
variant: PipelineVariant::TwoPhase,
system_prompt: "Custom system prompt".to_string(),
stages: vec![Stage::Helper {
kind: HelperKind::FtsClassifier,
params: Default::default(),
}],
};
let h = handler_with_mock_responses(vec![], FeatureTier::Smart);
let resp = handle_ingest_multistep(
&json!({
"content": "First, do step one. Then do step two.",
"pipeline_override": pipeline,
}),
Some(&h),
FeatureTier::Smart,
)
.expect("ok");
assert_eq!(resp["final_output"]["fact_kind"], "procedural");
}
}