Skip to main content

spool/mcp/
mod.rs

1mod mcp_sampling;
2mod parse;
3mod protocol;
4mod schemas;
5
6use crate::app;
7use crate::daemon::{LifecycleReadOptions, read_history, read_record, read_workbench};
8use crate::distill::pipeline::DistillRequest;
9use crate::domain::{OutputFormat, RouteInput};
10use crate::lifecycle_service::{LifecycleAction, LifecycleService};
11use crate::sampling::SamplingClient;
12
13use crate::lifecycle_summary;
14use crate::memory_gateway::{self, wakeup_request};
15use crate::output;
16use mcp_sampling::{McpSamplingClient, StdSamplingChannel};
17use parse::{
18    handle_prompt_get, handle_resource_read, optional_object_field, parse_metadata,
19    parse_prompt_optimize_request, parse_propose_request, parse_record_request,
20    parse_route_request, parse_wakeup_request, required_object, required_string,
21    required_string_from_object,
22};
23
24use protocol::{
25    JsonRpcError, is_successful_tool_response, jsonrpc_error, jsonrpc_result, tool_failure,
26    tool_runtime_error, tool_success,
27};
28use schemas::{prompt_definitions, resource_definitions, tool_definitions};
29use serde_json::{Value, json};
30use std::path::{Path, PathBuf};
31
32const PROTOCOL_VERSION: &str = "2025-03-26";
33
34#[derive(Debug, Default, Clone)]
35struct ServerState {
36    initialized: bool,
37    /// Client-declared capabilities from the `initialize` request,
38    /// stored verbatim so [`Self::client_supports_sampling`] can
39    /// decide whether `sampling/createMessage` reverse calls are
40    /// available before the distill pipeline tries them. Absent /
41    /// null capabilities → server treats sampling as unsupported.
42    client_capabilities: Value,
43}
44
45impl ServerState {
46    /// True when the connected MCP client advertised
47    /// `capabilities.sampling` during `initialize`. Per the MCP spec
48    /// the value is an opaque object — its presence is what matters,
49    /// not the keys inside.
50    fn client_supports_sampling(&self) -> bool {
51        self.client_capabilities
52            .get("sampling")
53            .map(|value| !value.is_null())
54            .unwrap_or(false)
55    }
56}
57
58pub fn serve_stdio(config_path: &Path, daemon_bin: Option<&Path>) -> anyhow::Result<()> {
59    if !config_path.exists() {
60        anyhow::bail!("config not found: {}", config_path.display());
61    }
62
63    // Design (R4b-2): stdin/stdout live on dedicated OS threads so
64    // their blocking reads/writes never stall the tokio runtime, and
65    // the runtime itself only exists to drive the sampling
66    // reverse-call future inside the distill_pending handler.
67    //
68    //   [OS thread: reader]   raw stdin → JSON → either:
69    //                         - `sampling/createMessage` responses →
70    //                           routed to the waiting oneshot via
71    //                           `SamplingChannel::pending`
72    //                         - everything else → `in_tx` queue,
73    //                           consumed by the main dispatcher.
74    //   [main thread]         drains `in_rx`. Sync handlers run
75    //                         inline. `memory_distill_pending` runs
76    //                         on the tokio runtime via `block_on`,
77    //                         which awaits the sampling future; the
78    //                         runtime does not own stdin so the
79    //                         reader thread can fulfill the waiter
80    //                         concurrently.
81    //   [OS thread: writer]   drains `out_rx` and writes each JSON
82    //                         envelope followed by `\n` + flush. All
83    //                         stdout writes go through here so the
84    //                         main thread and the distill handler
85    //                         never interleave bytes.
86    //
87    // `tokio` is a single-threaded `current_thread` runtime with the
88    // timer enabled so `SAMPLING_REVERSE_TIMEOUT` works.
89    let runtime = tokio::runtime::Builder::new_current_thread()
90        .enable_time()
91        .build()?;
92
93    let (in_tx, in_rx) = std::sync::mpsc::channel::<Value>();
94    let (out_tx, out_rx) = std::sync::mpsc::channel::<Value>();
95    let sampling_channel = StdSamplingChannel::new(out_tx.clone());
96
97    let reader_handle = {
98        let sampling_channel = sampling_channel.clone();
99        let out_tx = out_tx.clone();
100        std::thread::spawn(move || reader_thread(in_tx, out_tx, sampling_channel))
101    };
102    let writer_handle = std::thread::spawn(move || writer_thread(out_rx));
103
104    let mut state = ServerState::default();
105    while let Ok(message) = in_rx.recv() {
106        if is_distill_pending_call(&message) {
107            if let Err(err) = require_initialized(&state) {
108                if let Some(id) = message.get("id").cloned() {
109                    let _ = out_tx.send(err.with_id(id));
110                }
111                continue;
112            }
113            let id = message.get("id").cloned().unwrap_or(Value::Null);
114            let arguments = message
115                .get("params")
116                .and_then(|p| p.get("arguments"))
117                .cloned()
118                .unwrap_or_else(|| json!({}));
119            let supports = state.client_supports_sampling();
120            let client = McpSamplingClient::new(sampling_channel.clone(), supports);
121            let outcome = runtime.block_on(execute_distill_pending_tool_async(
122                config_path,
123                &arguments,
124                &client,
125            ));
126            let envelope = match outcome {
127                Ok(value) => jsonrpc_result(id, value),
128                Err(err) => err.with_id(id),
129            };
130            if out_tx.send(envelope).is_err() {
131                break;
132            }
133            continue;
134        }
135
136        if is_crystallize_call(&message) {
137            if let Err(err) = require_initialized(&state) {
138                if let Some(id) = message.get("id").cloned() {
139                    let _ = out_tx.send(err.with_id(id));
140                }
141                continue;
142            }
143            let id = message.get("id").cloned().unwrap_or(Value::Null);
144            let arguments = message
145                .get("params")
146                .and_then(|p| p.get("arguments"))
147                .cloned()
148                .unwrap_or_else(|| json!({}));
149            let supports = state.client_supports_sampling();
150            let client = McpSamplingClient::new(sampling_channel.clone(), supports);
151            let outcome = runtime.block_on(execute_crystallize_tool_async(
152                config_path,
153                &arguments,
154                &client,
155            ));
156            let envelope = match outcome {
157                Ok(value) => jsonrpc_result(id, value),
158                Err(err) => err.with_id(id),
159            };
160            if out_tx.send(envelope).is_err() {
161                break;
162            }
163            continue;
164        }
165
166        if is_lifecycle_write_call(&message) {
167            let response = sync_dispatch_message(&mut state, config_path, daemon_bin, message);
168            if let Some(ref resp) = response {
169                if out_tx.send(resp.clone()).is_err() {
170                    break;
171                }
172                // Fire-and-forget LLM-assisted auto-compile after successful lifecycle write.
173                // Runs on a dedicated thread so the main dispatch loop is never blocked
174                // waiting for the sampling reverse-call round-trip (up to 180 s timeout).
175                // The sampling channel is cloned into the thread; if the client never
176                // responds the timeout fires and compile falls back to template synthesis.
177                if is_successful_tool_response(resp) && state.client_supports_sampling() {
178                    let client = McpSamplingClient::new(sampling_channel.clone(), true);
179                    let config = config_path.to_path_buf();
180                    std::thread::spawn(move || {
181                        let rt = tokio::runtime::Builder::new_current_thread()
182                            .enable_time()
183                            .build();
184                        if let Ok(rt) = rt {
185                            let _ = rt.block_on(crate::knowledge::synthesize_with_sampling(
186                                &config,
187                                &client,
188                                None,
189                                "mcp-auto-compile",
190                            ));
191                        }
192                    });
193                }
194            }
195            continue;
196        }
197
198        if let Some(response) = sync_dispatch_message(&mut state, config_path, daemon_bin, message)
199            && out_tx.send(response).is_err()
200        {
201            break;
202        }
203    }
204
205    // Drop every `out_tx` clone the main thread still owns so the
206    // writer's `out_rx.recv()` can observe EOF and the worker thread
207    // can exit. Without dropping `sampling_channel` here the writer
208    // would block forever and the child would never exit on stdin
209    // close — which the smoke harness measures as `wait()` hanging.
210    drop(out_tx);
211    drop(sampling_channel);
212    let _ = writer_handle.join();
213    let _ = reader_handle.join();
214    Ok(())
215}
216
217fn reader_thread(
218    in_tx: std::sync::mpsc::Sender<Value>,
219    out_tx: std::sync::mpsc::Sender<Value>,
220    sampling_channel: StdSamplingChannel,
221) {
222    use std::io::BufRead;
223    let stdin = std::io::stdin();
224    let mut locked = stdin.lock();
225    let mut line = String::new();
226    loop {
227        line.clear();
228        match locked.read_line(&mut line) {
229            Ok(0) => break,
230            Ok(_) => {}
231            Err(_) => break,
232        }
233        let trimmed = line.trim();
234        if trimmed.is_empty() {
235            continue;
236        }
237        let message = match serde_json::from_str::<Value>(trimmed) {
238            Ok(v) => v,
239            Err(error) => {
240                let _ = out_tx.send(jsonrpc_error(
241                    Value::Null,
242                    -32700,
243                    format!("parse error: {error}"),
244                ));
245                continue;
246            }
247        };
248        for item in unwrap_batch(message) {
249            if is_response_envelope(&item) {
250                if let Some(id_val) = item.get("id")
251                    && let Some(id_key) = json_id_to_key(id_val)
252                    && sampling_channel.route_response(&id_key, item)
253                {
254                    continue;
255                }
256                // stray response — drop quietly.
257                continue;
258            }
259            if in_tx.send(item).is_err() {
260                return;
261            }
262        }
263    }
264}
265
266fn writer_thread(out_rx: std::sync::mpsc::Receiver<Value>) {
267    use std::io::Write;
268    let stdout = std::io::stdout();
269    let mut locked = stdout.lock();
270    while let Ok(message) = out_rx.recv() {
271        let payload = match serde_json::to_vec(&message) {
272            Ok(bytes) => bytes,
273            Err(_) => continue,
274        };
275        if locked.write_all(&payload).is_err() {
276            break;
277        }
278        if locked.write_all(b"\n").is_err() {
279            break;
280        }
281        if locked.flush().is_err() {
282            break;
283        }
284    }
285}
286
287fn unwrap_batch(message: Value) -> Vec<Value> {
288    match message {
289        Value::Array(items) => items,
290        single => vec![single],
291    }
292}
293
294fn is_response_envelope(message: &Value) -> bool {
295    message.get("method").is_none()
296        && (message.get("result").is_some() || message.get("error").is_some())
297}
298
299fn json_id_to_key(id: &Value) -> Option<String> {
300    match id {
301        Value::String(s) => Some(s.clone()),
302        Value::Number(n) => Some(n.to_string()),
303        _ => None,
304    }
305}
306
307fn is_distill_pending_call(message: &Value) -> bool {
308    message.get("method").and_then(Value::as_str) == Some("tools/call")
309        && message
310            .get("params")
311            .and_then(|p| p.get("name"))
312            .and_then(Value::as_str)
313            == Some("memory_distill_pending")
314}
315
316fn is_crystallize_call(message: &Value) -> bool {
317    message.get("method").and_then(Value::as_str) == Some("tools/call")
318        && message
319            .get("params")
320            .and_then(|p| p.get("name"))
321            .and_then(Value::as_str)
322            == Some("memory_crystallize")
323}
324
325const LIFECYCLE_WRITE_TOOLS: &[&str] = &[
326    "memory_record_manual",
327    "memory_propose",
328    "memory_accept",
329    "memory_promote",
330    "memory_archive",
331];
332
333fn is_lifecycle_write_call(message: &Value) -> bool {
334    if message.get("method").and_then(Value::as_str) != Some("tools/call") {
335        return false;
336    }
337    message
338        .get("params")
339        .and_then(|p| p.get("name"))
340        .and_then(Value::as_str)
341        .is_some_and(|name| LIFECYCLE_WRITE_TOOLS.contains(&name))
342}
343
344fn sync_dispatch_message(
345    state: &mut ServerState,
346    config_path: &Path,
347    daemon_bin: Option<&Path>,
348    message: Value,
349) -> Option<Value> {
350    let id = message.get("id").cloned();
351    let method = message
352        .get("method")
353        .and_then(Value::as_str)
354        .map(str::to_string);
355    let Some(method) = method else {
356        return id.map(|id| jsonrpc_error(id, -32600, "invalid request"));
357    };
358    if id.is_none() {
359        handle_notification(state, &method, &message);
360        return None;
361    }
362    let id = id.unwrap();
363    match handle_request(
364        state,
365        config_path,
366        daemon_bin,
367        &method,
368        message.get("params"),
369    ) {
370        Ok(result) => Some(jsonrpc_result(id, result)),
371        Err(error) => Some(error.with_id(id)),
372    }
373}
374
375/// Synchronous dispatch helper used by unit tests. The production
376/// stdio loop drives [`sync_dispatch_message`] inline (and spawns
377/// the distill_pending handler separately so it can issue a real
378/// sampling reverse-call). This sync flavor never invokes sampling —
379/// tests that need the sampling path drive the binary through
380/// `mcp_smoke`.
381#[cfg(test)]
382fn process_message(
383    state: &mut ServerState,
384    config_path: &Path,
385    daemon_bin: Option<&Path>,
386    message: Value,
387) -> Option<Value> {
388    fn process_single(
389        state: &mut ServerState,
390        config_path: &Path,
391        daemon_bin: Option<&Path>,
392        message: Value,
393    ) -> Option<Value> {
394        let id = message.get("id").cloned();
395        let method = message
396            .get("method")
397            .and_then(Value::as_str)
398            .map(str::to_string);
399        let Some(method) = method else {
400            return id.map(|id| jsonrpc_error(id, -32600, "invalid request"));
401        };
402        if id.is_none() {
403            handle_notification(state, &method, &message);
404            return None;
405        }
406        let id = id.unwrap();
407        match handle_request(
408            state,
409            config_path,
410            daemon_bin,
411            &method,
412            message.get("params"),
413        ) {
414            Ok(result) => Some(jsonrpc_result(id, result)),
415            Err(error) => Some(error.with_id(id)),
416        }
417    }
418    match message {
419        Value::Array(items) => {
420            let responses: Vec<Value> = items
421                .into_iter()
422                .filter_map(|item| process_single(state, config_path, daemon_bin, item))
423                .collect();
424            if responses.is_empty() {
425                None
426            } else {
427                Some(Value::Array(responses))
428            }
429        }
430        value => process_single(state, config_path, daemon_bin, value),
431    }
432}
433
434fn handle_notification(state: &mut ServerState, method: &str, _message: &Value) {
435    if method == "notifications/initialized" {
436        state.initialized = true;
437    }
438}
439
440fn handle_request(
441    state: &mut ServerState,
442    config_path: &Path,
443    daemon_bin: Option<&Path>,
444    method: &str,
445    params: Option<&Value>,
446) -> Result<Value, JsonRpcError> {
447    match method {
448        "initialize" => {
449            state.initialized = true;
450            // Cache the client-declared capabilities so later
451            // handlers (specifically `memory_distill_pending`) can
452            // tell whether `sampling/createMessage` is available.
453            // Spec-compliant clients put them at
454            // `params.capabilities`; absent / non-object values are
455            // treated as "no capabilities advertised".
456            state.client_capabilities = params
457                .and_then(|p| p.get("capabilities").cloned())
458                .unwrap_or(Value::Null);
459            Ok(json!({
460                "protocolVersion": PROTOCOL_VERSION,
461                "serverInfo": {
462                    "name": "spool-mcp",
463                    "version": env!("CARGO_PKG_VERSION")
464                },
465                "capabilities": {
466                    "tools": {
467                        "listChanged": false
468                    },
469                    "prompts": {
470                        "listChanged": false
471                    },
472                    "resources": {
473                        "listChanged": false
474                    }
475                }
476            }))
477        }
478        "ping" => Ok(json!({})),
479        "tools/list" => {
480            require_initialized(state)?;
481            Ok(json!({ "tools": tool_definitions() }))
482        }
483        "tools/call" => {
484            require_initialized(state)?;
485            let params = params.ok_or_else(|| JsonRpcError::new(-32602, "missing params"))?;
486            handle_tool_call(config_path, daemon_bin, params)
487        }
488        "prompts/list" => {
489            require_initialized(state)?;
490            Ok(json!({ "prompts": prompt_definitions() }))
491        }
492        "prompts/get" => {
493            require_initialized(state)?;
494            let params = params.ok_or_else(|| JsonRpcError::new(-32602, "missing params"))?;
495            handle_prompt_get(params)
496        }
497        "resources/list" => {
498            require_initialized(state)?;
499            Ok(json!({ "resources": resource_definitions() }))
500        }
501        "resources/read" => {
502            require_initialized(state)?;
503            let params = params.ok_or_else(|| JsonRpcError::new(-32602, "missing params"))?;
504            handle_resource_read(config_path, params)
505        }
506        _ => Err(JsonRpcError::new(
507            -32601,
508            format!("method not found: {method}"),
509        )),
510    }
511}
512
513fn require_initialized(state: &ServerState) -> Result<(), JsonRpcError> {
514    if state.initialized {
515        Ok(())
516    } else {
517        Err(JsonRpcError::new(-32002, "server not initialized"))
518    }
519}
520
521fn handle_tool_call(
522    config_path: &Path,
523    daemon_bin: Option<&Path>,
524    params: &Value,
525) -> Result<Value, JsonRpcError> {
526    let params = required_object(params, "params")?;
527    let name = required_string_from_object(params, "name")?;
528    let arguments = optional_object_field(params, "arguments")?.unwrap_or_else(|| json!({}));
529    let service = LifecycleService::new();
530
531    match name.as_str() {
532        "prompt_optimize" => {
533            let request = parse_prompt_optimize_request(&arguments)?;
534            let response = memory_gateway::execute_prompt_optimize(
535                config_path,
536                memory_gateway::prompt_optimize_request(
537                    RouteInput {
538                        task: request.task,
539                        cwd: PathBuf::from(request.cwd),
540                        files: request.files,
541                        target: request.target,
542                        format: OutputFormat::Prompt,
543                    },
544                    request.profile,
545                    request.provider,
546                    request.session_id,
547                    true,
548                ),
549                None,
550            )
551            .map_err(tool_runtime_error)?;
552
553            Ok(tool_success(
554                "Prompt optimization bundle generated.",
555                json!(response),
556            ))
557        }
558        "memory_search" => {
559            let request = parse_route_request(&arguments, OutputFormat::Prompt)?;
560            let result = app::run(config_path, request.input, Some(request.format))
561                .map_err(tool_runtime_error)?;
562            Ok(tool_success(
563                "Context bundle loaded.",
564                json!({
565                    "rendered": result.rendered,
566                    "explain": result.explain,
567                    "bundle": result.bundle,
568                    "used_format": result.used_format.as_str(),
569                    "used_vault_root": result.used_vault_root
570                }),
571            ))
572        }
573        "memory_explain" => {
574            let request = parse_route_request(&arguments, OutputFormat::Markdown)?;
575            let result = app::run(config_path, request.input, Some(request.format))
576                .map_err(tool_runtime_error)?;
577            Ok(tool_success(
578                "Route explanation loaded.",
579                json!({
580                    "explain": result.explain,
581                    "bundle": result.bundle,
582                    "used_vault_root": result.used_vault_root
583                }),
584            ))
585        }
586        "memory_wakeup" => {
587            let request = parse_wakeup_request(&arguments)?;
588            let response = memory_gateway::execute(
589                config_path,
590                wakeup_request(request.input, request.profile),
591                None,
592            )
593            .map_err(tool_runtime_error)?;
594            let packet = response
595                .wakeup_packet()
596                .cloned()
597                .ok_or_else(|| JsonRpcError::new(-32603, "missing wakeup packet"))?;
598            Ok(tool_success(
599                "Wakeup packet loaded.",
600                json!({
601                    "rendered": output::wakeup::render(&packet, request.format),
602                    "packet": packet,
603                    "bundle": response.bundle,
604                    "used_vault_root": response.used_vault_root
605                }),
606            ))
607        }
608        "memory_review_queue" => {
609            let snapshot = read_workbench(config_path, &lifecycle_read_options(daemon_bin))
610                .map_err(tool_runtime_error)?;
611            Ok(tool_success(
612                "Pending review queue loaded.",
613                lifecycle_summary::queue_payload(&snapshot.pending_review, "pending_review"),
614            ))
615        }
616        "memory_wakeup_ready" => {
617            let snapshot = read_workbench(config_path, &lifecycle_read_options(daemon_bin))
618                .map_err(tool_runtime_error)?;
619            Ok(tool_success(
620                "Wakeup-ready queue loaded.",
621                lifecycle_summary::queue_payload(&snapshot.wakeup_ready, "wakeup_ready"),
622            ))
623        }
624        "memory_get" => {
625            let record_id = required_string(&arguments, "record_id")?;
626            match read_record(config_path, &record_id, &lifecycle_read_options(daemon_bin))
627                .map_err(tool_runtime_error)?
628            {
629                Some(entry) => Ok(tool_success(
630                    "Memory record loaded.",
631                    lifecycle_summary::record_payload(&entry),
632                )),
633                None => Ok(tool_failure(
634                    &format!("memory record not found: {record_id}"),
635                    lifecycle_summary::not_found_payload(&record_id),
636                )),
637            }
638        }
639        "memory_history" => {
640            let record_id = required_string(&arguments, "record_id")?;
641            let history =
642                read_history(config_path, &record_id, &lifecycle_read_options(daemon_bin))
643                    .map_err(tool_runtime_error)?;
644            if history.is_empty() {
645                Ok(tool_failure(
646                    &format!("memory history not found: {record_id}"),
647                    lifecycle_summary::not_found_payload(&record_id),
648                ))
649            } else {
650                Ok(tool_success(
651                    "Memory history loaded.",
652                    lifecycle_summary::history_payload(&record_id, &history),
653                ))
654            }
655        }
656        "memory_record_manual" => {
657            let result = service
658                .record_manual(config_path, parse_record_request(&arguments)?)
659                .map_err(tool_runtime_error)?;
660            crate::vault_writer::writeback_from_config_no_compile(config_path, &result.entry);
661            mcp_embedding_auto_append(config_path, &result.entry);
662            Ok(tool_success(
663                "Manual memory recorded.",
664                lifecycle_summary::create_payload("record_manual", &result.entry, &result.snapshot),
665            ))
666        }
667        "memory_propose" => {
668            let result = service
669                .propose_ai(config_path, parse_propose_request(&arguments)?)
670                .map_err(tool_runtime_error)?;
671            crate::vault_writer::writeback_from_config_no_compile(config_path, &result.entry);
672            Ok(tool_success(
673                "AI memory proposed.",
674                lifecycle_summary::create_payload("propose", &result.entry, &result.snapshot),
675            ))
676        }
677        "memory_accept" => {
678            execute_action_tool(service, config_path, &arguments, LifecycleAction::Accept)
679        }
680        "memory_promote" => execute_action_tool(
681            service,
682            config_path,
683            &arguments,
684            LifecycleAction::PromoteToCanonical,
685        ),
686        "memory_archive" => {
687            execute_action_tool(service, config_path, &arguments, LifecycleAction::Archive)
688        }
689        "memory_import_session" => execute_import_session_tool(config_path, &arguments),
690        "memory_sync_vault" => execute_sync_vault_tool(config_path, &arguments),
691        "memory_distill_pending" => execute_distill_pending_tool(config_path, &arguments),
692        "memory_check_contradictions" => execute_check_contradictions_tool(config_path, &arguments),
693        "memory_staleness_report" => execute_staleness_report_tool(config_path),
694        "memory_import_git" => execute_import_git_tool(config_path, &arguments),
695        "memory_dedup_suggestions" => execute_dedup_suggestions_tool(config_path),
696        "memory_consolidate" => execute_consolidate_tool(config_path, &arguments),
697        "memory_prune" => execute_prune_tool(config_path, &arguments),
698        "memory_crystallize" => execute_crystallize_tool(config_path, &arguments),
699        "memory_lint" => execute_lint_tool(config_path),
700        _ => Err(JsonRpcError::new(-32601, format!("tool not found: {name}"))),
701    }
702}
703
704/// `memory_distill_pending` tool — drains the post-tool-use queue,
705/// scans the session transcript with Tier 1 heuristics (self-tag +
706/// incident extraction), and writes accepted / candidate records via
707/// the lifecycle ledger.
708///
709/// **Synchronous fallback** path. Used by:
710/// - the unit-test handler which has no live tokio runtime;
711/// - any future surface that wants distill semantics without
712///   sampling support.
713///
714/// The MCP stdio dispatcher routes `tools/call name=memory_distill_pending`
715/// requests to [`execute_distill_pending_tool_async`] instead, so
716/// real client sessions get the sampling reverse-call path.
717fn execute_distill_pending_tool(
718    config_path: &Path,
719    arguments: &Value,
720) -> Result<Value, JsonRpcError> {
721    use crate::distill::pipeline;
722
723    let request = parse_distill_request(config_path, arguments)?;
724    let report = pipeline::run(request).map_err(tool_runtime_error)?;
725    Ok(distill_report_response(report))
726}
727
728/// Async variant of [`execute_distill_pending_tool`] that drives a
729/// real `sampling/createMessage` reverse-call when the connected
730/// client advertised the capability. The pipeline still runs Tier 1
731/// underneath so an unhelpful or missing LLM response can never
732/// suppress the baseline heuristic signal — see PRD §C.13.
733async fn execute_distill_pending_tool_async(
734    config_path: &Path,
735    arguments: &Value,
736    sampling: &(dyn SamplingClient + Send),
737) -> Result<Value, JsonRpcError> {
738    use crate::distill::pipeline;
739
740    let request = parse_distill_request(config_path, arguments)?;
741    let report = pipeline::run_with_sampling(request, sampling)
742        .await
743        .map_err(tool_runtime_error)?;
744    Ok(distill_report_response(report))
745}
746
747fn execute_dedup_suggestions_tool(config_path: &Path) -> Result<Value, JsonRpcError> {
748    use crate::lifecycle_store::{
749        LifecycleStore, lifecycle_root_from_config, wakeup_ready_entries,
750    };
751
752    let config_dir = config_path.parent().unwrap_or_else(|| Path::new("."));
753    let lifecycle_root = lifecycle_root_from_config(config_dir);
754    let store = LifecycleStore::new(lifecycle_root.as_path());
755    let entries = wakeup_ready_entries(&store).map_err(tool_runtime_error)?;
756    let records: Vec<(String, crate::domain::MemoryRecord)> = entries
757        .into_iter()
758        .map(|e| (e.record_id, e.record))
759        .collect();
760
761    let suggestions = crate::contradiction::find_duplicates(&records, 0.5);
762
763    Ok(json!({
764        "suggestions": suggestions,
765        "checked": records.len(),
766    }))
767}
768
769fn execute_consolidate_tool(config_path: &Path, arguments: &Value) -> Result<Value, JsonRpcError> {
770    use crate::knowledge::cluster as consolidation;
771
772    let dry_run = arguments
773        .get("dry_run")
774        .and_then(Value::as_bool)
775        .unwrap_or(true);
776
777    let entries = consolidation::load_entries(config_path).map_err(tool_runtime_error)?;
778    let suggestions = consolidation::detect_consolidation_candidates(&entries);
779
780    if dry_run {
781        Ok(tool_success(
782            &format!(
783                "Consolidation check: {} cluster(s) found.",
784                suggestions.len()
785            ),
786            json!({
787                "dry_run": true,
788                "suggestions": suggestions,
789            }),
790        ))
791    } else {
792        let mut results = Vec::new();
793        for s in &suggestions {
794            let result = consolidation::apply_consolidation(config_path, s, &entries)
795                .map_err(tool_runtime_error)?;
796            results.push(json!({
797                "merged_record_id": result.merged_record_id,
798                "archived_record_ids": result.archived_record_ids,
799            }));
800        }
801        Ok(tool_success(
802            &format!("Consolidated {} cluster(s).", results.len()),
803            json!({
804                "dry_run": false,
805                "applied": results,
806            }),
807        ))
808    }
809}
810
811fn execute_prune_tool(config_path: &Path, arguments: &Value) -> Result<Value, JsonRpcError> {
812    use crate::knowledge::cluster as consolidation;
813
814    let dry_run = arguments
815        .get("dry_run")
816        .and_then(Value::as_bool)
817        .unwrap_or(true);
818
819    let entries = consolidation::load_entries(config_path).map_err(tool_runtime_error)?;
820    let lifecycle_root = consolidation::resolve_lifecycle_root(config_path);
821    let suggestions = consolidation::detect_prune_candidates(&entries, &lifecycle_root);
822
823    if dry_run {
824        Ok(tool_success(
825            &format!("Prune check: {} record(s) to archive.", suggestions.len()),
826            json!({
827                "dry_run": true,
828                "suggestions": suggestions,
829            }),
830        ))
831    } else {
832        let result =
833            consolidation::apply_prune(config_path, &suggestions).map_err(tool_runtime_error)?;
834        Ok(tool_success(
835            &format!("Pruned {} record(s).", result.archived_record_ids.len()),
836            json!({
837                "dry_run": false,
838                "archived_record_ids": result.archived_record_ids,
839            }),
840        ))
841    }
842}
843
844/// `memory_crystallize` tool — synchronous fallback (no sampling).
845/// Used by the unit-test handler and any surface without a live tokio
846/// runtime. The MCP stdio dispatcher routes crystallize calls to
847/// [`execute_crystallize_tool_async`] instead for real sampling.
848fn execute_crystallize_tool(config_path: &Path, arguments: &Value) -> Result<Value, JsonRpcError> {
849    use crate::knowledge;
850
851    let topic = arguments.get("topic").and_then(Value::as_str);
852    let dry_run = arguments
853        .get("dry_run")
854        .and_then(Value::as_bool)
855        .unwrap_or(false);
856
857    if dry_run {
858        let drafts =
859            knowledge::detect_knowledge_clusters(config_path).map_err(tool_runtime_error)?;
860        let filtered: Vec<&knowledge::KnowledgePageDraft> = if let Some(topic) = topic {
861            let topic_lower = topic.to_lowercase();
862            drafts
863                .iter()
864                .filter(|d| {
865                    d.entities
866                        .iter()
867                        .any(|e| e.to_lowercase().contains(&topic_lower))
868                        || d.tags
869                            .iter()
870                            .any(|t| t.to_lowercase().contains(&topic_lower))
871                        || d.title.to_lowercase().contains(&topic_lower)
872                })
873                .collect()
874        } else {
875            drafts.iter().collect()
876        };
877        return Ok(tool_success(
878            &format!(
879                "Knowledge crystallize dry-run: {} cluster(s) detected.",
880                filtered.len()
881            ),
882            json!({
883                "dry_run": true,
884                "clusters": filtered.len(),
885                "drafts": filtered.iter().map(|d| json!({
886                    "title": d.title,
887                    "domain": d.domain,
888                    "tags": d.tags,
889                    "entities": d.entities,
890                    "source_count": d.source_record_ids.len(),
891                })).collect::<Vec<_>>(),
892            }),
893        ));
894    }
895
896    // Sync path: template synthesis only (no sampling available)
897    let drafts = knowledge::detect_knowledge_clusters(config_path).map_err(tool_runtime_error)?;
898    let filtered: Vec<knowledge::KnowledgePageDraft> = if let Some(topic) = topic {
899        let topic_lower = topic.to_lowercase();
900        drafts
901            .into_iter()
902            .filter(|d| {
903                d.entities
904                    .iter()
905                    .any(|e| e.to_lowercase().contains(&topic_lower))
906                    || d.tags
907                        .iter()
908                        .any(|t| t.to_lowercase().contains(&topic_lower))
909                    || d.title.to_lowercase().contains(&topic_lower)
910            })
911            .collect()
912    } else {
913        drafts
914    };
915
916    if filtered.is_empty() {
917        return Ok(tool_success(
918            "No clusters found for crystallization.",
919            json!({
920                "pages_created": 0,
921                "sampling_used": false,
922                "fallback_reason": "no clusters found",
923            }),
924        ));
925    }
926
927    let persisted_ids = knowledge::apply_distill(config_path, &filtered, "mcp-memory-crystallize")
928        .map_err(tool_runtime_error)?;
929
930    Ok(tool_success(
931        &format!(
932            "Crystallized {} knowledge page(s) (template synthesis).",
933            persisted_ids.len()
934        ),
935        json!({
936            "pages_created": persisted_ids.len(),
937            "persisted_ids": persisted_ids,
938            "sampling_used": false,
939            "fallback_reason": "sync path (no sampling)",
940        }),
941    ))
942}
943
944/// Async variant of [`execute_crystallize_tool`] that drives LLM
945/// synthesis via `sampling/createMessage` when the connected client
946/// advertised the capability. Falls back to template synthesis when
947/// sampling is unavailable or fails.
948async fn execute_crystallize_tool_async(
949    config_path: &Path,
950    arguments: &Value,
951    sampling: &(dyn SamplingClient + Send),
952) -> Result<Value, JsonRpcError> {
953    use crate::knowledge;
954
955    let topic = arguments.get("topic").and_then(Value::as_str);
956    let dry_run = arguments
957        .get("dry_run")
958        .and_then(Value::as_bool)
959        .unwrap_or(false);
960
961    if dry_run {
962        let drafts =
963            knowledge::detect_knowledge_clusters(config_path).map_err(tool_runtime_error)?;
964        let filtered: Vec<&knowledge::KnowledgePageDraft> = if let Some(topic) = topic {
965            let topic_lower = topic.to_lowercase();
966            drafts
967                .iter()
968                .filter(|d| {
969                    d.entities
970                        .iter()
971                        .any(|e| e.to_lowercase().contains(&topic_lower))
972                        || d.tags
973                            .iter()
974                            .any(|t| t.to_lowercase().contains(&topic_lower))
975                        || d.title.to_lowercase().contains(&topic_lower)
976                })
977                .collect()
978        } else {
979            drafts.iter().collect()
980        };
981        return Ok(tool_success(
982            &format!(
983                "Knowledge crystallize dry-run: {} cluster(s) detected.",
984                filtered.len()
985            ),
986            json!({
987                "dry_run": true,
988                "clusters": filtered.len(),
989                "drafts": filtered.iter().map(|d| json!({
990                    "title": d.title,
991                    "domain": d.domain,
992                    "tags": d.tags,
993                    "entities": d.entities,
994                    "source_count": d.source_record_ids.len(),
995                })).collect::<Vec<_>>(),
996            }),
997        ));
998    }
999
1000    let result =
1001        knowledge::synthesize_with_sampling(config_path, sampling, topic, "mcp-memory-crystallize")
1002            .await
1003            .map_err(tool_runtime_error)?;
1004
1005    let summary_text = if result.sampling_used {
1006        format!(
1007            "Crystallized {} knowledge page(s) via LLM synthesis.",
1008            result.pages_created
1009        )
1010    } else {
1011        format!(
1012            "Crystallized {} knowledge page(s) (template fallback: {}).",
1013            result.pages_created,
1014            result.fallback_reason.as_deref().unwrap_or("unknown")
1015        )
1016    };
1017
1018    Ok(tool_success(
1019        &summary_text,
1020        json!({
1021            "pages_created": result.pages_created,
1022            "persisted_ids": result.persisted_ids,
1023            "sampling_used": result.sampling_used,
1024            "fallback_reason": result.fallback_reason,
1025            "drafts": result.drafts.iter().map(|d| json!({
1026                "title": d.title,
1027                "domain": d.domain,
1028                "tags": d.tags,
1029                "entities": d.entities,
1030                "source_count": d.source_record_ids.len(),
1031            })).collect::<Vec<_>>(),
1032        }),
1033    ))
1034}
1035
1036fn execute_import_git_tool(config_path: &Path, arguments: &Value) -> Result<Value, JsonRpcError> {
1037    let cwd = arguments
1038        .get("cwd")
1039        .and_then(Value::as_str)
1040        .map(std::path::PathBuf::from)
1041        .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
1042    let limit = arguments.get("limit").and_then(Value::as_u64).unwrap_or(30) as usize;
1043    let dry_run = arguments
1044        .get("dry_run")
1045        .and_then(Value::as_bool)
1046        .unwrap_or(false);
1047
1048    let report = crate::git_importer::import_git_activity(config_path, &cwd, limit, dry_run)
1049        .map_err(tool_runtime_error)?;
1050
1051    Ok(json!({
1052        "commits_scanned": report.commits_scanned,
1053        "candidates_found": report.candidates_found,
1054        "candidates_persisted": report.candidates_persisted.len(),
1055        "candidates_duplicate_dropped": report.candidates_duplicate_dropped,
1056        "dry_run": dry_run,
1057    }))
1058}
1059
1060fn execute_staleness_report_tool(config_path: &Path) -> Result<Value, JsonRpcError> {
1061    use crate::lifecycle_store::{
1062        LifecycleStore, lifecycle_root_from_config, wakeup_ready_entries,
1063    };
1064    use crate::reference_tracker;
1065
1066    let config_dir = config_path.parent().unwrap_or_else(|| Path::new("."));
1067    let lifecycle_root = lifecycle_root_from_config(config_dir);
1068    let store = LifecycleStore::new(lifecycle_root.as_path());
1069    let ref_map = reference_tracker::read(&lifecycle_root);
1070
1071    let entries = wakeup_ready_entries(&store).map_err(tool_runtime_error)?;
1072    let mut stale: Vec<Value> = Vec::new();
1073    let mut fresh: usize = 0;
1074    let mut never_referenced: usize = 0;
1075
1076    for entry in &entries {
1077        let age = ref_map
1078            .records
1079            .get(&entry.record_id)
1080            .and_then(reference_tracker::age_days);
1081        let penalty = reference_tracker::staleness_penalty(age);
1082        match age {
1083            None => never_referenced += 1,
1084            Some(_days) if penalty >= 0 => fresh += 1,
1085            Some(days) => {
1086                stale.push(json!({
1087                    "record_id": entry.record_id,
1088                    "title": entry.record.title,
1089                    "memory_type": entry.record.memory_type,
1090                    "days_since_reference": days,
1091                    "penalty": penalty,
1092                }));
1093            }
1094        }
1095    }
1096
1097    stale.sort_by(|a, b| {
1098        b["days_since_reference"]
1099            .as_u64()
1100            .cmp(&a["days_since_reference"].as_u64())
1101    });
1102
1103    Ok(json!({
1104        "total_wakeup_ready": entries.len(),
1105        "fresh": fresh,
1106        "never_referenced": never_referenced,
1107        "stale_count": stale.len(),
1108        "stale": stale,
1109    }))
1110}
1111
1112fn execute_lint_tool(config_path: &Path) -> Result<Value, JsonRpcError> {
1113    let report = crate::wiki_lint::run_lint_from_config(config_path).map_err(tool_runtime_error)?;
1114    let markdown = crate::wiki_lint::render_lint_markdown(&report);
1115    let mut value = serde_json::to_value(&report).map_err(|err| {
1116        JsonRpcError::new(-32603, format!("failed to serialize lint report: {err}"))
1117    })?;
1118    if let Some(obj) = value.as_object_mut() {
1119        obj.insert("markdown".to_string(), Value::String(markdown));
1120    }
1121    Ok(value)
1122}
1123
1124fn execute_check_contradictions_tool(
1125    config_path: &Path,
1126    arguments: &Value,
1127) -> Result<Value, JsonRpcError> {
1128    use crate::lifecycle_store::{LifecycleStore, lifecycle_root_from_config};
1129
1130    let record_id = required_string(arguments, "record_id")?;
1131    let config_dir = config_path.parent().unwrap_or_else(|| Path::new("."));
1132    let lifecycle_root = lifecycle_root_from_config(config_dir);
1133    let store = LifecycleStore::new(lifecycle_root.as_path());
1134
1135    let target_entry = crate::lifecycle_store::read_events_for_record(&store, &record_id)
1136        .map_err(tool_runtime_error)?
1137        .into_iter()
1138        .last()
1139        .ok_or_else(|| JsonRpcError::new(-32602, format!("record not found: {record_id}")))?;
1140
1141    let existing: Vec<(String, crate::domain::MemoryRecord)> =
1142        crate::lifecycle_store::wakeup_ready_entries(&store)
1143            .map_err(tool_runtime_error)?
1144            .into_iter()
1145            .filter(|e| e.record_id != record_id)
1146            .map(|e| (e.record_id, e.record))
1147            .collect();
1148
1149    let hits = crate::contradiction::detect(
1150        &target_entry.record.summary,
1151        &target_entry.record.memory_type,
1152        &existing,
1153    );
1154
1155    Ok(json!({
1156        "record_id": record_id,
1157        "contradictions": hits,
1158        "checked_against": existing.len(),
1159    }))
1160}
1161
1162fn parse_distill_request(
1163    config_path: &Path,
1164    arguments: &Value,
1165) -> Result<DistillRequest, JsonRpcError> {
1166    let cwd_str = arguments
1167        .get("cwd")
1168        .and_then(Value::as_str)
1169        .ok_or_else(|| JsonRpcError::new(-32602, "missing 'cwd' (absolute path)"))?;
1170    let cwd = PathBuf::from(cwd_str);
1171    if !cwd.is_absolute() {
1172        return Err(JsonRpcError::new(
1173            -32602,
1174            format!("cwd must be absolute, got: {cwd_str}"),
1175        ));
1176    }
1177
1178    let transcript_path = arguments
1179        .get("transcript_path")
1180        .and_then(Value::as_str)
1181        .map(PathBuf::from);
1182
1183    Ok(
1184        DistillRequest::new(config_path.to_path_buf(), cwd, transcript_path)
1185            .with_actor("mcp-memory-distill-pending")
1186            .with_source_refs("mcp:distill:self-tag", "mcp:distill:extraction"),
1187    )
1188}
1189
1190fn distill_report_response(report: crate::distill::pipeline::DistillReport) -> Value {
1191    let summary_text = format!(
1192        "Distill pending complete ({}): {} accepted, {} candidates, {} queued signals drained.",
1193        report.fallback_used,
1194        report.signals_persisted.len(),
1195        report.candidates_persisted.len(),
1196        report.queue_drained,
1197    );
1198    let report_json = serde_json::to_value(&report).unwrap_or_else(|_| json!({}));
1199    tool_success(&summary_text, report_json)
1200}
1201
1202fn execute_import_session_tool(
1203    config_path: &Path,
1204    arguments: &Value,
1205) -> Result<Value, JsonRpcError> {
1206    use crate::memory_importer::{ImportProvider, import_session};
1207
1208    let provider_str = arguments
1209        .get("provider")
1210        .and_then(Value::as_str)
1211        .ok_or_else(|| JsonRpcError::new(-32602, "missing 'provider' (claude | codex)"))?;
1212    let provider = ImportProvider::parse(provider_str)
1213        .map_err(|err| JsonRpcError::new(-32602, err.to_string()))?;
1214    let session_id = arguments
1215        .get("session_id")
1216        .and_then(Value::as_str)
1217        .ok_or_else(|| JsonRpcError::new(-32602, "missing 'session_id'"))?;
1218    let apply = arguments
1219        .get("apply")
1220        .and_then(Value::as_bool)
1221        .unwrap_or(false);
1222    let actor = arguments
1223        .get("actor")
1224        .and_then(Value::as_str)
1225        .map(|s| s.to_string());
1226
1227    let response = import_session(config_path, provider, session_id, apply, actor)
1228        .map_err(tool_runtime_error)?;
1229    let payload = serde_json::to_value(&response)
1230        .map_err(|err| JsonRpcError::new(-32603, format!("serialize import response: {err}")))?;
1231    let text = if response.applied {
1232        format!(
1233            "Imported {} candidate(s) from {} and applied to ledger.",
1234            response.candidate_count, response.session_ref
1235        )
1236    } else {
1237        format!(
1238            "Previewed {} candidate(s) from {} (dry run, use apply=true to write).",
1239            response.candidate_count, response.session_ref
1240        )
1241    };
1242    Ok(tool_success(&text, payload))
1243}
1244
1245fn execute_sync_vault_tool(config_path: &Path, arguments: &Value) -> Result<Value, JsonRpcError> {
1246    use crate::domain::MemoryLifecycleState;
1247    use crate::lifecycle_store::{
1248        LifecycleStore, latest_state_entries, lifecycle_root_from_config,
1249    };
1250    use crate::vault_writer::{self, WriteStatus, memory_note_path};
1251
1252    let dry_run = arguments
1253        .get("dry_run")
1254        .and_then(Value::as_bool)
1255        .unwrap_or(false);
1256    let enrich = arguments
1257        .get("enrich")
1258        .and_then(Value::as_bool)
1259        .unwrap_or(false);
1260
1261    let config = crate::app::load(config_path).map_err(tool_runtime_error)?;
1262    let vault_root = crate::app::resolve_override_path(&config.vault.root, config_path)
1263        .map_err(tool_runtime_error)?;
1264    let config_dir = config_path.parent().unwrap_or_else(|| Path::new("."));
1265    let lifecycle_root = lifecycle_root_from_config(config_dir);
1266    let store = LifecycleStore::new(lifecycle_root.as_path());
1267    let entries = latest_state_entries(&store).map_err(tool_runtime_error)?;
1268
1269    if enrich {
1270        return execute_enrich_tool(&entries, vault_root.as_path(), dry_run);
1271    }
1272
1273    let mut counters: std::collections::HashMap<&'static str, u64> =
1274        std::collections::HashMap::new();
1275    let mut errors: Vec<(String, String)> = Vec::new();
1276    let bump = |counters: &mut std::collections::HashMap<&'static str, u64>, key: &'static str| {
1277        *counters.entry(key).or_insert(0) += 1;
1278    };
1279
1280    for entry in &entries {
1281        match entry.record.state {
1282            MemoryLifecycleState::Accepted | MemoryLifecycleState::Canonical => {
1283                if dry_run {
1284                    let path = memory_note_path(vault_root.as_path(), &entry.record_id);
1285                    if path.exists() {
1286                        bump(&mut counters, "would_update");
1287                    } else {
1288                        bump(&mut counters, "would_create");
1289                    }
1290                    continue;
1291                }
1292                match vault_writer::write_memory_note(
1293                    vault_root.as_path(),
1294                    &entry.record_id,
1295                    &entry.record,
1296                ) {
1297                    Ok(result) => match result.status {
1298                        WriteStatus::Created => bump(&mut counters, "created"),
1299                        WriteStatus::UpdatedAll => bump(&mut counters, "updated_all"),
1300                        WriteStatus::UpdatedPreserveBody => {
1301                            bump(&mut counters, "updated_preserve_body")
1302                        }
1303                        WriteStatus::Unchanged => bump(&mut counters, "unchanged"),
1304                    },
1305                    Err(error) => errors.push((entry.record_id.clone(), error.to_string())),
1306                }
1307            }
1308            MemoryLifecycleState::Archived => {
1309                if dry_run {
1310                    let path = memory_note_path(vault_root.as_path(), &entry.record_id);
1311                    if path.exists() {
1312                        bump(&mut counters, "would_archive");
1313                    } else {
1314                        bump(&mut counters, "skipped_missing");
1315                    }
1316                    continue;
1317                }
1318                match vault_writer::archive_memory_note(vault_root.as_path(), &entry.record_id) {
1319                    Ok(Some(result)) => match result.status {
1320                        WriteStatus::Unchanged => bump(&mut counters, "unchanged"),
1321                        _ => bump(&mut counters, "archived"),
1322                    },
1323                    Ok(None) => bump(&mut counters, "skipped_missing"),
1324                    Err(error) => errors.push((entry.record_id.clone(), error.to_string())),
1325                }
1326            }
1327            MemoryLifecycleState::Draft | MemoryLifecycleState::Candidate => {
1328                bump(&mut counters, "skipped_draft_or_candidate")
1329            }
1330        }
1331    }
1332
1333    let counters_json: serde_json::Map<String, Value> = counters
1334        .into_iter()
1335        .map(|(k, v)| (k.to_string(), json!(v)))
1336        .collect();
1337    let errors_json: Vec<Value> = errors
1338        .iter()
1339        .map(|(id, msg)| json!({ "record_id": id, "error": msg }))
1340        .collect();
1341    let payload = json!({
1342        "vault_root": vault_root.display().to_string(),
1343        "ledger_records": entries.len(),
1344        "dry_run": dry_run,
1345        "counters": Value::Object(counters_json),
1346        "errors": errors_json,
1347    });
1348    let text = if dry_run {
1349        "Vault sync preview generated."
1350    } else {
1351        "Vault sync completed."
1352    };
1353    Ok(tool_success(text, payload))
1354}
1355
1356fn execute_enrich_tool(
1357    entries: &[crate::lifecycle_store::LedgerEntry],
1358    vault_root: &Path,
1359    dry_run: bool,
1360) -> Result<Value, JsonRpcError> {
1361    use crate::domain::MemoryLifecycleState;
1362    use crate::enrich;
1363    use crate::vault_writer;
1364
1365    let mut enriched_count = 0_u64;
1366    let mut skipped_count = 0_u64;
1367    let mut enriched_records: Vec<Value> = Vec::new();
1368
1369    for entry in entries {
1370        if !matches!(
1371            entry.record.state,
1372            MemoryLifecycleState::Accepted | MemoryLifecycleState::Canonical
1373        ) {
1374            continue;
1375        }
1376
1377        let patch = enrich::enrich_record(&entry.record);
1378        if patch.is_empty() {
1379            skipped_count += 1;
1380            continue;
1381        }
1382
1383        enriched_records.push(json!({
1384            "record_id": entry.record_id,
1385            "title": entry.record.title,
1386            "entities": patch.entities,
1387            "tags": patch.tags,
1388            "triggers": patch.triggers,
1389        }));
1390
1391        if !dry_run {
1392            let mut enriched_record = entry.record.clone();
1393            if enriched_record.entities.is_empty() {
1394                enriched_record.entities = patch.entities.clone();
1395            }
1396            if enriched_record.tags.is_empty() {
1397                enriched_record.tags = patch.tags.clone();
1398            }
1399            if enriched_record.triggers.is_empty() {
1400                enriched_record.triggers = patch.triggers.clone();
1401            }
1402            if let Err(error) =
1403                vault_writer::write_memory_note(vault_root, &entry.record_id, &enriched_record)
1404            {
1405                eprintln!(
1406                    "[spool] enrich writeback failed for {}: {error}",
1407                    entry.record_id
1408                );
1409            }
1410        }
1411
1412        enriched_count += 1;
1413    }
1414
1415    let payload = json!({
1416        "dry_run": dry_run,
1417        "enriched": enriched_count,
1418        "skipped": skipped_count,
1419        "records": enriched_records,
1420    });
1421    let text = if dry_run {
1422        format!("Enrich preview: {enriched_count} records would be enriched.")
1423    } else {
1424        format!("Enriched {enriched_count} records.")
1425    };
1426    Ok(tool_success(&text, payload))
1427}
1428
1429fn lifecycle_read_options(daemon_bin: Option<&Path>) -> LifecycleReadOptions {
1430    daemon_bin
1431        .map(LifecycleReadOptions::with_daemon)
1432        .unwrap_or_default()
1433}
1434
1435fn execute_action_tool(
1436    service: LifecycleService,
1437    config_path: &Path,
1438    arguments: &Value,
1439    action: LifecycleAction,
1440) -> Result<Value, JsonRpcError> {
1441    let record_id = required_string(arguments, "record_id")?;
1442    match service.apply_action_with_metadata(
1443        config_path,
1444        &record_id,
1445        action,
1446        parse_metadata(arguments)?,
1447    ) {
1448        Ok(result) => {
1449            crate::vault_writer::writeback_from_config_no_compile(config_path, &result.entry);
1450            mcp_embedding_auto_append(config_path, &result.entry);
1451            Ok(tool_success(
1452                &format!("Lifecycle action {} applied.", action.label()),
1453                lifecycle_summary::action_payload(&result.entry, &result.snapshot, action),
1454            ))
1455        }
1456        Err(error) => Ok(tool_failure(
1457            &error.to_string(),
1458            json!({ "record_id": record_id, "action": action.label() }),
1459        )),
1460    }
1461}
1462
1463fn mcp_embedding_auto_append(config_path: &Path, entry: &crate::lifecycle_store::LedgerEntry) {
1464    #[cfg(feature = "embedding")]
1465    {
1466        use crate::domain::MemoryLifecycleState;
1467        if !matches!(
1468            entry.record.state,
1469            MemoryLifecycleState::Accepted | MemoryLifecycleState::Canonical
1470        ) {
1471            return;
1472        }
1473        if let Ok(config) = crate::config::load_from_path(config_path) {
1474            crate::engine::embedding::try_append_record(
1475                &config.embedding,
1476                &entry.record_id,
1477                &entry.record,
1478            );
1479        }
1480    }
1481    #[cfg(not(feature = "embedding"))]
1482    {
1483        let _ = (config_path, entry);
1484    }
1485}
1486
1487#[cfg(test)]
1488mod tests {
1489    use super::parse::{parse_files, parse_metadata};
1490    use super::schemas::{RESOURCE_CURRENT_PLAN_URI, RESOURCE_SESSION_HANDOFF_URI};
1491    use super::{
1492        ServerState, handle_prompt_get, handle_request, handle_resource_read, handle_tool_call,
1493        process_message, prompt_definitions, resource_definitions, tool_definitions,
1494    };
1495    use crate::daemon_client::{
1496        daemon_session_pid_for_test, daemon_test_lock_for_test, kill_daemon_session_for_test,
1497        reset_daemon_sessions,
1498    };
1499    use crate::enhancement_trace::read_latest_prompt_optimize_trace;
1500    use crate::lifecycle_service::LifecycleService;
1501    use assert_cmd::cargo::cargo_bin;
1502    use serde_json::json;
1503    use std::fs;
1504    use tempfile::tempdir;
1505
1506    fn setup_config_path() -> (tempfile::TempDir, std::path::PathBuf) {
1507        let temp = tempdir().unwrap();
1508        let docs_dir = temp.path().join("docs");
1509        fs::create_dir_all(&docs_dir).unwrap();
1510        fs::write(
1511            docs_dir.join("MCP_PROMPTS_ROUND_8_PLAN.md"),
1512            "Round 8 test doc",
1513        )
1514        .unwrap();
1515        fs::write(
1516            docs_dir.join("SESSION_HANDOFF.md"),
1517            "Session handoff test doc",
1518        )
1519        .unwrap();
1520        let config_path = temp.path().join("spool.toml");
1521        fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
1522        (temp, config_path)
1523    }
1524
1525    #[test]
1526    fn tools_list_should_return_all_lifecycle_tools() {
1527        let (_temp, config_path) = setup_config_path();
1528        let mut state = ServerState {
1529            initialized: true,
1530            ..Default::default()
1531        };
1532        let result =
1533            handle_request(&mut state, config_path.as_path(), None, "tools/list", None).unwrap();
1534        let tools = result["tools"].as_array().unwrap();
1535        assert_eq!(tools.len(), tool_definitions().len());
1536        assert!(
1537            tools
1538                .iter()
1539                .any(|tool| tool["name"] == "memory_record_manual")
1540        );
1541    }
1542
1543    #[test]
1544    fn prompts_and_resources_should_list_and_read() {
1545        let (_temp, config_path) = setup_config_path();
1546        let prompts = prompt_definitions();
1547        let resources = resource_definitions();
1548        assert!(
1549            prompts
1550                .iter()
1551                .any(|prompt| prompt["name"] == "review_lifecycle_queue")
1552        );
1553        assert!(
1554            resources
1555                .iter()
1556                .any(|resource| resource["uri"] == RESOURCE_SESSION_HANDOFF_URI)
1557        );
1558
1559        let prompt = handle_prompt_get(&json!({
1560            "name": "retrieve_project_context",
1561            "arguments": { "cwd": "/tmp/repo", "task": "understand spool" }
1562        }))
1563        .unwrap();
1564        assert!(
1565            prompt["messages"][0]["content"]["text"]
1566                .as_str()
1567                .unwrap()
1568                .contains("memory_search")
1569        );
1570
1571        let resource = handle_resource_read(
1572            config_path.as_path(),
1573            &json!({
1574                "uri": RESOURCE_CURRENT_PLAN_URI
1575            }),
1576        )
1577        .unwrap();
1578        assert_eq!(resource["contents"][0]["mimeType"], json!("text/markdown"));
1579        assert!(
1580            resource["contents"][0]["text"]
1581                .as_str()
1582                .unwrap()
1583                .contains("Round 8")
1584        );
1585    }
1586
1587    #[test]
1588    fn lifecycle_tool_calls_should_create_and_read_records() {
1589        let (_temp, config_path) = setup_config_path();
1590        let create = handle_tool_call(
1591            config_path.as_path(),
1592            None,
1593            &json!({
1594                "name": "memory_propose",
1595                "arguments": {
1596                    "title": "测试偏好",
1597                    "summary": "先 smoke 再收口",
1598                    "memory_type": "workflow",
1599                    "scope": "user",
1600                    "source_ref": "session:1",
1601                    "user_id": "long",
1602                    "actor": "codex",
1603                    "reason": "captured during MCP review",
1604                    "evidence_refs": ["session:1"]
1605                }
1606            }),
1607        )
1608        .unwrap();
1609        let record_id = create["structuredContent"]["entry"]["record_id"]
1610            .as_str()
1611            .unwrap()
1612            .to_string();
1613        assert_eq!(
1614            create["structuredContent"]["entry"]["metadata"]["actor"],
1615            json!("codex")
1616        );
1617
1618        let get = handle_tool_call(
1619            config_path.as_path(),
1620            None,
1621            &json!({
1622                "name": "memory_get",
1623                "arguments": { "record_id": record_id }
1624            }),
1625        )
1626        .unwrap();
1627        assert_eq!(
1628            get["structuredContent"]["record"]["record"]["state"],
1629            json!("candidate")
1630        );
1631    }
1632
1633    #[test]
1634    fn lifecycle_action_tool_should_persist_metadata_arguments() {
1635        let (_temp, config_path) = setup_config_path();
1636        let create = handle_tool_call(
1637            config_path.as_path(),
1638            None,
1639            &json!({
1640                "name": "memory_propose",
1641                "arguments": {
1642                    "title": "测试偏好",
1643                    "summary": "先 smoke 再收口",
1644                    "memory_type": "workflow",
1645                    "scope": "user",
1646                    "source_ref": "session:1",
1647                    "user_id": "long"
1648                }
1649            }),
1650        )
1651        .unwrap();
1652        let record_id = create["structuredContent"]["entry"]["record_id"]
1653            .as_str()
1654            .unwrap()
1655            .to_string();
1656
1657        let accept = handle_tool_call(
1658            config_path.as_path(),
1659            None,
1660            &json!({
1661                "name": "memory_accept",
1662                "arguments": {
1663                    "record_id": record_id,
1664                    "actor": "long",
1665                    "reason": "approved after review",
1666                    "evidence_refs": ["session:1", "session:2"]
1667                }
1668            }),
1669        )
1670        .unwrap();
1671        assert_eq!(
1672            accept["structuredContent"]["entry"]["metadata"]["actor"],
1673            json!("long")
1674        );
1675        assert_eq!(
1676            accept["structuredContent"]["entry"]["metadata"]["evidence_refs"],
1677            json!(["session:1", "session:2"])
1678        );
1679    }
1680
1681    #[test]
1682    fn retrieval_tool_calls_should_build_context_and_wakeup() {
1683        let temp = tempdir().unwrap();
1684        let vault_dir = temp.path().join("vault");
1685        let repo_dir = temp.path().join("repo");
1686        fs::create_dir_all(vault_dir.join("10-Projects")).unwrap();
1687        fs::create_dir_all(&repo_dir).unwrap();
1688        fs::write(
1689            vault_dir.join("10-Projects/spool.md"),
1690            "# spool\n\ncontext\n",
1691        )
1692        .unwrap();
1693        let config_path = temp.path().join("spool.toml");
1694        fs::write(
1695            &config_path,
1696            format!(
1697                r#"[vault]
1698root = "{}"
1699
1700[output]
1701default_format = "markdown"
1702max_chars = 12000
1703max_notes = 8
1704
1705[[projects]]
1706id = "spool"
1707name = "spool"
1708repo_paths = ["{}"]
1709note_roots = ["10-Projects"]
1710"#,
1711                vault_dir.display(),
1712                repo_dir.display()
1713            ),
1714        )
1715        .unwrap();
1716
1717        let search = handle_tool_call(
1718            config_path.as_path(),
1719            None,
1720            &json!({
1721                "name": "memory_search",
1722                "arguments": {
1723                    "task": "spool context",
1724                    "cwd": repo_dir,
1725                    "target": "codex"
1726                }
1727            }),
1728        )
1729        .unwrap();
1730        assert!(
1731            search["structuredContent"]["rendered"]
1732                .as_str()
1733                .unwrap()
1734                .contains("Codex")
1735        );
1736
1737        let wakeup = handle_tool_call(
1738            config_path.as_path(),
1739            None,
1740            &json!({
1741                "name": "memory_wakeup",
1742                "arguments": {
1743                    "task": "spool wakeup",
1744                    "cwd": repo_dir,
1745                    "profile": "project",
1746                    "format": "json"
1747                }
1748            }),
1749        )
1750        .unwrap();
1751        assert_eq!(
1752            wakeup["structuredContent"]["packet"]["profile"],
1753            json!("project")
1754        );
1755    }
1756
1757    #[test]
1758    fn prompt_optimize_tool_should_return_combined_prompt_bundle() {
1759        let temp = tempdir().unwrap();
1760        let vault_dir = temp.path().join("vault");
1761        let repo_dir = temp.path().join("repo");
1762        fs::create_dir_all(vault_dir.join("10-Projects")).unwrap();
1763        fs::create_dir_all(&repo_dir).unwrap();
1764        fs::write(
1765            vault_dir.join("10-Projects/spool.md"),
1766            "# spool\n\nproject context for prompt optimize\n",
1767        )
1768        .unwrap();
1769        let config_path = temp.path().join("spool.toml");
1770        fs::write(
1771            &config_path,
1772            format!(
1773                r#"[vault]
1774root = "{}"
1775
1776[output]
1777default_format = "markdown"
1778max_chars = 12000
1779max_notes = 8
1780
1781[[projects]]
1782id = "spool"
1783name = "spool"
1784repo_paths = ["{}"]
1785note_roots = ["10-Projects"]
1786"#,
1787                vault_dir.display(),
1788                repo_dir.display()
1789            ),
1790        )
1791        .unwrap();
1792
1793        let optimized = handle_tool_call(
1794            config_path.as_path(),
1795            None,
1796            &json!({
1797                "name": "prompt_optimize",
1798                "arguments": {
1799                    "task": "continue the spool desktop refactor",
1800                    "cwd": repo_dir,
1801                    "target": "codex",
1802                    "profile": "project",
1803                    "provider": "codex",
1804                    "session_id": "codex:session-42"
1805                }
1806            }),
1807        )
1808        .unwrap();
1809
1810        let combined = optimized["structuredContent"]["combined_prompt"]
1811            .as_str()
1812            .unwrap();
1813        assert!(combined.contains("Codex"));
1814        assert_eq!(optimized["structuredContent"]["profile"], json!("project"));
1815        assert_eq!(optimized["structuredContent"]["target"], json!("codex"));
1816        assert_eq!(optimized["structuredContent"]["provider"], json!("codex"));
1817        assert_eq!(
1818            optimized["structuredContent"]["session_id"],
1819            json!("codex:session-42")
1820        );
1821        assert_eq!(
1822            optimized["structuredContent"]["runtime_trace"]["source"],
1823            json!("mcp.prompt_optimize")
1824        );
1825
1826        let trace = read_latest_prompt_optimize_trace(config_path.as_path())
1827            .unwrap()
1828            .unwrap();
1829        assert_eq!(trace.provider.as_deref(), Some("codex"));
1830        assert_eq!(trace.session_id.as_deref(), Some("codex:session-42"));
1831        assert_eq!(trace.target, "codex");
1832        assert_eq!(trace.profile, "project");
1833    }
1834
1835    #[test]
1836    fn parse_array_fields_should_reject_non_string_items() {
1837        let metadata_error = parse_metadata(&json!({
1838            "evidence_refs": ["session:1", 2]
1839        }))
1840        .unwrap_err();
1841        assert!(metadata_error.message.contains("array of strings"));
1842
1843        let files_error = parse_files(&json!({
1844            "files": ["src/mcp.rs", true]
1845        }))
1846        .unwrap_err();
1847        assert!(files_error.message.contains("array of strings"));
1848    }
1849
1850    #[test]
1851    fn tool_and_prompt_calls_should_reject_non_object_arguments() {
1852        let (_temp, config_path) = setup_config_path();
1853
1854        let tool_error = handle_tool_call(
1855            config_path.as_path(),
1856            None,
1857            &json!({
1858                "name": "memory_get",
1859                "arguments": "record-1"
1860            }),
1861        )
1862        .unwrap_err();
1863        assert!(
1864            tool_error
1865                .message
1866                .contains("field must be an object: arguments")
1867        );
1868
1869        let prompt_error = handle_prompt_get(&json!({
1870            "name": "retrieve_project_context",
1871            "arguments": "not-an-object"
1872        }))
1873        .unwrap_err();
1874        assert!(
1875            prompt_error
1876                .message
1877                .contains("field must be an object: arguments")
1878        );
1879    }
1880
1881    #[test]
1882    fn resource_reads_should_reject_non_object_params() {
1883        let (_temp, config_path) = setup_config_path();
1884        let error = handle_resource_read(config_path.as_path(), &json!("bad-params")).unwrap_err();
1885        assert!(error.message.contains("field must be an object: params"));
1886    }
1887
1888    #[test]
1889    fn resource_reads_should_use_config_directory_as_base() {
1890        let temp = tempdir().unwrap();
1891        let config_dir = temp.path().join("config");
1892        let docs_dir = config_dir.join("docs");
1893        fs::create_dir_all(&docs_dir).unwrap();
1894        let config_path = config_dir.join("spool.toml");
1895        fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
1896        fs::write(
1897            docs_dir.join("MCP_PROMPTS_ROUND_8_PLAN.md"),
1898            "Round 8 custom doc",
1899        )
1900        .unwrap();
1901        fs::write(
1902            docs_dir.join("SESSION_HANDOFF.md"),
1903            "Session handoff custom doc",
1904        )
1905        .unwrap();
1906
1907        let resource = handle_resource_read(
1908            config_path.as_path(),
1909            &json!({ "uri": RESOURCE_CURRENT_PLAN_URI }),
1910        )
1911        .unwrap();
1912        assert!(
1913            resource["contents"][0]["text"]
1914                .as_str()
1915                .unwrap()
1916                .contains("Round 8 custom doc")
1917        );
1918    }
1919
1920    #[test]
1921    fn lifecycle_action_tool_should_return_is_error_for_invalid_transition() {
1922        let (_temp, config_path) = setup_config_path();
1923        let result = LifecycleService::new()
1924            .record_manual(
1925                config_path.as_path(),
1926                crate::lifecycle_store::RecordMemoryRequest {
1927                    title: "简洁输出".to_string(),
1928                    summary: "偏好简洁".to_string(),
1929                    memory_type: "preference".to_string(),
1930                    scope: crate::domain::MemoryScope::User,
1931                    source_ref: "manual:cli".to_string(),
1932                    project_id: None,
1933                    user_id: Some("long".to_string()),
1934                    sensitivity: None,
1935                    metadata: crate::lifecycle_store::TransitionMetadata::default(),
1936                    entities: Vec::new(),
1937                    tags: Vec::new(),
1938                    triggers: Vec::new(),
1939                    related_files: Vec::new(),
1940                    related_records: Vec::new(),
1941                    supersedes: None,
1942                    applies_to: Vec::new(),
1943                    valid_until: None,
1944                },
1945            )
1946            .unwrap();
1947        let response = handle_tool_call(
1948            config_path.as_path(),
1949            None,
1950            &json!({
1951                "name": "memory_accept",
1952                "arguments": { "record_id": result.entry.record_id }
1953            }),
1954        )
1955        .unwrap();
1956        assert_eq!(response["isError"], json!(true));
1957    }
1958
1959    #[test]
1960    fn lifecycle_read_tools_should_use_daemon_when_configured() {
1961        let _guard = daemon_test_lock_for_test()
1962            .lock()
1963            .unwrap_or_else(|error| error.into_inner());
1964        reset_daemon_sessions();
1965        let (_temp, config_path) = setup_config_path();
1966        let create = LifecycleService::new()
1967            .propose_ai(
1968                config_path.as_path(),
1969                crate::lifecycle_store::ProposeMemoryRequest {
1970                    title: "测试偏好".to_string(),
1971                    summary: "先 smoke 再收口".to_string(),
1972                    memory_type: "workflow".to_string(),
1973                    scope: crate::domain::MemoryScope::User,
1974                    source_ref: "session:1".to_string(),
1975                    project_id: None,
1976                    user_id: Some("long".to_string()),
1977                    sensitivity: None,
1978                    metadata: crate::lifecycle_store::TransitionMetadata::default(),
1979                    entities: Vec::new(),
1980                    tags: Vec::new(),
1981                    triggers: Vec::new(),
1982                    related_files: Vec::new(),
1983                    related_records: Vec::new(),
1984                    supersedes: None,
1985                    applies_to: Vec::new(),
1986                    valid_until: None,
1987                },
1988            )
1989            .unwrap();
1990        let daemon_bin = cargo_bin("spool-daemon");
1991
1992        let review_queue = handle_tool_call(
1993            config_path.as_path(),
1994            Some(daemon_bin.as_path()),
1995            &json!({ "name": "memory_review_queue", "arguments": {} }),
1996        )
1997        .unwrap();
1998        assert_eq!(review_queue["isError"], json!(false));
1999        assert_eq!(
2000            review_queue["structuredContent"]["pending_review"]
2001                .as_array()
2002                .unwrap()
2003                .len(),
2004            1
2005        );
2006        let first_pid =
2007            daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
2008
2009        let get = handle_tool_call(
2010            config_path.as_path(),
2011            Some(daemon_bin.as_path()),
2012            &json!({
2013                "name": "memory_get",
2014                "arguments": { "record_id": create.entry.record_id }
2015            }),
2016        )
2017        .unwrap();
2018        assert_eq!(get["isError"], json!(false));
2019        assert_eq!(
2020            get["structuredContent"]["record"]["record"]["state"],
2021            json!("candidate")
2022        );
2023        let second_pid =
2024            daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
2025        assert_eq!(first_pid, second_pid);
2026        reset_daemon_sessions();
2027    }
2028
2029    #[test]
2030    fn lifecycle_read_tools_should_rebuild_daemon_session_after_exit() {
2031        let _guard = daemon_test_lock_for_test()
2032            .lock()
2033            .unwrap_or_else(|error| error.into_inner());
2034        reset_daemon_sessions();
2035        let (_temp, config_path) = setup_config_path();
2036        let create = LifecycleService::new()
2037            .propose_ai(
2038                config_path.as_path(),
2039                crate::lifecycle_store::ProposeMemoryRequest {
2040                    title: "测试偏好".to_string(),
2041                    summary: "先 smoke 再收口".to_string(),
2042                    memory_type: "workflow".to_string(),
2043                    scope: crate::domain::MemoryScope::User,
2044                    source_ref: "session:1".to_string(),
2045                    project_id: None,
2046                    user_id: Some("long".to_string()),
2047                    sensitivity: None,
2048                    metadata: crate::lifecycle_store::TransitionMetadata::default(),
2049                    entities: Vec::new(),
2050                    tags: Vec::new(),
2051                    triggers: Vec::new(),
2052                    related_files: Vec::new(),
2053                    related_records: Vec::new(),
2054                    supersedes: None,
2055                    applies_to: Vec::new(),
2056                    valid_until: None,
2057                },
2058            )
2059            .unwrap();
2060        let daemon_bin = cargo_bin("spool-daemon");
2061
2062        let review_queue = handle_tool_call(
2063            config_path.as_path(),
2064            Some(daemon_bin.as_path()),
2065            &json!({ "name": "memory_review_queue", "arguments": {} }),
2066        )
2067        .unwrap();
2068        assert_eq!(review_queue["isError"], json!(false));
2069        let first_pid =
2070            daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
2071
2072        kill_daemon_session_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
2073
2074        let get = handle_tool_call(
2075            config_path.as_path(),
2076            Some(daemon_bin.as_path()),
2077            &json!({
2078                "name": "memory_get",
2079                "arguments": { "record_id": create.entry.record_id }
2080            }),
2081        )
2082        .unwrap();
2083        assert_eq!(get["isError"], json!(false));
2084        assert_eq!(
2085            get["structuredContent"]["record"]["record"]["state"],
2086            json!("candidate")
2087        );
2088        let second_pid =
2089            daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
2090        assert_ne!(first_pid, second_pid);
2091        reset_daemon_sessions();
2092    }
2093
2094    #[test]
2095    fn lifecycle_read_tools_should_fallback_when_daemon_is_unavailable() {
2096        let (_temp, config_path) = setup_config_path();
2097        let create = LifecycleService::new()
2098            .propose_ai(
2099                config_path.as_path(),
2100                crate::lifecycle_store::ProposeMemoryRequest {
2101                    title: "测试偏好".to_string(),
2102                    summary: "先 smoke 再收口".to_string(),
2103                    memory_type: "workflow".to_string(),
2104                    scope: crate::domain::MemoryScope::User,
2105                    source_ref: "session:1".to_string(),
2106                    project_id: None,
2107                    user_id: Some("long".to_string()),
2108                    sensitivity: None,
2109                    metadata: crate::lifecycle_store::TransitionMetadata::default(),
2110                    entities: Vec::new(),
2111                    tags: Vec::new(),
2112                    triggers: Vec::new(),
2113                    related_files: Vec::new(),
2114                    related_records: Vec::new(),
2115                    supersedes: None,
2116                    applies_to: Vec::new(),
2117                    valid_until: None,
2118                },
2119            )
2120            .unwrap();
2121
2122        let get = handle_tool_call(
2123            config_path.as_path(),
2124            Some(std::path::Path::new("/definitely/missing/spool-daemon")),
2125            &json!({
2126                "name": "memory_get",
2127                "arguments": { "record_id": create.entry.record_id }
2128            }),
2129        )
2130        .unwrap();
2131        assert_eq!(get["isError"], json!(false));
2132        assert_eq!(
2133            get["structuredContent"]["record"]["record"]["state"],
2134            json!("candidate")
2135        );
2136    }
2137
2138    #[test]
2139    fn initialize_request_should_return_server_capabilities() {
2140        let (_temp, config_path) = setup_config_path();
2141        let mut state = ServerState::default();
2142        let response = process_message(
2143            &mut state,
2144            config_path.as_path(),
2145            None,
2146            json!({
2147                "jsonrpc": "2.0",
2148                "id": 1,
2149                "method": "initialize",
2150                "params": {}
2151            }),
2152        )
2153        .unwrap();
2154        assert_eq!(
2155            response["result"]["protocolVersion"],
2156            json!(super::PROTOCOL_VERSION)
2157        );
2158        assert_eq!(
2159            response["result"]["capabilities"]["tools"]["listChanged"],
2160            json!(false)
2161        );
2162        assert_eq!(
2163            response["result"]["capabilities"]["prompts"]["listChanged"],
2164            json!(false)
2165        );
2166        assert_eq!(
2167            response["result"]["capabilities"]["resources"]["listChanged"],
2168            json!(false)
2169        );
2170    }
2171
2172    #[test]
2173    fn initialize_should_capture_client_capabilities_for_sampling_detection() {
2174        // R4b foundation: when the MCP client advertises
2175        // `capabilities.sampling` during initialize, the server must
2176        // remember it so a later `memory_distill_pending` call can
2177        // attempt the reverse `sampling/createMessage` path. Absent /
2178        // null capabilities → server treats sampling as unavailable
2179        // and the distill pipeline stays on Tier 1.
2180        let (_temp, config_path) = setup_config_path();
2181        let mut state = ServerState::default();
2182
2183        // Case 1: client advertises sampling.
2184        process_message(
2185            &mut state,
2186            config_path.as_path(),
2187            None,
2188            json!({
2189                "jsonrpc": "2.0",
2190                "id": 1,
2191                "method": "initialize",
2192                "params": {
2193                    "capabilities": { "sampling": {} }
2194                }
2195            }),
2196        )
2197        .unwrap();
2198        assert!(
2199            state.client_supports_sampling(),
2200            "advertised sampling capability must be detected: {state:?}"
2201        );
2202
2203        // Case 2: client sends no capabilities at all.
2204        let mut bare_state = ServerState::default();
2205        process_message(
2206            &mut bare_state,
2207            config_path.as_path(),
2208            None,
2209            json!({
2210                "jsonrpc": "2.0",
2211                "id": 1,
2212                "method": "initialize",
2213                "params": {}
2214            }),
2215        )
2216        .unwrap();
2217        assert!(
2218            !bare_state.client_supports_sampling(),
2219            "missing capabilities must be treated as no sampling support: {bare_state:?}"
2220        );
2221
2222        // Case 3: capabilities exists but sampling is absent.
2223        let mut other_state = ServerState::default();
2224        process_message(
2225            &mut other_state,
2226            config_path.as_path(),
2227            None,
2228            json!({
2229                "jsonrpc": "2.0",
2230                "id": 1,
2231                "method": "initialize",
2232                "params": {
2233                    "capabilities": { "tools": { "listChanged": true } }
2234                }
2235            }),
2236        )
2237        .unwrap();
2238        assert!(
2239            !other_state.client_supports_sampling(),
2240            "absent sampling key must be treated as unsupported: {other_state:?}"
2241        );
2242    }
2243
2244    #[test]
2245    fn malformed_json_should_return_parse_error_without_exiting() {
2246        let (_temp, config_path) = setup_config_path();
2247        let mut state = ServerState::default();
2248
2249        let invalid = "{";
2250        let response = match serde_json::from_str::<serde_json::Value>(invalid) {
2251            Ok(message) => process_message(&mut state, config_path.as_path(), None, message),
2252            Err(error) => Some(super::jsonrpc_error(
2253                serde_json::Value::Null,
2254                -32700,
2255                format!("parse error: {error}"),
2256            )),
2257        }
2258        .unwrap();
2259
2260        assert_eq!(response["error"]["code"], json!(-32700));
2261        assert!(
2262            response["error"]["message"]
2263                .as_str()
2264                .unwrap()
2265                .contains("parse error:")
2266        );
2267    }
2268}