Skip to main content

ai_memory/mcp/tools/
ingest_multistep.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 Form 3 (issue #756) — `memory_ingest_multistep` MCP tool.
5//!
6//! Surfaces the multi-step ingest orchestrator
7//! ([`crate::multistep_ingest`]) at the Family::Power tier. Tier-gated
8//! to smart+ — the keyword and semantic tiers short-circuit with a
9//! tier-locked advisory envelope, matching the convention from
10//! `memory_atomise` / `memory_consolidate`.
11//!
12//! # Tool contract
13//!
14//! Input arguments:
15//!
16//! - `content` (string, required) — the content to ingest.
17//! - `namespace` (string, optional) — routing hint for the FTS
18//!   classifier helper. Default `"global"`.
19//! - `pipeline_variant` (string, optional, default `"two_phase"`) —
20//!   one of `"two_phase"` | `"four_step"`. Picks the default
21//!   pipeline.
22//! - `pipeline_override` (object, optional) — full
23//!   [`Pipeline`](crate::multistep_ingest::Pipeline) JSON. Overrides
24//!   `pipeline_variant` when both are present.
25//!
26//! Output JSON envelope:
27//!
28//! ```json
29//! {
30//!   "variant": "two_phase",
31//!   "stages": [ ... per-stage trace ... ],
32//!   "distinct_cache_keys": ["<hex>"],
33//!   "prompt_cache_consistent": true,
34//!   "final_output": { ... },
35//!   "ingested_memory_ids": []
36//! }
37//! ```
38//!
39//! `ingested_memory_ids` is reserved for the follow-up wave that wires
40//! the substrate `memory_store` writer behind a Form 3 emit-stage
41//! dispatcher. For the initial Form 3 closeout, the tool returns the
42//! structured pipeline trace + final output so operators and
43//! downstream automation can route the synthesis result themselves.
44
45use std::sync::Arc;
46
47use serde_json::{Value, json};
48
49use crate::config::FeatureTier;
50use crate::mcp::param_names;
51use crate::models::field_names;
52#[cfg(test)]
53use crate::multistep_ingest::MockLlmDispatch;
54use crate::multistep_ingest::{
55    IngestExecutor, LlmDispatch, Pipeline, PipelineVariant, four_step_default, two_phase_default,
56};
57
58/// Handler bundle. Keeps the dispatch implementation behind an `Arc<dyn
59/// LlmDispatch>` so the daemon-runtime side can construct it once at
60/// MCP boot and re-use across calls. The dispatch is `None` until the
61/// daemon wires an LLM client (semantic-tier and below); the tier gate
62/// in [`handle_ingest_multistep`] short-circuits before consulting the
63/// dispatch in that case.
64pub struct IngestMultistepHandler {
65    /// LLM dispatch — production binding via
66    /// [`crate::multistep_ingest::executor::OllamaDispatch`]; a mock
67    /// queue under tests.
68    pub dispatch: Arc<dyn LlmDispatch>,
69    /// Daemon's resolved feature tier. Retained as defense-in-depth so
70    /// callers outside the MCP path still have it available.
71    #[allow(dead_code)]
72    pub tier: FeatureTier,
73}
74
75impl IngestMultistepHandler {
76    /// Construct a handler with the supplied dispatch + tier.
77    #[must_use]
78    pub fn new(dispatch: Arc<dyn LlmDispatch>, tier: FeatureTier) -> Self {
79        Self { dispatch, tier }
80    }
81}
82
83/// Required-tier label for the tier-locked advisory envelope.
84const REQUIRED_TIER: &str = "smart";
85
86/// Handle a `memory_ingest_multistep` MCP tool call.
87///
88/// # Arguments
89///
90/// - `params` — JSON-RPC `arguments` object.
91/// - `handler` — pre-built handler bundle, or `None` when the daemon
92///   has no LLM wired (collapses to the tier-locked advisory).
93/// - `tier` — fallback tier for the advisory envelope when `handler`
94///   is `None`.
95///
96/// # Errors
97///
98/// Returns `Err(String)` on input validation failure or pipeline
99/// execution failure; the dispatcher wraps the string into the MCP
100/// `isError: true` envelope.
101pub fn handle_ingest_multistep(
102    params: &Value,
103    handler: Option<&IngestMultistepHandler>,
104    tier: FeatureTier,
105) -> Result<Value, String> {
106    // ── Argument validation ─────────────────────────────────────────
107    let content = params
108        .get(param_names::CONTENT)
109        .ok_or(crate::errors::msg::CONTENT_REQUIRED)?
110        .as_str()
111        .ok_or("content must be a string")?;
112    if content.is_empty() {
113        return Err("content must not be empty".to_string());
114    }
115
116    let namespace = params
117        .get(param_names::NAMESPACE)
118        .and_then(Value::as_str)
119        .unwrap_or(crate::DEFAULT_NAMESPACE);
120
121    // ── Tier gate ───────────────────────────────────────────────────
122    if tier == FeatureTier::Keyword || handler.is_none() {
123        return Ok(json!({
124            (field_names::TIER_LOCKED): "memory_ingest_multistep requires smart tier or higher",
125            (field_names::CURRENT_TIER): tier.as_str(),
126            (field_names::REQUIRED_TIER): REQUIRED_TIER,
127        }));
128    }
129    let handler = handler.expect("checked above");
130
131    // ── Pipeline resolution ─────────────────────────────────────────
132    let pipeline = if let Some(override_value) = params.get(param_names::PIPELINE_OVERRIDE) {
133        if !override_value.is_null() {
134            serde_json::from_value::<Pipeline>(override_value.clone())
135                .map_err(|e| format!("pipeline_override is malformed: {e}"))?
136        } else {
137            resolve_variant(params)?
138        }
139    } else {
140        resolve_variant(params)?
141    };
142
143    // ── Execute ────────────────────────────────────────────────────
144    let executor: IngestExecutor<dyn LlmDispatch> =
145        IngestExecutor::new(Arc::clone(&handler.dispatch));
146    let trace = executor
147        .run(&pipeline, content, &[], None, Some(namespace))
148        .map_err(|e| format!("INGEST_MULTISTEP_FAILED: {e}"))?;
149
150    Ok(json!({
151        "variant": trace.variant,
152        "stages": trace.stages,
153        "distinct_cache_keys": trace.distinct_cache_keys,
154        "prompt_cache_consistent": trace.prompt_cache_consistent,
155        "final_output": trace.final_output,
156        "ingested_memory_ids": Vec::<String>::new(),
157    }))
158}
159
160fn resolve_variant(params: &Value) -> Result<Pipeline, String> {
161    let variant_tag = params
162        .get("pipeline_variant")
163        .and_then(Value::as_str)
164        .unwrap_or("two_phase");
165    let variant = PipelineVariant::from_str(variant_tag).ok_or_else(|| {
166        format!(
167            "pipeline_variant must be one of \"two_phase\" | \"four_step\"; got {variant_tag:?}"
168        )
169    })?;
170    Ok(match variant {
171        PipelineVariant::TwoPhase => two_phase_default(),
172        PipelineVariant::FourStep => four_step_default(),
173    })
174}
175
176/// Test-only helper: build a handler bundle with a `MockLlmDispatch`
177/// pre-loaded with the supplied canned responses. Exposed under
178/// `cfg(test)` so the integration suite at
179/// `tests/form_3_multistep_ingest.rs` can drive the handler without
180/// spinning up a real `OllamaClient`.
181#[cfg(test)]
182pub(crate) fn handler_with_mock_responses(
183    responses: Vec<Result<String, String>>,
184    tier: FeatureTier,
185) -> IngestMultistepHandler {
186    let dispatch: Arc<dyn LlmDispatch> = Arc::new(MockLlmDispatch::new(responses));
187    IngestMultistepHandler::new(dispatch, tier)
188}
189
190// --- D1.5 (#986): per-tool McpTool impl for memory_ingest_multistep ---
191
192use crate::mcp::registry::McpTool;
193use schemars::JsonSchema;
194use serde::Deserialize;
195
196/// v0.7.0 #972 D1.5 (#986) — request body for `memory_ingest_multistep`.
197#[derive(Debug, Clone, Default, Deserialize, JsonSchema)]
198#[allow(dead_code)]
199pub struct IngestMultistepRequest {
200    /// Content to ingest.
201    pub content: String,
202
203    /// FTS classifier hint. Default 'global'.
204    #[serde(default)]
205    pub namespace: Option<String>,
206
207    /// Named pipeline; ignored if pipeline_override set.
208    #[serde(default)]
209    pub pipeline_variant: Option<String>,
210
211    /// Custom Pipeline descriptor.
212    #[serde(default)]
213    pub pipeline_override: Option<serde_json::Value>,
214}
215
216/// v0.7.0 #972 D1.5 (#986) — `McpTool` impl for `memory_ingest_multistep`.
217#[allow(dead_code)]
218pub struct IngestMultistepTool;
219
220impl McpTool for IngestMultistepTool {
221    fn name() -> &'static str {
222        crate::mcp::registry::tool_names::MEMORY_INGEST_MULTISTEP
223    }
224    fn description() -> &'static str {
225        "Form 3 multi-step ingest: deterministic helpers + LLM stages."
226    }
227    fn docs() -> &'static str {
228        "Form 3 (#756): two_phase (FTS + Jaccard -> synthesise) or four_step (load_context -> classify -> enrich -> emit). Helpers run first; LLM stages receive helper output under explicit-trust banner + SHARED PREFIX for cache-key reuse. Response carries trace + cache-key set + final output. Smart+ tier only."
229    }
230    fn input_schema() -> Value {
231        crate::mcp::registry::input_schema_for::<IngestMultistepRequest>()
232    }
233    fn family() -> &'static str {
234        crate::profile::Family::Power.name()
235    }
236}
237
238#[cfg(test)]
239mod d1_5_986_tests {
240    //! D1.5 (#986) — schema parity for `memory_ingest_multistep`.
241    //! Shared helpers live at [`crate::mcp::parity_test_helpers`].
242    use super::*;
243    use crate::mcp::parity_test_helpers::{
244        assert_descriptions_match, assert_property_set_parity, derived_props_for,
245    };
246
247    #[test]
248    fn ingest_multistep_parity_986() {
249        let derived = derived_props_for::<IngestMultistepRequest>();
250        assert_property_set_parity("memory_ingest_multistep", &derived);
251        assert_descriptions_match("memory_ingest_multistep", &derived);
252    }
253
254    #[test]
255    fn ingest_multistep_tool_metadata_986() {
256        assert_eq!(IngestMultistepTool::name(), "memory_ingest_multistep");
257        assert_eq!(IngestMultistepTool::family(), "power");
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[test]
266    fn missing_content_errors() {
267        let err = handle_ingest_multistep(&json!({}), None, FeatureTier::Smart).unwrap_err();
268        assert!(err.contains("content is required"), "got: {err}");
269    }
270
271    #[test]
272    fn non_string_content_errors() {
273        let err =
274            handle_ingest_multistep(&json!({"content": 42}), None, FeatureTier::Smart).unwrap_err();
275        assert!(err.contains("must be a string"), "got: {err}");
276    }
277
278    #[test]
279    fn empty_content_errors() {
280        let err =
281            handle_ingest_multistep(&json!({"content": ""}), None, FeatureTier::Smart).unwrap_err();
282        assert!(err.contains("must not be empty"), "got: {err}");
283    }
284
285    #[test]
286    fn keyword_tier_returns_tier_locked_advisory() {
287        let h = handler_with_mock_responses(vec![Ok("{}".to_string())], FeatureTier::Smart);
288        let resp = handle_ingest_multistep(
289            &json!({"content": "hello world"}),
290            Some(&h),
291            FeatureTier::Keyword,
292        )
293        .expect("tier-locked is informational");
294        assert_eq!(
295            resp["tier-locked"].as_str(),
296            Some("memory_ingest_multistep requires smart tier or higher")
297        );
298        assert_eq!(resp["current_tier"].as_str(), Some("keyword"));
299    }
300
301    #[test]
302    fn handler_none_returns_tier_locked_at_higher_tier() {
303        let resp = handle_ingest_multistep(
304            &json!({"content": "hello world"}),
305            None,
306            FeatureTier::Semantic,
307        )
308        .expect("none-handler degrades to advisory");
309        assert!(resp["tier-locked"].is_string());
310    }
311
312    #[test]
313    fn unknown_variant_errors_with_explicit_options() {
314        let h = handler_with_mock_responses(vec![Ok("{}".to_string())], FeatureTier::Smart);
315        let err = handle_ingest_multistep(
316            &json!({"content": "hi", "pipeline_variant": "magic"}),
317            Some(&h),
318            FeatureTier::Smart,
319        )
320        .unwrap_err();
321        assert!(err.contains("two_phase"), "got: {err}");
322        assert!(err.contains("four_step"), "got: {err}");
323    }
324
325    #[test]
326    fn two_phase_run_returns_structured_envelope() {
327        let h = handler_with_mock_responses(
328            vec![Ok(
329                r#"{"title":"T","summary":"S","tags":[],"atoms":[]}"#.to_string()
330            )],
331            FeatureTier::Smart,
332        );
333        let resp = handle_ingest_multistep(
334            &json!({"content": "Paris is the capital of France."}),
335            Some(&h),
336            FeatureTier::Smart,
337        )
338        .expect("ok");
339        assert_eq!(resp["variant"], "two_phase");
340        assert_eq!(resp["prompt_cache_consistent"], true);
341        assert!(resp["stages"].as_array().unwrap().len() >= 3);
342    }
343
344    #[test]
345    fn four_step_run_returns_structured_envelope() {
346        let h = handler_with_mock_responses(
347            vec![
348                Ok(r#"{"fact_kind":"declarative","confidence":0.9}"#.to_string()),
349                Ok(r#"{"entities":[],"claims":[],"relations":[]}"#.to_string()),
350                Ok(r#"{"title":"X","summary":"Y","tags":[],"proposed_links":[]}"#.to_string()),
351            ],
352            FeatureTier::Smart,
353        );
354        let resp = handle_ingest_multistep(
355            &json!({"content": "Paris", "pipeline_variant": "four_step"}),
356            Some(&h),
357            FeatureTier::Smart,
358        )
359        .expect("ok");
360        assert_eq!(resp["variant"], "four_step");
361        // All LLM stages within the run must share the cache key.
362        assert_eq!(resp["distinct_cache_keys"].as_array().unwrap().len(), 1);
363    }
364
365    #[test]
366    fn pipeline_override_drives_custom_pipeline() {
367        use crate::multistep_ingest::HelperKind;
368        use crate::multistep_ingest::pipeline::{Pipeline, PipelineVariant, Stage};
369        let pipeline = Pipeline {
370            variant: PipelineVariant::TwoPhase,
371            system_prompt: "Custom system prompt".to_string(),
372            stages: vec![Stage::Helper {
373                kind: HelperKind::FtsClassifier,
374                params: Default::default(),
375            }],
376        };
377        let h = handler_with_mock_responses(vec![], FeatureTier::Smart);
378        let resp = handle_ingest_multistep(
379            &json!({
380                "content": "First, do step one. Then do step two.",
381                "pipeline_override": pipeline,
382            }),
383            Some(&h),
384            FeatureTier::Smart,
385        )
386        .expect("ok");
387        // Helper-only pipeline → final output is the helper payload.
388        assert_eq!(resp["final_output"]["fact_kind"], "procedural");
389    }
390}