mod mcp_sampling;
mod parse;
mod protocol;
mod schemas;
use crate::app;
use crate::daemon::{LifecycleReadOptions, read_history, read_record, read_workbench};
use crate::distill::pipeline::DistillRequest;
use crate::domain::{OutputFormat, RouteInput};
use crate::lifecycle_service::{LifecycleAction, LifecycleService};
use crate::sampling::SamplingClient;
use crate::lifecycle_summary;
use crate::memory_gateway::{self, wakeup_request};
use crate::output;
use mcp_sampling::{McpSamplingClient, StdSamplingChannel};
use parse::{
handle_prompt_get, handle_resource_read, optional_object_field, parse_metadata,
parse_prompt_optimize_request, parse_propose_request, parse_record_request,
parse_route_request, parse_wakeup_request, required_object, required_string,
required_string_from_object,
};
use protocol::{
JsonRpcError, is_successful_tool_response, jsonrpc_error, jsonrpc_result, tool_failure,
tool_runtime_error, tool_success,
};
use schemas::{prompt_definitions, resource_definitions, tool_definitions};
use serde_json::{Value, json};
use std::path::{Path, PathBuf};
const PROTOCOL_VERSION: &str = "2025-03-26";
#[derive(Debug, Default, Clone)]
struct ServerState {
initialized: bool,
client_capabilities: Value,
}
impl ServerState {
fn client_supports_sampling(&self) -> bool {
self.client_capabilities
.get("sampling")
.map(|value| !value.is_null())
.unwrap_or(false)
}
}
pub fn serve_stdio(config_path: &Path, daemon_bin: Option<&Path>) -> anyhow::Result<()> {
if !config_path.exists() {
anyhow::bail!("config not found: {}", config_path.display());
}
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()?;
let (in_tx, in_rx) = std::sync::mpsc::channel::<Value>();
let (out_tx, out_rx) = std::sync::mpsc::channel::<Value>();
let sampling_channel = StdSamplingChannel::new(out_tx.clone());
let reader_handle = {
let sampling_channel = sampling_channel.clone();
let out_tx = out_tx.clone();
std::thread::spawn(move || reader_thread(in_tx, out_tx, sampling_channel))
};
let writer_handle = std::thread::spawn(move || writer_thread(out_rx));
let mut state = ServerState::default();
while let Ok(message) = in_rx.recv() {
if is_distill_pending_call(&message) {
if let Err(err) = require_initialized(&state) {
if let Some(id) = message.get("id").cloned() {
let _ = out_tx.send(err.with_id(id));
}
continue;
}
let id = message.get("id").cloned().unwrap_or(Value::Null);
let arguments = message
.get("params")
.and_then(|p| p.get("arguments"))
.cloned()
.unwrap_or_else(|| json!({}));
let supports = state.client_supports_sampling();
let client = McpSamplingClient::new(sampling_channel.clone(), supports);
let outcome = runtime.block_on(execute_distill_pending_tool_async(
config_path,
&arguments,
&client,
));
let envelope = match outcome {
Ok(value) => jsonrpc_result(id, value),
Err(err) => err.with_id(id),
};
if out_tx.send(envelope).is_err() {
break;
}
continue;
}
if is_crystallize_call(&message) {
if let Err(err) = require_initialized(&state) {
if let Some(id) = message.get("id").cloned() {
let _ = out_tx.send(err.with_id(id));
}
continue;
}
let id = message.get("id").cloned().unwrap_or(Value::Null);
let arguments = message
.get("params")
.and_then(|p| p.get("arguments"))
.cloned()
.unwrap_or_else(|| json!({}));
let supports = state.client_supports_sampling();
let client = McpSamplingClient::new(sampling_channel.clone(), supports);
let outcome = runtime.block_on(execute_crystallize_tool_async(
config_path,
&arguments,
&client,
));
let envelope = match outcome {
Ok(value) => jsonrpc_result(id, value),
Err(err) => err.with_id(id),
};
if out_tx.send(envelope).is_err() {
break;
}
continue;
}
if is_lifecycle_write_call(&message) {
let response = sync_dispatch_message(&mut state, config_path, daemon_bin, message);
if let Some(ref resp) = response {
if out_tx.send(resp.clone()).is_err() {
break;
}
if is_successful_tool_response(resp) && state.client_supports_sampling() {
let client = McpSamplingClient::new(sampling_channel.clone(), true);
let config = config_path.to_path_buf();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build();
if let Ok(rt) = rt {
let _ = rt.block_on(crate::knowledge::synthesize_with_sampling(
&config,
&client,
None,
"mcp-auto-compile",
));
}
});
}
}
continue;
}
if let Some(response) = sync_dispatch_message(&mut state, config_path, daemon_bin, message)
&& out_tx.send(response).is_err()
{
break;
}
}
drop(out_tx);
drop(sampling_channel);
let _ = writer_handle.join();
let _ = reader_handle.join();
Ok(())
}
fn reader_thread(
in_tx: std::sync::mpsc::Sender<Value>,
out_tx: std::sync::mpsc::Sender<Value>,
sampling_channel: StdSamplingChannel,
) {
use std::io::BufRead;
let stdin = std::io::stdin();
let mut locked = stdin.lock();
let mut line = String::new();
loop {
line.clear();
match locked.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {}
Err(_) => break,
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let message = match serde_json::from_str::<Value>(trimmed) {
Ok(v) => v,
Err(error) => {
let _ = out_tx.send(jsonrpc_error(
Value::Null,
-32700,
format!("parse error: {error}"),
));
continue;
}
};
for item in unwrap_batch(message) {
if is_response_envelope(&item) {
if let Some(id_val) = item.get("id")
&& let Some(id_key) = json_id_to_key(id_val)
&& sampling_channel.route_response(&id_key, item)
{
continue;
}
continue;
}
if in_tx.send(item).is_err() {
return;
}
}
}
}
fn writer_thread(out_rx: std::sync::mpsc::Receiver<Value>) {
use std::io::Write;
let stdout = std::io::stdout();
let mut locked = stdout.lock();
while let Ok(message) = out_rx.recv() {
let payload = match serde_json::to_vec(&message) {
Ok(bytes) => bytes,
Err(_) => continue,
};
if locked.write_all(&payload).is_err() {
break;
}
if locked.write_all(b"\n").is_err() {
break;
}
if locked.flush().is_err() {
break;
}
}
}
fn unwrap_batch(message: Value) -> Vec<Value> {
match message {
Value::Array(items) => items,
single => vec![single],
}
}
fn is_response_envelope(message: &Value) -> bool {
message.get("method").is_none()
&& (message.get("result").is_some() || message.get("error").is_some())
}
fn json_id_to_key(id: &Value) -> Option<String> {
match id {
Value::String(s) => Some(s.clone()),
Value::Number(n) => Some(n.to_string()),
_ => None,
}
}
fn is_distill_pending_call(message: &Value) -> bool {
message.get("method").and_then(Value::as_str) == Some("tools/call")
&& message
.get("params")
.and_then(|p| p.get("name"))
.and_then(Value::as_str)
== Some("memory_distill_pending")
}
fn is_crystallize_call(message: &Value) -> bool {
message.get("method").and_then(Value::as_str) == Some("tools/call")
&& message
.get("params")
.and_then(|p| p.get("name"))
.and_then(Value::as_str)
== Some("memory_crystallize")
}
const LIFECYCLE_WRITE_TOOLS: &[&str] = &[
"memory_record_manual",
"memory_propose",
"memory_accept",
"memory_promote",
"memory_archive",
];
fn is_lifecycle_write_call(message: &Value) -> bool {
if message.get("method").and_then(Value::as_str) != Some("tools/call") {
return false;
}
message
.get("params")
.and_then(|p| p.get("name"))
.and_then(Value::as_str)
.is_some_and(|name| LIFECYCLE_WRITE_TOOLS.contains(&name))
}
fn sync_dispatch_message(
state: &mut ServerState,
config_path: &Path,
daemon_bin: Option<&Path>,
message: Value,
) -> Option<Value> {
let id = message.get("id").cloned();
let method = message
.get("method")
.and_then(Value::as_str)
.map(str::to_string);
let Some(method) = method else {
return id.map(|id| jsonrpc_error(id, -32600, "invalid request"));
};
if id.is_none() {
handle_notification(state, &method, &message);
return None;
}
let id = id.unwrap();
match handle_request(
state,
config_path,
daemon_bin,
&method,
message.get("params"),
) {
Ok(result) => Some(jsonrpc_result(id, result)),
Err(error) => Some(error.with_id(id)),
}
}
#[cfg(test)]
fn process_message(
state: &mut ServerState,
config_path: &Path,
daemon_bin: Option<&Path>,
message: Value,
) -> Option<Value> {
fn process_single(
state: &mut ServerState,
config_path: &Path,
daemon_bin: Option<&Path>,
message: Value,
) -> Option<Value> {
let id = message.get("id").cloned();
let method = message
.get("method")
.and_then(Value::as_str)
.map(str::to_string);
let Some(method) = method else {
return id.map(|id| jsonrpc_error(id, -32600, "invalid request"));
};
if id.is_none() {
handle_notification(state, &method, &message);
return None;
}
let id = id.unwrap();
match handle_request(
state,
config_path,
daemon_bin,
&method,
message.get("params"),
) {
Ok(result) => Some(jsonrpc_result(id, result)),
Err(error) => Some(error.with_id(id)),
}
}
match message {
Value::Array(items) => {
let responses: Vec<Value> = items
.into_iter()
.filter_map(|item| process_single(state, config_path, daemon_bin, item))
.collect();
if responses.is_empty() {
None
} else {
Some(Value::Array(responses))
}
}
value => process_single(state, config_path, daemon_bin, value),
}
}
fn handle_notification(state: &mut ServerState, method: &str, _message: &Value) {
if method == "notifications/initialized" {
state.initialized = true;
}
}
fn handle_request(
state: &mut ServerState,
config_path: &Path,
daemon_bin: Option<&Path>,
method: &str,
params: Option<&Value>,
) -> Result<Value, JsonRpcError> {
match method {
"initialize" => {
state.initialized = true;
state.client_capabilities = params
.and_then(|p| p.get("capabilities").cloned())
.unwrap_or(Value::Null);
Ok(json!({
"protocolVersion": PROTOCOL_VERSION,
"serverInfo": {
"name": "spool-mcp",
"version": env!("CARGO_PKG_VERSION")
},
"capabilities": {
"tools": {
"listChanged": false
},
"prompts": {
"listChanged": false
},
"resources": {
"listChanged": false
}
}
}))
}
"ping" => Ok(json!({})),
"tools/list" => {
require_initialized(state)?;
Ok(json!({ "tools": tool_definitions() }))
}
"tools/call" => {
require_initialized(state)?;
let params = params.ok_or_else(|| JsonRpcError::new(-32602, "missing params"))?;
handle_tool_call(config_path, daemon_bin, params)
}
"prompts/list" => {
require_initialized(state)?;
Ok(json!({ "prompts": prompt_definitions() }))
}
"prompts/get" => {
require_initialized(state)?;
let params = params.ok_or_else(|| JsonRpcError::new(-32602, "missing params"))?;
handle_prompt_get(params)
}
"resources/list" => {
require_initialized(state)?;
Ok(json!({ "resources": resource_definitions() }))
}
"resources/read" => {
require_initialized(state)?;
let params = params.ok_or_else(|| JsonRpcError::new(-32602, "missing params"))?;
handle_resource_read(config_path, params)
}
_ => Err(JsonRpcError::new(
-32601,
format!("method not found: {method}"),
)),
}
}
fn require_initialized(state: &ServerState) -> Result<(), JsonRpcError> {
if state.initialized {
Ok(())
} else {
Err(JsonRpcError::new(-32002, "server not initialized"))
}
}
fn handle_tool_call(
config_path: &Path,
daemon_bin: Option<&Path>,
params: &Value,
) -> Result<Value, JsonRpcError> {
let params = required_object(params, "params")?;
let name = required_string_from_object(params, "name")?;
let arguments = optional_object_field(params, "arguments")?.unwrap_or_else(|| json!({}));
let service = LifecycleService::new();
match name.as_str() {
"prompt_optimize" => {
let request = parse_prompt_optimize_request(&arguments)?;
let response = memory_gateway::execute_prompt_optimize(
config_path,
memory_gateway::prompt_optimize_request(
RouteInput {
task: request.task,
cwd: PathBuf::from(request.cwd),
files: request.files,
target: request.target,
format: OutputFormat::Prompt,
},
request.profile,
request.provider,
request.session_id,
true,
),
None,
)
.map_err(tool_runtime_error)?;
Ok(tool_success(
"Prompt optimization bundle generated.",
json!(response),
))
}
"memory_search" => {
let request = parse_route_request(&arguments, OutputFormat::Prompt)?;
let result = app::run(config_path, request.input, Some(request.format))
.map_err(tool_runtime_error)?;
Ok(tool_success(
"Context bundle loaded.",
json!({
"rendered": result.rendered,
"explain": result.explain,
"bundle": result.bundle,
"used_format": result.used_format.as_str(),
"used_vault_root": result.used_vault_root
}),
))
}
"memory_explain" => {
let request = parse_route_request(&arguments, OutputFormat::Markdown)?;
let result = app::run(config_path, request.input, Some(request.format))
.map_err(tool_runtime_error)?;
Ok(tool_success(
"Route explanation loaded.",
json!({
"explain": result.explain,
"bundle": result.bundle,
"used_vault_root": result.used_vault_root
}),
))
}
"memory_wakeup" => {
let request = parse_wakeup_request(&arguments)?;
let response = memory_gateway::execute(
config_path,
wakeup_request(request.input, request.profile),
None,
)
.map_err(tool_runtime_error)?;
let packet = response
.wakeup_packet()
.cloned()
.ok_or_else(|| JsonRpcError::new(-32603, "missing wakeup packet"))?;
Ok(tool_success(
"Wakeup packet loaded.",
json!({
"rendered": output::wakeup::render(&packet, request.format),
"packet": packet,
"bundle": response.bundle,
"used_vault_root": response.used_vault_root
}),
))
}
"memory_review_queue" => {
let snapshot = read_workbench(config_path, &lifecycle_read_options(daemon_bin))
.map_err(tool_runtime_error)?;
Ok(tool_success(
"Pending review queue loaded.",
lifecycle_summary::queue_payload(&snapshot.pending_review, "pending_review"),
))
}
"memory_wakeup_ready" => {
let snapshot = read_workbench(config_path, &lifecycle_read_options(daemon_bin))
.map_err(tool_runtime_error)?;
Ok(tool_success(
"Wakeup-ready queue loaded.",
lifecycle_summary::queue_payload(&snapshot.wakeup_ready, "wakeup_ready"),
))
}
"memory_get" => {
let record_id = required_string(&arguments, "record_id")?;
match read_record(config_path, &record_id, &lifecycle_read_options(daemon_bin))
.map_err(tool_runtime_error)?
{
Some(entry) => Ok(tool_success(
"Memory record loaded.",
lifecycle_summary::record_payload(&entry),
)),
None => Ok(tool_failure(
&format!("memory record not found: {record_id}"),
lifecycle_summary::not_found_payload(&record_id),
)),
}
}
"memory_history" => {
let record_id = required_string(&arguments, "record_id")?;
let history =
read_history(config_path, &record_id, &lifecycle_read_options(daemon_bin))
.map_err(tool_runtime_error)?;
if history.is_empty() {
Ok(tool_failure(
&format!("memory history not found: {record_id}"),
lifecycle_summary::not_found_payload(&record_id),
))
} else {
Ok(tool_success(
"Memory history loaded.",
lifecycle_summary::history_payload(&record_id, &history),
))
}
}
"memory_record_manual" => {
let result = service
.record_manual(config_path, parse_record_request(&arguments)?)
.map_err(tool_runtime_error)?;
crate::vault_writer::writeback_from_config_no_compile(config_path, &result.entry);
mcp_embedding_auto_append(config_path, &result.entry);
Ok(tool_success(
"Manual memory recorded.",
lifecycle_summary::create_payload("record_manual", &result.entry, &result.snapshot),
))
}
"memory_propose" => {
let result = service
.propose_ai(config_path, parse_propose_request(&arguments)?)
.map_err(tool_runtime_error)?;
crate::vault_writer::writeback_from_config_no_compile(config_path, &result.entry);
Ok(tool_success(
"AI memory proposed.",
lifecycle_summary::create_payload("propose", &result.entry, &result.snapshot),
))
}
"memory_accept" => {
execute_action_tool(service, config_path, &arguments, LifecycleAction::Accept)
}
"memory_promote" => execute_action_tool(
service,
config_path,
&arguments,
LifecycleAction::PromoteToCanonical,
),
"memory_archive" => {
execute_action_tool(service, config_path, &arguments, LifecycleAction::Archive)
}
"memory_import_session" => execute_import_session_tool(config_path, &arguments),
"memory_sync_vault" => execute_sync_vault_tool(config_path, &arguments),
"memory_distill_pending" => execute_distill_pending_tool(config_path, &arguments),
"memory_check_contradictions" => execute_check_contradictions_tool(config_path, &arguments),
"memory_staleness_report" => execute_staleness_report_tool(config_path),
"memory_import_git" => execute_import_git_tool(config_path, &arguments),
"memory_dedup_suggestions" => execute_dedup_suggestions_tool(config_path),
"memory_consolidate" => execute_consolidate_tool(config_path, &arguments),
"memory_prune" => execute_prune_tool(config_path, &arguments),
"memory_crystallize" => execute_crystallize_tool(config_path, &arguments),
"memory_lint" => execute_lint_tool(config_path),
_ => Err(JsonRpcError::new(-32601, format!("tool not found: {name}"))),
}
}
fn execute_distill_pending_tool(
config_path: &Path,
arguments: &Value,
) -> Result<Value, JsonRpcError> {
use crate::distill::pipeline;
let request = parse_distill_request(config_path, arguments)?;
let report = pipeline::run(request).map_err(tool_runtime_error)?;
Ok(distill_report_response(report))
}
async fn execute_distill_pending_tool_async(
config_path: &Path,
arguments: &Value,
sampling: &(dyn SamplingClient + Send),
) -> Result<Value, JsonRpcError> {
use crate::distill::pipeline;
let request = parse_distill_request(config_path, arguments)?;
let report = pipeline::run_with_sampling(request, sampling)
.await
.map_err(tool_runtime_error)?;
Ok(distill_report_response(report))
}
fn execute_dedup_suggestions_tool(config_path: &Path) -> Result<Value, JsonRpcError> {
use crate::lifecycle_store::{
LifecycleStore, lifecycle_root_from_config, wakeup_ready_entries,
};
let config_dir = config_path.parent().unwrap_or_else(|| Path::new("."));
let lifecycle_root = lifecycle_root_from_config(config_dir);
let store = LifecycleStore::new(lifecycle_root.as_path());
let entries = wakeup_ready_entries(&store).map_err(tool_runtime_error)?;
let records: Vec<(String, crate::domain::MemoryRecord)> = entries
.into_iter()
.map(|e| (e.record_id, e.record))
.collect();
let suggestions = crate::contradiction::find_duplicates(&records, 0.5);
Ok(json!({
"suggestions": suggestions,
"checked": records.len(),
}))
}
fn execute_consolidate_tool(config_path: &Path, arguments: &Value) -> Result<Value, JsonRpcError> {
use crate::knowledge::cluster as consolidation;
let dry_run = arguments
.get("dry_run")
.and_then(Value::as_bool)
.unwrap_or(true);
let entries = consolidation::load_entries(config_path).map_err(tool_runtime_error)?;
let suggestions = consolidation::detect_consolidation_candidates(&entries);
if dry_run {
Ok(tool_success(
&format!(
"Consolidation check: {} cluster(s) found.",
suggestions.len()
),
json!({
"dry_run": true,
"suggestions": suggestions,
}),
))
} else {
let mut results = Vec::new();
for s in &suggestions {
let result = consolidation::apply_consolidation(config_path, s, &entries)
.map_err(tool_runtime_error)?;
results.push(json!({
"merged_record_id": result.merged_record_id,
"archived_record_ids": result.archived_record_ids,
}));
}
Ok(tool_success(
&format!("Consolidated {} cluster(s).", results.len()),
json!({
"dry_run": false,
"applied": results,
}),
))
}
}
fn execute_prune_tool(config_path: &Path, arguments: &Value) -> Result<Value, JsonRpcError> {
use crate::knowledge::cluster as consolidation;
let dry_run = arguments
.get("dry_run")
.and_then(Value::as_bool)
.unwrap_or(true);
let entries = consolidation::load_entries(config_path).map_err(tool_runtime_error)?;
let lifecycle_root = consolidation::resolve_lifecycle_root(config_path);
let suggestions = consolidation::detect_prune_candidates(&entries, &lifecycle_root);
if dry_run {
Ok(tool_success(
&format!("Prune check: {} record(s) to archive.", suggestions.len()),
json!({
"dry_run": true,
"suggestions": suggestions,
}),
))
} else {
let result =
consolidation::apply_prune(config_path, &suggestions).map_err(tool_runtime_error)?;
Ok(tool_success(
&format!("Pruned {} record(s).", result.archived_record_ids.len()),
json!({
"dry_run": false,
"archived_record_ids": result.archived_record_ids,
}),
))
}
}
fn execute_crystallize_tool(config_path: &Path, arguments: &Value) -> Result<Value, JsonRpcError> {
use crate::knowledge;
let topic = arguments.get("topic").and_then(Value::as_str);
let dry_run = arguments
.get("dry_run")
.and_then(Value::as_bool)
.unwrap_or(false);
if dry_run {
let drafts =
knowledge::detect_knowledge_clusters(config_path).map_err(tool_runtime_error)?;
let filtered: Vec<&knowledge::KnowledgePageDraft> = if let Some(topic) = topic {
let topic_lower = topic.to_lowercase();
drafts
.iter()
.filter(|d| {
d.entities
.iter()
.any(|e| e.to_lowercase().contains(&topic_lower))
|| d.tags
.iter()
.any(|t| t.to_lowercase().contains(&topic_lower))
|| d.title.to_lowercase().contains(&topic_lower)
})
.collect()
} else {
drafts.iter().collect()
};
return Ok(tool_success(
&format!(
"Knowledge crystallize dry-run: {} cluster(s) detected.",
filtered.len()
),
json!({
"dry_run": true,
"clusters": filtered.len(),
"drafts": filtered.iter().map(|d| json!({
"title": d.title,
"domain": d.domain,
"tags": d.tags,
"entities": d.entities,
"source_count": d.source_record_ids.len(),
})).collect::<Vec<_>>(),
}),
));
}
let drafts = knowledge::detect_knowledge_clusters(config_path).map_err(tool_runtime_error)?;
let filtered: Vec<knowledge::KnowledgePageDraft> = if let Some(topic) = topic {
let topic_lower = topic.to_lowercase();
drafts
.into_iter()
.filter(|d| {
d.entities
.iter()
.any(|e| e.to_lowercase().contains(&topic_lower))
|| d.tags
.iter()
.any(|t| t.to_lowercase().contains(&topic_lower))
|| d.title.to_lowercase().contains(&topic_lower)
})
.collect()
} else {
drafts
};
if filtered.is_empty() {
return Ok(tool_success(
"No clusters found for crystallization.",
json!({
"pages_created": 0,
"sampling_used": false,
"fallback_reason": "no clusters found",
}),
));
}
let persisted_ids = knowledge::apply_distill(config_path, &filtered, "mcp-memory-crystallize")
.map_err(tool_runtime_error)?;
Ok(tool_success(
&format!(
"Crystallized {} knowledge page(s) (template synthesis).",
persisted_ids.len()
),
json!({
"pages_created": persisted_ids.len(),
"persisted_ids": persisted_ids,
"sampling_used": false,
"fallback_reason": "sync path (no sampling)",
}),
))
}
async fn execute_crystallize_tool_async(
config_path: &Path,
arguments: &Value,
sampling: &(dyn SamplingClient + Send),
) -> Result<Value, JsonRpcError> {
use crate::knowledge;
let topic = arguments.get("topic").and_then(Value::as_str);
let dry_run = arguments
.get("dry_run")
.and_then(Value::as_bool)
.unwrap_or(false);
if dry_run {
let drafts =
knowledge::detect_knowledge_clusters(config_path).map_err(tool_runtime_error)?;
let filtered: Vec<&knowledge::KnowledgePageDraft> = if let Some(topic) = topic {
let topic_lower = topic.to_lowercase();
drafts
.iter()
.filter(|d| {
d.entities
.iter()
.any(|e| e.to_lowercase().contains(&topic_lower))
|| d.tags
.iter()
.any(|t| t.to_lowercase().contains(&topic_lower))
|| d.title.to_lowercase().contains(&topic_lower)
})
.collect()
} else {
drafts.iter().collect()
};
return Ok(tool_success(
&format!(
"Knowledge crystallize dry-run: {} cluster(s) detected.",
filtered.len()
),
json!({
"dry_run": true,
"clusters": filtered.len(),
"drafts": filtered.iter().map(|d| json!({
"title": d.title,
"domain": d.domain,
"tags": d.tags,
"entities": d.entities,
"source_count": d.source_record_ids.len(),
})).collect::<Vec<_>>(),
}),
));
}
let result =
knowledge::synthesize_with_sampling(config_path, sampling, topic, "mcp-memory-crystallize")
.await
.map_err(tool_runtime_error)?;
let summary_text = if result.sampling_used {
format!(
"Crystallized {} knowledge page(s) via LLM synthesis.",
result.pages_created
)
} else {
format!(
"Crystallized {} knowledge page(s) (template fallback: {}).",
result.pages_created,
result.fallback_reason.as_deref().unwrap_or("unknown")
)
};
Ok(tool_success(
&summary_text,
json!({
"pages_created": result.pages_created,
"persisted_ids": result.persisted_ids,
"sampling_used": result.sampling_used,
"fallback_reason": result.fallback_reason,
"drafts": result.drafts.iter().map(|d| json!({
"title": d.title,
"domain": d.domain,
"tags": d.tags,
"entities": d.entities,
"source_count": d.source_record_ids.len(),
})).collect::<Vec<_>>(),
}),
))
}
fn execute_import_git_tool(config_path: &Path, arguments: &Value) -> Result<Value, JsonRpcError> {
let cwd = arguments
.get("cwd")
.and_then(Value::as_str)
.map(std::path::PathBuf::from)
.unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
let limit = arguments.get("limit").and_then(Value::as_u64).unwrap_or(30) as usize;
let dry_run = arguments
.get("dry_run")
.and_then(Value::as_bool)
.unwrap_or(false);
let report = crate::git_importer::import_git_activity(config_path, &cwd, limit, dry_run)
.map_err(tool_runtime_error)?;
Ok(json!({
"commits_scanned": report.commits_scanned,
"candidates_found": report.candidates_found,
"candidates_persisted": report.candidates_persisted.len(),
"candidates_duplicate_dropped": report.candidates_duplicate_dropped,
"dry_run": dry_run,
}))
}
fn execute_staleness_report_tool(config_path: &Path) -> Result<Value, JsonRpcError> {
use crate::lifecycle_store::{
LifecycleStore, lifecycle_root_from_config, wakeup_ready_entries,
};
use crate::reference_tracker;
let config_dir = config_path.parent().unwrap_or_else(|| Path::new("."));
let lifecycle_root = lifecycle_root_from_config(config_dir);
let store = LifecycleStore::new(lifecycle_root.as_path());
let ref_map = reference_tracker::read(&lifecycle_root);
let entries = wakeup_ready_entries(&store).map_err(tool_runtime_error)?;
let mut stale: Vec<Value> = Vec::new();
let mut fresh: usize = 0;
let mut never_referenced: usize = 0;
for entry in &entries {
let age = ref_map
.records
.get(&entry.record_id)
.and_then(reference_tracker::age_days);
let penalty = reference_tracker::staleness_penalty(age);
match age {
None => never_referenced += 1,
Some(_days) if penalty >= 0 => fresh += 1,
Some(days) => {
stale.push(json!({
"record_id": entry.record_id,
"title": entry.record.title,
"memory_type": entry.record.memory_type,
"days_since_reference": days,
"penalty": penalty,
}));
}
}
}
stale.sort_by(|a, b| {
b["days_since_reference"]
.as_u64()
.cmp(&a["days_since_reference"].as_u64())
});
Ok(json!({
"total_wakeup_ready": entries.len(),
"fresh": fresh,
"never_referenced": never_referenced,
"stale_count": stale.len(),
"stale": stale,
}))
}
fn execute_lint_tool(config_path: &Path) -> Result<Value, JsonRpcError> {
let report = crate::wiki_lint::run_lint_from_config(config_path).map_err(tool_runtime_error)?;
let markdown = crate::wiki_lint::render_lint_markdown(&report);
let mut value = serde_json::to_value(&report).map_err(|err| {
JsonRpcError::new(-32603, format!("failed to serialize lint report: {err}"))
})?;
if let Some(obj) = value.as_object_mut() {
obj.insert("markdown".to_string(), Value::String(markdown));
}
Ok(value)
}
fn execute_check_contradictions_tool(
config_path: &Path,
arguments: &Value,
) -> Result<Value, JsonRpcError> {
use crate::lifecycle_store::{LifecycleStore, lifecycle_root_from_config};
let record_id = required_string(arguments, "record_id")?;
let config_dir = config_path.parent().unwrap_or_else(|| Path::new("."));
let lifecycle_root = lifecycle_root_from_config(config_dir);
let store = LifecycleStore::new(lifecycle_root.as_path());
let target_entry = crate::lifecycle_store::read_events_for_record(&store, &record_id)
.map_err(tool_runtime_error)?
.into_iter()
.last()
.ok_or_else(|| JsonRpcError::new(-32602, format!("record not found: {record_id}")))?;
let existing: Vec<(String, crate::domain::MemoryRecord)> =
crate::lifecycle_store::wakeup_ready_entries(&store)
.map_err(tool_runtime_error)?
.into_iter()
.filter(|e| e.record_id != record_id)
.map(|e| (e.record_id, e.record))
.collect();
let hits = crate::contradiction::detect(
&target_entry.record.summary,
&target_entry.record.memory_type,
&existing,
);
Ok(json!({
"record_id": record_id,
"contradictions": hits,
"checked_against": existing.len(),
}))
}
fn parse_distill_request(
config_path: &Path,
arguments: &Value,
) -> Result<DistillRequest, JsonRpcError> {
let cwd_str = arguments
.get("cwd")
.and_then(Value::as_str)
.ok_or_else(|| JsonRpcError::new(-32602, "missing 'cwd' (absolute path)"))?;
let cwd = PathBuf::from(cwd_str);
if !cwd.is_absolute() {
return Err(JsonRpcError::new(
-32602,
format!("cwd must be absolute, got: {cwd_str}"),
));
}
let transcript_path = arguments
.get("transcript_path")
.and_then(Value::as_str)
.map(PathBuf::from);
Ok(
DistillRequest::new(config_path.to_path_buf(), cwd, transcript_path)
.with_actor("mcp-memory-distill-pending")
.with_source_refs("mcp:distill:self-tag", "mcp:distill:extraction"),
)
}
fn distill_report_response(report: crate::distill::pipeline::DistillReport) -> Value {
let summary_text = format!(
"Distill pending complete ({}): {} accepted, {} candidates, {} queued signals drained.",
report.fallback_used,
report.signals_persisted.len(),
report.candidates_persisted.len(),
report.queue_drained,
);
let report_json = serde_json::to_value(&report).unwrap_or_else(|_| json!({}));
tool_success(&summary_text, report_json)
}
fn execute_import_session_tool(
config_path: &Path,
arguments: &Value,
) -> Result<Value, JsonRpcError> {
use crate::memory_importer::{ImportProvider, import_session};
let provider_str = arguments
.get("provider")
.and_then(Value::as_str)
.ok_or_else(|| JsonRpcError::new(-32602, "missing 'provider' (claude | codex)"))?;
let provider = ImportProvider::parse(provider_str)
.map_err(|err| JsonRpcError::new(-32602, err.to_string()))?;
let session_id = arguments
.get("session_id")
.and_then(Value::as_str)
.ok_or_else(|| JsonRpcError::new(-32602, "missing 'session_id'"))?;
let apply = arguments
.get("apply")
.and_then(Value::as_bool)
.unwrap_or(false);
let actor = arguments
.get("actor")
.and_then(Value::as_str)
.map(|s| s.to_string());
let response = import_session(config_path, provider, session_id, apply, actor)
.map_err(tool_runtime_error)?;
let payload = serde_json::to_value(&response)
.map_err(|err| JsonRpcError::new(-32603, format!("serialize import response: {err}")))?;
let text = if response.applied {
format!(
"Imported {} candidate(s) from {} and applied to ledger.",
response.candidate_count, response.session_ref
)
} else {
format!(
"Previewed {} candidate(s) from {} (dry run, use apply=true to write).",
response.candidate_count, response.session_ref
)
};
Ok(tool_success(&text, payload))
}
fn execute_sync_vault_tool(config_path: &Path, arguments: &Value) -> Result<Value, JsonRpcError> {
use crate::domain::MemoryLifecycleState;
use crate::lifecycle_store::{
LifecycleStore, latest_state_entries, lifecycle_root_from_config,
};
use crate::vault_writer::{self, WriteStatus, memory_note_path};
let dry_run = arguments
.get("dry_run")
.and_then(Value::as_bool)
.unwrap_or(false);
let enrich = arguments
.get("enrich")
.and_then(Value::as_bool)
.unwrap_or(false);
let config = crate::app::load(config_path).map_err(tool_runtime_error)?;
let vault_root = crate::app::resolve_override_path(&config.vault.root, config_path)
.map_err(tool_runtime_error)?;
let config_dir = config_path.parent().unwrap_or_else(|| Path::new("."));
let lifecycle_root = lifecycle_root_from_config(config_dir);
let store = LifecycleStore::new(lifecycle_root.as_path());
let entries = latest_state_entries(&store).map_err(tool_runtime_error)?;
if enrich {
return execute_enrich_tool(&entries, vault_root.as_path(), dry_run);
}
let mut counters: std::collections::HashMap<&'static str, u64> =
std::collections::HashMap::new();
let mut errors: Vec<(String, String)> = Vec::new();
let bump = |counters: &mut std::collections::HashMap<&'static str, u64>, key: &'static str| {
*counters.entry(key).or_insert(0) += 1;
};
for entry in &entries {
match entry.record.state {
MemoryLifecycleState::Accepted | MemoryLifecycleState::Canonical => {
if dry_run {
let path = memory_note_path(vault_root.as_path(), &entry.record_id);
if path.exists() {
bump(&mut counters, "would_update");
} else {
bump(&mut counters, "would_create");
}
continue;
}
match vault_writer::write_memory_note(
vault_root.as_path(),
&entry.record_id,
&entry.record,
) {
Ok(result) => match result.status {
WriteStatus::Created => bump(&mut counters, "created"),
WriteStatus::UpdatedAll => bump(&mut counters, "updated_all"),
WriteStatus::UpdatedPreserveBody => {
bump(&mut counters, "updated_preserve_body")
}
WriteStatus::Unchanged => bump(&mut counters, "unchanged"),
},
Err(error) => errors.push((entry.record_id.clone(), error.to_string())),
}
}
MemoryLifecycleState::Archived => {
if dry_run {
let path = memory_note_path(vault_root.as_path(), &entry.record_id);
if path.exists() {
bump(&mut counters, "would_archive");
} else {
bump(&mut counters, "skipped_missing");
}
continue;
}
match vault_writer::archive_memory_note(vault_root.as_path(), &entry.record_id) {
Ok(Some(result)) => match result.status {
WriteStatus::Unchanged => bump(&mut counters, "unchanged"),
_ => bump(&mut counters, "archived"),
},
Ok(None) => bump(&mut counters, "skipped_missing"),
Err(error) => errors.push((entry.record_id.clone(), error.to_string())),
}
}
MemoryLifecycleState::Draft | MemoryLifecycleState::Candidate => {
bump(&mut counters, "skipped_draft_or_candidate")
}
}
}
let counters_json: serde_json::Map<String, Value> = counters
.into_iter()
.map(|(k, v)| (k.to_string(), json!(v)))
.collect();
let errors_json: Vec<Value> = errors
.iter()
.map(|(id, msg)| json!({ "record_id": id, "error": msg }))
.collect();
let payload = json!({
"vault_root": vault_root.display().to_string(),
"ledger_records": entries.len(),
"dry_run": dry_run,
"counters": Value::Object(counters_json),
"errors": errors_json,
});
let text = if dry_run {
"Vault sync preview generated."
} else {
"Vault sync completed."
};
Ok(tool_success(text, payload))
}
fn execute_enrich_tool(
entries: &[crate::lifecycle_store::LedgerEntry],
vault_root: &Path,
dry_run: bool,
) -> Result<Value, JsonRpcError> {
use crate::domain::MemoryLifecycleState;
use crate::enrich;
use crate::vault_writer;
let mut enriched_count = 0_u64;
let mut skipped_count = 0_u64;
let mut enriched_records: Vec<Value> = Vec::new();
for entry in entries {
if !matches!(
entry.record.state,
MemoryLifecycleState::Accepted | MemoryLifecycleState::Canonical
) {
continue;
}
let patch = enrich::enrich_record(&entry.record);
if patch.is_empty() {
skipped_count += 1;
continue;
}
enriched_records.push(json!({
"record_id": entry.record_id,
"title": entry.record.title,
"entities": patch.entities,
"tags": patch.tags,
"triggers": patch.triggers,
}));
if !dry_run {
let mut enriched_record = entry.record.clone();
if enriched_record.entities.is_empty() {
enriched_record.entities = patch.entities.clone();
}
if enriched_record.tags.is_empty() {
enriched_record.tags = patch.tags.clone();
}
if enriched_record.triggers.is_empty() {
enriched_record.triggers = patch.triggers.clone();
}
if let Err(error) =
vault_writer::write_memory_note(vault_root, &entry.record_id, &enriched_record)
{
eprintln!(
"[spool] enrich writeback failed for {}: {error}",
entry.record_id
);
}
}
enriched_count += 1;
}
let payload = json!({
"dry_run": dry_run,
"enriched": enriched_count,
"skipped": skipped_count,
"records": enriched_records,
});
let text = if dry_run {
format!("Enrich preview: {enriched_count} records would be enriched.")
} else {
format!("Enriched {enriched_count} records.")
};
Ok(tool_success(&text, payload))
}
fn lifecycle_read_options(daemon_bin: Option<&Path>) -> LifecycleReadOptions {
daemon_bin
.map(LifecycleReadOptions::with_daemon)
.unwrap_or_default()
}
fn execute_action_tool(
service: LifecycleService,
config_path: &Path,
arguments: &Value,
action: LifecycleAction,
) -> Result<Value, JsonRpcError> {
let record_id = required_string(arguments, "record_id")?;
match service.apply_action_with_metadata(
config_path,
&record_id,
action,
parse_metadata(arguments)?,
) {
Ok(result) => {
crate::vault_writer::writeback_from_config_no_compile(config_path, &result.entry);
mcp_embedding_auto_append(config_path, &result.entry);
Ok(tool_success(
&format!("Lifecycle action {} applied.", action.label()),
lifecycle_summary::action_payload(&result.entry, &result.snapshot, action),
))
}
Err(error) => Ok(tool_failure(
&error.to_string(),
json!({ "record_id": record_id, "action": action.label() }),
)),
}
}
fn mcp_embedding_auto_append(config_path: &Path, entry: &crate::lifecycle_store::LedgerEntry) {
#[cfg(feature = "embedding")]
{
use crate::domain::MemoryLifecycleState;
if !matches!(
entry.record.state,
MemoryLifecycleState::Accepted | MemoryLifecycleState::Canonical
) {
return;
}
if let Ok(config) = crate::config::load_from_path(config_path) {
crate::engine::embedding::try_append_record(
&config.embedding,
&entry.record_id,
&entry.record,
);
}
}
#[cfg(not(feature = "embedding"))]
{
let _ = (config_path, entry);
}
}
#[cfg(test)]
mod tests {
use super::parse::{parse_files, parse_metadata};
use super::schemas::{RESOURCE_CURRENT_PLAN_URI, RESOURCE_SESSION_HANDOFF_URI};
use super::{
ServerState, handle_prompt_get, handle_request, handle_resource_read, handle_tool_call,
process_message, prompt_definitions, resource_definitions, tool_definitions,
};
use crate::daemon_client::{
daemon_session_pid_for_test, daemon_test_lock_for_test, kill_daemon_session_for_test,
reset_daemon_sessions,
};
use crate::enhancement_trace::read_latest_prompt_optimize_trace;
use crate::lifecycle_service::LifecycleService;
use assert_cmd::cargo::cargo_bin;
use serde_json::json;
use std::fs;
use tempfile::tempdir;
fn setup_config_path() -> (tempfile::TempDir, std::path::PathBuf) {
let temp = tempdir().unwrap();
let docs_dir = temp.path().join("docs");
fs::create_dir_all(&docs_dir).unwrap();
fs::write(
docs_dir.join("MCP_PROMPTS_ROUND_8_PLAN.md"),
"Round 8 test doc",
)
.unwrap();
fs::write(
docs_dir.join("SESSION_HANDOFF.md"),
"Session handoff test doc",
)
.unwrap();
let config_path = temp.path().join("spool.toml");
fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
(temp, config_path)
}
#[test]
fn tools_list_should_return_all_lifecycle_tools() {
let (_temp, config_path) = setup_config_path();
let mut state = ServerState {
initialized: true,
..Default::default()
};
let result =
handle_request(&mut state, config_path.as_path(), None, "tools/list", None).unwrap();
let tools = result["tools"].as_array().unwrap();
assert_eq!(tools.len(), tool_definitions().len());
assert!(
tools
.iter()
.any(|tool| tool["name"] == "memory_record_manual")
);
}
#[test]
fn prompts_and_resources_should_list_and_read() {
let (_temp, config_path) = setup_config_path();
let prompts = prompt_definitions();
let resources = resource_definitions();
assert!(
prompts
.iter()
.any(|prompt| prompt["name"] == "review_lifecycle_queue")
);
assert!(
resources
.iter()
.any(|resource| resource["uri"] == RESOURCE_SESSION_HANDOFF_URI)
);
let prompt = handle_prompt_get(&json!({
"name": "retrieve_project_context",
"arguments": { "cwd": "/tmp/repo", "task": "understand spool" }
}))
.unwrap();
assert!(
prompt["messages"][0]["content"]["text"]
.as_str()
.unwrap()
.contains("memory_search")
);
let resource = handle_resource_read(
config_path.as_path(),
&json!({
"uri": RESOURCE_CURRENT_PLAN_URI
}),
)
.unwrap();
assert_eq!(resource["contents"][0]["mimeType"], json!("text/markdown"));
assert!(
resource["contents"][0]["text"]
.as_str()
.unwrap()
.contains("Round 8")
);
}
#[test]
fn lifecycle_tool_calls_should_create_and_read_records() {
let (_temp, config_path) = setup_config_path();
let create = handle_tool_call(
config_path.as_path(),
None,
&json!({
"name": "memory_propose",
"arguments": {
"title": "测试偏好",
"summary": "先 smoke 再收口",
"memory_type": "workflow",
"scope": "user",
"source_ref": "session:1",
"user_id": "long",
"actor": "codex",
"reason": "captured during MCP review",
"evidence_refs": ["session:1"]
}
}),
)
.unwrap();
let record_id = create["structuredContent"]["entry"]["record_id"]
.as_str()
.unwrap()
.to_string();
assert_eq!(
create["structuredContent"]["entry"]["metadata"]["actor"],
json!("codex")
);
let get = handle_tool_call(
config_path.as_path(),
None,
&json!({
"name": "memory_get",
"arguments": { "record_id": record_id }
}),
)
.unwrap();
assert_eq!(
get["structuredContent"]["record"]["record"]["state"],
json!("candidate")
);
}
#[test]
fn lifecycle_action_tool_should_persist_metadata_arguments() {
let (_temp, config_path) = setup_config_path();
let create = handle_tool_call(
config_path.as_path(),
None,
&json!({
"name": "memory_propose",
"arguments": {
"title": "测试偏好",
"summary": "先 smoke 再收口",
"memory_type": "workflow",
"scope": "user",
"source_ref": "session:1",
"user_id": "long"
}
}),
)
.unwrap();
let record_id = create["structuredContent"]["entry"]["record_id"]
.as_str()
.unwrap()
.to_string();
let accept = handle_tool_call(
config_path.as_path(),
None,
&json!({
"name": "memory_accept",
"arguments": {
"record_id": record_id,
"actor": "long",
"reason": "approved after review",
"evidence_refs": ["session:1", "session:2"]
}
}),
)
.unwrap();
assert_eq!(
accept["structuredContent"]["entry"]["metadata"]["actor"],
json!("long")
);
assert_eq!(
accept["structuredContent"]["entry"]["metadata"]["evidence_refs"],
json!(["session:1", "session:2"])
);
}
#[test]
fn retrieval_tool_calls_should_build_context_and_wakeup() {
let temp = tempdir().unwrap();
let vault_dir = temp.path().join("vault");
let repo_dir = temp.path().join("repo");
fs::create_dir_all(vault_dir.join("10-Projects")).unwrap();
fs::create_dir_all(&repo_dir).unwrap();
fs::write(
vault_dir.join("10-Projects/spool.md"),
"# spool\n\ncontext\n",
)
.unwrap();
let config_path = temp.path().join("spool.toml");
fs::write(
&config_path,
format!(
r#"[vault]
root = "{}"
[output]
default_format = "markdown"
max_chars = 12000
max_notes = 8
[[projects]]
id = "spool"
name = "spool"
repo_paths = ["{}"]
note_roots = ["10-Projects"]
"#,
vault_dir.display(),
repo_dir.display()
),
)
.unwrap();
let search = handle_tool_call(
config_path.as_path(),
None,
&json!({
"name": "memory_search",
"arguments": {
"task": "spool context",
"cwd": repo_dir,
"target": "codex"
}
}),
)
.unwrap();
assert!(
search["structuredContent"]["rendered"]
.as_str()
.unwrap()
.contains("Codex")
);
let wakeup = handle_tool_call(
config_path.as_path(),
None,
&json!({
"name": "memory_wakeup",
"arguments": {
"task": "spool wakeup",
"cwd": repo_dir,
"profile": "project",
"format": "json"
}
}),
)
.unwrap();
assert_eq!(
wakeup["structuredContent"]["packet"]["profile"],
json!("project")
);
}
#[test]
fn prompt_optimize_tool_should_return_combined_prompt_bundle() {
let temp = tempdir().unwrap();
let vault_dir = temp.path().join("vault");
let repo_dir = temp.path().join("repo");
fs::create_dir_all(vault_dir.join("10-Projects")).unwrap();
fs::create_dir_all(&repo_dir).unwrap();
fs::write(
vault_dir.join("10-Projects/spool.md"),
"# spool\n\nproject context for prompt optimize\n",
)
.unwrap();
let config_path = temp.path().join("spool.toml");
fs::write(
&config_path,
format!(
r#"[vault]
root = "{}"
[output]
default_format = "markdown"
max_chars = 12000
max_notes = 8
[[projects]]
id = "spool"
name = "spool"
repo_paths = ["{}"]
note_roots = ["10-Projects"]
"#,
vault_dir.display(),
repo_dir.display()
),
)
.unwrap();
let optimized = handle_tool_call(
config_path.as_path(),
None,
&json!({
"name": "prompt_optimize",
"arguments": {
"task": "continue the spool desktop refactor",
"cwd": repo_dir,
"target": "codex",
"profile": "project",
"provider": "codex",
"session_id": "codex:session-42"
}
}),
)
.unwrap();
let combined = optimized["structuredContent"]["combined_prompt"]
.as_str()
.unwrap();
assert!(combined.contains("Codex"));
assert_eq!(optimized["structuredContent"]["profile"], json!("project"));
assert_eq!(optimized["structuredContent"]["target"], json!("codex"));
assert_eq!(optimized["structuredContent"]["provider"], json!("codex"));
assert_eq!(
optimized["structuredContent"]["session_id"],
json!("codex:session-42")
);
assert_eq!(
optimized["structuredContent"]["runtime_trace"]["source"],
json!("mcp.prompt_optimize")
);
let trace = read_latest_prompt_optimize_trace(config_path.as_path())
.unwrap()
.unwrap();
assert_eq!(trace.provider.as_deref(), Some("codex"));
assert_eq!(trace.session_id.as_deref(), Some("codex:session-42"));
assert_eq!(trace.target, "codex");
assert_eq!(trace.profile, "project");
}
#[test]
fn parse_array_fields_should_reject_non_string_items() {
let metadata_error = parse_metadata(&json!({
"evidence_refs": ["session:1", 2]
}))
.unwrap_err();
assert!(metadata_error.message.contains("array of strings"));
let files_error = parse_files(&json!({
"files": ["src/mcp.rs", true]
}))
.unwrap_err();
assert!(files_error.message.contains("array of strings"));
}
#[test]
fn tool_and_prompt_calls_should_reject_non_object_arguments() {
let (_temp, config_path) = setup_config_path();
let tool_error = handle_tool_call(
config_path.as_path(),
None,
&json!({
"name": "memory_get",
"arguments": "record-1"
}),
)
.unwrap_err();
assert!(
tool_error
.message
.contains("field must be an object: arguments")
);
let prompt_error = handle_prompt_get(&json!({
"name": "retrieve_project_context",
"arguments": "not-an-object"
}))
.unwrap_err();
assert!(
prompt_error
.message
.contains("field must be an object: arguments")
);
}
#[test]
fn resource_reads_should_reject_non_object_params() {
let (_temp, config_path) = setup_config_path();
let error = handle_resource_read(config_path.as_path(), &json!("bad-params")).unwrap_err();
assert!(error.message.contains("field must be an object: params"));
}
#[test]
fn resource_reads_should_use_config_directory_as_base() {
let temp = tempdir().unwrap();
let config_dir = temp.path().join("config");
let docs_dir = config_dir.join("docs");
fs::create_dir_all(&docs_dir).unwrap();
let config_path = config_dir.join("spool.toml");
fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
fs::write(
docs_dir.join("MCP_PROMPTS_ROUND_8_PLAN.md"),
"Round 8 custom doc",
)
.unwrap();
fs::write(
docs_dir.join("SESSION_HANDOFF.md"),
"Session handoff custom doc",
)
.unwrap();
let resource = handle_resource_read(
config_path.as_path(),
&json!({ "uri": RESOURCE_CURRENT_PLAN_URI }),
)
.unwrap();
assert!(
resource["contents"][0]["text"]
.as_str()
.unwrap()
.contains("Round 8 custom doc")
);
}
#[test]
fn lifecycle_action_tool_should_return_is_error_for_invalid_transition() {
let (_temp, config_path) = setup_config_path();
let result = LifecycleService::new()
.record_manual(
config_path.as_path(),
crate::lifecycle_store::RecordMemoryRequest {
title: "简洁输出".to_string(),
summary: "偏好简洁".to_string(),
memory_type: "preference".to_string(),
scope: crate::domain::MemoryScope::User,
source_ref: "manual:cli".to_string(),
project_id: None,
user_id: Some("long".to_string()),
sensitivity: None,
metadata: crate::lifecycle_store::TransitionMetadata::default(),
entities: Vec::new(),
tags: Vec::new(),
triggers: Vec::new(),
related_files: Vec::new(),
related_records: Vec::new(),
supersedes: None,
applies_to: Vec::new(),
valid_until: None,
},
)
.unwrap();
let response = handle_tool_call(
config_path.as_path(),
None,
&json!({
"name": "memory_accept",
"arguments": { "record_id": result.entry.record_id }
}),
)
.unwrap();
assert_eq!(response["isError"], json!(true));
}
#[test]
fn lifecycle_read_tools_should_use_daemon_when_configured() {
let _guard = daemon_test_lock_for_test()
.lock()
.unwrap_or_else(|error| error.into_inner());
reset_daemon_sessions();
let (_temp, config_path) = setup_config_path();
let create = LifecycleService::new()
.propose_ai(
config_path.as_path(),
crate::lifecycle_store::ProposeMemoryRequest {
title: "测试偏好".to_string(),
summary: "先 smoke 再收口".to_string(),
memory_type: "workflow".to_string(),
scope: crate::domain::MemoryScope::User,
source_ref: "session:1".to_string(),
project_id: None,
user_id: Some("long".to_string()),
sensitivity: None,
metadata: crate::lifecycle_store::TransitionMetadata::default(),
entities: Vec::new(),
tags: Vec::new(),
triggers: Vec::new(),
related_files: Vec::new(),
related_records: Vec::new(),
supersedes: None,
applies_to: Vec::new(),
valid_until: None,
},
)
.unwrap();
let daemon_bin = cargo_bin("spool-daemon");
let review_queue = handle_tool_call(
config_path.as_path(),
Some(daemon_bin.as_path()),
&json!({ "name": "memory_review_queue", "arguments": {} }),
)
.unwrap();
assert_eq!(review_queue["isError"], json!(false));
assert_eq!(
review_queue["structuredContent"]["pending_review"]
.as_array()
.unwrap()
.len(),
1
);
let first_pid =
daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
let get = handle_tool_call(
config_path.as_path(),
Some(daemon_bin.as_path()),
&json!({
"name": "memory_get",
"arguments": { "record_id": create.entry.record_id }
}),
)
.unwrap();
assert_eq!(get["isError"], json!(false));
assert_eq!(
get["structuredContent"]["record"]["record"]["state"],
json!("candidate")
);
let second_pid =
daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
assert_eq!(first_pid, second_pid);
reset_daemon_sessions();
}
#[test]
fn lifecycle_read_tools_should_rebuild_daemon_session_after_exit() {
let _guard = daemon_test_lock_for_test()
.lock()
.unwrap_or_else(|error| error.into_inner());
reset_daemon_sessions();
let (_temp, config_path) = setup_config_path();
let create = LifecycleService::new()
.propose_ai(
config_path.as_path(),
crate::lifecycle_store::ProposeMemoryRequest {
title: "测试偏好".to_string(),
summary: "先 smoke 再收口".to_string(),
memory_type: "workflow".to_string(),
scope: crate::domain::MemoryScope::User,
source_ref: "session:1".to_string(),
project_id: None,
user_id: Some("long".to_string()),
sensitivity: None,
metadata: crate::lifecycle_store::TransitionMetadata::default(),
entities: Vec::new(),
tags: Vec::new(),
triggers: Vec::new(),
related_files: Vec::new(),
related_records: Vec::new(),
supersedes: None,
applies_to: Vec::new(),
valid_until: None,
},
)
.unwrap();
let daemon_bin = cargo_bin("spool-daemon");
let review_queue = handle_tool_call(
config_path.as_path(),
Some(daemon_bin.as_path()),
&json!({ "name": "memory_review_queue", "arguments": {} }),
)
.unwrap();
assert_eq!(review_queue["isError"], json!(false));
let first_pid =
daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
kill_daemon_session_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
let get = handle_tool_call(
config_path.as_path(),
Some(daemon_bin.as_path()),
&json!({
"name": "memory_get",
"arguments": { "record_id": create.entry.record_id }
}),
)
.unwrap();
assert_eq!(get["isError"], json!(false));
assert_eq!(
get["structuredContent"]["record"]["record"]["state"],
json!("candidate")
);
let second_pid =
daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
assert_ne!(first_pid, second_pid);
reset_daemon_sessions();
}
#[test]
fn lifecycle_read_tools_should_fallback_when_daemon_is_unavailable() {
let (_temp, config_path) = setup_config_path();
let create = LifecycleService::new()
.propose_ai(
config_path.as_path(),
crate::lifecycle_store::ProposeMemoryRequest {
title: "测试偏好".to_string(),
summary: "先 smoke 再收口".to_string(),
memory_type: "workflow".to_string(),
scope: crate::domain::MemoryScope::User,
source_ref: "session:1".to_string(),
project_id: None,
user_id: Some("long".to_string()),
sensitivity: None,
metadata: crate::lifecycle_store::TransitionMetadata::default(),
entities: Vec::new(),
tags: Vec::new(),
triggers: Vec::new(),
related_files: Vec::new(),
related_records: Vec::new(),
supersedes: None,
applies_to: Vec::new(),
valid_until: None,
},
)
.unwrap();
let get = handle_tool_call(
config_path.as_path(),
Some(std::path::Path::new("/definitely/missing/spool-daemon")),
&json!({
"name": "memory_get",
"arguments": { "record_id": create.entry.record_id }
}),
)
.unwrap();
assert_eq!(get["isError"], json!(false));
assert_eq!(
get["structuredContent"]["record"]["record"]["state"],
json!("candidate")
);
}
#[test]
fn initialize_request_should_return_server_capabilities() {
let (_temp, config_path) = setup_config_path();
let mut state = ServerState::default();
let response = process_message(
&mut state,
config_path.as_path(),
None,
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
}),
)
.unwrap();
assert_eq!(
response["result"]["protocolVersion"],
json!(super::PROTOCOL_VERSION)
);
assert_eq!(
response["result"]["capabilities"]["tools"]["listChanged"],
json!(false)
);
assert_eq!(
response["result"]["capabilities"]["prompts"]["listChanged"],
json!(false)
);
assert_eq!(
response["result"]["capabilities"]["resources"]["listChanged"],
json!(false)
);
}
#[test]
fn initialize_should_capture_client_capabilities_for_sampling_detection() {
let (_temp, config_path) = setup_config_path();
let mut state = ServerState::default();
process_message(
&mut state,
config_path.as_path(),
None,
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"capabilities": { "sampling": {} }
}
}),
)
.unwrap();
assert!(
state.client_supports_sampling(),
"advertised sampling capability must be detected: {state:?}"
);
let mut bare_state = ServerState::default();
process_message(
&mut bare_state,
config_path.as_path(),
None,
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
}),
)
.unwrap();
assert!(
!bare_state.client_supports_sampling(),
"missing capabilities must be treated as no sampling support: {bare_state:?}"
);
let mut other_state = ServerState::default();
process_message(
&mut other_state,
config_path.as_path(),
None,
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"capabilities": { "tools": { "listChanged": true } }
}
}),
)
.unwrap();
assert!(
!other_state.client_supports_sampling(),
"absent sampling key must be treated as unsupported: {other_state:?}"
);
}
#[test]
fn malformed_json_should_return_parse_error_without_exiting() {
let (_temp, config_path) = setup_config_path();
let mut state = ServerState::default();
let invalid = "{";
let response = match serde_json::from_str::<serde_json::Value>(invalid) {
Ok(message) => process_message(&mut state, config_path.as_path(), None, message),
Err(error) => Some(super::jsonrpc_error(
serde_json::Value::Null,
-32700,
format!("parse error: {error}"),
)),
}
.unwrap();
assert_eq!(response["error"]["code"], json!(-32700));
assert!(
response["error"]["message"]
.as_str()
.unwrap()
.contains("parse error:")
);
}
}