Skip to main content

trusty_memory/
lib.rs

1//! MCP server (stdio + HTTP/SSE) for trusty-memory.
2//!
3//! Why: Claude Code and other MCP-aware clients integrate with trusty-memory
4//! through the standardized Model Context Protocol; we expose memory + KG
5//! tools so they can be called by name.
6//! What: Provides `run_stdio` (JSON-RPC 2.0 over stdin/stdout) and `run_http`
7//! (axum HTTP/SSE stub), plus an `AppState` that carries the shared
8//! `PalaceRegistry`, on-disk data root, and a lazily-initialized embedder.
9//! Test: `cargo test -p trusty-memory-mcp` validates handshake + dispatch.
10
11use anyhow::Result;
12use serde_json::{json, Value};
13use std::path::PathBuf;
14use std::sync::Arc;
15use tokio::sync::{broadcast, OnceCell};
16use tracing::info;
17use trusty_common::mcp::{error_codes, initialize_response, Request, Response};
18use trusty_common::memory_core::embed::FastEmbedder;
19use trusty_common::memory_core::store::ChatSessionStore;
20use trusty_common::memory_core::PalaceRegistry;
21use trusty_common::ChatProvider;
22
23pub mod commands;
24pub mod openrpc;
25pub mod service;
26pub mod tools;
27pub mod web;
28
29pub use service::MemoryMcpService;
30pub use tools::MemoryMcpServer;
31
32/// Live daemon events broadcast to connected SSE subscribers.
33///
34/// Why: The dashboard needs push-driven updates so palace creation, drawer
35/// add/delete, dream cycles, and aggregate status changes are visible without
36/// polling. A single broadcast channel fans out to every connected browser.
37/// What: Tagged enum serialized as `{"type": "...", ...fields}` over SSE.
38/// Test: `web::tests::sse_stream_emits_events` subscribes, triggers a
39/// mutation, and asserts the frame arrives.
40#[derive(Clone, Debug, serde::Serialize)]
41#[serde(tag = "type", rename_all = "snake_case")]
42pub enum DaemonEvent {
43    PalaceCreated {
44        id: String,
45        name: String,
46    },
47    DrawerAdded {
48        palace_id: String,
49        drawer_count: usize,
50    },
51    DrawerDeleted {
52        palace_id: String,
53        drawer_count: usize,
54    },
55    DreamCompleted {
56        palace_id: Option<String>,
57        merged: usize,
58        pruned: usize,
59        compacted: usize,
60        closets_updated: usize,
61        duration_ms: u64,
62    },
63    StatusChanged {
64        total_drawers: usize,
65        total_vectors: usize,
66        total_kg_triples: usize,
67    },
68}
69
70/// Shared application state passed to every request handler.
71///
72/// Why: The stdio loop and HTTP server need the same handles to the registry,
73/// data root, and embedder so MCP tools can perform real reads/writes against
74/// the live trusty-memory core. The embedder is heavy (loads ONNX weights) so
75/// we hold it behind a `OnceCell` and initialize lazily on first use.
76/// What: `Clone`-able via `Arc` fields. The registry / data root are eager;
77/// `embedder` is `Arc<OnceCell<Arc<FastEmbedder>>>` so concurrent first-use
78/// races resolve to a single shared instance.
79/// Test: `app_state_default_constructs` confirms construction without panic.
80#[derive(Clone)]
81pub struct AppState {
82    pub version: String,
83    pub registry: Arc<PalaceRegistry>,
84    pub data_root: PathBuf,
85    pub embedder: Arc<OnceCell<Arc<FastEmbedder>>>,
86    /// Optional default palace applied to MCP tool calls when the caller
87    /// omits the `palace` argument. Set via `trusty-memory serve --palace`.
88    pub default_palace: Option<String>,
89    /// Active chat provider selected at startup. `None` means no upstream is
90    /// configured (no Ollama detected and no OpenRouter key) — callers must
91    /// degrade gracefully (chat endpoint returns 412).
92    pub chat_provider: Arc<OnceCell<Option<Arc<dyn ChatProvider>>>>,
93    /// Per-palace chat-session stores, opened lazily so cold-start cost is
94    /// paid only when chat-history endpoints are hit.
95    pub session_stores: Arc<dashmap::DashMap<String, Arc<ChatSessionStore>>>,
96    /// Broadcast sender for live `DaemonEvent` pushes to SSE subscribers.
97    ///
98    /// Why: Lets mutating handlers emit events that any connected dashboard
99    /// receives instantly. Cap of 128 buffers transient slow readers; if a
100    /// receiver lags it gets `RecvError::Lagged` and we emit a `lag` frame.
101    pub events: Arc<broadcast::Sender<DaemonEvent>>,
102    /// Instant the daemon started, used to compute `uptime_secs` on `/health`.
103    ///
104    /// Why (issue #35): `GET /health` reports how long the daemon has been
105    /// up. Capturing a monotonic `Instant` at `AppState` construction lets the
106    /// handler compute the elapsed seconds cheaply and without a clock-skew
107    /// hazard.
108    /// What: a wall-monotonic `Instant`; `AppState::new` stamps it at startup.
109    /// Test: `health_endpoint_includes_resource_fields`.
110    pub started_at: std::time::Instant,
111    /// In-memory ring buffer of recent tracing log lines (issue #35).
112    ///
113    /// Why: the `GET /api/v1/logs/tail` endpoint serves the last N log lines
114    /// so operators can inspect a running daemon without tailing a file. The
115    /// buffer is shared between the tracing `LogBufferLayer` (writer) and the
116    /// HTTP handler (reader).
117    /// What: a cheap `Arc`-backed clone of the buffer the subscriber writes
118    /// to. Defaults to an empty buffer for states that never install the
119    /// layer (tests, the stdio path).
120    /// Test: `logs_tail_returns_recent_lines`.
121    pub log_buffer: trusty_common::log_buffer::LogBuffer,
122    /// Most recent on-disk footprint of `data_root`, in bytes (issue #35).
123    ///
124    /// Why: `GET /health` reports `disk_bytes`. Walking the data directory on
125    /// every health request would make a frequent health poll do unbounded
126    /// I/O; a background task recomputes it every 10 s and stores it here so
127    /// the handler reads it lock-free.
128    /// What: an `AtomicU64` updated by the ticker spawned in `run_http_on`.
129    /// `0` until the first walk completes.
130    /// Test: `health_endpoint_includes_resource_fields`.
131    pub disk_bytes: Arc<std::sync::atomic::AtomicU64>,
132    /// Per-process RSS + CPU sampler, refreshed on each `/health` request
133    /// (issue #35).
134    ///
135    /// Why: CPU usage is a delta between two `sysinfo` refreshes, so the
136    /// sampler must persist between requests — hence the shared `Mutex`.
137    /// What: a `tokio::sync::Mutex<SysMetrics>` so the async health handler
138    /// can sample without blocking the runtime.
139    /// Test: `health_endpoint_includes_resource_fields`.
140    pub sys_metrics: Arc<tokio::sync::Mutex<trusty_common::sys_metrics::SysMetrics>>,
141}
142
143impl AppState {
144    /// Construct an `AppState` rooted at the given on-disk data directory.
145    ///
146    /// Why: The CLI (`serve`) and integration tests need to point the MCP
147    /// server at different roots — production at `dirs::data_dir`, tests at a
148    /// `tempfile::tempdir()`.
149    /// What: Builds an empty `PalaceRegistry`, captures the version, and
150    /// allocates an empty `OnceCell` for the embedder. `default_palace` is
151    /// `None`; use `with_default_palace` to set it.
152    /// Test: `tools::tests::dispatch_palace_create_persists` constructs an
153    /// AppState pointed at a tempdir and round-trips a palace through it.
154    pub fn new(data_root: PathBuf) -> Self {
155        let (events_tx, _) = broadcast::channel::<DaemonEvent>(128);
156        Self {
157            version: env!("CARGO_PKG_VERSION").to_string(),
158            registry: Arc::new(PalaceRegistry::new()),
159            data_root,
160            embedder: Arc::new(OnceCell::new()),
161            default_palace: None,
162            chat_provider: Arc::new(OnceCell::new()),
163            session_stores: Arc::new(dashmap::DashMap::new()),
164            events: Arc::new(events_tx),
165            started_at: std::time::Instant::now(),
166            // Default to an empty buffer — `with_log_buffer` overrides this
167            // when the daemon installs the `LogBufferLayer` (HTTP mode).
168            log_buffer: trusty_common::log_buffer::LogBuffer::new(
169                trusty_common::log_buffer::DEFAULT_LOG_CAPACITY,
170            ),
171            disk_bytes: Arc::new(std::sync::atomic::AtomicU64::new(0)),
172            sys_metrics: Arc::new(tokio::sync::Mutex::new(
173                trusty_common::sys_metrics::SysMetrics::new(),
174            )),
175        }
176    }
177
178    /// Scan `data_root/palaces/` and re-register every persisted palace into
179    /// the in-memory [`PalaceRegistry`].
180    ///
181    /// Why: `AppState::new` builds an *empty* registry, so after a daemon
182    /// restart `palace_list` / the dashboard reported zero palaces even though
183    /// dozens existed on disk — palace metadata was persisted by
184    /// `palace_create` but never re-hydrated on startup. This method closes
185    /// that gap by walking the on-disk layout (each subdirectory holding a
186    /// `palace.json` is one palace) and rebuilding a live `PalaceHandle` for
187    /// each, so recall paths see the full set immediately after a restart.
188    /// What: Runs the blocking filesystem walk + per-palace `PalaceHandle::open`
189    /// on a `spawn_blocking` thread (so it never stalls the async runtime),
190    /// registers each successfully opened palace via `register_arc`, logs every
191    /// load at `debug!`, and returns the count loaded. A palace that fails to
192    /// open (corrupt index, unreadable `kg.db`, etc.) is logged at `warn!` and
193    /// skipped — one bad palace must not abort startup or crash the daemon.
194    /// Test: `tests::load_palaces_from_disk_rehydrates_registry` writes two
195    /// palaces into a tempdir, constructs an `AppState`, calls this method, and
196    /// asserts the returned count and registry contents.
197    pub async fn load_palaces_from_disk(&self) -> Result<usize> {
198        let data_root = self.data_root.clone();
199        let registry = self.registry.clone();
200        // The directory walk and each `PalaceHandle::open` perform blocking
201        // filesystem + redb/usearch I/O — run the whole hydration on the
202        // blocking pool so it never parks an async worker thread.
203        let count = tokio::task::spawn_blocking(move || -> Result<usize> {
204            let palaces = PalaceRegistry::list_palaces(&data_root)?;
205            let mut loaded = 0usize;
206            for palace in palaces {
207                match trusty_common::memory_core::PalaceHandle::open(&palace) {
208                    Ok(handle) => {
209                        tracing::debug!(
210                            palace = %palace.id,
211                            data_dir = %palace.data_dir.display(),
212                            "loaded palace from disk"
213                        );
214                        registry.register_arc(handle);
215                        loaded += 1;
216                    }
217                    Err(e) => {
218                        tracing::warn!(
219                            palace = %palace.id,
220                            "skipping palace during startup hydration: {e:#}"
221                        );
222                    }
223                }
224            }
225            Ok(loaded)
226        })
227        .await
228        .map_err(|e| anyhow::anyhow!("join load_palaces_from_disk: {e}"))??;
229        Ok(count)
230    }
231
232    /// Builder-style: attach the daemon's shared [`LogBuffer`] so the
233    /// `GET /api/v1/logs/tail` endpoint serves the same lines the tracing
234    /// subscriber captures (issue #35).
235    ///
236    /// Why: `main` builds the buffer (via `init_tracing_with_buffer`) before
237    /// constructing the `AppState`, then hands a clone here so the HTTP
238    /// handler and the tracing layer observe the same ring.
239    /// What: replaces the empty default buffer with the supplied one.
240    /// Test: `logs_tail_returns_recent_lines`.
241    #[must_use]
242    pub fn with_log_buffer(mut self, buffer: trusty_common::log_buffer::LogBuffer) -> Self {
243        self.log_buffer = buffer;
244        self
245    }
246
247    /// Send a `DaemonEvent` to all connected SSE subscribers.
248    ///
249    /// Why: Mutating handlers call this after a successful write so the
250    /// dashboard can update without polling. The send is best-effort —
251    /// `broadcast::Sender::send` returns `Err` only when there are no live
252    /// receivers, which is fine (no listeners == no work to do).
253    /// What: Drops the result, so callers don't need to care whether anyone
254    /// is listening.
255    /// Test: `web::tests::sse_stream_receives_palace_created` confirms a
256    /// subscriber observes the emitted event.
257    pub fn emit(&self, event: DaemonEvent) {
258        let _ = self.events.send(event);
259    }
260
261    /// Open (or return cached) the chat-session store for a palace.
262    ///
263    /// Why: Chat session persistence lives in a dedicated SQLite file under
264    /// the palace's data dir (`chat_sessions.db`) so it doesn't intermingle
265    /// with the KG's transactional load. The store is cheap to clone via
266    /// `Arc` but the underlying r2d2 pool should be reused, so cache by id.
267    /// What: Creates the palace data dir if missing, opens (or reuses) a
268    /// `ChatSessionStore` and stashes an `Arc` in the DashMap.
269    /// Test: Indirectly via the session HTTP handlers in `web::tests`.
270    pub fn session_store(&self, palace_id: &str) -> Result<Arc<ChatSessionStore>> {
271        if let Some(entry) = self.session_stores.get(palace_id) {
272            return Ok(entry.clone());
273        }
274        let dir = self.data_root.join(palace_id);
275        std::fs::create_dir_all(&dir)
276            .map_err(|e| anyhow::anyhow!("create palace dir {}: {e}", dir.display()))?;
277        let store = Arc::new(ChatSessionStore::open(&dir.join("chat_sessions.db"))?);
278        self.session_stores
279            .insert(palace_id.to_string(), store.clone());
280        Ok(store)
281    }
282
283    /// Builder-style setter for the default palace name.
284    ///
285    /// Why: `serve --palace <name>` wants to bind every tool call to a
286    /// project-scoped namespace without forcing every MCP request to repeat
287    /// the palace argument.
288    /// What: Returns `self` with `default_palace = Some(name)`.
289    /// Test: `default_palace_used_when_arg_omitted` covers the resolution
290    /// path; this setter is exercised there.
291    pub fn with_default_palace(mut self, name: Option<String>) -> Self {
292        self.default_palace = name;
293        self
294    }
295
296    /// Resolve (or initialize) the shared embedder.
297    ///
298    /// Why: FastEmbedder load is expensive — we share one instance across all
299    /// tool calls; the `OnceCell` ensures concurrent first-use races collapse
300    /// to a single load.
301    /// What: Returns `Arc<FastEmbedder>` on success. Errors propagate from the
302    /// underlying ONNX load.
303    /// Test: Indirectly via `dispatch_remember_then_recall`.
304    /// Resolve the active chat provider, auto-detecting on first call.
305    ///
306    /// Why: Provider selection depends on filesystem-loaded config plus a
307    /// network probe (Ollama liveness), so it must be lazily initialised at
308    /// runtime. Caching the choice in a `OnceCell` keeps it stable across
309    /// concurrent requests without re-probing on every chat call.
310    /// What: On first use loads `~/.trusty-memory/config.toml`, prefers an
311    /// auto-detected Ollama instance (when `local_model.enabled`), and falls
312    /// back to OpenRouter when an API key is set. Returns `Ok(None)` when
313    /// neither is available so the caller can emit a 412.
314    /// Test: `web::tests::providers_endpoint_returns_payload` covers the
315    /// detection path indirectly through `/api/v1/chat/providers`.
316    pub async fn chat_provider(&self) -> Option<Arc<dyn ChatProvider>> {
317        self.chat_provider
318            .get_or_init(|| async {
319                let cfg = crate::web::load_user_config().unwrap_or_default();
320                if cfg.local_model.enabled {
321                    if let Some(mut p) =
322                        trusty_common::auto_detect_local_provider(&cfg.local_model.base_url).await
323                    {
324                        // auto_detect returns an empty model id; callers must
325                        // set the configured model name themselves.
326                        p.model = cfg.local_model.model.clone();
327                        return Some(Arc::new(p) as Arc<dyn ChatProvider>);
328                    }
329                }
330                if !cfg.openrouter_api_key.is_empty() {
331                    return Some(Arc::new(trusty_common::OpenRouterProvider::new(
332                        cfg.openrouter_api_key,
333                        cfg.openrouter_model,
334                    )) as Arc<dyn ChatProvider>);
335                }
336                None
337            })
338            .await
339            .clone()
340    }
341
342    pub async fn embedder(&self) -> Result<Arc<FastEmbedder>> {
343        let cell = self.embedder.clone();
344        let embedder = cell
345            .get_or_try_init(|| async {
346                let e = FastEmbedder::new().await?;
347                Ok::<Arc<FastEmbedder>, anyhow::Error>(Arc::new(e))
348            })
349            .await?
350            .clone();
351        Ok(embedder)
352    }
353}
354
355impl std::fmt::Debug for AppState {
356    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
357        f.debug_struct("AppState")
358            .field("version", &self.version)
359            .field("data_root", &self.data_root)
360            .field("registry_len", &self.registry.len())
361            .finish()
362    }
363}
364
365/// Handle a single MCP JSON-RPC message and produce its response.
366///
367/// Why: Pulled out of the stdio loop so unit tests can drive every method
368/// without touching real stdin/stdout.
369/// What: Routes `initialize`, `tools/list`, `tools/call`, `ping`, and the
370/// `notifications/initialized` notification (which returns `Value::Null`).
371/// Test: See unit tests below — initialize/list/call all return expected
372/// JSON-RPC envelopes; notifications return `Null` (no response written).
373pub async fn handle_message(state: &AppState, msg: Value) -> Value {
374    let id = msg.get("id").cloned().unwrap_or(Value::Null);
375    let method = msg.get("method").and_then(|m| m.as_str()).unwrap_or("");
376
377    match method {
378        "initialize" => {
379            let extra = state
380                .default_palace
381                .as_ref()
382                .map(|dp| json!({ "default_palace": dp }));
383            let result = initialize_response("trusty-memory", &state.version, extra);
384            json!({
385                "jsonrpc": "2.0",
386                "id": id,
387                "result": result,
388            })
389        }
390        // Notifications must NOT receive a response.
391        "notifications/initialized" | "notifications/cancelled" => Value::Null,
392        "tools/list" => json!({
393            "jsonrpc": "2.0",
394            "id": id,
395            "result": tools::tool_definitions_with(state.default_palace.is_some())
396        }),
397        // OpenRPC 1.3.2 discovery — see `openrpc.rs`. Returns the full
398        // service description so orchestrators (open-mpm, etc.) can
399        // introspect every tool and its required `memory.read`/`memory.write`
400        // scope without bespoke per-server adapters.
401        "rpc.discover" => json!({
402            "jsonrpc": "2.0",
403            "id": id,
404            "result": openrpc::build_discover_response(
405                &state.version,
406                state.default_palace.is_some(),
407            ),
408        }),
409        "tools/call" => {
410            let params = msg.get("params").cloned().unwrap_or_default();
411            let tool_name = params
412                .get("name")
413                .and_then(|n| n.as_str())
414                .unwrap_or("")
415                .to_string();
416            let args = params.get("arguments").cloned().unwrap_or_default();
417            match tools::dispatch_tool(state, &tool_name, args).await {
418                Ok(content) => json!({
419                    "jsonrpc": "2.0",
420                    "id": id,
421                    "result": {
422                        "content": [{"type": "text", "text": content.to_string()}]
423                    }
424                }),
425                Err(e) => json!({
426                    "jsonrpc": "2.0",
427                    "id": id,
428                    "error": {"code": -32603, "message": e.to_string()}
429                }),
430            }
431        }
432        "ping" => json!({"jsonrpc": "2.0", "id": id, "result": {}}),
433        _ => json!({
434            "jsonrpc": "2.0",
435            "id": id,
436            "error": {
437                "code": -32601,
438                "message": format!("Method not found: {method}")
439            }
440        }),
441    }
442}
443
444/// Run the MCP stdio JSON-RPC 2.0 server loop.
445///
446/// Why: Claude Code launches MCP servers as child processes and speaks
447/// JSON-RPC over stdin/stdout — this is the primary integration path.
448/// What: Delegates to `trusty_mcp_core::run_stdio_loop`, adapting each
449/// shared `Request` back into the JSON `Value` shape `handle_message`
450/// expects, and translating the returned `Value` into a `Response`.
451/// Notifications (where `handle_message` returns `Value::Null`) become
452/// suppressed responses so the loop emits nothing on the wire.
453/// Test: `handle_message` covers protocol behaviour in unit tests.
454pub async fn run_stdio(state: AppState) -> Result<()> {
455    info!("trusty-memory MCP stdio server starting");
456    let state = Arc::new(state);
457    trusty_common::mcp::run_stdio_loop(move |req: Request| {
458        let state = state.clone();
459        async move {
460            // Re-serialise the Request into the JSON shape handle_message expects.
461            // (handle_message predates the shared types and reads loose Values.)
462            let msg = json!({
463                "jsonrpc": req.jsonrpc.unwrap_or_else(|| "2.0".to_string()),
464                "id": req.id.clone().unwrap_or(Value::Null),
465                "method": req.method,
466                "params": req.params.unwrap_or(Value::Null),
467            });
468            let resp_value = handle_message(&state, msg).await;
469            // handle_message returns Value::Null for notifications.
470            if resp_value.is_null() {
471                return Response::suppressed();
472            }
473            // Otherwise it returns the full JSON-RPC envelope as a Value;
474            // re-encode into the shared Response struct so the loop can serialise.
475            let id = resp_value.get("id").cloned();
476            if let Some(result) = resp_value.get("result").cloned() {
477                Response::ok(id, result)
478            } else if let Some(err) = resp_value.get("error") {
479                let code =
480                    err.get("code")
481                        .and_then(|c| c.as_i64())
482                        .unwrap_or(error_codes::INTERNAL_ERROR as i64) as i32;
483                let message = err
484                    .get("message")
485                    .and_then(|m| m.as_str())
486                    .unwrap_or("internal error")
487                    .to_string();
488                Response::err(id, code, message)
489            } else {
490                Response::err(
491                    id,
492                    error_codes::INTERNAL_ERROR,
493                    "malformed handler response",
494                )
495            }
496        }
497    })
498    .await
499}
500
501/// Run the optional HTTP/SSE + web admin server.
502///
503/// Why: A long-running daemon mode lets non-stdio clients (browsers, curl,
504/// future remote agents) hit `/health`, the `/api/v1/*` REST surface, and the
505/// embedded admin SPA.
506/// What: axum router built from `web::router()` plus a `/sse` stub for the
507/// existing MCP-over-SSE clients. Caller provides a pre-bound listener so
508/// port auto-detection lives at the call site.
509/// Test: `cargo test -p trusty-memory-mcp web::tests` exercises the router
510/// shape; manual: `curl http://127.0.0.1:<port>/health` returns `ok`.
511pub async fn run_http_on(state: AppState, listener: tokio::net::TcpListener) -> Result<()> {
512    use axum::routing::get;
513
514    // Issue #35: recompute the `data_root` disk footprint every 10 s on a
515    // background task so `GET /health` reports `disk_bytes` without doing a
516    // recursive directory walk on the request path.
517    spawn_disk_size_ticker(state.clone());
518
519    let app = web::router()
520        .route("/sse", get(sse_handler))
521        .with_state(state);
522
523    let local = listener.local_addr().ok();
524    if let Some(a) = local {
525        info!("HTTP server listening on http://{a}");
526        eprintln!("HTTP server listening on http://{a}");
527    }
528    axum::serve(listener, app).await?;
529    Ok(())
530}
531
532/// Convenience: bind `addr` and serve via [`run_http_on`].
533pub async fn run_http(state: AppState, addr: std::net::SocketAddr) -> Result<()> {
534    let listener = tokio::net::TcpListener::bind(addr).await?;
535    run_http_on(state, listener).await
536}
537
538/// Spawn a background ticker that recomputes the `data_root` disk footprint
539/// every 10 seconds and stores it in `state.disk_bytes` (issue #35).
540///
541/// Why: `GET /health` reports `disk_bytes`. Walking the data directory on
542/// every health request would turn a frequent health poll into unbounded
543/// recursive I/O. Computing it off the request path on a fixed cadence keeps
544/// `/health` cheap and bounds the staleness to ~10 s — fine for an
545/// at-a-glance footprint figure.
546/// What: spawns a detached tokio task. `AppState` is cheap to `Clone` (all
547/// `Arc` fields), so the task holds a full clone; the daemon process lives
548/// for the lifetime of the server anyway, so no `Weak` downgrade is needed.
549/// Each tick runs the blocking directory walk on `spawn_blocking` so it never
550/// stalls the async runtime, then stores the byte total atomically.
551/// Test: `health_endpoint_includes_resource_fields` asserts the field shape;
552/// the ticker cadence is not unit-tested (timing-dependent).
553fn spawn_disk_size_ticker(state: AppState) {
554    tokio::spawn(async move {
555        let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
556        loop {
557            interval.tick().await;
558            let dir = state.data_root.clone();
559            // The directory walk is blocking filesystem I/O — run it on the
560            // blocking pool so it never parks an async worker thread.
561            let bytes = tokio::task::spawn_blocking(move || {
562                trusty_common::sys_metrics::dir_size_bytes(&dir)
563            })
564            .await
565            .unwrap_or(0);
566            state
567                .disk_bytes
568                .store(bytes, std::sync::atomic::Ordering::Relaxed);
569        }
570    });
571}
572
573/// Live SSE event stream — pushes `DaemonEvent` frames to dashboard clients.
574///
575/// Why: The dashboard subscribes once and reacts to live pushes (palace
576/// created, drawer added/deleted, dream completed, status changed) instead of
577/// polling `/api/v1/*` endpoints.
578/// What: Subscribes to `state.events`, emits an initial `connected` frame,
579/// then forwards every `DaemonEvent` as `data: <json>\n\n`. Lagged
580/// subscribers receive a `lag` frame indicating skipped events; channel
581/// closure ends the stream.
582/// Test: `web::tests::sse_stream_emits_palace_created` (covers subscribe +
583/// emit + receive); manual: `curl -N http://.../sse`.
584pub(crate) async fn sse_handler(
585    axum::extract::State(state): axum::extract::State<AppState>,
586) -> impl axum::response::IntoResponse {
587    use futures::StreamExt;
588    use tokio_stream::wrappers::BroadcastStream;
589
590    let rx = state.events.subscribe();
591    let initial = futures::stream::once(async {
592        Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(
593            "data: {\"type\":\"connected\"}\n\n",
594        ))
595    });
596    let events = BroadcastStream::new(rx).map(|res| {
597        let frame = match res {
598            Ok(event) => match serde_json::to_string(&event) {
599                Ok(json) => format!("data: {json}\n\n"),
600                Err(e) => format!("data: {{\"type\":\"error\",\"message\":\"{e}\"}}\n\n"),
601            },
602            Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
603                format!("data: {{\"type\":\"lag\",\"skipped\":{n}}}\n\n")
604            }
605        };
606        Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(frame))
607    });
608    let stream = initial.chain(events);
609
610    axum::response::Response::builder()
611        .header("Content-Type", "text/event-stream")
612        .header("Cache-Control", "no-cache")
613        .header("X-Accel-Buffering", "no")
614        .body(axum::body::Body::from_stream(stream))
615        .expect("valid SSE response")
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621
622    fn test_state() -> AppState {
623        let tmp = tempfile::tempdir().expect("tempdir");
624        let root = tmp.path().to_path_buf();
625        // Leak the tempdir so it lives for the test process; tests are short.
626        std::mem::forget(tmp);
627        AppState::new(root)
628    }
629
630    #[tokio::test]
631    async fn initialize_returns_protocol_version_and_capabilities() {
632        let state = test_state();
633        let req = json!({
634            "jsonrpc": "2.0",
635            "id": 1,
636            "method": "initialize",
637            "params": {
638                "protocolVersion": "2024-11-05",
639                "capabilities": {},
640                "clientInfo": {"name": "test", "version": "0"}
641            }
642        });
643        let resp = handle_message(&state, req).await;
644        assert_eq!(resp["jsonrpc"], "2.0");
645        assert_eq!(resp["id"], 1);
646        assert_eq!(resp["result"]["protocolVersion"], "2024-11-05");
647        assert!(resp["result"]["capabilities"]["tools"].is_object());
648        assert_eq!(resp["result"]["serverInfo"]["name"], "trusty-memory");
649    }
650
651    #[tokio::test]
652    async fn initialized_notification_returns_null() {
653        let state = test_state();
654        let req = json!({
655            "jsonrpc": "2.0",
656            "method": "notifications/initialized",
657            "params": {}
658        });
659        let resp = handle_message(&state, req).await;
660        assert!(resp.is_null());
661    }
662
663    #[tokio::test]
664    async fn tools_list_returns_all_tools() {
665        let state = test_state();
666        let req = json!({"jsonrpc": "2.0", "id": 2, "method": "tools/list"});
667        let resp = handle_message(&state, req).await;
668        let tools = resp["result"]["tools"].as_array().expect("tools array");
669        assert_eq!(tools.len(), 12);
670    }
671
672    #[tokio::test]
673    async fn unknown_method_returns_error() {
674        let state = test_state();
675        let req = json!({"jsonrpc": "2.0", "id": 4, "method": "wat"});
676        let resp = handle_message(&state, req).await;
677        assert_eq!(resp["error"]["code"], -32601);
678    }
679
680    #[tokio::test]
681    async fn ping_returns_empty_result() {
682        let state = test_state();
683        let req = json!({"jsonrpc": "2.0", "id": 5, "method": "ping"});
684        let resp = handle_message(&state, req).await;
685        assert!(resp["result"].is_object());
686    }
687
688    #[tokio::test]
689    async fn app_state_default_constructs() {
690        let s = test_state();
691        assert!(!s.version.is_empty());
692        assert!(s.registry.is_empty());
693        assert!(s.default_palace.is_none());
694    }
695
696    /// Why: Issue #26 — when `serve --palace <name>` is set, the MCP server
697    /// must (a) report the default in the `initialize` `serverInfo`, (b)
698    /// drop `palace` from the required schema in `tools/list`, and (c) let
699    /// `tools/call` use the default when the caller omits `palace`.
700    /// Test: Construct an AppState with a default palace, create that palace
701    /// on disk via the registry, then call `memory_remember` without a
702    /// `palace` argument and confirm it resolves to the default.
703    #[tokio::test]
704    async fn default_palace_used_when_arg_omitted() {
705        let tmp = tempfile::tempdir().expect("tempdir");
706        let root = tmp.path().to_path_buf();
707
708        // Pre-create the default palace so remember has somewhere to land.
709        let registry = trusty_common::memory_core::PalaceRegistry::new();
710        let palace = trusty_common::memory_core::Palace {
711            id: trusty_common::memory_core::PalaceId::new("default-pal"),
712            name: "default-pal".to_string(),
713            description: None,
714            created_at: chrono::Utc::now(),
715            data_dir: root.join("default-pal"),
716        };
717        registry
718            .create_palace(&root, palace)
719            .expect("create_palace");
720
721        let state = AppState::new(root).with_default_palace(Some("default-pal".to_string()));
722
723        // (a) initialize advertises the default.
724        let init = handle_message(
725            &state,
726            json!({"jsonrpc": "2.0", "id": 1, "method": "initialize"}),
727        )
728        .await;
729        assert_eq!(
730            init["result"]["serverInfo"]["default_palace"], "default-pal",
731            "initialize must echo default_palace in serverInfo"
732        );
733
734        // (b) tools/list drops `palace` from required when default is set.
735        let list = handle_message(
736            &state,
737            json!({"jsonrpc": "2.0", "id": 2, "method": "tools/list"}),
738        )
739        .await;
740        let tools = list["result"]["tools"].as_array().expect("tools array");
741        let remember = tools
742            .iter()
743            .find(|t| t["name"] == "memory_remember")
744            .expect("memory_remember tool");
745        let required: Vec<&str> = remember["inputSchema"]["required"]
746            .as_array()
747            .expect("required array")
748            .iter()
749            .filter_map(|v| v.as_str())
750            .collect();
751        assert!(
752            !required.contains(&"palace"),
753            "palace must not be required when default is configured; got {required:?}"
754        );
755        assert!(required.contains(&"text"));
756
757        // (c) tools/call resolves the default when arg is omitted.
758        let call = handle_message(
759            &state,
760            json!({
761                "jsonrpc": "2.0",
762                "id": 3,
763                "method": "tools/call",
764                "params": {
765                    "name": "memory_remember",
766                    "arguments": {"text": "default-palace test memory"},
767                },
768            }),
769        )
770        .await;
771        // Successful dispatch returns `result.content[0].text` JSON.
772        let text = call["result"]["content"][0]["text"]
773            .as_str()
774            .unwrap_or_else(|| panic!("expected success result, got {call}"));
775        let parsed: Value = serde_json::from_str(text).expect("parse content json");
776        assert_eq!(parsed["palace"], "default-pal");
777        assert_eq!(parsed["status"], "stored");
778        assert!(parsed["drawer_id"].as_str().is_some());
779    }
780
781    /// Why: When no default is set, `tools/call` for a palace-bound tool
782    /// without a `palace` argument should error helpfully rather than panic.
783    #[tokio::test]
784    async fn missing_palace_without_default_errors() {
785        let state = test_state();
786        let resp = handle_message(
787            &state,
788            json!({
789                "jsonrpc": "2.0",
790                "id": 7,
791                "method": "tools/call",
792                "params": {
793                    "name": "memory_recall",
794                    "arguments": {"query": "anything"},
795                },
796            }),
797        )
798        .await;
799        assert_eq!(resp["error"]["code"], -32603);
800        let msg = resp["error"]["message"].as_str().unwrap_or("");
801        assert!(
802            msg.contains("missing 'palace'"),
803            "expected helpful error, got: {msg}"
804        );
805    }
806
807    /// Why: regression for the "palaces lost on restart" bug — `AppState::new`
808    /// builds an empty registry, so the daemon must call
809    /// `load_palaces_from_disk` on startup to re-register palaces persisted by
810    /// a previous run. Without that call the registry stays empty even though
811    /// `palace.json` files exist on disk.
812    /// What: persists two palaces under a tempdir (via the same
813    /// `create_palace` path the `palace_create` tool uses), constructs a fresh
814    /// `AppState` rooted there, calls `load_palaces_from_disk`, and asserts the
815    /// returned count and registry contents.
816    /// Test: this test itself.
817    #[tokio::test]
818    async fn load_palaces_from_disk_rehydrates_registry() {
819        use trusty_common::memory_core::{Palace, PalaceId, PalaceRegistry};
820
821        let tmp = tempfile::tempdir().expect("tempdir");
822        let root = tmp.path().to_path_buf();
823
824        // Phase 1: persist two palaces to disk, then drop the writer registry
825        // so nothing is held in memory — simulating a prior daemon run.
826        {
827            let writer = PalaceRegistry::new();
828            for id in ["alpha", "beta"] {
829                let palace = Palace {
830                    id: PalaceId::new(id),
831                    name: id.to_string(),
832                    description: None,
833                    created_at: chrono::Utc::now(),
834                    data_dir: root.join(id),
835                };
836                writer
837                    .create_palace(&root, palace)
838                    .expect("persist palace to disk");
839            }
840        }
841
842        // Add a stray non-palace subdirectory; the walker must ignore it.
843        std::fs::create_dir_all(root.join("not-a-palace")).expect("mkdir");
844
845        // Phase 2: fresh AppState starts with an empty registry (the bug).
846        let state = AppState::new(root);
847        assert!(
848            state.registry.is_empty(),
849            "AppState::new must start with an empty registry"
850        );
851
852        // The fix: hydrate from disk.
853        let count = state
854            .load_palaces_from_disk()
855            .await
856            .expect("load_palaces_from_disk");
857
858        assert_eq!(count, 2, "both persisted palaces should be loaded");
859        assert_eq!(state.registry.len(), 2, "registry should hold both palaces");
860        let ids: Vec<String> = state.registry.list().into_iter().map(|p| p.0).collect();
861        assert!(ids.contains(&"alpha".to_string()));
862        assert!(ids.contains(&"beta".to_string()));
863    }
864
865    /// Why: an empty (or missing) `palaces/` directory must not error — a
866    /// brand-new install has nothing to hydrate and should report zero.
867    #[tokio::test]
868    async fn load_palaces_from_disk_empty_root_returns_zero() {
869        let state = test_state();
870        let count = state
871            .load_palaces_from_disk()
872            .await
873            .expect("load_palaces_from_disk on empty root");
874        assert_eq!(count, 0);
875        assert!(state.registry.is_empty());
876    }
877
878    /// Why: initialize without a default palace must omit `default_palace`
879    /// from `serverInfo` so clients can detect the unbound mode.
880    #[tokio::test]
881    async fn initialize_without_default_palace_omits_field() {
882        let state = test_state();
883        let init = handle_message(
884            &state,
885            json!({"jsonrpc": "2.0", "id": 1, "method": "initialize"}),
886        )
887        .await;
888        assert!(init["result"]["serverInfo"]["default_palace"].is_null());
889    }
890}