trusty_memory/lib.rs
1//! MCP server (stdio + HTTP/SSE) for trusty-memory.
2//!
3//! Why: Claude Code and other MCP-aware clients integrate with trusty-memory
4//! through the standardized Model Context Protocol; we expose memory + KG
5//! tools so they can be called by name.
6//! What: Provides `run_stdio` (JSON-RPC 2.0 over stdin/stdout) and `run_http`
7//! (axum HTTP/SSE stub), plus an `AppState` that carries the shared
8//! `PalaceRegistry`, on-disk data root, and a lazily-initialized embedder.
9//! Test: `cargo test -p trusty-memory-mcp` validates handshake + dispatch.
10
11use anyhow::Result;
12use serde_json::{json, Value};
13use std::path::PathBuf;
14use std::sync::Arc;
15use tokio::sync::{broadcast, OnceCell};
16use tracing::info;
17use trusty_common::mcp::{error_codes, initialize_response, Request, Response};
18use trusty_common::memory_core::embed::FastEmbedder;
19use trusty_common::memory_core::store::ChatSessionStore;
20use trusty_common::memory_core::PalaceRegistry;
21use trusty_common::ChatProvider;
22
23pub mod commands;
24pub mod openrpc;
25pub mod service;
26pub mod tools;
27pub mod web;
28
29pub use service::MemoryMcpService;
30pub use tools::MemoryMcpServer;
31
32/// Live daemon events broadcast to connected SSE subscribers.
33///
34/// Why: The dashboard needs push-driven updates so palace creation, drawer
35/// add/delete, dream cycles, and aggregate status changes are visible without
36/// polling. A single broadcast channel fans out to every connected browser.
37/// What: Tagged enum serialized as `{"type": "...", ...fields}` over SSE.
38/// Test: `web::tests::sse_stream_emits_events` subscribes, triggers a
39/// mutation, and asserts the frame arrives.
40#[derive(Clone, Debug, serde::Serialize)]
41#[serde(tag = "type", rename_all = "snake_case")]
42pub enum DaemonEvent {
43 PalaceCreated {
44 id: String,
45 name: String,
46 },
47 DrawerAdded {
48 palace_id: String,
49 drawer_count: usize,
50 },
51 DrawerDeleted {
52 palace_id: String,
53 drawer_count: usize,
54 },
55 DreamCompleted {
56 palace_id: Option<String>,
57 merged: usize,
58 pruned: usize,
59 compacted: usize,
60 closets_updated: usize,
61 duration_ms: u64,
62 },
63 StatusChanged {
64 total_drawers: usize,
65 total_vectors: usize,
66 total_kg_triples: usize,
67 },
68}
69
70/// Shared application state passed to every request handler.
71///
72/// Why: The stdio loop and HTTP server need the same handles to the registry,
73/// data root, and embedder so MCP tools can perform real reads/writes against
74/// the live trusty-memory core. The embedder is heavy (loads ONNX weights) so
75/// we hold it behind a `OnceCell` and initialize lazily on first use.
76/// What: `Clone`-able via `Arc` fields. The registry / data root are eager;
77/// `embedder` is `Arc<OnceCell<Arc<FastEmbedder>>>` so concurrent first-use
78/// races resolve to a single shared instance.
79/// Test: `app_state_default_constructs` confirms construction without panic.
80#[derive(Clone)]
81pub struct AppState {
82 pub version: String,
83 pub registry: Arc<PalaceRegistry>,
84 pub data_root: PathBuf,
85 pub embedder: Arc<OnceCell<Arc<FastEmbedder>>>,
86 /// Optional default palace applied to MCP tool calls when the caller
87 /// omits the `palace` argument. Set via `trusty-memory serve --palace`.
88 pub default_palace: Option<String>,
89 /// Active chat provider selected at startup. `None` means no upstream is
90 /// configured (no Ollama detected and no OpenRouter key) — callers must
91 /// degrade gracefully (chat endpoint returns 412).
92 pub chat_provider: Arc<OnceCell<Option<Arc<dyn ChatProvider>>>>,
93 /// Per-palace chat-session stores, opened lazily so cold-start cost is
94 /// paid only when chat-history endpoints are hit.
95 pub session_stores: Arc<dashmap::DashMap<String, Arc<ChatSessionStore>>>,
96 /// Broadcast sender for live `DaemonEvent` pushes to SSE subscribers.
97 ///
98 /// Why: Lets mutating handlers emit events that any connected dashboard
99 /// receives instantly. Cap of 128 buffers transient slow readers; if a
100 /// receiver lags it gets `RecvError::Lagged` and we emit a `lag` frame.
101 pub events: Arc<broadcast::Sender<DaemonEvent>>,
102 /// Instant the daemon started, used to compute `uptime_secs` on `/health`.
103 ///
104 /// Why (issue #35): `GET /health` reports how long the daemon has been
105 /// up. Capturing a monotonic `Instant` at `AppState` construction lets the
106 /// handler compute the elapsed seconds cheaply and without a clock-skew
107 /// hazard.
108 /// What: a wall-monotonic `Instant`; `AppState::new` stamps it at startup.
109 /// Test: `health_endpoint_includes_resource_fields`.
110 pub started_at: std::time::Instant,
111 /// In-memory ring buffer of recent tracing log lines (issue #35).
112 ///
113 /// Why: the `GET /api/v1/logs/tail` endpoint serves the last N log lines
114 /// so operators can inspect a running daemon without tailing a file. The
115 /// buffer is shared between the tracing `LogBufferLayer` (writer) and the
116 /// HTTP handler (reader).
117 /// What: a cheap `Arc`-backed clone of the buffer the subscriber writes
118 /// to. Defaults to an empty buffer for states that never install the
119 /// layer (tests, the stdio path).
120 /// Test: `logs_tail_returns_recent_lines`.
121 pub log_buffer: trusty_common::log_buffer::LogBuffer,
122 /// Most recent on-disk footprint of `data_root`, in bytes (issue #35).
123 ///
124 /// Why: `GET /health` reports `disk_bytes`. Walking the data directory on
125 /// every health request would make a frequent health poll do unbounded
126 /// I/O; a background task recomputes it every 10 s and stores it here so
127 /// the handler reads it lock-free.
128 /// What: an `AtomicU64` updated by the ticker spawned in `run_http_on`.
129 /// `0` until the first walk completes.
130 /// Test: `health_endpoint_includes_resource_fields`.
131 pub disk_bytes: Arc<std::sync::atomic::AtomicU64>,
132 /// Per-process RSS + CPU sampler, refreshed on each `/health` request
133 /// (issue #35).
134 ///
135 /// Why: CPU usage is a delta between two `sysinfo` refreshes, so the
136 /// sampler must persist between requests — hence the shared `Mutex`.
137 /// What: a `tokio::sync::Mutex<SysMetrics>` so the async health handler
138 /// can sample without blocking the runtime.
139 /// Test: `health_endpoint_includes_resource_fields`.
140 pub sys_metrics: Arc<tokio::sync::Mutex<trusty_common::sys_metrics::SysMetrics>>,
141}
142
143impl AppState {
144 /// Construct an `AppState` rooted at the given on-disk data directory.
145 ///
146 /// Why: The CLI (`serve`) and integration tests need to point the MCP
147 /// server at different roots — production at `dirs::data_dir`, tests at a
148 /// `tempfile::tempdir()`.
149 /// What: Builds an empty `PalaceRegistry`, captures the version, and
150 /// allocates an empty `OnceCell` for the embedder. `default_palace` is
151 /// `None`; use `with_default_palace` to set it.
152 /// Test: `tools::tests::dispatch_palace_create_persists` constructs an
153 /// AppState pointed at a tempdir and round-trips a palace through it.
154 pub fn new(data_root: PathBuf) -> Self {
155 let (events_tx, _) = broadcast::channel::<DaemonEvent>(128);
156 Self {
157 version: env!("CARGO_PKG_VERSION").to_string(),
158 registry: Arc::new(PalaceRegistry::new()),
159 data_root,
160 embedder: Arc::new(OnceCell::new()),
161 default_palace: None,
162 chat_provider: Arc::new(OnceCell::new()),
163 session_stores: Arc::new(dashmap::DashMap::new()),
164 events: Arc::new(events_tx),
165 started_at: std::time::Instant::now(),
166 // Default to an empty buffer — `with_log_buffer` overrides this
167 // when the daemon installs the `LogBufferLayer` (HTTP mode).
168 log_buffer: trusty_common::log_buffer::LogBuffer::new(
169 trusty_common::log_buffer::DEFAULT_LOG_CAPACITY,
170 ),
171 disk_bytes: Arc::new(std::sync::atomic::AtomicU64::new(0)),
172 sys_metrics: Arc::new(tokio::sync::Mutex::new(
173 trusty_common::sys_metrics::SysMetrics::new(),
174 )),
175 }
176 }
177
178 /// Builder-style: attach the daemon's shared [`LogBuffer`] so the
179 /// `GET /api/v1/logs/tail` endpoint serves the same lines the tracing
180 /// subscriber captures (issue #35).
181 ///
182 /// Why: `main` builds the buffer (via `init_tracing_with_buffer`) before
183 /// constructing the `AppState`, then hands a clone here so the HTTP
184 /// handler and the tracing layer observe the same ring.
185 /// What: replaces the empty default buffer with the supplied one.
186 /// Test: `logs_tail_returns_recent_lines`.
187 #[must_use]
188 pub fn with_log_buffer(mut self, buffer: trusty_common::log_buffer::LogBuffer) -> Self {
189 self.log_buffer = buffer;
190 self
191 }
192
193 /// Send a `DaemonEvent` to all connected SSE subscribers.
194 ///
195 /// Why: Mutating handlers call this after a successful write so the
196 /// dashboard can update without polling. The send is best-effort —
197 /// `broadcast::Sender::send` returns `Err` only when there are no live
198 /// receivers, which is fine (no listeners == no work to do).
199 /// What: Drops the result, so callers don't need to care whether anyone
200 /// is listening.
201 /// Test: `web::tests::sse_stream_receives_palace_created` confirms a
202 /// subscriber observes the emitted event.
203 pub fn emit(&self, event: DaemonEvent) {
204 let _ = self.events.send(event);
205 }
206
207 /// Open (or return cached) the chat-session store for a palace.
208 ///
209 /// Why: Chat session persistence lives in a dedicated SQLite file under
210 /// the palace's data dir (`chat_sessions.db`) so it doesn't intermingle
211 /// with the KG's transactional load. The store is cheap to clone via
212 /// `Arc` but the underlying r2d2 pool should be reused, so cache by id.
213 /// What: Creates the palace data dir if missing, opens (or reuses) a
214 /// `ChatSessionStore` and stashes an `Arc` in the DashMap.
215 /// Test: Indirectly via the session HTTP handlers in `web::tests`.
216 pub fn session_store(&self, palace_id: &str) -> Result<Arc<ChatSessionStore>> {
217 if let Some(entry) = self.session_stores.get(palace_id) {
218 return Ok(entry.clone());
219 }
220 let dir = self.data_root.join(palace_id);
221 std::fs::create_dir_all(&dir)
222 .map_err(|e| anyhow::anyhow!("create palace dir {}: {e}", dir.display()))?;
223 let store = Arc::new(ChatSessionStore::open(&dir.join("chat_sessions.db"))?);
224 self.session_stores
225 .insert(palace_id.to_string(), store.clone());
226 Ok(store)
227 }
228
229 /// Builder-style setter for the default palace name.
230 ///
231 /// Why: `serve --palace <name>` wants to bind every tool call to a
232 /// project-scoped namespace without forcing every MCP request to repeat
233 /// the palace argument.
234 /// What: Returns `self` with `default_palace = Some(name)`.
235 /// Test: `default_palace_used_when_arg_omitted` covers the resolution
236 /// path; this setter is exercised there.
237 pub fn with_default_palace(mut self, name: Option<String>) -> Self {
238 self.default_palace = name;
239 self
240 }
241
242 /// Resolve (or initialize) the shared embedder.
243 ///
244 /// Why: FastEmbedder load is expensive — we share one instance across all
245 /// tool calls; the `OnceCell` ensures concurrent first-use races collapse
246 /// to a single load.
247 /// What: Returns `Arc<FastEmbedder>` on success. Errors propagate from the
248 /// underlying ONNX load.
249 /// Test: Indirectly via `dispatch_remember_then_recall`.
250 /// Resolve the active chat provider, auto-detecting on first call.
251 ///
252 /// Why: Provider selection depends on filesystem-loaded config plus a
253 /// network probe (Ollama liveness), so it must be lazily initialised at
254 /// runtime. Caching the choice in a `OnceCell` keeps it stable across
255 /// concurrent requests without re-probing on every chat call.
256 /// What: On first use loads `~/.trusty-memory/config.toml`, prefers an
257 /// auto-detected Ollama instance (when `local_model.enabled`), and falls
258 /// back to OpenRouter when an API key is set. Returns `Ok(None)` when
259 /// neither is available so the caller can emit a 412.
260 /// Test: `web::tests::providers_endpoint_returns_payload` covers the
261 /// detection path indirectly through `/api/v1/chat/providers`.
262 pub async fn chat_provider(&self) -> Option<Arc<dyn ChatProvider>> {
263 self.chat_provider
264 .get_or_init(|| async {
265 let cfg = crate::web::load_user_config().unwrap_or_default();
266 if cfg.local_model.enabled {
267 if let Some(mut p) =
268 trusty_common::auto_detect_local_provider(&cfg.local_model.base_url).await
269 {
270 // auto_detect returns an empty model id; callers must
271 // set the configured model name themselves.
272 p.model = cfg.local_model.model.clone();
273 return Some(Arc::new(p) as Arc<dyn ChatProvider>);
274 }
275 }
276 if !cfg.openrouter_api_key.is_empty() {
277 return Some(Arc::new(trusty_common::OpenRouterProvider::new(
278 cfg.openrouter_api_key,
279 cfg.openrouter_model,
280 )) as Arc<dyn ChatProvider>);
281 }
282 None
283 })
284 .await
285 .clone()
286 }
287
288 pub async fn embedder(&self) -> Result<Arc<FastEmbedder>> {
289 let cell = self.embedder.clone();
290 let embedder = cell
291 .get_or_try_init(|| async {
292 let e = FastEmbedder::new().await?;
293 Ok::<Arc<FastEmbedder>, anyhow::Error>(Arc::new(e))
294 })
295 .await?
296 .clone();
297 Ok(embedder)
298 }
299}
300
301impl std::fmt::Debug for AppState {
302 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303 f.debug_struct("AppState")
304 .field("version", &self.version)
305 .field("data_root", &self.data_root)
306 .field("registry_len", &self.registry.len())
307 .finish()
308 }
309}
310
311/// Handle a single MCP JSON-RPC message and produce its response.
312///
313/// Why: Pulled out of the stdio loop so unit tests can drive every method
314/// without touching real stdin/stdout.
315/// What: Routes `initialize`, `tools/list`, `tools/call`, `ping`, and the
316/// `notifications/initialized` notification (which returns `Value::Null`).
317/// Test: See unit tests below — initialize/list/call all return expected
318/// JSON-RPC envelopes; notifications return `Null` (no response written).
319pub async fn handle_message(state: &AppState, msg: Value) -> Value {
320 let id = msg.get("id").cloned().unwrap_or(Value::Null);
321 let method = msg.get("method").and_then(|m| m.as_str()).unwrap_or("");
322
323 match method {
324 "initialize" => {
325 let extra = state
326 .default_palace
327 .as_ref()
328 .map(|dp| json!({ "default_palace": dp }));
329 let result = initialize_response("trusty-memory", &state.version, extra);
330 json!({
331 "jsonrpc": "2.0",
332 "id": id,
333 "result": result,
334 })
335 }
336 // Notifications must NOT receive a response.
337 "notifications/initialized" | "notifications/cancelled" => Value::Null,
338 "tools/list" => json!({
339 "jsonrpc": "2.0",
340 "id": id,
341 "result": tools::tool_definitions_with(state.default_palace.is_some())
342 }),
343 // OpenRPC 1.3.2 discovery — see `openrpc.rs`. Returns the full
344 // service description so orchestrators (open-mpm, etc.) can
345 // introspect every tool and its required `memory.read`/`memory.write`
346 // scope without bespoke per-server adapters.
347 "rpc.discover" => json!({
348 "jsonrpc": "2.0",
349 "id": id,
350 "result": openrpc::build_discover_response(
351 &state.version,
352 state.default_palace.is_some(),
353 ),
354 }),
355 "tools/call" => {
356 let params = msg.get("params").cloned().unwrap_or_default();
357 let tool_name = params
358 .get("name")
359 .and_then(|n| n.as_str())
360 .unwrap_or("")
361 .to_string();
362 let args = params.get("arguments").cloned().unwrap_or_default();
363 match tools::dispatch_tool(state, &tool_name, args).await {
364 Ok(content) => json!({
365 "jsonrpc": "2.0",
366 "id": id,
367 "result": {
368 "content": [{"type": "text", "text": content.to_string()}]
369 }
370 }),
371 Err(e) => json!({
372 "jsonrpc": "2.0",
373 "id": id,
374 "error": {"code": -32603, "message": e.to_string()}
375 }),
376 }
377 }
378 "ping" => json!({"jsonrpc": "2.0", "id": id, "result": {}}),
379 _ => json!({
380 "jsonrpc": "2.0",
381 "id": id,
382 "error": {
383 "code": -32601,
384 "message": format!("Method not found: {method}")
385 }
386 }),
387 }
388}
389
390/// Run the MCP stdio JSON-RPC 2.0 server loop.
391///
392/// Why: Claude Code launches MCP servers as child processes and speaks
393/// JSON-RPC over stdin/stdout — this is the primary integration path.
394/// What: Delegates to `trusty_mcp_core::run_stdio_loop`, adapting each
395/// shared `Request` back into the JSON `Value` shape `handle_message`
396/// expects, and translating the returned `Value` into a `Response`.
397/// Notifications (where `handle_message` returns `Value::Null`) become
398/// suppressed responses so the loop emits nothing on the wire.
399/// Test: `handle_message` covers protocol behaviour in unit tests.
400pub async fn run_stdio(state: AppState) -> Result<()> {
401 info!("trusty-memory MCP stdio server starting");
402 let state = Arc::new(state);
403 trusty_common::mcp::run_stdio_loop(move |req: Request| {
404 let state = state.clone();
405 async move {
406 // Re-serialise the Request into the JSON shape handle_message expects.
407 // (handle_message predates the shared types and reads loose Values.)
408 let msg = json!({
409 "jsonrpc": req.jsonrpc.unwrap_or_else(|| "2.0".to_string()),
410 "id": req.id.clone().unwrap_or(Value::Null),
411 "method": req.method,
412 "params": req.params.unwrap_or(Value::Null),
413 });
414 let resp_value = handle_message(&state, msg).await;
415 // handle_message returns Value::Null for notifications.
416 if resp_value.is_null() {
417 return Response::suppressed();
418 }
419 // Otherwise it returns the full JSON-RPC envelope as a Value;
420 // re-encode into the shared Response struct so the loop can serialise.
421 let id = resp_value.get("id").cloned();
422 if let Some(result) = resp_value.get("result").cloned() {
423 Response::ok(id, result)
424 } else if let Some(err) = resp_value.get("error") {
425 let code =
426 err.get("code")
427 .and_then(|c| c.as_i64())
428 .unwrap_or(error_codes::INTERNAL_ERROR as i64) as i32;
429 let message = err
430 .get("message")
431 .and_then(|m| m.as_str())
432 .unwrap_or("internal error")
433 .to_string();
434 Response::err(id, code, message)
435 } else {
436 Response::err(
437 id,
438 error_codes::INTERNAL_ERROR,
439 "malformed handler response",
440 )
441 }
442 }
443 })
444 .await
445}
446
447/// Run the optional HTTP/SSE + web admin server.
448///
449/// Why: A long-running daemon mode lets non-stdio clients (browsers, curl,
450/// future remote agents) hit `/health`, the `/api/v1/*` REST surface, and the
451/// embedded admin SPA.
452/// What: axum router built from `web::router()` plus a `/sse` stub for the
453/// existing MCP-over-SSE clients. Caller provides a pre-bound listener so
454/// port auto-detection lives at the call site.
455/// Test: `cargo test -p trusty-memory-mcp web::tests` exercises the router
456/// shape; manual: `curl http://127.0.0.1:<port>/health` returns `ok`.
457pub async fn run_http_on(state: AppState, listener: tokio::net::TcpListener) -> Result<()> {
458 use axum::routing::get;
459
460 // Issue #35: recompute the `data_root` disk footprint every 10 s on a
461 // background task so `GET /health` reports `disk_bytes` without doing a
462 // recursive directory walk on the request path.
463 spawn_disk_size_ticker(state.clone());
464
465 let app = web::router()
466 .route("/sse", get(sse_handler))
467 .with_state(state);
468
469 let local = listener.local_addr().ok();
470 if let Some(a) = local {
471 info!("HTTP server listening on http://{a}");
472 eprintln!("HTTP server listening on http://{a}");
473 }
474 axum::serve(listener, app).await?;
475 Ok(())
476}
477
478/// Convenience: bind `addr` and serve via [`run_http_on`].
479pub async fn run_http(state: AppState, addr: std::net::SocketAddr) -> Result<()> {
480 let listener = tokio::net::TcpListener::bind(addr).await?;
481 run_http_on(state, listener).await
482}
483
484/// Spawn a background ticker that recomputes the `data_root` disk footprint
485/// every 10 seconds and stores it in `state.disk_bytes` (issue #35).
486///
487/// Why: `GET /health` reports `disk_bytes`. Walking the data directory on
488/// every health request would turn a frequent health poll into unbounded
489/// recursive I/O. Computing it off the request path on a fixed cadence keeps
490/// `/health` cheap and bounds the staleness to ~10 s — fine for an
491/// at-a-glance footprint figure.
492/// What: spawns a detached tokio task. `AppState` is cheap to `Clone` (all
493/// `Arc` fields), so the task holds a full clone; the daemon process lives
494/// for the lifetime of the server anyway, so no `Weak` downgrade is needed.
495/// Each tick runs the blocking directory walk on `spawn_blocking` so it never
496/// stalls the async runtime, then stores the byte total atomically.
497/// Test: `health_endpoint_includes_resource_fields` asserts the field shape;
498/// the ticker cadence is not unit-tested (timing-dependent).
499fn spawn_disk_size_ticker(state: AppState) {
500 tokio::spawn(async move {
501 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
502 loop {
503 interval.tick().await;
504 let dir = state.data_root.clone();
505 // The directory walk is blocking filesystem I/O — run it on the
506 // blocking pool so it never parks an async worker thread.
507 let bytes = tokio::task::spawn_blocking(move || {
508 trusty_common::sys_metrics::dir_size_bytes(&dir)
509 })
510 .await
511 .unwrap_or(0);
512 state
513 .disk_bytes
514 .store(bytes, std::sync::atomic::Ordering::Relaxed);
515 }
516 });
517}
518
519/// Live SSE event stream — pushes `DaemonEvent` frames to dashboard clients.
520///
521/// Why: The dashboard subscribes once and reacts to live pushes (palace
522/// created, drawer added/deleted, dream completed, status changed) instead of
523/// polling `/api/v1/*` endpoints.
524/// What: Subscribes to `state.events`, emits an initial `connected` frame,
525/// then forwards every `DaemonEvent` as `data: <json>\n\n`. Lagged
526/// subscribers receive a `lag` frame indicating skipped events; channel
527/// closure ends the stream.
528/// Test: `web::tests::sse_stream_emits_palace_created` (covers subscribe +
529/// emit + receive); manual: `curl -N http://.../sse`.
530pub(crate) async fn sse_handler(
531 axum::extract::State(state): axum::extract::State<AppState>,
532) -> impl axum::response::IntoResponse {
533 use futures::StreamExt;
534 use tokio_stream::wrappers::BroadcastStream;
535
536 let rx = state.events.subscribe();
537 let initial = futures::stream::once(async {
538 Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(
539 "data: {\"type\":\"connected\"}\n\n",
540 ))
541 });
542 let events = BroadcastStream::new(rx).map(|res| {
543 let frame = match res {
544 Ok(event) => match serde_json::to_string(&event) {
545 Ok(json) => format!("data: {json}\n\n"),
546 Err(e) => format!("data: {{\"type\":\"error\",\"message\":\"{e}\"}}\n\n"),
547 },
548 Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
549 format!("data: {{\"type\":\"lag\",\"skipped\":{n}}}\n\n")
550 }
551 };
552 Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(frame))
553 });
554 let stream = initial.chain(events);
555
556 axum::response::Response::builder()
557 .header("Content-Type", "text/event-stream")
558 .header("Cache-Control", "no-cache")
559 .header("X-Accel-Buffering", "no")
560 .body(axum::body::Body::from_stream(stream))
561 .expect("valid SSE response")
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567
568 fn test_state() -> AppState {
569 let tmp = tempfile::tempdir().expect("tempdir");
570 let root = tmp.path().to_path_buf();
571 // Leak the tempdir so it lives for the test process; tests are short.
572 std::mem::forget(tmp);
573 AppState::new(root)
574 }
575
576 #[tokio::test]
577 async fn initialize_returns_protocol_version_and_capabilities() {
578 let state = test_state();
579 let req = json!({
580 "jsonrpc": "2.0",
581 "id": 1,
582 "method": "initialize",
583 "params": {
584 "protocolVersion": "2024-11-05",
585 "capabilities": {},
586 "clientInfo": {"name": "test", "version": "0"}
587 }
588 });
589 let resp = handle_message(&state, req).await;
590 assert_eq!(resp["jsonrpc"], "2.0");
591 assert_eq!(resp["id"], 1);
592 assert_eq!(resp["result"]["protocolVersion"], "2024-11-05");
593 assert!(resp["result"]["capabilities"]["tools"].is_object());
594 assert_eq!(resp["result"]["serverInfo"]["name"], "trusty-memory");
595 }
596
597 #[tokio::test]
598 async fn initialized_notification_returns_null() {
599 let state = test_state();
600 let req = json!({
601 "jsonrpc": "2.0",
602 "method": "notifications/initialized",
603 "params": {}
604 });
605 let resp = handle_message(&state, req).await;
606 assert!(resp.is_null());
607 }
608
609 #[tokio::test]
610 async fn tools_list_returns_all_tools() {
611 let state = test_state();
612 let req = json!({"jsonrpc": "2.0", "id": 2, "method": "tools/list"});
613 let resp = handle_message(&state, req).await;
614 let tools = resp["result"]["tools"].as_array().expect("tools array");
615 assert_eq!(tools.len(), 12);
616 }
617
618 #[tokio::test]
619 async fn unknown_method_returns_error() {
620 let state = test_state();
621 let req = json!({"jsonrpc": "2.0", "id": 4, "method": "wat"});
622 let resp = handle_message(&state, req).await;
623 assert_eq!(resp["error"]["code"], -32601);
624 }
625
626 #[tokio::test]
627 async fn ping_returns_empty_result() {
628 let state = test_state();
629 let req = json!({"jsonrpc": "2.0", "id": 5, "method": "ping"});
630 let resp = handle_message(&state, req).await;
631 assert!(resp["result"].is_object());
632 }
633
634 #[tokio::test]
635 async fn app_state_default_constructs() {
636 let s = test_state();
637 assert!(!s.version.is_empty());
638 assert!(s.registry.is_empty());
639 assert!(s.default_palace.is_none());
640 }
641
642 /// Why: Issue #26 — when `serve --palace <name>` is set, the MCP server
643 /// must (a) report the default in the `initialize` `serverInfo`, (b)
644 /// drop `palace` from the required schema in `tools/list`, and (c) let
645 /// `tools/call` use the default when the caller omits `palace`.
646 /// Test: Construct an AppState with a default palace, create that palace
647 /// on disk via the registry, then call `memory_remember` without a
648 /// `palace` argument and confirm it resolves to the default.
649 #[tokio::test]
650 async fn default_palace_used_when_arg_omitted() {
651 let tmp = tempfile::tempdir().expect("tempdir");
652 let root = tmp.path().to_path_buf();
653
654 // Pre-create the default palace so remember has somewhere to land.
655 let registry = trusty_common::memory_core::PalaceRegistry::new();
656 let palace = trusty_common::memory_core::Palace {
657 id: trusty_common::memory_core::PalaceId::new("default-pal"),
658 name: "default-pal".to_string(),
659 description: None,
660 created_at: chrono::Utc::now(),
661 data_dir: root.join("default-pal"),
662 };
663 registry
664 .create_palace(&root, palace)
665 .expect("create_palace");
666
667 let state = AppState::new(root).with_default_palace(Some("default-pal".to_string()));
668
669 // (a) initialize advertises the default.
670 let init = handle_message(
671 &state,
672 json!({"jsonrpc": "2.0", "id": 1, "method": "initialize"}),
673 )
674 .await;
675 assert_eq!(
676 init["result"]["serverInfo"]["default_palace"], "default-pal",
677 "initialize must echo default_palace in serverInfo"
678 );
679
680 // (b) tools/list drops `palace` from required when default is set.
681 let list = handle_message(
682 &state,
683 json!({"jsonrpc": "2.0", "id": 2, "method": "tools/list"}),
684 )
685 .await;
686 let tools = list["result"]["tools"].as_array().expect("tools array");
687 let remember = tools
688 .iter()
689 .find(|t| t["name"] == "memory_remember")
690 .expect("memory_remember tool");
691 let required: Vec<&str> = remember["inputSchema"]["required"]
692 .as_array()
693 .expect("required array")
694 .iter()
695 .filter_map(|v| v.as_str())
696 .collect();
697 assert!(
698 !required.contains(&"palace"),
699 "palace must not be required when default is configured; got {required:?}"
700 );
701 assert!(required.contains(&"text"));
702
703 // (c) tools/call resolves the default when arg is omitted.
704 let call = handle_message(
705 &state,
706 json!({
707 "jsonrpc": "2.0",
708 "id": 3,
709 "method": "tools/call",
710 "params": {
711 "name": "memory_remember",
712 "arguments": {"text": "default-palace test memory"},
713 },
714 }),
715 )
716 .await;
717 // Successful dispatch returns `result.content[0].text` JSON.
718 let text = call["result"]["content"][0]["text"]
719 .as_str()
720 .unwrap_or_else(|| panic!("expected success result, got {call}"));
721 let parsed: Value = serde_json::from_str(text).expect("parse content json");
722 assert_eq!(parsed["palace"], "default-pal");
723 assert_eq!(parsed["status"], "stored");
724 assert!(parsed["drawer_id"].as_str().is_some());
725 }
726
727 /// Why: When no default is set, `tools/call` for a palace-bound tool
728 /// without a `palace` argument should error helpfully rather than panic.
729 #[tokio::test]
730 async fn missing_palace_without_default_errors() {
731 let state = test_state();
732 let resp = handle_message(
733 &state,
734 json!({
735 "jsonrpc": "2.0",
736 "id": 7,
737 "method": "tools/call",
738 "params": {
739 "name": "memory_recall",
740 "arguments": {"query": "anything"},
741 },
742 }),
743 )
744 .await;
745 assert_eq!(resp["error"]["code"], -32603);
746 let msg = resp["error"]["message"].as_str().unwrap_or("");
747 assert!(
748 msg.contains("missing 'palace'"),
749 "expected helpful error, got: {msg}"
750 );
751 }
752
753 /// Why: initialize without a default palace must omit `default_palace`
754 /// from `serverInfo` so clients can detect the unbound mode.
755 #[tokio::test]
756 async fn initialize_without_default_palace_omits_field() {
757 let state = test_state();
758 let init = handle_message(
759 &state,
760 json!({"jsonrpc": "2.0", "id": 1, "method": "initialize"}),
761 )
762 .await;
763 assert!(init["result"]["serverInfo"]["default_palace"].is_null());
764 }
765}