Skip to main content

trusty_memory/
lib.rs

1//! MCP server (stdio + HTTP/SSE) for trusty-memory.
2//!
3//! Why: Claude Code and other MCP-aware clients integrate with trusty-memory
4//! through the standardized Model Context Protocol; we expose memory + KG
5//! tools so they can be called by name.
6//! What: Provides `run_stdio` (JSON-RPC 2.0 over stdin/stdout) and `run_http`
7//! (axum HTTP/SSE stub), plus an `AppState` that carries the shared
8//! `PalaceRegistry`, on-disk data root, and a lazily-initialized embedder.
9//! Test: `cargo test -p trusty-memory-mcp` validates handshake + dispatch.
10
11use anyhow::Result;
12use serde_json::{json, Value};
13use std::path::PathBuf;
14use std::sync::Arc;
15use tokio::sync::{broadcast, OnceCell};
16use tracing::info;
17use trusty_common::mcp::{error_codes, initialize_response, Request, Response};
18use trusty_common::memory_core::embed::FastEmbedder;
19use trusty_common::memory_core::store::ChatSessionStore;
20use trusty_common::memory_core::PalaceRegistry;
21use trusty_common::ChatProvider;
22
23pub mod commands;
24pub mod openrpc;
25pub mod service;
26pub mod tools;
27pub mod web;
28
29pub use service::MemoryMcpService;
30pub use tools::MemoryMcpServer;
31
32/// Resolve the directory that actually holds the per-palace subdirectories.
33///
34/// Why: there are two on-disk layouts in the wild. The current monorepo code
35/// treats the registry directory *itself* as the parent of per-palace dirs
36/// (`<dir>/<id>/palace.json`). The legacy standalone `trusty-memory` repo
37/// nested everything one level deeper under a `palaces/` subdirectory
38/// (`<data_dir>/palaces/<id>/palace.json`) — and that is where existing
39/// installs' data lives (e.g. 88 palaces under
40/// `~/Library/Application Support/trusty-memory/palaces/`). A daemon that uses
41/// the bare data dir as its registry root finds zero palaces because every
42/// `palace.json` sits one level below where it looked — the "palaces lost on
43/// restart" bug.
44/// What: given the standard data dir, returns `<data_dir>/palaces` when that
45/// subdirectory exists, otherwise `<data_dir>` itself. Resolving this once in
46/// `main.rs` and using the result as `AppState::data_root` keeps every call
47/// site (`status`, `palace_list`, `open_palace`, `palace_create`,
48/// `load_palaces_from_disk`) consistent without forcing a data migration.
49/// Test: `tests::resolve_palace_registry_dir_prefers_palaces_subdir` and
50/// `resolve_palace_registry_dir_falls_back_to_data_dir`.
51pub fn resolve_palace_registry_dir(data_dir: PathBuf) -> PathBuf {
52    let nested = data_dir.join("palaces");
53    if nested.is_dir() {
54        nested
55    } else {
56        data_dir
57    }
58}
59
60/// Live daemon events broadcast to connected SSE subscribers.
61///
62/// Why: The dashboard needs push-driven updates so palace creation, drawer
63/// add/delete, dream cycles, and aggregate status changes are visible without
64/// polling. A single broadcast channel fans out to every connected browser.
65/// What: Tagged enum serialized as `{"type": "...", ...fields}` over SSE.
66/// Test: `web::tests::sse_stream_emits_events` subscribes, triggers a
67/// mutation, and asserts the frame arrives.
68#[derive(Clone, Debug, serde::Serialize)]
69#[serde(tag = "type", rename_all = "snake_case")]
70pub enum DaemonEvent {
71    PalaceCreated {
72        id: String,
73        name: String,
74    },
75    DrawerAdded {
76        palace_id: String,
77        drawer_count: usize,
78    },
79    DrawerDeleted {
80        palace_id: String,
81        drawer_count: usize,
82    },
83    DreamCompleted {
84        palace_id: Option<String>,
85        merged: usize,
86        pruned: usize,
87        compacted: usize,
88        closets_updated: usize,
89        duration_ms: u64,
90    },
91    StatusChanged {
92        total_drawers: usize,
93        total_vectors: usize,
94        total_kg_triples: usize,
95    },
96}
97
98/// Shared application state passed to every request handler.
99///
100/// Why: The stdio loop and HTTP server need the same handles to the registry,
101/// data root, and embedder so MCP tools can perform real reads/writes against
102/// the live trusty-memory core. The embedder is heavy (loads ONNX weights) so
103/// we hold it behind a `OnceCell` and initialize lazily on first use.
104/// What: `Clone`-able via `Arc` fields. The registry / data root are eager;
105/// `embedder` is `Arc<OnceCell<Arc<FastEmbedder>>>` so concurrent first-use
106/// races resolve to a single shared instance.
107/// Test: `app_state_default_constructs` confirms construction without panic.
108#[derive(Clone)]
109pub struct AppState {
110    pub version: String,
111    pub registry: Arc<PalaceRegistry>,
112    pub data_root: PathBuf,
113    pub embedder: Arc<OnceCell<Arc<FastEmbedder>>>,
114    /// Optional default palace applied to MCP tool calls when the caller
115    /// omits the `palace` argument. Set via `trusty-memory serve --palace`.
116    pub default_palace: Option<String>,
117    /// Active chat provider selected at startup. `None` means no upstream is
118    /// configured (no Ollama detected and no OpenRouter key) — callers must
119    /// degrade gracefully (chat endpoint returns 412).
120    pub chat_provider: Arc<OnceCell<Option<Arc<dyn ChatProvider>>>>,
121    /// Per-palace chat-session stores, opened lazily so cold-start cost is
122    /// paid only when chat-history endpoints are hit.
123    pub session_stores: Arc<dashmap::DashMap<String, Arc<ChatSessionStore>>>,
124    /// Broadcast sender for live `DaemonEvent` pushes to SSE subscribers.
125    ///
126    /// Why: Lets mutating handlers emit events that any connected dashboard
127    /// receives instantly. Cap of 128 buffers transient slow readers; if a
128    /// receiver lags it gets `RecvError::Lagged` and we emit a `lag` frame.
129    pub events: Arc<broadcast::Sender<DaemonEvent>>,
130    /// Instant the daemon started, used to compute `uptime_secs` on `/health`.
131    ///
132    /// Why (issue #35): `GET /health` reports how long the daemon has been
133    /// up. Capturing a monotonic `Instant` at `AppState` construction lets the
134    /// handler compute the elapsed seconds cheaply and without a clock-skew
135    /// hazard.
136    /// What: a wall-monotonic `Instant`; `AppState::new` stamps it at startup.
137    /// Test: `health_endpoint_includes_resource_fields`.
138    pub started_at: std::time::Instant,
139    /// In-memory ring buffer of recent tracing log lines (issue #35).
140    ///
141    /// Why: the `GET /api/v1/logs/tail` endpoint serves the last N log lines
142    /// so operators can inspect a running daemon without tailing a file. The
143    /// buffer is shared between the tracing `LogBufferLayer` (writer) and the
144    /// HTTP handler (reader).
145    /// What: a cheap `Arc`-backed clone of the buffer the subscriber writes
146    /// to. Defaults to an empty buffer for states that never install the
147    /// layer (tests, the stdio path).
148    /// Test: `logs_tail_returns_recent_lines`.
149    pub log_buffer: trusty_common::log_buffer::LogBuffer,
150    /// Most recent on-disk footprint of `data_root`, in bytes (issue #35).
151    ///
152    /// Why: `GET /health` reports `disk_bytes`. Walking the data directory on
153    /// every health request would make a frequent health poll do unbounded
154    /// I/O; a background task recomputes it every 10 s and stores it here so
155    /// the handler reads it lock-free.
156    /// What: an `AtomicU64` updated by the ticker spawned in `run_http_on`.
157    /// `0` until the first walk completes.
158    /// Test: `health_endpoint_includes_resource_fields`.
159    pub disk_bytes: Arc<std::sync::atomic::AtomicU64>,
160    /// Per-process RSS + CPU sampler, refreshed on each `/health` request
161    /// (issue #35).
162    ///
163    /// Why: CPU usage is a delta between two `sysinfo` refreshes, so the
164    /// sampler must persist between requests — hence the shared `Mutex`.
165    /// What: a `tokio::sync::Mutex<SysMetrics>` so the async health handler
166    /// can sample without blocking the runtime.
167    /// Test: `health_endpoint_includes_resource_fields`.
168    pub sys_metrics: Arc<tokio::sync::Mutex<trusty_common::sys_metrics::SysMetrics>>,
169}
170
171impl AppState {
172    /// Construct an `AppState` rooted at the given on-disk data directory.
173    ///
174    /// Why: The CLI (`serve`) and integration tests need to point the MCP
175    /// server at different roots — production at `dirs::data_dir`, tests at a
176    /// `tempfile::tempdir()`.
177    /// What: Builds an empty `PalaceRegistry`, captures the version, and
178    /// allocates an empty `OnceCell` for the embedder. `default_palace` is
179    /// `None`; use `with_default_palace` to set it.
180    /// Test: `tools::tests::dispatch_palace_create_persists` constructs an
181    /// AppState pointed at a tempdir and round-trips a palace through it.
182    pub fn new(data_root: PathBuf) -> Self {
183        let (events_tx, _) = broadcast::channel::<DaemonEvent>(128);
184        Self {
185            version: env!("CARGO_PKG_VERSION").to_string(),
186            registry: Arc::new(PalaceRegistry::new()),
187            data_root,
188            embedder: Arc::new(OnceCell::new()),
189            default_palace: None,
190            chat_provider: Arc::new(OnceCell::new()),
191            session_stores: Arc::new(dashmap::DashMap::new()),
192            events: Arc::new(events_tx),
193            started_at: std::time::Instant::now(),
194            // Default to an empty buffer — `with_log_buffer` overrides this
195            // when the daemon installs the `LogBufferLayer` (HTTP mode).
196            log_buffer: trusty_common::log_buffer::LogBuffer::new(
197                trusty_common::log_buffer::DEFAULT_LOG_CAPACITY,
198            ),
199            disk_bytes: Arc::new(std::sync::atomic::AtomicU64::new(0)),
200            sys_metrics: Arc::new(tokio::sync::Mutex::new(
201                trusty_common::sys_metrics::SysMetrics::new(),
202            )),
203        }
204    }
205
206    /// Scan the palace registry directory and re-register every persisted
207    /// palace into the in-memory [`PalaceRegistry`].
208    ///
209    /// Why: `AppState::new` builds an *empty* registry, so after a daemon
210    /// restart `palace_list` / the dashboard reported zero palaces even though
211    /// dozens existed on disk — palace metadata was persisted by
212    /// `palace_create` but never re-hydrated on startup. This method closes
213    /// that gap by walking the on-disk layout (each subdirectory holding a
214    /// `palace.json` is one palace) and rebuilding a live `PalaceHandle` for
215    /// each, so recall paths see the full set immediately after a restart.
216    /// What: runs the blocking filesystem walk + per-palace `PalaceHandle::open`
217    /// on a `spawn_blocking` thread (so it never stalls the async runtime),
218    /// registers each successfully opened palace via `register_arc`, logs every
219    /// load at `debug!`, and returns the count loaded. A palace that fails to
220    /// open (corrupt index, unreadable `kg.db`, etc.) is logged at `warn!` and
221    /// skipped — one bad palace must not abort startup or crash the daemon.
222    /// `data_root` is expected to already be the palace registry directory —
223    /// `main.rs` resolves it via [`resolve_palace_registry_dir`] before
224    /// constructing the `AppState`, so the flat / legacy-`palaces/` layout
225    /// difference is handled exactly once.
226    /// Test: `tests::load_palaces_from_disk_rehydrates_registry` writes two
227    /// palaces into a tempdir, constructs an `AppState`, calls this method, and
228    /// asserts the returned count and registry contents.
229    pub async fn load_palaces_from_disk(&self) -> Result<usize> {
230        let registry_dir = self.data_root.clone();
231        let registry = self.registry.clone();
232        // The directory walk and each `PalaceHandle::open` perform blocking
233        // filesystem + redb/usearch I/O — run the whole hydration on the
234        // blocking pool so it never parks an async worker thread.
235        let count = tokio::task::spawn_blocking(move || -> Result<usize> {
236            let palaces = PalaceRegistry::list_palaces(&registry_dir)?;
237            let mut loaded = 0usize;
238            for palace in palaces {
239                match trusty_common::memory_core::PalaceHandle::open(&palace) {
240                    Ok(handle) => {
241                        tracing::debug!(
242                            palace = %palace.id,
243                            data_dir = %palace.data_dir.display(),
244                            "loaded palace from disk"
245                        );
246                        registry.register_arc(handle);
247                        loaded += 1;
248                    }
249                    Err(e) => {
250                        tracing::warn!(
251                            palace = %palace.id,
252                            "skipping palace during startup hydration: {e:#}"
253                        );
254                    }
255                }
256            }
257            Ok(loaded)
258        })
259        .await
260        .map_err(|e| anyhow::anyhow!("join load_palaces_from_disk: {e}"))??;
261        Ok(count)
262    }
263
264    /// Builder-style: attach the daemon's shared [`LogBuffer`] so the
265    /// `GET /api/v1/logs/tail` endpoint serves the same lines the tracing
266    /// subscriber captures (issue #35).
267    ///
268    /// Why: `main` builds the buffer (via `init_tracing_with_buffer`) before
269    /// constructing the `AppState`, then hands a clone here so the HTTP
270    /// handler and the tracing layer observe the same ring.
271    /// What: replaces the empty default buffer with the supplied one.
272    /// Test: `logs_tail_returns_recent_lines`.
273    #[must_use]
274    pub fn with_log_buffer(mut self, buffer: trusty_common::log_buffer::LogBuffer) -> Self {
275        self.log_buffer = buffer;
276        self
277    }
278
279    /// Send a `DaemonEvent` to all connected SSE subscribers.
280    ///
281    /// Why: Mutating handlers call this after a successful write so the
282    /// dashboard can update without polling. The send is best-effort —
283    /// `broadcast::Sender::send` returns `Err` only when there are no live
284    /// receivers, which is fine (no listeners == no work to do).
285    /// What: Drops the result, so callers don't need to care whether anyone
286    /// is listening.
287    /// Test: `web::tests::sse_stream_receives_palace_created` confirms a
288    /// subscriber observes the emitted event.
289    pub fn emit(&self, event: DaemonEvent) {
290        let _ = self.events.send(event);
291    }
292
293    /// Open (or return cached) the chat-session store for a palace.
294    ///
295    /// Why: Chat session persistence lives in a dedicated SQLite file under
296    /// the palace's data dir (`chat_sessions.db`) so it doesn't intermingle
297    /// with the KG's transactional load. The store is cheap to clone via
298    /// `Arc` but the underlying r2d2 pool should be reused, so cache by id.
299    /// What: Creates the palace data dir if missing, opens (or reuses) a
300    /// `ChatSessionStore` and stashes an `Arc` in the DashMap.
301    /// Test: Indirectly via the session HTTP handlers in `web::tests`.
302    pub fn session_store(&self, palace_id: &str) -> Result<Arc<ChatSessionStore>> {
303        if let Some(entry) = self.session_stores.get(palace_id) {
304            return Ok(entry.clone());
305        }
306        let dir = self.data_root.join(palace_id);
307        std::fs::create_dir_all(&dir)
308            .map_err(|e| anyhow::anyhow!("create palace dir {}: {e}", dir.display()))?;
309        let store = Arc::new(ChatSessionStore::open(&dir.join("chat_sessions.db"))?);
310        self.session_stores
311            .insert(palace_id.to_string(), store.clone());
312        Ok(store)
313    }
314
315    /// Builder-style setter for the default palace name.
316    ///
317    /// Why: `serve --palace <name>` wants to bind every tool call to a
318    /// project-scoped namespace without forcing every MCP request to repeat
319    /// the palace argument.
320    /// What: Returns `self` with `default_palace = Some(name)`.
321    /// Test: `default_palace_used_when_arg_omitted` covers the resolution
322    /// path; this setter is exercised there.
323    pub fn with_default_palace(mut self, name: Option<String>) -> Self {
324        self.default_palace = name;
325        self
326    }
327
328    /// Resolve (or initialize) the shared embedder.
329    ///
330    /// Why: FastEmbedder load is expensive — we share one instance across all
331    /// tool calls; the `OnceCell` ensures concurrent first-use races collapse
332    /// to a single load.
333    /// What: Returns `Arc<FastEmbedder>` on success. Errors propagate from the
334    /// underlying ONNX load.
335    /// Test: Indirectly via `dispatch_remember_then_recall`.
336    /// Resolve the active chat provider, auto-detecting on first call.
337    ///
338    /// Why: Provider selection depends on filesystem-loaded config plus a
339    /// network probe (Ollama liveness), so it must be lazily initialised at
340    /// runtime. Caching the choice in a `OnceCell` keeps it stable across
341    /// concurrent requests without re-probing on every chat call.
342    /// What: On first use loads `~/.trusty-memory/config.toml`, prefers an
343    /// auto-detected Ollama instance (when `local_model.enabled`), and falls
344    /// back to OpenRouter when an API key is set. Returns `Ok(None)` when
345    /// neither is available so the caller can emit a 412.
346    /// Test: `web::tests::providers_endpoint_returns_payload` covers the
347    /// detection path indirectly through `/api/v1/chat/providers`.
348    pub async fn chat_provider(&self) -> Option<Arc<dyn ChatProvider>> {
349        self.chat_provider
350            .get_or_init(|| async {
351                let cfg = crate::web::load_user_config().unwrap_or_default();
352                if cfg.local_model.enabled {
353                    if let Some(mut p) =
354                        trusty_common::auto_detect_local_provider(&cfg.local_model.base_url).await
355                    {
356                        // auto_detect returns an empty model id; callers must
357                        // set the configured model name themselves.
358                        p.model = cfg.local_model.model.clone();
359                        return Some(Arc::new(p) as Arc<dyn ChatProvider>);
360                    }
361                }
362                if !cfg.openrouter_api_key.is_empty() {
363                    return Some(Arc::new(trusty_common::OpenRouterProvider::new(
364                        cfg.openrouter_api_key,
365                        cfg.openrouter_model,
366                    )) as Arc<dyn ChatProvider>);
367                }
368                None
369            })
370            .await
371            .clone()
372    }
373
374    pub async fn embedder(&self) -> Result<Arc<FastEmbedder>> {
375        let cell = self.embedder.clone();
376        let embedder = cell
377            .get_or_try_init(|| async {
378                let e = FastEmbedder::new().await?;
379                Ok::<Arc<FastEmbedder>, anyhow::Error>(Arc::new(e))
380            })
381            .await?
382            .clone();
383        Ok(embedder)
384    }
385}
386
387impl std::fmt::Debug for AppState {
388    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389        f.debug_struct("AppState")
390            .field("version", &self.version)
391            .field("data_root", &self.data_root)
392            .field("registry_len", &self.registry.len())
393            .finish()
394    }
395}
396
397/// Handle a single MCP JSON-RPC message and produce its response.
398///
399/// Why: Pulled out of the stdio loop so unit tests can drive every method
400/// without touching real stdin/stdout.
401/// What: Routes `initialize`, `tools/list`, `tools/call`, `ping`, and the
402/// `notifications/initialized` notification (which returns `Value::Null`).
403/// Test: See unit tests below — initialize/list/call all return expected
404/// JSON-RPC envelopes; notifications return `Null` (no response written).
405pub async fn handle_message(state: &AppState, msg: Value) -> Value {
406    let id = msg.get("id").cloned().unwrap_or(Value::Null);
407    let method = msg.get("method").and_then(|m| m.as_str()).unwrap_or("");
408
409    match method {
410        "initialize" => {
411            let extra = state
412                .default_palace
413                .as_ref()
414                .map(|dp| json!({ "default_palace": dp }));
415            let result = initialize_response("trusty-memory", &state.version, extra);
416            json!({
417                "jsonrpc": "2.0",
418                "id": id,
419                "result": result,
420            })
421        }
422        // Notifications must NOT receive a response.
423        "notifications/initialized" | "notifications/cancelled" => Value::Null,
424        "tools/list" => json!({
425            "jsonrpc": "2.0",
426            "id": id,
427            "result": tools::tool_definitions_with(state.default_palace.is_some())
428        }),
429        // OpenRPC 1.3.2 discovery — see `openrpc.rs`. Returns the full
430        // service description so orchestrators (open-mpm, etc.) can
431        // introspect every tool and its required `memory.read`/`memory.write`
432        // scope without bespoke per-server adapters.
433        "rpc.discover" => json!({
434            "jsonrpc": "2.0",
435            "id": id,
436            "result": openrpc::build_discover_response(
437                &state.version,
438                state.default_palace.is_some(),
439            ),
440        }),
441        "tools/call" => {
442            let params = msg.get("params").cloned().unwrap_or_default();
443            let tool_name = params
444                .get("name")
445                .and_then(|n| n.as_str())
446                .unwrap_or("")
447                .to_string();
448            let args = params.get("arguments").cloned().unwrap_or_default();
449            match tools::dispatch_tool(state, &tool_name, args).await {
450                Ok(content) => json!({
451                    "jsonrpc": "2.0",
452                    "id": id,
453                    "result": {
454                        "content": [{"type": "text", "text": content.to_string()}]
455                    }
456                }),
457                Err(e) => json!({
458                    "jsonrpc": "2.0",
459                    "id": id,
460                    "error": {"code": -32603, "message": e.to_string()}
461                }),
462            }
463        }
464        "ping" => json!({"jsonrpc": "2.0", "id": id, "result": {}}),
465        _ => json!({
466            "jsonrpc": "2.0",
467            "id": id,
468            "error": {
469                "code": -32601,
470                "message": format!("Method not found: {method}")
471            }
472        }),
473    }
474}
475
476/// Run the MCP stdio JSON-RPC 2.0 server loop.
477///
478/// Why: Claude Code launches MCP servers as child processes and speaks
479/// JSON-RPC over stdin/stdout — this is the primary integration path.
480/// What: Delegates to `trusty_mcp_core::run_stdio_loop`, adapting each
481/// shared `Request` back into the JSON `Value` shape `handle_message`
482/// expects, and translating the returned `Value` into a `Response`.
483/// Notifications (where `handle_message` returns `Value::Null`) become
484/// suppressed responses so the loop emits nothing on the wire.
485/// Test: `handle_message` covers protocol behaviour in unit tests.
486pub async fn run_stdio(state: AppState) -> Result<()> {
487    info!("trusty-memory MCP stdio server starting");
488    let state = Arc::new(state);
489    trusty_common::mcp::run_stdio_loop(move |req: Request| {
490        let state = state.clone();
491        async move {
492            // Re-serialise the Request into the JSON shape handle_message expects.
493            // (handle_message predates the shared types and reads loose Values.)
494            let msg = json!({
495                "jsonrpc": req.jsonrpc.unwrap_or_else(|| "2.0".to_string()),
496                "id": req.id.clone().unwrap_or(Value::Null),
497                "method": req.method,
498                "params": req.params.unwrap_or(Value::Null),
499            });
500            let resp_value = handle_message(&state, msg).await;
501            // handle_message returns Value::Null for notifications.
502            if resp_value.is_null() {
503                return Response::suppressed();
504            }
505            // Otherwise it returns the full JSON-RPC envelope as a Value;
506            // re-encode into the shared Response struct so the loop can serialise.
507            let id = resp_value.get("id").cloned();
508            if let Some(result) = resp_value.get("result").cloned() {
509                Response::ok(id, result)
510            } else if let Some(err) = resp_value.get("error") {
511                let code =
512                    err.get("code")
513                        .and_then(|c| c.as_i64())
514                        .unwrap_or(error_codes::INTERNAL_ERROR as i64) as i32;
515                let message = err
516                    .get("message")
517                    .and_then(|m| m.as_str())
518                    .unwrap_or("internal error")
519                    .to_string();
520                Response::err(id, code, message)
521            } else {
522                Response::err(
523                    id,
524                    error_codes::INTERNAL_ERROR,
525                    "malformed handler response",
526                )
527            }
528        }
529    })
530    .await
531}
532
533/// Run the optional HTTP/SSE + web admin server.
534///
535/// Why: A long-running daemon mode lets non-stdio clients (browsers, curl,
536/// future remote agents) hit `/health`, the `/api/v1/*` REST surface, and the
537/// embedded admin SPA.
538/// What: axum router built from `web::router()` plus a `/sse` stub for the
539/// existing MCP-over-SSE clients. Caller provides a pre-bound listener so
540/// port auto-detection lives at the call site.
541/// Test: `cargo test -p trusty-memory-mcp web::tests` exercises the router
542/// shape; manual: `curl http://127.0.0.1:<port>/health` returns `ok`.
543pub async fn run_http_on(state: AppState, listener: tokio::net::TcpListener) -> Result<()> {
544    use axum::routing::get;
545
546    // Issue #35: recompute the `data_root` disk footprint every 10 s on a
547    // background task so `GET /health` reports `disk_bytes` without doing a
548    // recursive directory walk on the request path.
549    spawn_disk_size_ticker(state.clone());
550
551    let app = web::router()
552        .route("/sse", get(sse_handler))
553        .with_state(state);
554
555    let local = listener.local_addr().ok();
556    if let Some(a) = local {
557        info!("HTTP server listening on http://{a}");
558        eprintln!("HTTP server listening on http://{a}");
559    }
560    axum::serve(listener, app).await?;
561    Ok(())
562}
563
564/// Convenience: bind `addr` and serve via [`run_http_on`].
565pub async fn run_http(state: AppState, addr: std::net::SocketAddr) -> Result<()> {
566    let listener = tokio::net::TcpListener::bind(addr).await?;
567    run_http_on(state, listener).await
568}
569
570/// Spawn a background ticker that recomputes the `data_root` disk footprint
571/// every 10 seconds and stores it in `state.disk_bytes` (issue #35).
572///
573/// Why: `GET /health` reports `disk_bytes`. Walking the data directory on
574/// every health request would turn a frequent health poll into unbounded
575/// recursive I/O. Computing it off the request path on a fixed cadence keeps
576/// `/health` cheap and bounds the staleness to ~10 s — fine for an
577/// at-a-glance footprint figure.
578/// What: spawns a detached tokio task. `AppState` is cheap to `Clone` (all
579/// `Arc` fields), so the task holds a full clone; the daemon process lives
580/// for the lifetime of the server anyway, so no `Weak` downgrade is needed.
581/// Each tick runs the blocking directory walk on `spawn_blocking` so it never
582/// stalls the async runtime, then stores the byte total atomically.
583/// Test: `health_endpoint_includes_resource_fields` asserts the field shape;
584/// the ticker cadence is not unit-tested (timing-dependent).
585fn spawn_disk_size_ticker(state: AppState) {
586    tokio::spawn(async move {
587        let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
588        loop {
589            interval.tick().await;
590            let dir = state.data_root.clone();
591            // The directory walk is blocking filesystem I/O — run it on the
592            // blocking pool so it never parks an async worker thread.
593            let bytes = tokio::task::spawn_blocking(move || {
594                trusty_common::sys_metrics::dir_size_bytes(&dir)
595            })
596            .await
597            .unwrap_or(0);
598            state
599                .disk_bytes
600                .store(bytes, std::sync::atomic::Ordering::Relaxed);
601        }
602    });
603}
604
605/// Live SSE event stream — pushes `DaemonEvent` frames to dashboard clients.
606///
607/// Why: The dashboard subscribes once and reacts to live pushes (palace
608/// created, drawer added/deleted, dream completed, status changed) instead of
609/// polling `/api/v1/*` endpoints.
610/// What: Subscribes to `state.events`, emits an initial `connected` frame,
611/// then forwards every `DaemonEvent` as `data: <json>\n\n`. Lagged
612/// subscribers receive a `lag` frame indicating skipped events; channel
613/// closure ends the stream.
614/// Test: `web::tests::sse_stream_emits_palace_created` (covers subscribe +
615/// emit + receive); manual: `curl -N http://.../sse`.
616pub(crate) async fn sse_handler(
617    axum::extract::State(state): axum::extract::State<AppState>,
618) -> impl axum::response::IntoResponse {
619    use futures::StreamExt;
620    use tokio_stream::wrappers::BroadcastStream;
621
622    let rx = state.events.subscribe();
623    let initial = futures::stream::once(async {
624        Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(
625            "data: {\"type\":\"connected\"}\n\n",
626        ))
627    });
628    let events = BroadcastStream::new(rx).map(|res| {
629        let frame = match res {
630            Ok(event) => match serde_json::to_string(&event) {
631                Ok(json) => format!("data: {json}\n\n"),
632                Err(e) => format!("data: {{\"type\":\"error\",\"message\":\"{e}\"}}\n\n"),
633            },
634            Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
635                format!("data: {{\"type\":\"lag\",\"skipped\":{n}}}\n\n")
636            }
637        };
638        Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(frame))
639    });
640    let stream = initial.chain(events);
641
642    axum::response::Response::builder()
643        .header("Content-Type", "text/event-stream")
644        .header("Cache-Control", "no-cache")
645        .header("X-Accel-Buffering", "no")
646        .body(axum::body::Body::from_stream(stream))
647        .expect("valid SSE response")
648}
649
650#[cfg(test)]
651mod tests {
652    use super::*;
653
654    fn test_state() -> AppState {
655        let tmp = tempfile::tempdir().expect("tempdir");
656        let root = tmp.path().to_path_buf();
657        // Leak the tempdir so it lives for the test process; tests are short.
658        std::mem::forget(tmp);
659        AppState::new(root)
660    }
661
662    #[tokio::test]
663    async fn initialize_returns_protocol_version_and_capabilities() {
664        let state = test_state();
665        let req = json!({
666            "jsonrpc": "2.0",
667            "id": 1,
668            "method": "initialize",
669            "params": {
670                "protocolVersion": "2024-11-05",
671                "capabilities": {},
672                "clientInfo": {"name": "test", "version": "0"}
673            }
674        });
675        let resp = handle_message(&state, req).await;
676        assert_eq!(resp["jsonrpc"], "2.0");
677        assert_eq!(resp["id"], 1);
678        assert_eq!(resp["result"]["protocolVersion"], "2024-11-05");
679        assert!(resp["result"]["capabilities"]["tools"].is_object());
680        assert_eq!(resp["result"]["serverInfo"]["name"], "trusty-memory");
681    }
682
683    #[tokio::test]
684    async fn initialized_notification_returns_null() {
685        let state = test_state();
686        let req = json!({
687            "jsonrpc": "2.0",
688            "method": "notifications/initialized",
689            "params": {}
690        });
691        let resp = handle_message(&state, req).await;
692        assert!(resp.is_null());
693    }
694
695    #[tokio::test]
696    async fn tools_list_returns_all_tools() {
697        let state = test_state();
698        let req = json!({"jsonrpc": "2.0", "id": 2, "method": "tools/list"});
699        let resp = handle_message(&state, req).await;
700        let tools = resp["result"]["tools"].as_array().expect("tools array");
701        assert_eq!(tools.len(), 12);
702    }
703
704    #[tokio::test]
705    async fn unknown_method_returns_error() {
706        let state = test_state();
707        let req = json!({"jsonrpc": "2.0", "id": 4, "method": "wat"});
708        let resp = handle_message(&state, req).await;
709        assert_eq!(resp["error"]["code"], -32601);
710    }
711
712    #[tokio::test]
713    async fn ping_returns_empty_result() {
714        let state = test_state();
715        let req = json!({"jsonrpc": "2.0", "id": 5, "method": "ping"});
716        let resp = handle_message(&state, req).await;
717        assert!(resp["result"].is_object());
718    }
719
720    #[tokio::test]
721    async fn app_state_default_constructs() {
722        let s = test_state();
723        assert!(!s.version.is_empty());
724        assert!(s.registry.is_empty());
725        assert!(s.default_palace.is_none());
726    }
727
728    /// Why: Issue #26 — when `serve --palace <name>` is set, the MCP server
729    /// must (a) report the default in the `initialize` `serverInfo`, (b)
730    /// drop `palace` from the required schema in `tools/list`, and (c) let
731    /// `tools/call` use the default when the caller omits `palace`.
732    /// Test: Construct an AppState with a default palace, create that palace
733    /// on disk via the registry, then call `memory_remember` without a
734    /// `palace` argument and confirm it resolves to the default.
735    #[tokio::test]
736    async fn default_palace_used_when_arg_omitted() {
737        let tmp = tempfile::tempdir().expect("tempdir");
738        let root = tmp.path().to_path_buf();
739
740        // Pre-create the default palace so remember has somewhere to land.
741        let registry = trusty_common::memory_core::PalaceRegistry::new();
742        let palace = trusty_common::memory_core::Palace {
743            id: trusty_common::memory_core::PalaceId::new("default-pal"),
744            name: "default-pal".to_string(),
745            description: None,
746            created_at: chrono::Utc::now(),
747            data_dir: root.join("default-pal"),
748        };
749        registry
750            .create_palace(&root, palace)
751            .expect("create_palace");
752
753        let state = AppState::new(root).with_default_palace(Some("default-pal".to_string()));
754
755        // (a) initialize advertises the default.
756        let init = handle_message(
757            &state,
758            json!({"jsonrpc": "2.0", "id": 1, "method": "initialize"}),
759        )
760        .await;
761        assert_eq!(
762            init["result"]["serverInfo"]["default_palace"], "default-pal",
763            "initialize must echo default_palace in serverInfo"
764        );
765
766        // (b) tools/list drops `palace` from required when default is set.
767        let list = handle_message(
768            &state,
769            json!({"jsonrpc": "2.0", "id": 2, "method": "tools/list"}),
770        )
771        .await;
772        let tools = list["result"]["tools"].as_array().expect("tools array");
773        let remember = tools
774            .iter()
775            .find(|t| t["name"] == "memory_remember")
776            .expect("memory_remember tool");
777        let required: Vec<&str> = remember["inputSchema"]["required"]
778            .as_array()
779            .expect("required array")
780            .iter()
781            .filter_map(|v| v.as_str())
782            .collect();
783        assert!(
784            !required.contains(&"palace"),
785            "palace must not be required when default is configured; got {required:?}"
786        );
787        assert!(required.contains(&"text"));
788
789        // (c) tools/call resolves the default when arg is omitted.
790        let call = handle_message(
791            &state,
792            json!({
793                "jsonrpc": "2.0",
794                "id": 3,
795                "method": "tools/call",
796                "params": {
797                    "name": "memory_remember",
798                    "arguments": {"text": "default-palace test memory"},
799                },
800            }),
801        )
802        .await;
803        // Successful dispatch returns `result.content[0].text` JSON.
804        let text = call["result"]["content"][0]["text"]
805            .as_str()
806            .unwrap_or_else(|| panic!("expected success result, got {call}"));
807        let parsed: Value = serde_json::from_str(text).expect("parse content json");
808        assert_eq!(parsed["palace"], "default-pal");
809        assert_eq!(parsed["status"], "stored");
810        assert!(parsed["drawer_id"].as_str().is_some());
811    }
812
813    /// Why: When no default is set, `tools/call` for a palace-bound tool
814    /// without a `palace` argument should error helpfully rather than panic.
815    #[tokio::test]
816    async fn missing_palace_without_default_errors() {
817        let state = test_state();
818        let resp = handle_message(
819            &state,
820            json!({
821                "jsonrpc": "2.0",
822                "id": 7,
823                "method": "tools/call",
824                "params": {
825                    "name": "memory_recall",
826                    "arguments": {"query": "anything"},
827                },
828            }),
829        )
830        .await;
831        assert_eq!(resp["error"]["code"], -32603);
832        let msg = resp["error"]["message"].as_str().unwrap_or("");
833        assert!(
834            msg.contains("missing 'palace'"),
835            "expected helpful error, got: {msg}"
836        );
837    }
838
839    /// Why: regression for the "palaces lost on restart" bug — `AppState::new`
840    /// builds an empty registry, so the daemon must call
841    /// `load_palaces_from_disk` on startup to re-register palaces persisted by
842    /// a previous run. Without that call the registry stays empty even though
843    /// `palace.json` files exist on disk.
844    /// What: persists two palaces under a tempdir (via the same
845    /// `create_palace` path the `palace_create` tool uses), constructs a fresh
846    /// `AppState` rooted there, calls `load_palaces_from_disk`, and asserts the
847    /// returned count and registry contents.
848    /// Test: this test itself.
849    #[tokio::test]
850    async fn load_palaces_from_disk_rehydrates_registry() {
851        use trusty_common::memory_core::{Palace, PalaceId, PalaceRegistry};
852
853        let tmp = tempfile::tempdir().expect("tempdir");
854        let root = tmp.path().to_path_buf();
855
856        // Phase 1: persist two palaces to disk, then drop the writer registry
857        // so nothing is held in memory — simulating a prior daemon run.
858        {
859            let writer = PalaceRegistry::new();
860            for id in ["alpha", "beta"] {
861                let palace = Palace {
862                    id: PalaceId::new(id),
863                    name: id.to_string(),
864                    description: None,
865                    created_at: chrono::Utc::now(),
866                    data_dir: root.join(id),
867                };
868                writer
869                    .create_palace(&root, palace)
870                    .expect("persist palace to disk");
871            }
872        }
873
874        // Add a stray non-palace subdirectory; the walker must ignore it.
875        std::fs::create_dir_all(root.join("not-a-palace")).expect("mkdir");
876
877        // Phase 2: fresh AppState starts with an empty registry (the bug).
878        let state = AppState::new(root);
879        assert!(
880            state.registry.is_empty(),
881            "AppState::new must start with an empty registry"
882        );
883
884        // The fix: hydrate from disk.
885        let count = state
886            .load_palaces_from_disk()
887            .await
888            .expect("load_palaces_from_disk");
889
890        assert_eq!(count, 2, "both persisted palaces should be loaded");
891        assert_eq!(state.registry.len(), 2, "registry should hold both palaces");
892        let ids: Vec<String> = state.registry.list().into_iter().map(|p| p.0).collect();
893        assert!(ids.contains(&"alpha".to_string()));
894        assert!(ids.contains(&"beta".to_string()));
895    }
896
897    /// Why: existing installs (and the legacy standalone `trusty-memory` repo)
898    /// nest palaces one level deeper under a `palaces/` subdirectory. When that
899    /// subdirectory exists, `resolve_palace_registry_dir` must descend into it
900    /// so the daemon scans the level that actually holds the `palace.json`
901    /// files — otherwise it finds zero palaces, which is the restart bug.
902    /// What: creates `<dir>/palaces/`, resolves, and asserts the nested path is
903    /// returned.
904    /// Test: this test itself.
905    #[test]
906    fn resolve_palace_registry_dir_prefers_palaces_subdir() {
907        let tmp = tempfile::tempdir().expect("tempdir");
908        let data_dir = tmp.path().to_path_buf();
909        std::fs::create_dir_all(data_dir.join("palaces")).expect("mkdir palaces");
910
911        let resolved = resolve_palace_registry_dir(data_dir.clone());
912        assert_eq!(resolved, data_dir.join("palaces"));
913    }
914
915    /// Why: a fresh install with no `palaces/` subdirectory must fall back to
916    /// the data dir itself (the current flat monorepo layout).
917    #[test]
918    fn resolve_palace_registry_dir_falls_back_to_data_dir() {
919        let tmp = tempfile::tempdir().expect("tempdir");
920        let data_dir = tmp.path().to_path_buf();
921
922        let resolved = resolve_palace_registry_dir(data_dir.clone());
923        assert_eq!(resolved, data_dir);
924    }
925
926    /// Why: end-to-end check that the nested-`palaces/` layout hydrates — the
927    /// daemon resolves the registry dir via `resolve_palace_registry_dir`, so
928    /// an `AppState` rooted there must load palaces persisted one level below
929    /// the bare data dir.
930    /// What: persists two palaces under `<root>/palaces/<id>/`, constructs an
931    /// `AppState` rooted at the resolved registry dir, and asserts hydration
932    /// finds both.
933    /// Test: this test itself.
934    #[tokio::test]
935    async fn load_palaces_from_disk_handles_palaces_subdir() {
936        use trusty_common::memory_core::{Palace, PalaceId, PalaceRegistry};
937
938        let tmp = tempfile::tempdir().expect("tempdir");
939        let root = tmp.path().to_path_buf();
940        let nested = root.join("palaces");
941
942        {
943            let writer = PalaceRegistry::new();
944            for id in ["cto", "engineering"] {
945                let palace = Palace {
946                    id: PalaceId::new(id),
947                    name: id.to_string(),
948                    description: None,
949                    created_at: chrono::Utc::now(),
950                    data_dir: nested.join(id),
951                };
952                // create_palace anchors data_dir under the passed root, so
953                // pass `nested` here to land palaces under `<root>/palaces/`.
954                writer
955                    .create_palace(&nested, palace)
956                    .expect("persist palace under palaces/ subdir");
957            }
958        }
959
960        // Mirror main.rs: resolve the registry dir, then root AppState there.
961        let registry_dir = resolve_palace_registry_dir(root);
962        assert_eq!(registry_dir, nested, "must resolve into palaces/ subdir");
963        let state = AppState::new(registry_dir);
964        let count = state
965            .load_palaces_from_disk()
966            .await
967            .expect("load_palaces_from_disk");
968
969        assert_eq!(count, 2, "both nested palaces should be loaded");
970        assert_eq!(state.registry.len(), 2);
971        let ids: Vec<String> = state.registry.list().into_iter().map(|p| p.0).collect();
972        assert!(ids.contains(&"cto".to_string()));
973        assert!(ids.contains(&"engineering".to_string()));
974    }
975
976    /// Why: an empty (or missing) palace registry directory must not error — a
977    /// brand-new install has nothing to hydrate and should report zero.
978    #[tokio::test]
979    async fn load_palaces_from_disk_empty_root_returns_zero() {
980        let state = test_state();
981        let count = state
982            .load_palaces_from_disk()
983            .await
984            .expect("load_palaces_from_disk on empty root");
985        assert_eq!(count, 0);
986        assert!(state.registry.is_empty());
987    }
988
989    /// Why: initialize without a default palace must omit `default_palace`
990    /// from `serverInfo` so clients can detect the unbound mode.
991    #[tokio::test]
992    async fn initialize_without_default_palace_omits_field() {
993        let state = test_state();
994        let init = handle_message(
995            &state,
996            json!({"jsonrpc": "2.0", "id": 1, "method": "initialize"}),
997        )
998        .await;
999        assert!(init["result"]["serverInfo"]["default_palace"].is_null());
1000    }
1001}