Skip to main content

trusty_memory/
lib.rs

1//! MCP server (HTTP/SSE + UDS) for trusty-memory.
2//!
3//! Why: Claude Code and other MCP-aware clients integrate with trusty-memory
4//! through the standardized Model Context Protocol; we expose memory + KG
5//! tools so they can be called by name. Claude Code itself speaks stdio,
6//! but the in-process `serve --stdio` path was removed in issue #150
7//! because it deadlocked on the redb exclusive write lock whenever a
8//! long-lived daemon was already running — the canonical stdio integration
9//! is now the `trusty-memory-mcp-bridge` binary (PR #149), which pipes
10//! Claude Code's stdio over a Unix domain socket to the daemon.
11//! What: Provides `run_http` / `run_http_dynamic` / `run_http_on` (axum
12//! HTTP/SSE + REST + UI) and the `transport::uds` module (Unix-domain
13//! socket transport for the MCP bridge), plus an `AppState` that carries
14//! the shared `PalaceRegistry`, on-disk data root, and a lazily-initialized
15//! embedder.
16//! Test: `cargo test -p trusty-memory` validates handshake + dispatch via
17//! the in-process `handle_message` unit tests and the `tests/uds_roundtrip.rs`
18//! end-to-end harness.
19
20use anyhow::Result;
21use serde_json::{json, Value};
22use std::net::SocketAddr;
23use std::path::{Path, PathBuf};
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::sync::{Arc, OnceLock};
26use tokio::sync::{broadcast, OnceCell, RwLock};
27use trusty_common::bm25_client::Bm25Client;
28use trusty_common::mcp::initialize_response;
29use trusty_common::memory_core::embed::FastEmbedder;
30use trusty_common::memory_core::store::ChatSessionStore;
31use trusty_common::memory_core::PalaceRegistry;
32use trusty_common::ChatProvider;
33
34// Why: `tracing::info` is only used by the axum HTTP-serving helpers
35//      (`run_http_on`, `spawn_uds_listener`). Pulling it in unconditionally
36//      would trigger `unused_imports` warnings when the `axum-server`
37//      feature is disabled. `SocketAddr` is still used by `bound_addr` on
38//      `AppState` so it stays unconditional.
39#[cfg(feature = "axum-server")]
40use tracing::info;
41
42pub mod activity;
43pub mod attribution;
44pub mod bm25_supervisor;
45pub mod bootstrap;
46// Why (issue #226): `chat` and `web` are pure axum HTTP/SSE handler
47//      surfaces. Gating them behind the `axum-server` feature is what lets
48//      library consumers (e.g. `open-mpm` linking only `MemoryMcpService`)
49//      drop axum + tower-http entirely from their build graph.
50#[cfg(feature = "axum-server")]
51pub mod chat;
52pub mod commands;
53pub mod discovery;
54pub mod hook_emit;
55pub mod kg_extract;
56pub mod mcp_service;
57pub mod messaging;
58pub mod openrpc;
59/// Issue #88: project-root detection and palace-slug enforcement.
60///
61/// Why: prevents unbounded palace creation by anchoring palace names to the
62/// canonical slug of the project directory that contains the CWD, or to the
63/// `personal` sentinel for non-project contexts.
64/// What: exports `find_project_root`, `project_slug_at`, `project_slug`,
65/// `validate_palace_name`, `PERSONAL_PALACE`, and `PROJECT_MARKERS`.
66/// Test: see unit tests inside this module.
67pub mod project_root;
68pub mod prompt_facts;
69pub mod prompt_log;
70pub mod service;
71pub mod tools;
72pub mod transport;
73#[cfg(feature = "axum-server")]
74pub mod web;
75
76pub use activity::{ActivityEntry, ActivityFilter, ActivityLog, ActivitySource};
77pub use attribution::{CreatorInfo, CreatorSource};
78
79/// Maximum bytes retained in the trigger-prompt excerpt embedded on a
80/// `HookFired` event.
81///
82/// Why: the full triggering prompt is sensitive and already lives in the
83/// JSONL prompt log; the activity feed only needs enough text to give an
84/// operator a glance — a single-line ~80 char preview matches the existing
85/// `drawer_content_preview` convention so dashboard rows render uniformly.
86/// What: 80 characters; longer prompts are truncated with a trailing `…`.
87/// Test: `hook_excerpt_truncates_long_prompts`.
88pub const HOOK_PROMPT_EXCERPT_CHARS: usize = 80;
89
90/// Reduce a triggering prompt to the short excerpt embedded on a
91/// `HookFired` activity event.
92///
93/// Why: see [`HOOK_PROMPT_EXCERPT_CHARS`]. Centralising the truncation rule
94/// keeps every emitter (HTTP, hook CLI handlers, future tests) producing
95/// the same preview shape so UI rendering is uniform.
96/// What: whitespace-collapses `prompt` and trims to
97/// [`HOOK_PROMPT_EXCERPT_CHARS`] chars with `…` when cut. Empty input
98/// returns an empty string.
99/// Test: `hook_excerpt_truncates_long_prompts`,
100/// `hook_excerpt_collapses_whitespace`.
101pub fn hook_prompt_excerpt(prompt: &str) -> String {
102    let normalised: String = prompt.split_whitespace().collect::<Vec<_>>().join(" ");
103    if normalised.chars().count() <= HOOK_PROMPT_EXCERPT_CHARS {
104        normalised
105    } else {
106        let kept: String = normalised
107            .chars()
108            .take(HOOK_PROMPT_EXCERPT_CHARS.saturating_sub(1))
109            .collect();
110        format!("{kept}…")
111    }
112}
113
114pub use mcp_service::MemoryMcpService;
115pub use tools::MemoryMcpServer;
116
117/// Resolve the directory that actually holds the per-palace subdirectories.
118///
119/// Why: there are two on-disk layouts in the wild. The current monorepo code
120/// treats the registry directory *itself* as the parent of per-palace dirs
121/// (`<dir>/<id>/palace.json`). The legacy standalone `trusty-memory` repo
122/// nested everything one level deeper under a `palaces/` subdirectory
123/// (`<data_dir>/palaces/<id>/palace.json`) — and that is where existing
124/// installs' data lives (e.g. 88 palaces under
125/// `~/Library/Application Support/trusty-memory/palaces/`). A daemon that uses
126/// the bare data dir as its registry root finds zero palaces because every
127/// `palace.json` sits one level below where it looked — the "palaces lost on
128/// restart" bug.
129/// What: given the standard data dir, returns `<data_dir>/palaces` when that
130/// subdirectory exists, otherwise `<data_dir>` itself. Resolving this once in
131/// `main.rs` and using the result as `AppState::data_root` keeps every call
132/// site (`status`, `palace_list`, `open_palace`, `palace_create`,
133/// `load_palaces_from_disk`) consistent without forcing a data migration.
134/// Test: `tests::resolve_palace_registry_dir_prefers_palaces_subdir` and
135/// `resolve_palace_registry_dir_falls_back_to_data_dir`.
136pub fn resolve_palace_registry_dir(data_dir: PathBuf) -> PathBuf {
137    let nested = data_dir.join("palaces");
138    if nested.is_dir() {
139        nested
140    } else {
141        data_dir
142    }
143}
144
145/// Hook type — labels the Claude Code hook that triggered a submission.
146///
147/// Why: every hook firing produces an activity-feed entry tagged with the
148/// originating hook so operators can tell whether activity came from a user
149/// prompt (`UserPromptSubmit`), a new session (`SessionStart`), or a future
150/// hook variant. Threading this through `DaemonEvent::HookFired` lets the
151/// dashboard badge each row with the hook label.
152/// What: serde-serialised in PascalCase so the wire format matches Claude
153/// Code's own hook-name strings exactly (e.g. `"UserPromptSubmit"`).
154/// Test: `hook_type_serde_round_trips`.
155#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
156pub enum HookType {
157    /// Claude Code's `UserPromptSubmit` hook — fires on every user prompt.
158    UserPromptSubmit,
159    /// Claude Code's `SessionStart` hook — fires once at session open.
160    SessionStart,
161}
162
163impl HookType {
164    /// Stable string label used for the wire format.
165    pub fn as_str(&self) -> &'static str {
166        match self {
167            Self::UserPromptSubmit => "UserPromptSubmit",
168            Self::SessionStart => "SessionStart",
169        }
170    }
171}
172
173/// Injection kind — labels what the hook actually injected (or attempted).
174///
175/// Why: distinct from `HookType` because one hook could in principle render
176/// more than one kind of injection (e.g. SessionStart can deliver both an
177/// inbox check and bootstrap context). Tagging the rendered kind explicitly
178/// keeps the activity log searchable when that fan-out lands.
179/// What: serde-serialised as kebab-case so it matches the labels already
180/// used in the JSONL prompt log (`prompt-context-facts`,
181/// `inbox-check-messages`).
182/// Test: `injection_kind_serde_round_trips`.
183#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
184#[serde(rename_all = "kebab-case")]
185pub enum InjectionKind {
186    /// `prompt-context` hook rendered the prompt-facts block.
187    PromptContext,
188    /// `inbox-check` hook delivered unread messages.
189    InboxCheck,
190}
191
192impl InjectionKind {
193    /// Stable string label used for the wire format.
194    pub fn as_str(&self) -> &'static str {
195        match self {
196            Self::PromptContext => "prompt-context",
197            Self::InboxCheck => "inbox-check",
198        }
199    }
200}
201
202/// Live daemon events broadcast to connected SSE subscribers.
203///
204/// Why: The dashboard needs push-driven updates so palace creation, drawer
205/// add/delete, dream cycles, and aggregate status changes are visible without
206/// polling. A single broadcast channel fans out to every connected browser.
207/// What: Tagged enum serialized as `{"type": "...", ...fields}` over SSE.
208/// Test: `web::tests::sse_stream_emits_events` subscribes, triggers a
209/// mutation, and asserts the frame arrives.
210#[derive(Clone, Debug, serde::Serialize)]
211#[serde(tag = "type", rename_all = "snake_case")]
212pub enum DaemonEvent {
213    PalaceCreated {
214        id: String,
215        name: String,
216        /// Originating subsystem (HTTP, MCP, Hook). Why (issue #96): the
217        /// UI badges each row with its source so operators can tell at a
218        /// glance whether a write came from the dashboard form, an MCP
219        /// tool call, or a hook-driven path. The wire-format key is
220        /// `source` (lower-case strings via serde rename_all on
221        /// `ActivitySource`).
222        source: ActivitySource,
223    },
224    DrawerAdded {
225        palace_id: String,
226        /// Friendly palace name (Palace.name) at write time. Why: lets SSE
227        /// consumers (the dashboard activity feed) render the human-readable
228        /// label without a separate id→name lookup. Empty string if the
229        /// emitter could not resolve the name.
230        #[serde(default)]
231        palace_name: String,
232        drawer_count: usize,
233        /// Wall-clock timestamp when the drawer was added. Why: SSE
234        /// receivers want to render "just now / 2m ago" relative to the
235        /// daemon's clock, not the time the SSE frame happens to arrive.
236        timestamp: chrono::DateTime<chrono::Utc>,
237        /// Short preview of the drawer's content (whitespace-collapsed,
238        /// truncated to ~80 chars with an ellipsis when cut). Why: the TUI
239        /// activity feed and dashboard ticker want to show *what* was
240        /// stored, not just the running drawer count. Empty when the
241        /// emitter could not resolve the content (legacy clients tolerate
242        /// the missing field via `#[serde(default)]`).
243        #[serde(default)]
244        content_preview: String,
245        /// Originating subsystem (issue #96).
246        source: ActivitySource,
247    },
248    DrawerDeleted {
249        palace_id: String,
250        drawer_count: usize,
251        /// Originating subsystem (issue #96).
252        source: ActivitySource,
253    },
254    DreamCompleted {
255        palace_id: Option<String>,
256        merged: usize,
257        pruned: usize,
258        compacted: usize,
259        closets_updated: usize,
260        duration_ms: u64,
261        /// Originating subsystem (issue #96).
262        source: ActivitySource,
263    },
264    StatusChanged {
265        total_drawers: usize,
266        total_vectors: usize,
267        total_kg_triples: usize,
268    },
269    /// A Claude Code hook completed and rendered (or attempted to render) an
270    /// injection block.
271    ///
272    /// Why: pre-#XXX the activity feed only fired on drawer / palace / dream
273    /// writes, which meant a normal Claude Code session — whose only daemon
274    /// traffic is hook invocations — left the feed empty. Surfacing every
275    /// hook firing answers the user complaint "no activity in the TUI" and
276    /// gives operators a way to see how often each project palace is
277    /// actually picking up prompt-context / inbox-check work.
278    /// What: carries the resolved palace (or `None` if cwd resolution
279    /// failed), the [`HookType`] label, the [`InjectionKind`] label, the
280    /// rendered injection byte length, a short excerpt of the triggering
281    /// prompt (capped at ~80 chars; the full content stays in the JSONL
282    /// prompt log only), the timestamp, the hook's wall-clock duration,
283    /// and the [`ActivitySource`] tag (always `Hook` for this variant).
284    /// Backwards-compatible: SSE clients that do not recognise the
285    /// `hook_fired` `type` tag can safely ignore the frame.
286    HookFired {
287        /// Resolved palace id (slug) — `None` if cwd resolution failed.
288        #[serde(default)]
289        palace_id: Option<String>,
290        /// Friendly palace name at hook time — `None` if the registry
291        /// could not be consulted (HTTP path uses `palace_id` here when
292        /// no separate name is known).
293        #[serde(default)]
294        palace_name: Option<String>,
295        hook_type: HookType,
296        injection_kind: InjectionKind,
297        /// Rendered injection size in bytes (`0` when no injection was
298        /// emitted, e.g. SessionStart with an empty inbox).
299        injection_length: u64,
300        /// Short excerpt of the triggering prompt for the activity feed
301        /// display. Capped at ~80 chars with a trailing `…` when cut.
302        /// Why: the activity feed renders this directly; full prompt
303        /// content (which may be sensitive) stays in the JSONL log.
304        #[serde(default)]
305        trigger_prompt_excerpt: String,
306        timestamp: chrono::DateTime<chrono::Utc>,
307        /// Hook wall-clock duration in milliseconds.
308        duration_ms: u64,
309        /// Always `ActivitySource::Hook` for this variant; encoded explicitly
310        /// so the same dispatch path (`emit`) can persist + broadcast it.
311        source: ActivitySource,
312    },
313}
314
315/// Open the activity log under `data_root`, falling back to a per-process
316/// tempdir and finally to a no-op `Discard` variant when no writable
317/// directory is available.
318///
319/// Why (issues #96, #225): the activity log is a best-effort feature — if
320/// the data root is on a read-only mount, missing, or locked by another
321/// process, the daemon should still come up and serve every other endpoint.
322/// The first fallback is a `std::env::temp_dir()`-anchored subdirectory
323/// keyed by the daemon's process id. Issue #225: a previous version called
324/// `expect()` on the tempdir fallback, which crashed the daemon on hosts
325/// where neither `data_root` nor `std::env::temp_dir()` is writable
326/// (read-only containers, locked-down sandboxes). The contract is
327/// "best-effort", so the final fallback is now `ActivityLog::discard()` —
328/// a no-op variant that drops every append and returns empty reads. The
329/// dashboard's activity feed simply shows up empty in that degraded state.
330/// What: tries `ActivityLog::open(data_root)`; on error logs a warning and
331/// retries against `<temp>/trusty-memory-activity-<pid>/`. If both fail,
332/// emits a final warning and returns `ActivityLog::discard()`.
333/// Test: `open_activity_log_with_fallback_returns_discard_when_unwritable`
334/// covers the discard branch; existing `AppState` construction tests cover
335/// the happy and tempdir-fallback paths.
336fn open_activity_log_with_fallback(data_root: &Path) -> Arc<ActivityLog> {
337    match ActivityLog::open(data_root) {
338        Ok(log) => Arc::new(log),
339        Err(primary_err) => {
340            tracing::warn!(
341                "could not open activity log at {}: {primary_err:#}; falling back to per-process tempdir",
342                data_root.display()
343            );
344            let fallback =
345                std::env::temp_dir().join(format!("trusty-memory-activity-{}", std::process::id()));
346            match ActivityLog::open(&fallback) {
347                Ok(log) => Arc::new(log),
348                Err(fallback_err) => {
349                    tracing::warn!(
350                        "activity log tempdir fallback at {} also failed: {fallback_err:#}; \
351                         activity feed disabled for this process (no-op log)",
352                        fallback.display()
353                    );
354                    Arc::new(ActivityLog::discard())
355                }
356            }
357        }
358    }
359}
360
361impl DaemonEvent {
362    /// Short discriminant label matching the SSE `type` field.
363    ///
364    /// Why: the persisted activity log stores `event_type` as a string so
365    /// the UI can render the row without re-parsing the payload. Sharing
366    /// the same labels the SSE serializer uses keeps the wire and the
367    /// stored history consistent.
368    /// What: returns one of `palace_created`, `drawer_added`,
369    /// `drawer_deleted`, `dream_completed`, `status_changed`.
370    /// Test: `daemon_event_type_str_matches_sse_tag` in the lib tests.
371    pub fn type_str(&self) -> &'static str {
372        match self {
373            Self::PalaceCreated { .. } => "palace_created",
374            Self::DrawerAdded { .. } => "drawer_added",
375            Self::DrawerDeleted { .. } => "drawer_deleted",
376            Self::DreamCompleted { .. } => "dream_completed",
377            Self::StatusChanged { .. } => "status_changed",
378            Self::HookFired { .. } => "hook_fired",
379        }
380    }
381
382    /// `palace_id` if the event is scoped to a single palace.
383    ///
384    /// Why: the activity log indexes entries by palace id so the UI can
385    /// filter by palace; daemon-wide events (`status_changed`,
386    /// dream-across-all-palaces) return `None`.
387    /// What: returns a borrowed string when the variant carries a palace
388    /// id, otherwise `None`.
389    /// Test: `daemon_event_palace_id_extraction`.
390    pub fn palace_id(&self) -> Option<&str> {
391        match self {
392            Self::PalaceCreated { id, .. } => Some(id),
393            Self::DrawerAdded { palace_id, .. } | Self::DrawerDeleted { palace_id, .. } => {
394                Some(palace_id)
395            }
396            Self::DreamCompleted { palace_id, .. } => palace_id.as_deref(),
397            Self::HookFired { palace_id, .. } => palace_id.as_deref(),
398            Self::StatusChanged { .. } => None,
399        }
400    }
401
402    /// Originating subsystem if the event carries one.
403    ///
404    /// Why: only mutation events carry a `source`; the aggregate
405    /// `StatusChanged` is recomputed by the daemon and has no caller, so
406    /// it returns `None`.
407    /// What: returns the variant's `source` field where present.
408    /// Test: `daemon_event_source_extraction`.
409    pub fn source(&self) -> Option<ActivitySource> {
410        match self {
411            Self::PalaceCreated { source, .. }
412            | Self::DrawerAdded { source, .. }
413            | Self::DrawerDeleted { source, .. }
414            | Self::DreamCompleted { source, .. }
415            | Self::HookFired { source, .. } => Some(*source),
416            Self::StatusChanged { .. } => None,
417        }
418    }
419}
420
421/// Shared application state passed to every request handler.
422///
423/// Why: The stdio loop and HTTP server need the same handles to the registry,
424/// data root, and embedder so MCP tools can perform real reads/writes against
425/// the live trusty-memory core. The embedder is heavy (loads ONNX weights) so
426/// we hold it behind a `OnceCell` and initialize lazily on first use.
427/// What: `Clone`-able via `Arc` fields. The registry / data root are eager;
428/// `embedder` is `Arc<OnceCell<Arc<FastEmbedder>>>` so concurrent first-use
429/// races resolve to a single shared instance.
430/// Test: `app_state_default_constructs` confirms construction without panic.
431#[derive(Clone)]
432pub struct AppState {
433    pub version: String,
434    pub registry: Arc<PalaceRegistry>,
435    pub data_root: PathBuf,
436    pub embedder: Arc<OnceCell<Arc<FastEmbedder>>>,
437    /// Optional default palace applied to MCP tool calls when the caller
438    /// omits the `palace` argument. Set via `trusty-memory serve --palace`.
439    pub default_palace: Option<String>,
440    /// Active chat provider selected at startup. `None` means no upstream is
441    /// configured (no Ollama detected and no OpenRouter key) — callers must
442    /// degrade gracefully (chat endpoint returns 412).
443    pub chat_provider: Arc<OnceCell<Option<Arc<dyn ChatProvider>>>>,
444    /// Per-palace chat-session stores, opened lazily so cold-start cost is
445    /// paid only when chat-history endpoints are hit.
446    pub session_stores: Arc<dashmap::DashMap<String, Arc<ChatSessionStore>>>,
447    /// Broadcast sender for live `DaemonEvent` pushes to SSE subscribers.
448    ///
449    /// Why: Lets mutating handlers emit events that any connected dashboard
450    /// receives instantly. Cap of 128 buffers transient slow readers; if a
451    /// receiver lags it gets `RecvError::Lagged` and we emit a `lag` frame.
452    pub events: Arc<broadcast::Sender<DaemonEvent>>,
453    /// Instant the daemon started, used to compute `uptime_secs` on `/health`.
454    ///
455    /// Why (issue #35): `GET /health` reports how long the daemon has been
456    /// up. Capturing a monotonic `Instant` at `AppState` construction lets the
457    /// handler compute the elapsed seconds cheaply and without a clock-skew
458    /// hazard.
459    /// What: a wall-monotonic `Instant`; `AppState::new` stamps it at startup.
460    /// Test: `health_endpoint_includes_resource_fields`.
461    pub started_at: std::time::Instant,
462    /// In-memory ring buffer of recent tracing log lines (issue #35).
463    ///
464    /// Why: the `GET /api/v1/logs/tail` endpoint serves the last N log lines
465    /// so operators can inspect a running daemon without tailing a file. The
466    /// buffer is shared between the tracing `LogBufferLayer` (writer) and the
467    /// HTTP handler (reader).
468    /// What: a cheap `Arc`-backed clone of the buffer the subscriber writes
469    /// to. Defaults to an empty buffer for states that never install the
470    /// layer (tests, the stdio path).
471    /// Test: `logs_tail_returns_recent_lines`.
472    pub log_buffer: trusty_common::log_buffer::LogBuffer,
473    /// Most recent on-disk footprint of `data_root`, in bytes (issue #35).
474    ///
475    /// Why: `GET /health` reports `disk_bytes`. Walking the data directory on
476    /// every health request would make a frequent health poll do unbounded
477    /// I/O; a background task recomputes it every 10 s and stores it here so
478    /// the handler reads it lock-free.
479    /// What: an `AtomicU64` updated by the ticker spawned in `run_http_on`.
480    /// `0` until the first walk completes.
481    /// Test: `health_endpoint_includes_resource_fields`.
482    pub disk_bytes: Arc<std::sync::atomic::AtomicU64>,
483    /// Per-process RSS + CPU sampler, refreshed on each `/health` request
484    /// (issue #35).
485    ///
486    /// Why: CPU usage is a delta between two `sysinfo` refreshes, so the
487    /// sampler must persist between requests — hence the shared `Mutex`.
488    /// What: a `tokio::sync::Mutex<SysMetrics>` so the async health handler
489    /// can sample without blocking the runtime.
490    /// Test: `health_endpoint_includes_resource_fields`.
491    pub sys_metrics: Arc<tokio::sync::Mutex<trusty_common::sys_metrics::SysMetrics>>,
492    /// HTTP listener address the daemon bound to, once `run_http_on` is running.
493    ///
494    /// Why: clients (and `/health` responses) need to advertise the live
495    /// `host:port` even though port selection happens dynamically (7070–7079
496    /// walk + OS fallback). Stashing it on `AppState` lets request handlers
497    /// surface the discovery value without re-querying the listener.
498    /// What: a `OnceLock<SocketAddr>` so `run_http_on` writes it exactly once
499    /// at bind time and every handler reads it lock-free thereafter. Empty
500    /// (`None` from `get()`) on the stdio path where no listener exists.
501    /// Test: `health_endpoint_reports_bound_addr` (added below).
502    pub bound_addr: Arc<OnceLock<SocketAddr>>,
503    /// Cached prompt-facts surface served by the MCP `get_prompt_context`
504    /// tool (issue #42).
505    ///
506    /// Why: The original session-init `prompts/get` design loaded context
507    /// once per connection; switching to a per-message tool lets the model
508    /// pull fresh, query-filtered context on demand. The cache holds both
509    /// the raw triples (for filtered lookups) and a pre-formatted Markdown
510    /// block (for the unfiltered hot path) so neither code path re-walks
511    /// the KG. The cache is rebuilt by
512    /// `prompt_facts::rebuild_prompt_cache` after any write that touches a
513    /// hot predicate (`kg_assert`, `add_alias`, `remove_prompt_fact`).
514    /// What: An `Arc<tokio::sync::RwLock<PromptFactsCache>>` so the hot
515    /// read path takes a brief read lock and clones the cache; rebuilds
516    /// take a write lock for the assignment only. The async-aware lock
517    /// (issue #229) yields to the tokio runtime instead of blocking a
518    /// runtime thread for the rebuild duration. An empty `triples` vec ↔
519    /// "no context stored yet" (the tool handler renders a hint).
520    /// Test: `get_prompt_context_returns_cached_or_hint`,
521    /// `get_prompt_context_filters_by_query`.
522    pub prompt_context_cache: Arc<RwLock<prompt_facts::PromptFactsCache>>,
523    /// Persistent activity log (issue #96).
524    ///
525    /// Why: the dashboard activity feed used to be a pure live-stream over
526    /// `/sse` — opening the UI showed an empty feed and any mutation from
527    /// the MCP path was invisible. Holding an `ActivityLog` on `AppState`
528    /// lets `emit` record an entry on every push so the
529    /// `GET /api/v1/activity` handler can return historical rows on mount
530    /// and the live SSE stream can continue prepending events on top of
531    /// the loaded history. `None` on builds that opt out (tests that use
532    /// `AppState::new` get a real log under their tempdir so behaviour
533    /// matches production).
534    /// What: an `Arc<ActivityLog>` shared with every emitter.
535    /// Test: `web::tests::activity_endpoint_lists_recent_emits`.
536    pub activity_log: Arc<ActivityLog>,
537    /// Optional per-palace BM25 lexical search lane (issue #156).
538    ///
539    /// Why: in-process BM25 would serialise the recall hot path on disk
540    /// I/O during writes and contend with the redb/usearch locks. Delegating
541    /// to the `trusty-bm25-daemon` subprocess (one socket per palace) keeps
542    /// BM25 ingestion and search off the critical path while still feeding
543    /// hits into the recall RRF fusion.
544    /// What: `Some(client)` only when `TRUSTY_BM25_DAEMON=1` at startup —
545    /// every code path that uses this field is gated on `is_some()` and
546    /// falls back to vector-only behaviour otherwise so existing deployments
547    /// see zero behavioural change.
548    /// Test: `bm25_client_disabled_by_default`,
549    /// `bm25_client_enabled_when_env_set`.
550    pub bm25_client: Option<Arc<Bm25Client>>,
551    /// Optional per-palace BM25 daemon spawn supervisor (issue #193).
552    ///
553    /// Why: without an in-process supervisor the BM25 daemon must be
554    /// launched out-of-band (launchd, manual `trusty-bm25-daemon`), which
555    /// is the same UX trap PR #190 fixed for trusty-embedderd. Holding a
556    /// supervisor here lets us spawn the daemon on first BM25 use for a
557    /// palace, restart it if it dies, and reap it on clean shutdown.
558    /// `Some` only when `TRUSTY_BM25_DAEMON=1` at startup — the same gate
559    /// that enables `bm25_client`. When set but `TRUSTY_BM25_EXTERNAL=1`,
560    /// the supervisor's `ensure_running` becomes a no-op that just returns
561    /// the canonical socket path so operators can keep using their own
562    /// process manager.
563    /// Test: covered by `bm25_supervisor_present_when_env_set` and the
564    /// `bm25_supervisor::tests` unit tests.
565    pub bm25_supervisor: Option<Arc<bm25_supervisor::Bm25Supervisor>>,
566    /// Per-palace write serialisation locks (issue #230).
567    ///
568    /// Why: the dedup gate in `tools.rs` previously read a snapshot of
569    /// existing drawers, checked for near-duplicates via Jaro-Winkler, and
570    /// then issued the write — a classic time-of-check/time-of-use race.
571    /// Two concurrent `memory_remember` calls with the same content could
572    /// both see the pre-write snapshot, both pass the gate, and both land
573    /// duplicate drawers. Serialising the gate-then-write sequence per
574    /// palace closes the window: while one task holds the mutex, any
575    /// concurrent writer for the same palace blocks until the first write
576    /// finishes and is visible to `list_drawers`. The lock is **per
577    /// palace** (not global) so writes to different palaces continue to
578    /// run in parallel.
579    /// What: a `DashMap` keyed by palace id, where each entry is an
580    /// `Arc<tokio::sync::Mutex<()>>`. The mutex is constructed lazily by
581    /// `palace_write_lock` on first access. `Arc` lets callers hold a
582    /// clone of the lock past the lifetime of the `DashMap` entry so the
583    /// map never needs to be held across an `.await`.
584    /// Test: `tools::tests::dedup_gate_blocks_concurrent_duplicate_writes`.
585    pub palace_write_locks: Arc<dashmap::DashMap<String, Arc<tokio::sync::Mutex<()>>>>,
586    /// Counter of in-flight activity-log writes spawned by `emit`
587    /// (issue #232).
588    ///
589    /// Why: `emit` offloads the synchronous redb append to the tokio blocking
590    /// pool via `spawn_blocking` so the async runtime is never parked waiting
591    /// on fsync. The write is fire-and-forget — `emit` returns immediately
592    /// after spawning. Tests that observe the activity log right after a
593    /// burst of `emit` calls need a deterministic synchronization point;
594    /// holding an in-flight counter lets `flush_activity_writes` poll until
595    /// every spawned append has settled, which keeps the assertions
596    /// race-free without forcing every caller to `.await`.
597    /// What: an `Arc<AtomicUsize>` incremented before each `spawn_blocking`
598    /// and decremented inside the closure (after the append completes, even
599    /// if it errored). The counter is cheap (one atomic add per emit) and
600    /// stays at zero in steady-state production traffic.
601    /// Test: `web::tests::activity_endpoint_lists_recent_emits` and
602    /// `tests::emit_persists_mutations_but_skips_status_changed` call
603    /// `flush_activity_writes` to drain the counter before reading the log.
604    pub pending_activity_writes: Arc<AtomicUsize>,
605    /// In-memory cache mapping palace id → `Palace.name` (issue #228).
606    ///
607    /// Why: every `memory_remember` / `memory_note` write used to call
608    /// `PalaceRegistry::list_palaces` (a synchronous filesystem walk of the
609    /// data root) just to resolve a friendly palace name for the SSE
610    /// `DrawerAdded` event. With N palaces on disk the cost was O(N) opendirs
611    /// plus `palace.json` reads on every write, blocking the async runtime.
612    /// Caching the name in-memory turns the lookup into a `DashMap::get`.
613    /// What: `DashMap<String, String>` populated by `create_palace` and
614    /// `load_palaces_from_disk`, kept in sync by rename / delete paths.
615    /// Missing entries are treated as "name unknown" so callers fall back to
616    /// the palace id and the emit path never fails.
617    /// Test: `palace_name_cache_populated_after_hydration` and
618    /// `palace_name_cache_updates_on_create`.
619    pub palace_names: Arc<dashmap::DashMap<String, String>>,
620    /// Bounded sender for the BM25 index worker (issue #231).
621    ///
622    /// Why: the previous fire-and-forget design `tokio::spawn`ed one task per
623    /// `memory_remember` / `memory_note` call, so a write burst against a slow
624    /// or unreachable BM25 daemon grew an unbounded in-flight task queue. A
625    /// single long-lived worker draining a bounded mpsc channel caps that
626    /// back-pressure: writers `try_send` (never block), full-queue requests
627    /// are dropped with a `warn!`, and the worker exits cleanly when the last
628    /// sender is dropped on shutdown.
629    /// What: an `mpsc::Sender` cloned to every `AppState` clone (cheap). The
630    /// matching receiver is consumed by the worker spawned in
631    /// [`AppState::new`] via [`tools::spawn_bm25_index_worker`]. Capacity is
632    /// [`tools::BM25_INDEX_QUEUE_CAPACITY`] (256).
633    /// Test: `bm25_index_queue_drops_when_full` exercises the full-queue
634    /// branch via `bm25_index_enqueue`.
635    pub bm25_index_tx: tokio::sync::mpsc::Sender<tools::Bm25IndexRequest>,
636}
637
638impl AppState {
639    /// Construct an `AppState` rooted at the given on-disk data directory.
640    ///
641    /// Why: The CLI (`serve`) and integration tests need to point the MCP
642    /// server at different roots — production at `dirs::data_dir`, tests at a
643    /// `tempfile::tempdir()`.
644    /// What: Builds an empty `PalaceRegistry`, captures the version, and
645    /// allocates an empty `OnceCell` for the embedder. `default_palace` is
646    /// `None`; use `with_default_palace` to set it.
647    /// Test: `tools::tests::dispatch_palace_create_persists` constructs an
648    /// AppState pointed at a tempdir and round-trips a palace through it.
649    pub fn new(data_root: PathBuf) -> Self {
650        let (events_tx, _) = broadcast::channel::<DaemonEvent>(128);
651        // Issue #96: open (or create) the persistent activity log under the
652        // daemon data root. Open failure is logged but never crashes the
653        // daemon — we fall back to a per-process tempdir so emits remain
654        // best-effort and the rest of the daemon keeps working.
655        let activity_log = open_activity_log_with_fallback(&data_root);
656        // Issue #231: bounded mpsc channel + single long-lived worker
657        // replaces the per-write `tokio::spawn` fire-and-forget pattern so
658        // BM25 indexing back-pressure is capped. The worker is spawned here
659        // unconditionally so the channel always has a drain — even when
660        // `bm25_client` is `None`, the worker just consumes and discards
661        // each request so senders never block on a full queue.
662        let (bm25_index_tx, bm25_index_rx) =
663            tokio::sync::mpsc::channel::<tools::Bm25IndexRequest>(tools::BM25_INDEX_QUEUE_CAPACITY);
664        // `bm25_client` / `bm25_supervisor` start as `None`; the builder
665        // `with_bm25_client_from_env` rebuilds the worker with the real
666        // client + supervisor once env-gated opt-in is resolved.
667        tools::spawn_bm25_index_worker(bm25_index_rx, None, None);
668        Self {
669            version: env!("CARGO_PKG_VERSION").to_string(),
670            registry: Arc::new(PalaceRegistry::new()),
671            data_root,
672            embedder: Arc::new(OnceCell::new()),
673            default_palace: None,
674            chat_provider: Arc::new(OnceCell::new()),
675            session_stores: Arc::new(dashmap::DashMap::new()),
676            events: Arc::new(events_tx),
677            started_at: std::time::Instant::now(),
678            // Default to an empty buffer — `with_log_buffer` overrides this
679            // when the daemon installs the `LogBufferLayer` (HTTP mode).
680            log_buffer: trusty_common::log_buffer::LogBuffer::new(
681                trusty_common::log_buffer::DEFAULT_LOG_CAPACITY,
682            ),
683            disk_bytes: Arc::new(std::sync::atomic::AtomicU64::new(0)),
684            sys_metrics: Arc::new(tokio::sync::Mutex::new(
685                trusty_common::sys_metrics::SysMetrics::new(),
686            )),
687            bound_addr: Arc::new(OnceLock::new()),
688            prompt_context_cache: Arc::new(RwLock::new(prompt_facts::PromptFactsCache::default())),
689            activity_log,
690            bm25_client: None,
691            bm25_supervisor: None,
692            palace_write_locks: Arc::new(dashmap::DashMap::new()),
693            pending_activity_writes: Arc::new(AtomicUsize::new(0)),
694            palace_names: Arc::new(dashmap::DashMap::new()),
695            bm25_index_tx,
696        }
697    }
698
699    /// Acquire (lazily, then clone) the per-palace write mutex.
700    ///
701    /// Why (issue #230): the dedup-check + `remember_with_options` write
702    /// sequence in `tools.rs` must be atomic per palace to prevent two
703    /// concurrent identical writes from both passing the dedup gate.
704    /// Callers hold the returned `Arc<Mutex<()>>`'s guard across the gate
705    /// check and the write so the second writer blocks until the first
706    /// write is visible to `list_drawers`. Returning a clone of the `Arc`
707    /// rather than a borrow into the `DashMap` lets the caller `.await`
708    /// while holding the lock without risking a deadlock against any
709    /// future map mutation (DashMap shards are sync mutexes).
710    /// What: looks up the palace id in `palace_write_locks` and returns
711    /// a clone of the existing mutex; on the first call for a palace,
712    /// inserts a freshly-constructed `tokio::sync::Mutex<()>` first. The
713    /// `DashMap::entry().or_insert_with` API guarantees the lazy
714    /// construction is racy-safe — only one mutex is ever inserted per
715    /// palace id.
716    /// Test: `tools::tests::dedup_gate_blocks_concurrent_duplicate_writes`.
717    pub fn palace_write_lock(&self, palace_id: &str) -> Arc<tokio::sync::Mutex<()>> {
718        if let Some(existing) = self.palace_write_locks.get(palace_id) {
719            return existing.clone();
720        }
721        self.palace_write_locks
722            .entry(palace_id.to_string())
723            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
724            .clone()
725    }
726
727    /// Builder-style: opt-in to the BM25 lexical lane (issue #156).
728    ///
729    /// Why: the BM25 subprocess is gated behind `TRUSTY_BM25_DAEMON=1` so
730    /// the default `cargo install trusty-memory` / launchd plist deployment
731    /// stays vector-only and existing test fixtures keep passing without
732    /// having to provision a daemon. Reading the env var here keeps the
733    /// gating logic in one place (the helper in `main.rs` just plumbs the
734    /// result through).
735    /// What: when `TRUSTY_BM25_DAEMON=1`, constructs one `Bm25Client` per
736    /// palace by lazy-resolving the socket path the first time the palace
737    /// id is observed. Currently we install a shared `default` client up
738    /// front and re-key on the palace id at the call site — palaces with no
739    /// daemon socket simply see search/index errors which we log + ignore.
740    /// Returns `self` unchanged when the env var is unset or set to anything
741    /// other than `1`.
742    /// Test: `bm25_client_disabled_by_default`,
743    /// `bm25_client_enabled_when_env_set`.
744    #[must_use]
745    pub fn with_bm25_client_from_env(mut self) -> Self {
746        if std::env::var("TRUSTY_BM25_DAEMON").as_deref() == Ok("1") {
747            // Install the default-palace client; per-palace clients are
748            // constructed on demand via `Bm25Client::for_palace`.
749            let default_palace = self.default_palace.as_deref().unwrap_or("default");
750            self.bm25_client = Some(Arc::new(Bm25Client::for_palace(default_palace)));
751            // Issue #193: hand-in-hand with the client, attach a spawn
752            // supervisor so the BM25 daemon is auto-started on first use
753            // for any palace. Operators who want to manage daemons
754            // out-of-band (launchd, systemd, manual) set
755            // TRUSTY_BM25_EXTERNAL=1 which makes the supervisor a no-op.
756            self.bm25_supervisor = Some(Arc::new(bm25_supervisor::Bm25Supervisor::new()));
757            // Issue #231: rebuild the bounded indexer channel + worker so
758            // the worker holds the now-populated client + supervisor. The
759            // placeholder worker installed by `AppState::new` (with `None`
760            // / `None`) drained the channel into the void — replacing the
761            // sender here closes the placeholder receiver and the
762            // placeholder worker exits cleanly. The new worker takes over
763            // as the sole drain for the indexer queue.
764            let (tx, rx) = tokio::sync::mpsc::channel::<tools::Bm25IndexRequest>(
765                tools::BM25_INDEX_QUEUE_CAPACITY,
766            );
767            tools::spawn_bm25_index_worker(
768                rx,
769                self.bm25_client.clone(),
770                self.bm25_supervisor.clone(),
771            );
772            self.bm25_index_tx = tx;
773            tracing::info!(
774                palace = default_palace,
775                "BM25 daemon client + spawn supervisor enabled (TRUSTY_BM25_DAEMON=1)"
776            );
777        }
778        self
779    }
780
781    /// Scan the palace registry directory and re-register every persisted
782    /// palace into the in-memory [`PalaceRegistry`].
783    ///
784    /// Why: `AppState::new` builds an *empty* registry, so after a daemon
785    /// restart `palace_list` / the dashboard reported zero palaces even though
786    /// dozens existed on disk — palace metadata was persisted by
787    /// `palace_create` but never re-hydrated on startup. This method closes
788    /// that gap by walking the on-disk layout (each subdirectory holding a
789    /// `palace.json` is one palace) and rebuilding a live `PalaceHandle` for
790    /// each, so recall paths see the full set immediately after a restart.
791    /// What: runs the blocking filesystem walk + per-palace `PalaceHandle::open`
792    /// on a `spawn_blocking` thread (so it never stalls the async runtime),
793    /// registers each successfully opened palace via `register_arc`, logs every
794    /// load at `debug!`, and returns the count loaded. A palace that fails to
795    /// open (corrupt index, unreadable `kg.db`, etc.) is logged at `warn!` and
796    /// skipped — one bad palace must not abort startup or crash the daemon.
797    /// `data_root` is expected to already be the palace registry directory —
798    /// `main.rs` resolves it via [`resolve_palace_registry_dir`] before
799    /// constructing the `AppState`, so the flat / legacy-`palaces/` layout
800    /// difference is handled exactly once.
801    /// Test: `tests::load_palaces_from_disk_rehydrates_registry` writes two
802    /// palaces into a tempdir, constructs an `AppState`, calls this method, and
803    /// asserts the returned count and registry contents.
804    pub async fn load_palaces_from_disk(&self) -> Result<usize> {
805        let registry_dir = self.data_root.clone();
806        let registry = self.registry.clone();
807        let palace_names = self.palace_names.clone();
808        // The directory walk and each `PalaceHandle::open` perform blocking
809        // filesystem + redb/usearch I/O — run the whole hydration on the
810        // blocking pool so it never parks an async worker thread.
811        let count = tokio::task::spawn_blocking(move || -> Result<usize> {
812            let palaces = PalaceRegistry::list_palaces(&registry_dir)?;
813            let total = palaces.len();
814            let mut loaded = 0usize;
815            let mut skipped = 0usize;
816            for palace in palaces {
817                match trusty_common::memory_core::PalaceHandle::open(&palace) {
818                    Ok(handle) => {
819                        tracing::debug!(
820                            palace = %palace.id,
821                            data_dir = %palace.data_dir.display(),
822                            "loaded palace from disk"
823                        );
824                        // Issue #228: seed the in-memory name cache so write
825                        // hot paths (memory_remember / memory_note) can resolve
826                        // the friendly palace name without re-walking the data
827                        // root. Insert here (during hydration) is the single
828                        // point of truth for restart-time population.
829                        palace_names.insert(palace.id.0.clone(), palace.name.clone());
830                        registry.register_arc(handle);
831                        loaded += 1;
832                    }
833                    Err(e) => {
834                        // Why: a single bad palace (corrupt kg.db, stale WAL,
835                        // permissions) must never abort startup or block the
836                        // HTTP server from binding. Log per-palace and keep
837                        // going; the summary below tells operators how many
838                        // were skipped without trawling the log.
839                        tracing::warn!(
840                            palace = %palace.id,
841                            data_dir = %palace.data_dir.display(),
842                            "skipping palace during startup hydration: {e:#}"
843                        );
844                        skipped += 1;
845                    }
846                }
847            }
848            tracing::info!(
849                "palace hydration summary: loaded {loaded}/{total} ({skipped} skipped due to errors)"
850            );
851            Ok(loaded)
852        })
853        .await
854        .map_err(|e| anyhow::anyhow!("join load_palaces_from_disk: {e}"))??;
855        Ok(count)
856    }
857
858    /// Builder-style: attach the daemon's shared [`LogBuffer`] so the
859    /// `GET /api/v1/logs/tail` endpoint serves the same lines the tracing
860    /// subscriber captures (issue #35).
861    ///
862    /// Why: `main` builds the buffer (via `init_tracing_with_buffer`) before
863    /// constructing the `AppState`, then hands a clone here so the HTTP
864    /// handler and the tracing layer observe the same ring.
865    /// What: replaces the empty default buffer with the supplied one.
866    /// Test: `logs_tail_returns_recent_lines`.
867    #[must_use]
868    pub fn with_log_buffer(mut self, buffer: trusty_common::log_buffer::LogBuffer) -> Self {
869        self.log_buffer = buffer;
870        self
871    }
872
873    /// Send a `DaemonEvent` to all connected SSE subscribers and persist
874    /// it to the activity log when the variant carries a source.
875    ///
876    /// Why: Mutating handlers call this after a successful write so the
877    /// dashboard can update without polling. The send is best-effort —
878    /// `broadcast::Sender::send` returns `Err` only when there are no live
879    /// receivers, which is fine (no listeners == no work to do). Issue
880    /// #96 additionally writes the entry to the persistent activity log
881    /// so the feed can serve historical rows on page load and so MCP /
882    /// HTTP / Hook origins are visible to the operator. Persistence is
883    /// also best-effort — a write failure is logged but never blocks the
884    /// SSE broadcast.
885    ///
886    /// Issue #232: the activity-log append is a synchronous redb write +
887    /// fsync. Calling it directly on the async caller's task parked a tokio
888    /// worker thread on disk I/O for every SSE event. We now offload the
889    /// append to the blocking thread pool via `spawn_blocking` and return
890    /// immediately — `emit` stays synchronous so every existing caller
891    /// (including the sync `dispatch_hook_fired` JSON-RPC handler) keeps
892    /// compiling unchanged. The fire-and-forget pattern matches the
893    /// pre-fix semantics (best-effort, never blocks the SSE broadcast)
894    /// while freeing the async runtime to do real work during the write.
895    /// What: serialises the event for the log (skipping `StatusChanged`
896    /// which is a recomputed aggregate, not a mutation), spawns the redb
897    /// append on `tokio::task::spawn_blocking` keyed by a clone of the
898    /// `Arc<ActivityLog>` and the cloned event, then sends the event over
899    /// the broadcast channel. A `pending_activity_writes` counter is bumped
900    /// before the spawn and decremented inside the closure so
901    /// [`Self::flush_activity_writes`] can drain in tests.
902    /// Test: `web::tests::sse_stream_receives_palace_created` confirms a
903    /// subscriber observes the emitted event;
904    /// `activity_endpoint_lists_recent_emits` confirms persistence via
905    /// `flush_activity_writes`.
906    pub fn emit(&self, event: DaemonEvent) {
907        if let Some(source) = event.source() {
908            let event_type = event.type_str();
909            let palace_id = event.palace_id().map(|s| s.to_string());
910            let log = Arc::clone(&self.activity_log);
911            let event_for_log = event.clone();
912            let pending = Arc::clone(&self.pending_activity_writes);
913            // Pre-allocate the sequence id in the emitting thread so the
914            // persisted order matches the emission order even when blocking-pool
915            // workers execute the writes concurrently (issue #247). Without
916            // this, four rapid emits would assign IDs inside their respective
917            // `spawn_blocking` closures in a non-deterministic order.
918            let id = log.alloc_id();
919            pending.fetch_add(1, Ordering::SeqCst);
920            // Why: the synchronous redb append + fsync must not park an
921            // async worker thread (issue #232). Spawn the write on the
922            // blocking pool; the JoinHandle is intentionally dropped —
923            // the write is best-effort and any failure is logged below.
924            tokio::task::spawn_blocking(move || {
925                let result = log.append_with_id(id, source, palace_id, event_type, &event_for_log);
926                if let Err(e) = result {
927                    tracing::warn!("activity_log.append failed for {event_type}: {e:#}");
928                }
929                pending.fetch_sub(1, Ordering::SeqCst);
930            });
931        }
932        let _ = self.events.send(event);
933    }
934
935    /// Block (asynchronously) until every in-flight activity-log write
936    /// spawned by [`Self::emit`] has settled.
937    ///
938    /// Why: `emit` offloads its redb append to `tokio::task::spawn_blocking`
939    /// and returns immediately (issue #232). Tests that observe the
940    /// activity log right after a burst of emits would otherwise race the
941    /// blocking-pool worker; this helper gives them a deterministic
942    /// synchronization point. Production code never needs to call this —
943    /// the dashboard reads through `GET /api/v1/activity`, which already
944    /// tolerates writes settling asynchronously.
945    /// What: spins on `pending_activity_writes` with a 1 ms yield until the
946    /// counter is zero. Cheap: tests typically emit a handful of events
947    /// and the loop exits within a single scheduler tick.
948    /// Test: covered indirectly by `emit_persists_mutations_but_skips_status_changed`
949    /// and `web::tests::activity_endpoint_lists_recent_emits`.
950    pub async fn flush_activity_writes(&self) {
951        while self.pending_activity_writes.load(Ordering::SeqCst) > 0 {
952            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
953        }
954    }
955
956    /// Open (or return cached) the chat-session store for a palace.
957    ///
958    /// Why: Chat session persistence lives in a dedicated SQLite file under
959    /// the palace's data dir (`chat_sessions.db`) so it doesn't intermingle
960    /// with the KG's transactional load. The store is cheap to clone via
961    /// `Arc` but the underlying r2d2 pool should be reused, so cache by id.
962    /// What: Creates the palace data dir if missing, opens (or reuses) a
963    /// `ChatSessionStore` and stashes an `Arc` in the DashMap.
964    /// Test: Indirectly via the session HTTP handlers in `web::tests`.
965    pub fn session_store(&self, palace_id: &str) -> Result<Arc<ChatSessionStore>> {
966        if let Some(entry) = self.session_stores.get(palace_id) {
967            return Ok(entry.clone());
968        }
969        let dir = self.data_root.join(palace_id);
970        std::fs::create_dir_all(&dir)
971            .map_err(|e| anyhow::anyhow!("create palace dir {}: {e}", dir.display()))?;
972        let store = Arc::new(ChatSessionStore::open(&dir.join("chat_sessions.db"))?);
973        self.session_stores
974            .insert(palace_id.to_string(), store.clone());
975        Ok(store)
976    }
977
978    /// Builder-style setter for the default palace name.
979    ///
980    /// Why: `serve --palace <name>` wants to bind every tool call to a
981    /// project-scoped namespace without forcing every MCP request to repeat
982    /// the palace argument.
983    /// What: Returns `self` with `default_palace = Some(name)`.
984    /// Test: `default_palace_used_when_arg_omitted` covers the resolution
985    /// path; this setter is exercised there.
986    pub fn with_default_palace(mut self, name: Option<String>) -> Self {
987        self.default_palace = name;
988        self
989    }
990
991    /// Resolve (or initialize) the shared embedder.
992    ///
993    /// Why: FastEmbedder load is expensive — we share one instance across all
994    /// tool calls; the `OnceCell` ensures concurrent first-use races collapse
995    /// to a single load.
996    /// What: Returns `Arc<FastEmbedder>` on success. Errors propagate from the
997    /// underlying ONNX load.
998    /// Test: Indirectly via `dispatch_remember_then_recall`.
999    /// Resolve the active chat provider, auto-detecting on first call.
1000    ///
1001    /// Why: Provider selection depends on filesystem-loaded config plus a
1002    /// network probe (Ollama liveness), so it must be lazily initialised at
1003    /// runtime. Caching the choice in a `OnceCell` keeps it stable across
1004    /// concurrent requests without re-probing on every chat call.
1005    /// What: On first use loads `~/.trusty-memory/config.toml`, prefers an
1006    /// auto-detected Ollama instance (when `local_model.enabled`), and falls
1007    /// back to OpenRouter when an API key is set. Returns `Ok(None)` when
1008    /// neither is available so the caller can emit a 412.
1009    /// Test: `web::tests::providers_endpoint_returns_payload` covers the
1010    /// detection path indirectly through `/api/v1/chat/providers`.
1011    pub async fn chat_provider(&self) -> Option<Arc<dyn ChatProvider>> {
1012        self.chat_provider
1013            .get_or_init(|| async {
1014                // Why (issue #226): `service::load_user_config` is the
1015                //      axum-free home of the loader; the `web::load_user_config`
1016                //      re-export only exists for the HTTP handlers. Going
1017                //      direct to `service` keeps this method usable when
1018                //      the `axum-server` feature is disabled.
1019                let cfg = crate::service::load_user_config().unwrap_or_default();
1020                if cfg.local_model.enabled {
1021                    if let Some(mut p) =
1022                        trusty_common::auto_detect_local_provider(&cfg.local_model.base_url).await
1023                    {
1024                        // auto_detect returns an empty model id; callers must
1025                        // set the configured model name themselves.
1026                        p.model = cfg.local_model.model.clone();
1027                        return Some(Arc::new(p) as Arc<dyn ChatProvider>);
1028                    }
1029                }
1030                if !cfg.openrouter_api_key.is_empty() {
1031                    return Some(Arc::new(trusty_common::OpenRouterProvider::new(
1032                        cfg.openrouter_api_key,
1033                        cfg.openrouter_model,
1034                    )) as Arc<dyn ChatProvider>);
1035                }
1036                None
1037            })
1038            .await
1039            .clone()
1040    }
1041
1042    /// Spawn a fire-and-forget background task that auto-discovers project
1043    /// aliases under `project_root` and asserts new ones into `palace`.
1044    ///
1045    /// Why (issue #42): Projects carry implicit shorthand — cargo package
1046    /// names that differ from their directory, binary names that differ
1047    /// from packages, first-letter abbreviations — that should be surfaced
1048    /// without a user ever calling `add_alias`. Running discovery as a
1049    /// detached task on palace-open keeps startup latency unchanged: the
1050    /// daemon binds and starts serving immediately while the discovery scan
1051    /// completes in the background, and any newly-asserted aliases land in
1052    /// the prompt cache before the model's next `get_prompt_context` call.
1053    /// What: clones `self` (cheap; `Arc`-backed), spawns a tokio task that
1054    /// invokes the `discover_aliases` tool handler directly so the
1055    /// dedup + cache-rebuild logic runs exactly the same path as the MCP
1056    /// tool call. Errors are logged at `warn!`; one failed discovery never
1057    /// destabilises the daemon.
1058    /// Test: not unit-tested (timing-dependent fire-and-forget); the
1059    /// underlying `discover_aliases` dispatch is covered by
1060    /// `dispatch_discover_aliases_inserts_new_and_dedupes` in `tools::tests`.
1061    pub fn spawn_alias_discovery(&self, palace: String, project_root: PathBuf) {
1062        let state = self.clone();
1063        tokio::spawn(async move {
1064            let args = serde_json::json!({
1065                "palace": palace,
1066                "project_root": project_root.to_string_lossy(),
1067            });
1068            match tools::dispatch_tool(&state, "discover_aliases", args).await {
1069                Ok(result) => tracing::info!(
1070                    new = ?result.get("new"),
1071                    already_known = ?result.get("already_known"),
1072                    "alias discovery complete"
1073                ),
1074                Err(e) => tracing::warn!("alias discovery failed: {e:#}"),
1075            }
1076        });
1077    }
1078
1079    pub async fn embedder(&self) -> Result<Arc<FastEmbedder>> {
1080        let cell = self.embedder.clone();
1081        let embedder = cell
1082            .get_or_try_init(|| async {
1083                let e = FastEmbedder::new().await?;
1084                Ok::<Arc<FastEmbedder>, anyhow::Error>(Arc::new(e))
1085            })
1086            .await?
1087            .clone();
1088        Ok(embedder)
1089    }
1090}
1091
1092impl std::fmt::Debug for AppState {
1093    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1094        f.debug_struct("AppState")
1095            .field("version", &self.version)
1096            .field("data_root", &self.data_root)
1097            .field("registry_len", &self.registry.len())
1098            .finish()
1099    }
1100}
1101
1102/// Handle a single MCP JSON-RPC message and produce its response.
1103///
1104/// Why: Pulled out of the stdio loop so unit tests can drive every method
1105/// without touching real stdin/stdout.
1106/// What: Routes `initialize`, `tools/list`, `tools/call`, `ping`, and the
1107/// `notifications/initialized` notification (which returns `Value::Null`).
1108/// Test: See unit tests below — initialize/list/call all return expected
1109/// JSON-RPC envelopes; notifications return `Null` (no response written).
1110pub async fn handle_message(state: &AppState, msg: Value) -> Value {
1111    let id = msg.get("id").cloned().unwrap_or(Value::Null);
1112    let method = msg.get("method").and_then(|m| m.as_str()).unwrap_or("");
1113
1114    match method {
1115        "initialize" => {
1116            let extra = state
1117                .default_palace
1118                .as_ref()
1119                .map(|dp| json!({ "default_palace": dp }));
1120            let result = initialize_response("trusty-memory", &state.version, extra);
1121            // Why (issue #42): prompt-facts now flow through the
1122            // per-message `get_prompt_context` tool rather than MCP
1123            // prompts, so we no longer advertise the `prompts` capability.
1124            json!({
1125                "jsonrpc": "2.0",
1126                "id": id,
1127                "result": result,
1128            })
1129        }
1130        // Notifications must NOT receive a response.
1131        "notifications/initialized" | "notifications/cancelled" => Value::Null,
1132        "tools/list" => json!({
1133            "jsonrpc": "2.0",
1134            "id": id,
1135            "result": tools::tool_definitions_with(state.default_palace.is_some())
1136        }),
1137        // OpenRPC 1.3.2 discovery — see `openrpc.rs`. Returns the full
1138        // service description so orchestrators (open-mpm, etc.) can
1139        // introspect every tool and its required `memory.read`/`memory.write`
1140        // scope without bespoke per-server adapters.
1141        "rpc.discover" => json!({
1142            "jsonrpc": "2.0",
1143            "id": id,
1144            "result": openrpc::build_discover_response(
1145                &state.version,
1146                state.default_palace.is_some(),
1147            ),
1148        }),
1149        "tools/call" => {
1150            let params = msg.get("params").cloned().unwrap_or_default();
1151            let tool_name = params
1152                .get("name")
1153                .and_then(|n| n.as_str())
1154                .unwrap_or("")
1155                .to_string();
1156            let args = params.get("arguments").cloned().unwrap_or_default();
1157            match tools::dispatch_tool(state, &tool_name, args).await {
1158                Ok(content) => {
1159                    // Why: tools that return a bare JSON string (e.g.
1160                    // `get_prompt_context` returning the formatted
1161                    // Markdown block) should surface as plain text in the
1162                    // MCP `content[0].text` field — wrapping in
1163                    // `Value::to_string()` would re-quote the payload and
1164                    // force every caller to strip outer quotes.
1165                    let text = match &content {
1166                        Value::String(s) => s.clone(),
1167                        other => other.to_string(),
1168                    };
1169                    json!({
1170                        "jsonrpc": "2.0",
1171                        "id": id,
1172                        "result": {
1173                            "content": [{"type": "text", "text": text}]
1174                        }
1175                    })
1176                }
1177                Err(e) => json!({
1178                    "jsonrpc": "2.0",
1179                    "id": id,
1180                    // Why: anyhow's `{:#}` alternate format walks the full
1181                    // `Caused by:` chain so MCP clients see actionable
1182                    // detail (e.g. "PalaceHandle::remember_with_options:
1183                    // filter rejected: too short") instead of just the
1184                    // outermost context label.
1185                    "error": {"code": -32603, "message": format!("{e:#}")}
1186                }),
1187            }
1188        }
1189        "ping" => json!({"jsonrpc": "2.0", "id": id, "result": {}}),
1190        _ => json!({
1191            "jsonrpc": "2.0",
1192            "id": id,
1193            "error": {
1194                "code": -32601,
1195                "message": format!("Method not found: {method}")
1196            }
1197        }),
1198    }
1199}
1200
1201/// Preferred starting port for the trusty-memory HTTP daemon.
1202///
1203/// Why: keeps the well-known default stable for clients that have hard-coded
1204/// `127.0.0.1:7070` in their configuration, while still allowing dynamic
1205/// walking when the port is in use (`DYNAMIC_PORT_RANGE` ports starting here).
1206/// What: `7070` — historic default, matches the launchd plist's prior value.
1207/// Test: covered indirectly by `bind_dynamic_port_returns_listener`.
1208pub const DEFAULT_HTTP_PORT: u16 = 7070;
1209
1210/// Number of consecutive ports `bind_dynamic_port` walks before falling back
1211/// to the OS-assigned port. Matches the trusty-search convention.
1212const DYNAMIC_PORT_RANGE: u16 = 10;
1213
1214/// Path to the canonical address-discovery file for the trusty-memory daemon.
1215///
1216/// Why: clients (CLI, MCP tools, dashboards) need to find the running daemon
1217/// without configuration when the port was selected dynamically. Using
1218/// `trusty_common::resolve_data_dir` aligns this path with the location
1219/// that `trusty_common::read_daemon_addr("trusty-memory")` reads from, so
1220/// `prompt-context`, `doctor`, and `start`'s probe all find the running daemon.
1221/// The old `~/.trusty-memory/http_addr` path and the new
1222/// `~/Library/Application Support/trusty-memory/http_addr` (macOS) path were
1223/// divergent — the daemon wrote one; readers expected the other.
1224/// What: returns `{resolve_data_dir("trusty-memory")}/http_addr`, or `None` if
1225/// the data dir cannot be resolved (locked-down container, no passwd entry).
1226/// Test: `http_addr_path_uses_resolve_data_dir`.
1227pub fn http_addr_path() -> Option<PathBuf> {
1228    trusty_common::resolve_data_dir("trusty-memory")
1229        .ok()
1230        .map(|d| d.join("http_addr"))
1231}
1232
1233/// Bind a `TcpListener` to `127.0.0.1`, dynamically selecting a port.
1234///
1235/// Why: the historic default `7070` is convenient for clients but a stale
1236/// process or a second daemon must not produce a noisy failure. Walking
1237/// `DEFAULT_HTTP_PORT..DEFAULT_HTTP_PORT+DYNAMIC_PORT_RANGE` first preserves
1238/// backwards compatibility for the common case; OS-assigned fallback (`:0`)
1239/// guarantees the daemon always comes up even when every preferred port is
1240/// busy.
1241/// What: returns the first successful `TcpListener`. Tries 7070..=7079
1242/// in order, then falls back to OS-assigned. Caller inspects
1243/// `local_addr()` to learn the chosen port.
1244/// Test: `bind_dynamic_port_returns_listener` confirms it always binds *some*
1245/// port even after another listener occupies the preferred one.
1246pub async fn bind_dynamic_port() -> Result<tokio::net::TcpListener> {
1247    let preferred: SocketAddr = SocketAddr::from(([127, 0, 0, 1], DEFAULT_HTTP_PORT));
1248    // First: walk the preferred range (7070..=7079).
1249    if let Ok(listener) =
1250        trusty_common::bind_with_auto_port(preferred, DYNAMIC_PORT_RANGE - 1).await
1251    {
1252        return Ok(listener);
1253    }
1254    // Last resort: ask the kernel for any free port. `bind_with_auto_port`
1255    // with `:0` resolves immediately to the OS-assigned port.
1256    tracing::warn!(
1257        "all ports {DEFAULT_HTTP_PORT}..{} in use; requesting OS-assigned port",
1258        DEFAULT_HTTP_PORT + DYNAMIC_PORT_RANGE - 1
1259    );
1260    let any: SocketAddr = SocketAddr::from(([127, 0, 0, 1], 0));
1261    trusty_common::bind_with_auto_port(any, 0).await
1262}
1263
1264/// Write the bound `host:port` to `~/.trusty-memory/http_addr` atomically.
1265///
1266/// Why: clients must read the file mid-write without observing a partial
1267/// value. Writing to a `.tmp` sibling and renaming over the target gives
1268/// POSIX atomicity, matching the trusty-search implementation.
1269/// What: creates `~/.trusty-memory/` if missing; writes `addr` followed by a
1270/// trailing newline (avoids the "no newline at end of file" warnings from
1271/// `cat`); renames `.tmp` → `http_addr`. Best-effort: I/O errors are
1272/// returned to the caller so `run_http_on` can log without panicking.
1273/// Test: `http_addr_file_round_trip_via_helpers`.
1274#[cfg(feature = "axum-server")]
1275fn write_http_addr_file(path: &Path, addr: &SocketAddr) -> std::io::Result<()> {
1276    use std::io::Write;
1277    if let Some(parent) = path.parent() {
1278        std::fs::create_dir_all(parent)?;
1279    }
1280    let tmp = path.with_extension("addr.tmp");
1281    {
1282        let mut f = std::fs::File::create(&tmp)?;
1283        writeln!(f, "{addr}")?;
1284        f.sync_all()?;
1285    }
1286    std::fs::rename(&tmp, path)?;
1287    Ok(())
1288}
1289
1290/// Run the optional HTTP/SSE + web admin server.
1291///
1292/// Why: A long-running daemon mode lets non-stdio clients (browsers, curl,
1293/// future remote agents) hit `/health`, the `/api/v1/*` REST surface, and the
1294/// embedded admin SPA.
1295/// What: axum router built from `web::router()` plus a `/sse` stub for the
1296/// existing MCP-over-SSE clients. Caller provides a pre-bound listener so
1297/// port auto-detection lives at the call site. Before accepting connections
1298/// the daemon stamps the bound `host:port` onto `AppState.bound_addr` and
1299/// writes `~/.trusty-memory/http_addr` so clients can discover the live port.
1300/// On shutdown the file is removed best-effort (a stale file with the wrong
1301/// port is worse than a missing one).
1302/// Test: `cargo test -p trusty-memory web::tests` exercises the router shape;
1303/// manual: `curl http://127.0.0.1:<port>/health` returns `ok` with `addr`.
1304#[cfg(feature = "axum-server")]
1305pub async fn run_http_on(state: AppState, listener: tokio::net::TcpListener) -> Result<()> {
1306    use axum::routing::get;
1307
1308    // Issue #35: recompute the `data_root` disk footprint every 10 s on a
1309    // background task so `GET /health` reports `disk_bytes` without doing a
1310    // recursive directory walk on the request path.
1311    spawn_disk_size_ticker(state.clone());
1312
1313    // Issue #228: emit aggregate `StatusChanged` on a fixed cadence rather
1314    // than on every drawer write. The previous design called
1315    // `aggregate_status_event` from every `memory_remember` / `memory_note`
1316    // / `memory_forget` (and the matching HTTP handlers), each of which
1317    // walked the data root + opened every palace handle. Coalescing the
1318    // emit to a 30 s ticker keeps dashboards live without dragging an
1319    // O(N palaces) recompute onto the write hot path.
1320    spawn_status_event_ticker(state.clone());
1321
1322    // Capture and advertise the bound address BEFORE serving so the first
1323    // request handler — and the http_addr discovery file — see the real port
1324    // even if `local_addr()` would otherwise be racy.
1325    let local = listener.local_addr().ok();
1326    let written_path = if let Some(a) = local {
1327        // Stash on state for handlers (e.g. /health) to surface.
1328        let _ = state.bound_addr.set(a);
1329        info!("HTTP server listening on http://{a}");
1330        eprintln!("HTTP server listening on http://{a}");
1331        // Best-effort: a missing $HOME or read-only fs is non-fatal — the
1332        // /health endpoint still advertises `addr`. Logging the failure
1333        // helps operators diagnose discovery problems.
1334        match http_addr_path() {
1335            Some(p) => match write_http_addr_file(&p, &a) {
1336                Ok(()) => {
1337                    info!("wrote daemon address to {}", p.display());
1338                    Some(p)
1339                }
1340                Err(e) => {
1341                    tracing::warn!("could not write {}: {e}", p.display());
1342                    None
1343                }
1344            },
1345            None => {
1346                tracing::warn!("no $HOME — skipping http_addr discovery file");
1347                None
1348            }
1349        }
1350    } else {
1351        None
1352    };
1353
1354    // Multi-transport refactor: bind the Unix domain socket alongside
1355    // the HTTP listener. The UDS serves NDJSON JSON-RPC 2.0 for the
1356    // `trusty-memory-mcp-bridge` binary (and any local CLI that wants
1357    // to skip HTTP overhead). Failures are logged but never block the
1358    // HTTP server from coming up — UDS is best-effort on hosts where
1359    // it's unsupported (e.g. some Docker overlays).
1360    let uds_sock_path = spawn_uds_listener(state.clone()).await;
1361
1362    // Keep a handle to the BM25 supervisor (if any) so we can call
1363    // `shutdown()` on the exit path. Cloning here is cheap (`Arc`) and
1364    // detaches the lifetime of the supervisor from the `state` move into
1365    // the router below.
1366    let bm25_supervisor = state.bm25_supervisor.clone();
1367
1368    let app = web::router()
1369        .route("/sse", get(sse_handler))
1370        .with_state(state);
1371
1372    let serve_result = axum::serve(listener, app).await;
1373
1374    // Best-effort cleanup: remove `http_addr` so stale clients fail fast
1375    // instead of timing out against a dead port.
1376    if let Some(p) = written_path.as_ref() {
1377        let _ = std::fs::remove_file(p);
1378    }
1379    if let Some(p) = uds_sock_path.as_ref() {
1380        let _ = std::fs::remove_file(p);
1381    }
1382
1383    // Issue #193: gracefully reap every spawned BM25 daemon before the
1384    // process exits so each one gets a chance to flush its snapshot and
1385    // unlink its socket. `kill_on_drop=true` on the children would
1386    // SIGKILL them on Drop anyway, but that skips the daemon's own
1387    // shutdown sequence and leaves stale sockets behind.
1388    if let Some(supervisor) = bm25_supervisor {
1389        supervisor.shutdown().await;
1390    }
1391
1392    serve_result?;
1393    Ok(())
1394}
1395
1396/// Spawn the UDS accept loop alongside the HTTP server.
1397///
1398/// Why: UDS is an additive transport — failing to bind it (unusual
1399/// $TMPDIR layout, permission error on macOS) should not block the
1400/// HTTP daemon from coming up. Logging the failure and returning
1401/// `None` lets the caller skip cleanup later.
1402/// What: resolves [`transport::uds::socket_path`], cleans any stale
1403/// file, binds, writes the `<data_root>/uds_addr` discovery file, and
1404/// spawns the accept loop on a background tokio task. Returns the
1405/// bound path so the caller can clean it up on shutdown.
1406/// Test: covered by `uds_ndjson_roundtrip` in the integration tests
1407/// and the unit tests in [`transport::uds`].
1408#[cfg(feature = "axum-server")]
1409async fn spawn_uds_listener(state: AppState) -> Option<PathBuf> {
1410    // Use a data-root-scoped socket path so multiple daemons (typical
1411    // in tests) don't collide on the shared `$TMPDIR/trusty-memory.sock`.
1412    // Production daemons (those rooted at the canonical data dir) still
1413    // get the canonical socket path so the bridge can find it without
1414    // reading the discovery file.
1415    let sock_path = transport::uds::socket_path_for(&state.data_root);
1416    let listener = match transport::uds::bind_uds(&sock_path).await {
1417        Ok(l) => l,
1418        Err(e) => {
1419            tracing::warn!(
1420                "UDS bind at {} failed: {e:#}; continuing without UDS transport",
1421                sock_path.display()
1422            );
1423            return None;
1424        }
1425    };
1426    info!("UDS listener bound at {}", sock_path.display());
1427    eprintln!("UDS listener bound at {}", sock_path.display());
1428    // Best-effort: write the address discovery file so the bridge can
1429    // find the live socket even when the daemon was started with an
1430    // unusual $TMPDIR.
1431    if let Err(e) = transport::uds::write_uds_addr_file(&state.data_root, &sock_path) {
1432        tracing::warn!(
1433            "could not write {}/{}: {e:#}",
1434            state.data_root.display(),
1435            transport::uds::UDS_ADDR_FILE
1436        );
1437    }
1438    let task_state = state.clone();
1439    tokio::spawn(async move {
1440        if let Err(e) = transport::uds::run_uds(task_state, listener).await {
1441            tracing::error!("UDS accept loop exited: {e:#}");
1442        }
1443    });
1444    Some(sock_path)
1445}
1446
1447/// Convenience: bind `addr` and serve via [`run_http_on`].
1448#[cfg(feature = "axum-server")]
1449pub async fn run_http(state: AppState, addr: std::net::SocketAddr) -> Result<()> {
1450    let listener = tokio::net::TcpListener::bind(addr).await?;
1451    run_http_on(state, listener).await
1452}
1453
1454/// Convenience: bind dynamically (7070..=7079, OS fallback) and serve.
1455///
1456/// Why: `trusty-memory serve` with no `--http` flag is the canonical
1457/// launchd-managed daemon entry point. Dynamic binding lets a stale daemon
1458/// or a hand-spawned `serve --http 127.0.0.1:7070` coexist without breaking
1459/// the launchd-managed instance.
1460/// What: calls [`bind_dynamic_port`] then [`run_http_on`].
1461/// Test: integration via `trusty-memory serve` + `cat ~/.trusty-memory/http_addr`.
1462#[cfg(feature = "axum-server")]
1463pub async fn run_http_dynamic(state: AppState) -> Result<()> {
1464    let listener = bind_dynamic_port().await?;
1465    run_http_on(state, listener).await
1466}
1467
1468/// Spawn a background ticker that recomputes the `data_root` disk footprint
1469/// every 10 seconds and stores it in `state.disk_bytes` (issue #35).
1470///
1471/// Why: `GET /health` reports `disk_bytes`. Walking the data directory on
1472/// every health request would turn a frequent health poll into unbounded
1473/// recursive I/O. Computing it off the request path on a fixed cadence keeps
1474/// `/health` cheap and bounds the staleness to ~10 s — fine for an
1475/// at-a-glance footprint figure.
1476/// What: spawns a detached tokio task. `AppState` is cheap to `Clone` (all
1477/// `Arc` fields), so the task holds a full clone; the daemon process lives
1478/// for the lifetime of the server anyway, so no `Weak` downgrade is needed.
1479/// Each tick runs the blocking directory walk on `spawn_blocking` so it never
1480/// stalls the async runtime, then stores the byte total atomically.
1481/// Test: `health_endpoint_includes_resource_fields` asserts the field shape;
1482/// the ticker cadence is not unit-tested (timing-dependent).
1483#[cfg(feature = "axum-server")]
1484fn spawn_disk_size_ticker(state: AppState) {
1485    tokio::spawn(async move {
1486        let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1487        loop {
1488            interval.tick().await;
1489            let dir = state.data_root.clone();
1490            // The directory walk is blocking filesystem I/O — run it on the
1491            // blocking pool so it never parks an async worker thread.
1492            let bytes = tokio::task::spawn_blocking(move || {
1493                trusty_common::sys_metrics::dir_size_bytes(&dir)
1494            })
1495            .await
1496            .unwrap_or(0);
1497            state
1498                .disk_bytes
1499                .store(bytes, std::sync::atomic::Ordering::Relaxed);
1500        }
1501    });
1502}
1503
1504/// Interval between aggregate-status snapshot emits on the SSE bus.
1505///
1506/// Why (issue #228): mutations used to fire `StatusChanged` synchronously on
1507/// the write path, which forced an O(N palaces) sum of drawer / vector / KG
1508/// counts on every `memory_remember`. Coalescing into a fixed-cadence ticker
1509/// lets dashboards stay current (a 30 s lag is invisible at human scale)
1510/// while keeping the write path free of aggregate work.
1511/// What: 30 seconds — short enough that the operator UI doesn't feel stale
1512/// between manual writes, long enough that the recompute cost (in-memory
1513/// registry walk plus the redb `count_active_triples` per palace) is a
1514/// rounding error on the daemon's CPU budget.
1515/// Test: covered indirectly — the math has not changed, only the cadence.
1516#[allow(dead_code)]
1517const STATUS_EVENT_TICK_SECS: u64 = 30;
1518
1519/// Spawn a background ticker that emits `DaemonEvent::StatusChanged` every
1520/// [`STATUS_EVENT_TICK_SECS`] seconds (issue #228).
1521///
1522/// Why: replaces the per-write `state.emit(self.aggregate_status_event())`
1523/// call sites that used to recompute the aggregate every time a drawer was
1524/// created or deleted. Walking N palaces on every write blocks the async
1525/// runtime; coalescing the emit onto a ticker keeps dashboards up-to-date
1526/// without that cost.
1527/// What: spawns a detached tokio task that holds a full `AppState` clone
1528/// (cheap — every field is `Arc`-backed) and ticks every
1529/// [`STATUS_EVENT_TICK_SECS`] seconds. Each tick computes
1530/// `MemoryService::aggregate_status_event` (which now iterates the
1531/// in-memory registry, not disk) and broadcasts it via `state.emit`. If
1532/// no SSE subscribers are connected the broadcast `send` is a cheap no-op,
1533/// so the ticker imposes no cost when nobody is listening.
1534/// Test: not unit-tested (timing-dependent fire-and-forget); the underlying
1535/// `aggregate_status_event` math is exercised by the existing
1536/// `status_endpoint_returns_payload` path.
1537#[allow(dead_code)]
1538fn spawn_status_event_ticker(state: AppState) {
1539    tokio::spawn(async move {
1540        let mut interval =
1541            tokio::time::interval(std::time::Duration::from_secs(STATUS_EVENT_TICK_SECS));
1542        // The first tick fires immediately, which is fine: it gives SSE
1543        // subscribers a baseline `StatusChanged` shortly after they connect.
1544        loop {
1545            interval.tick().await;
1546            let event = service::MemoryService::new(state.clone()).aggregate_status_event();
1547            state.emit(event);
1548        }
1549    });
1550}
1551
1552/// Live SSE event stream — pushes `DaemonEvent` frames to dashboard clients.
1553///
1554/// Why: The dashboard subscribes once and reacts to live pushes (palace
1555/// created, drawer added/deleted, dream completed, status changed) instead of
1556/// polling `/api/v1/*` endpoints.
1557/// What: Subscribes to `state.events`, emits an initial `connected` frame,
1558/// then forwards every `DaemonEvent` as `data: <json>\n\n`. Lagged
1559/// subscribers receive a `lag` frame indicating skipped events; channel
1560/// closure ends the stream.
1561/// Test: `web::tests::sse_stream_emits_palace_created` (covers subscribe +
1562/// emit + receive); manual: `curl -N http://.../sse`.
1563#[cfg(feature = "axum-server")]
1564pub(crate) async fn sse_handler(
1565    axum::extract::State(state): axum::extract::State<AppState>,
1566) -> impl axum::response::IntoResponse {
1567    use futures::StreamExt;
1568    use tokio_stream::wrappers::BroadcastStream;
1569
1570    let rx = state.events.subscribe();
1571    let initial = futures::stream::once(async {
1572        Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(
1573            "data: {\"type\":\"connected\"}\n\n",
1574        ))
1575    });
1576    let events = BroadcastStream::new(rx).map(|res| {
1577        let frame = match res {
1578            Ok(event) => match serde_json::to_string(&event) {
1579                Ok(json) => format!("data: {json}\n\n"),
1580                Err(e) => format!("data: {{\"type\":\"error\",\"message\":\"{e}\"}}\n\n"),
1581            },
1582            Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
1583                format!("data: {{\"type\":\"lag\",\"skipped\":{n}}}\n\n")
1584            }
1585        };
1586        Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(frame))
1587    });
1588    let stream = initial.chain(events);
1589
1590    axum::response::Response::builder()
1591        .header("Content-Type", "text/event-stream")
1592        .header("Cache-Control", "no-cache")
1593        .header("X-Accel-Buffering", "no")
1594        .body(axum::body::Body::from_stream(stream))
1595        .expect("valid SSE response")
1596}
1597
1598#[cfg(test)]
1599mod tests {
1600    use super::*;
1601
1602    /// Why: Issue #234 — previously we `mem::forget`ed the `TempDir` so tests
1603    /// could keep using `AppState` without juggling the directory handle, but
1604    /// that leaked one temp directory per test (262+ accumulated each run).
1605    /// What: Returns the `TempDir` alongside the `AppState` so the caller can
1606    /// bind it (`let (state, _tmp) = ...;`) and let drop semantics clean up
1607    /// when the test scope ends.
1608    /// Test: Every test in this module that constructs state.
1609    fn test_state() -> (AppState, tempfile::TempDir) {
1610        let tmp = tempfile::tempdir().expect("tempdir");
1611        let root = tmp.path().to_path_buf();
1612        // Issue #88: bypass palace-slug enforcement so lib tests that call
1613        // `palace_create` with arbitrary names keep passing.
1614        // SAFETY: constant idempotent write; safe across test threads.
1615        unsafe {
1616            std::env::set_var("TRUSTY_SKIP_PALACE_ENFORCEMENT", "1");
1617        }
1618        (AppState::new(root), tmp)
1619    }
1620
1621    #[tokio::test]
1622    async fn initialize_returns_protocol_version_and_capabilities() {
1623        let (state, _tmp) = test_state();
1624        let req = json!({
1625            "jsonrpc": "2.0",
1626            "id": 1,
1627            "method": "initialize",
1628            "params": {
1629                "protocolVersion": "2024-11-05",
1630                "capabilities": {},
1631                "clientInfo": {"name": "test", "version": "0"}
1632            }
1633        });
1634        let resp = handle_message(&state, req).await;
1635        assert_eq!(resp["jsonrpc"], "2.0");
1636        assert_eq!(resp["id"], 1);
1637        assert_eq!(resp["result"]["protocolVersion"], "2024-11-05");
1638        assert!(resp["result"]["capabilities"]["tools"].is_object());
1639        assert_eq!(resp["result"]["serverInfo"]["name"], "trusty-memory");
1640    }
1641
1642    #[tokio::test]
1643    async fn initialized_notification_returns_null() {
1644        let (state, _tmp) = test_state();
1645        let req = json!({
1646            "jsonrpc": "2.0",
1647            "method": "notifications/initialized",
1648            "params": {}
1649        });
1650        let resp = handle_message(&state, req).await;
1651        assert!(resp.is_null());
1652    }
1653
1654    #[tokio::test]
1655    async fn tools_list_returns_all_tools() {
1656        let (state, _tmp) = test_state();
1657        let req = json!({"jsonrpc": "2.0", "id": 2, "method": "tools/list"});
1658        let resp = handle_message(&state, req).await;
1659        let tools = resp["result"]["tools"].as_array().expect("tools array");
1660        // Issue #99 added `memory_send_message`; issue #180 added
1661        // `palace_delete`; the #180 follow-up adds `palace_update` on top
1662        // of the 22-tool baseline.
1663        assert_eq!(tools.len(), 23);
1664    }
1665
1666    #[tokio::test]
1667    async fn unknown_method_returns_error() {
1668        let (state, _tmp) = test_state();
1669        let req = json!({"jsonrpc": "2.0", "id": 4, "method": "wat"});
1670        let resp = handle_message(&state, req).await;
1671        assert_eq!(resp["error"]["code"], -32601);
1672    }
1673
1674    #[tokio::test]
1675    async fn ping_returns_empty_result() {
1676        let (state, _tmp) = test_state();
1677        let req = json!({"jsonrpc": "2.0", "id": 5, "method": "ping"});
1678        let resp = handle_message(&state, req).await;
1679        assert!(resp["result"].is_object());
1680    }
1681
1682    #[tokio::test]
1683    async fn app_state_default_constructs() {
1684        let (s, _tmp) = test_state();
1685        assert!(!s.version.is_empty());
1686        assert!(s.registry.is_empty());
1687        assert!(s.default_palace.is_none());
1688    }
1689
1690    /// Why (issue #225): the previous implementation called `.expect()` on the
1691    /// tempdir fallback, which panicked the daemon at startup on hosts where
1692    /// neither the data root nor `std::env::temp_dir()` is writable
1693    /// (read-only Docker overlays, locked-down sandboxes). The activity log
1694    /// is documented as best-effort, so the fix returns a no-op `Discard`
1695    /// variant instead. This test forces both paths to fail and asserts the
1696    /// helper returns the discard variant rather than panicking.
1697    ///
1698    /// Skipped when running as root because `chmod 000` is a no-op for the
1699    /// root user — the kernel grants root access regardless of mode bits.
1700    /// CI typically runs as non-root, so coverage is preserved in the
1701    /// common case; local root invocations simply skip with a warning.
1702    #[test]
1703    #[cfg(unix)]
1704    fn open_activity_log_with_fallback_returns_discard_when_unwritable() {
1705        // Skip when running as root — chmod is ignored.
1706        // SAFETY: libc::geteuid is a thread-safe syscall with no preconditions.
1707        if unsafe { libc::geteuid() } == 0 {
1708            eprintln!(
1709                "skipping open_activity_log_with_fallback_returns_discard_when_unwritable: running as root"
1710            );
1711            return;
1712        }
1713
1714        use std::os::unix::fs::PermissionsExt;
1715
1716        // Build two unwritable directories: the primary "data root" and a
1717        // shadow "TMPDIR" so the tempdir fallback also fails.
1718        let outer = tempfile::tempdir().expect("outer tempdir");
1719        let primary = outer.path().join("primary");
1720        let tmpdir = outer.path().join("fake-tmp");
1721        std::fs::create_dir(&primary).expect("create primary");
1722        std::fs::create_dir(&tmpdir).expect("create tmpdir");
1723
1724        // chmod 000 on both — neither can be opened for write.
1725        std::fs::set_permissions(&primary, std::fs::Permissions::from_mode(0o000))
1726            .expect("chmod primary");
1727        std::fs::set_permissions(&tmpdir, std::fs::Permissions::from_mode(0o000))
1728            .expect("chmod tmpdir");
1729
1730        // Override the tempdir lookup so `open_activity_log_with_fallback`
1731        // hits our unwritable fake-tmp instead of the real system temp.
1732        // Note: env var mutation is process-global; this test is the only
1733        // accessor for `TMPDIR` in this test binary, and we restore the
1734        // previous value before returning.
1735        let prev_tmpdir = std::env::var_os("TMPDIR");
1736        std::env::set_var("TMPDIR", &tmpdir);
1737
1738        let log = open_activity_log_with_fallback(&primary);
1739
1740        // Restore TMPDIR ASAP so a panic later in the test doesn't leak it.
1741        match prev_tmpdir {
1742            Some(v) => std::env::set_var("TMPDIR", v),
1743            None => std::env::remove_var("TMPDIR"),
1744        }
1745
1746        // Restore permissions so the outer tempdir can clean up.
1747        let _ = std::fs::set_permissions(&primary, std::fs::Permissions::from_mode(0o700));
1748        let _ = std::fs::set_permissions(&tmpdir, std::fs::Permissions::from_mode(0o700));
1749
1750        assert!(
1751            log.is_discard(),
1752            "expected ActivityLog::Discard when both data root and tempdir are unwritable"
1753        );
1754
1755        // The Discard variant must still satisfy the public contract: no
1756        // panic on append/count/list.
1757        let id = log
1758            .append(
1759                ActivitySource::Http,
1760                None,
1761                "drawer_added",
1762                json!({"smoke": true}),
1763            )
1764            .expect("discard append must succeed");
1765        assert_eq!(id, 0);
1766        assert_eq!(log.count().expect("discard count"), 0);
1767        assert!(log
1768            .list(&ActivityFilter::default(), 10, 0)
1769            .expect("discard list")
1770            .is_empty());
1771    }
1772
1773    /// Why: Issue #26 — when `serve --palace <name>` is set, the MCP server
1774    /// must (a) report the default in the `initialize` `serverInfo`, (b)
1775    /// drop `palace` from the required schema in `tools/list`, and (c) let
1776    /// `tools/call` use the default when the caller omits `palace`.
1777    /// Test: Construct an AppState with a default palace, create that palace
1778    /// on disk via the registry, then call `memory_remember` without a
1779    /// `palace` argument and confirm it resolves to the default.
1780    #[tokio::test]
1781    async fn default_palace_used_when_arg_omitted() {
1782        let tmp = tempfile::tempdir().expect("tempdir");
1783        let root = tmp.path().to_path_buf();
1784
1785        // Pre-create the default palace so remember has somewhere to land.
1786        let registry = trusty_common::memory_core::PalaceRegistry::new();
1787        let palace = trusty_common::memory_core::Palace {
1788            id: trusty_common::memory_core::PalaceId::new("default-pal"),
1789            name: "default-pal".to_string(),
1790            description: None,
1791            created_at: chrono::Utc::now(),
1792            data_dir: root.join("default-pal"),
1793        };
1794        registry
1795            .create_palace(&root, palace)
1796            .expect("create_palace");
1797
1798        let state = AppState::new(root).with_default_palace(Some("default-pal".to_string()));
1799
1800        // (a) initialize advertises the default.
1801        let init = handle_message(
1802            &state,
1803            json!({"jsonrpc": "2.0", "id": 1, "method": "initialize"}),
1804        )
1805        .await;
1806        assert_eq!(
1807            init["result"]["serverInfo"]["default_palace"], "default-pal",
1808            "initialize must echo default_palace in serverInfo"
1809        );
1810
1811        // (b) tools/list drops `palace` from required when default is set.
1812        let list = handle_message(
1813            &state,
1814            json!({"jsonrpc": "2.0", "id": 2, "method": "tools/list"}),
1815        )
1816        .await;
1817        let tools = list["result"]["tools"].as_array().expect("tools array");
1818        let remember = tools
1819            .iter()
1820            .find(|t| t["name"] == "memory_remember")
1821            .expect("memory_remember tool");
1822        let required: Vec<&str> = remember["inputSchema"]["required"]
1823            .as_array()
1824            .expect("required array")
1825            .iter()
1826            .filter_map(|v| v.as_str())
1827            .collect();
1828        assert!(
1829            !required.contains(&"palace"),
1830            "palace must not be required when default is configured; got {required:?}"
1831        );
1832        assert!(required.contains(&"text"));
1833
1834        // (c) tools/call resolves the default when arg is omitted.
1835        let call = handle_message(
1836            &state,
1837            json!({
1838                "jsonrpc": "2.0",
1839                "id": 3,
1840                "method": "tools/call",
1841                "params": {
1842                    "name": "memory_remember",
1843                    "arguments": {"text": "default palace test memory content with several tokens"},
1844                },
1845            }),
1846        )
1847        .await;
1848        // Successful dispatch returns `result.content[0].text` JSON.
1849        let text = call["result"]["content"][0]["text"]
1850            .as_str()
1851            .unwrap_or_else(|| panic!("expected success result, got {call}"));
1852        let parsed: Value = serde_json::from_str(text).expect("parse content json");
1853        assert_eq!(parsed["palace"], "default-pal");
1854        assert_eq!(parsed["status"], "stored");
1855        assert!(parsed["drawer_id"].as_str().is_some());
1856    }
1857
1858    /// Why: When no default is set, `tools/call` for a palace-bound tool
1859    /// without a `palace` argument should error helpfully rather than panic.
1860    #[tokio::test]
1861    async fn missing_palace_without_default_errors() {
1862        let (state, _tmp) = test_state();
1863        let resp = handle_message(
1864            &state,
1865            json!({
1866                "jsonrpc": "2.0",
1867                "id": 7,
1868                "method": "tools/call",
1869                "params": {
1870                    "name": "memory_recall",
1871                    "arguments": {"query": "anything"},
1872                },
1873            }),
1874        )
1875        .await;
1876        assert_eq!(resp["error"]["code"], -32603);
1877        let msg = resp["error"]["message"].as_str().unwrap_or("");
1878        assert!(
1879            msg.contains("missing 'palace'"),
1880            "expected helpful error, got: {msg}"
1881        );
1882    }
1883
1884    /// Why: regression for the "palaces lost on restart" bug — `AppState::new`
1885    /// builds an empty registry, so the daemon must call
1886    /// `load_palaces_from_disk` on startup to re-register palaces persisted by
1887    /// a previous run. Without that call the registry stays empty even though
1888    /// `palace.json` files exist on disk.
1889    /// What: persists two palaces under a tempdir (via the same
1890    /// `create_palace` path the `palace_create` tool uses), constructs a fresh
1891    /// `AppState` rooted there, calls `load_palaces_from_disk`, and asserts the
1892    /// returned count and registry contents.
1893    /// Test: this test itself.
1894    #[tokio::test]
1895    async fn load_palaces_from_disk_rehydrates_registry() {
1896        use trusty_common::memory_core::{Palace, PalaceId, PalaceRegistry};
1897
1898        let tmp = tempfile::tempdir().expect("tempdir");
1899        let root = tmp.path().to_path_buf();
1900
1901        // Phase 1: persist two palaces to disk, then drop the writer registry
1902        // so nothing is held in memory — simulating a prior daemon run.
1903        {
1904            let writer = PalaceRegistry::new();
1905            for id in ["alpha", "beta"] {
1906                let palace = Palace {
1907                    id: PalaceId::new(id),
1908                    name: id.to_string(),
1909                    description: None,
1910                    created_at: chrono::Utc::now(),
1911                    data_dir: root.join(id),
1912                };
1913                writer
1914                    .create_palace(&root, palace)
1915                    .expect("persist palace to disk");
1916            }
1917        }
1918
1919        // Add a stray non-palace subdirectory; the walker must ignore it.
1920        std::fs::create_dir_all(root.join("not-a-palace")).expect("mkdir");
1921
1922        // Phase 2: fresh AppState starts with an empty registry (the bug).
1923        let state = AppState::new(root);
1924        assert!(
1925            state.registry.is_empty(),
1926            "AppState::new must start with an empty registry"
1927        );
1928
1929        // The fix: hydrate from disk.
1930        let count = state
1931            .load_palaces_from_disk()
1932            .await
1933            .expect("load_palaces_from_disk");
1934
1935        assert_eq!(count, 2, "both persisted palaces should be loaded");
1936        assert_eq!(state.registry.len(), 2, "registry should hold both palaces");
1937        let ids: Vec<String> = state.registry.list().into_iter().map(|p| p.0).collect();
1938        assert!(ids.contains(&"alpha".to_string()));
1939        assert!(ids.contains(&"beta".to_string()));
1940    }
1941
1942    /// Why: existing installs (and the legacy standalone `trusty-memory` repo)
1943    /// nest palaces one level deeper under a `palaces/` subdirectory. When that
1944    /// subdirectory exists, `resolve_palace_registry_dir` must descend into it
1945    /// so the daemon scans the level that actually holds the `palace.json`
1946    /// files — otherwise it finds zero palaces, which is the restart bug.
1947    /// What: creates `<dir>/palaces/`, resolves, and asserts the nested path is
1948    /// returned.
1949    /// Test: this test itself.
1950    #[test]
1951    fn resolve_palace_registry_dir_prefers_palaces_subdir() {
1952        let tmp = tempfile::tempdir().expect("tempdir");
1953        let data_dir = tmp.path().to_path_buf();
1954        std::fs::create_dir_all(data_dir.join("palaces")).expect("mkdir palaces");
1955
1956        let resolved = resolve_palace_registry_dir(data_dir.clone());
1957        assert_eq!(resolved, data_dir.join("palaces"));
1958    }
1959
1960    /// Why: a fresh install with no `palaces/` subdirectory must fall back to
1961    /// the data dir itself (the current flat monorepo layout).
1962    #[test]
1963    fn resolve_palace_registry_dir_falls_back_to_data_dir() {
1964        let tmp = tempfile::tempdir().expect("tempdir");
1965        let data_dir = tmp.path().to_path_buf();
1966
1967        let resolved = resolve_palace_registry_dir(data_dir.clone());
1968        assert_eq!(resolved, data_dir);
1969    }
1970
1971    /// Why: end-to-end check that the nested-`palaces/` layout hydrates — the
1972    /// daemon resolves the registry dir via `resolve_palace_registry_dir`, so
1973    /// an `AppState` rooted there must load palaces persisted one level below
1974    /// the bare data dir.
1975    /// What: persists two palaces under `<root>/palaces/<id>/`, constructs an
1976    /// `AppState` rooted at the resolved registry dir, and asserts hydration
1977    /// finds both.
1978    /// Test: this test itself.
1979    #[tokio::test]
1980    async fn load_palaces_from_disk_handles_palaces_subdir() {
1981        use trusty_common::memory_core::{Palace, PalaceId, PalaceRegistry};
1982
1983        let tmp = tempfile::tempdir().expect("tempdir");
1984        let root = tmp.path().to_path_buf();
1985        let nested = root.join("palaces");
1986
1987        {
1988            let writer = PalaceRegistry::new();
1989            for id in ["cto", "engineering"] {
1990                let palace = Palace {
1991                    id: PalaceId::new(id),
1992                    name: id.to_string(),
1993                    description: None,
1994                    created_at: chrono::Utc::now(),
1995                    data_dir: nested.join(id),
1996                };
1997                // create_palace anchors data_dir under the passed root, so
1998                // pass `nested` here to land palaces under `<root>/palaces/`.
1999                writer
2000                    .create_palace(&nested, palace)
2001                    .expect("persist palace under palaces/ subdir");
2002            }
2003        }
2004
2005        // Mirror main.rs: resolve the registry dir, then root AppState there.
2006        let registry_dir = resolve_palace_registry_dir(root);
2007        assert_eq!(registry_dir, nested, "must resolve into palaces/ subdir");
2008        let state = AppState::new(registry_dir);
2009        let count = state
2010            .load_palaces_from_disk()
2011            .await
2012            .expect("load_palaces_from_disk");
2013
2014        assert_eq!(count, 2, "both nested palaces should be loaded");
2015        assert_eq!(state.registry.len(), 2);
2016        let ids: Vec<String> = state.registry.list().into_iter().map(|p| p.0).collect();
2017        assert!(ids.contains(&"cto".to_string()));
2018        assert!(ids.contains(&"engineering".to_string()));
2019    }
2020
2021    /// Why: an empty (or missing) palace registry directory must not error — a
2022    /// brand-new install has nothing to hydrate and should report zero.
2023    #[tokio::test]
2024    async fn load_palaces_from_disk_empty_root_returns_zero() {
2025        let (state, _tmp) = test_state();
2026        let count = state
2027            .load_palaces_from_disk()
2028            .await
2029            .expect("load_palaces_from_disk on empty root");
2030        assert_eq!(count, 0);
2031        assert!(state.registry.is_empty());
2032    }
2033
2034    /// Why (issue #228): hydration must seed `state.palace_names` so the
2035    /// MCP write hot path (`memory_remember` / `memory_note`) can resolve a
2036    /// friendly palace name without re-walking the data root on every call.
2037    /// Regression risk: a future refactor that forgets to populate the cache
2038    /// would silently degrade write latency.
2039    /// What: persists two palaces with distinct `name` values, constructs a
2040    /// fresh `AppState`, hydrates from disk, and asserts the cache holds the
2041    /// expected mappings.
2042    /// Test: this test itself.
2043    #[tokio::test]
2044    async fn palace_name_cache_populated_after_hydration() {
2045        use trusty_common::memory_core::{Palace, PalaceId, PalaceRegistry};
2046
2047        let tmp = tempfile::tempdir().expect("tempdir");
2048        let root = tmp.path().to_path_buf();
2049        {
2050            let writer = PalaceRegistry::new();
2051            for (id, name) in [("alpha", "Alpha Project"), ("beta", "Beta Project")] {
2052                let palace = Palace {
2053                    id: PalaceId::new(id),
2054                    name: name.to_string(),
2055                    description: None,
2056                    created_at: chrono::Utc::now(),
2057                    data_dir: root.join(id),
2058                };
2059                writer.create_palace(&root, palace).expect("persist palace");
2060            }
2061        }
2062
2063        let state = AppState::new(root);
2064        assert!(
2065            state.palace_names.is_empty(),
2066            "fresh AppState must start with an empty name cache"
2067        );
2068        state
2069            .load_palaces_from_disk()
2070            .await
2071            .expect("load_palaces_from_disk");
2072
2073        assert_eq!(state.palace_names.len(), 2, "cache must hold both palaces");
2074        assert_eq!(
2075            state.palace_names.get("alpha").map(|e| e.value().clone()),
2076            Some("Alpha Project".to_string()),
2077        );
2078        assert_eq!(
2079            state.palace_names.get("beta").map(|e| e.value().clone()),
2080            Some("Beta Project".to_string()),
2081        );
2082    }
2083
2084    /// Why (issue #228): `palace_create` (MCP tool) and `MemoryService::create_palace`
2085    /// (HTTP path) both insert into the name cache so a freshly-created palace
2086    /// is resolvable on the very next write — without waiting for the next
2087    /// hydration cycle.
2088    /// What: dispatches the `palace_create` MCP tool against a tempdir and
2089    /// asserts the cache row was written.
2090    /// Test: this test itself.
2091    #[tokio::test]
2092    async fn palace_name_cache_updates_on_create() {
2093        use serde_json::json;
2094
2095        let (state, _tmp) = test_state();
2096        let _ = tools::dispatch_tool(&state, "palace_create", json!({"name": "gamma"}))
2097            .await
2098            .expect("palace_create");
2099        assert_eq!(
2100            state.palace_names.get("gamma").map(|e| e.value().clone()),
2101            Some("gamma".to_string()),
2102            "palace_create must populate the in-memory name cache so writes \
2103             can resolve the friendly name without a disk walk"
2104        );
2105    }
2106
2107    /// Why: initialize without a default palace must omit `default_palace`
2108    /// from `serverInfo` so clients can detect the unbound mode.
2109    #[tokio::test]
2110    async fn initialize_without_default_palace_omits_field() {
2111        let (state, _tmp) = test_state();
2112        let init = handle_message(
2113            &state,
2114            json!({"jsonrpc": "2.0", "id": 1, "method": "initialize"}),
2115        )
2116        .await;
2117        assert!(init["result"]["serverInfo"]["default_palace"].is_null());
2118    }
2119
2120    /// Why: every `~/.trusty-memory/http_addr` consumer (CLI, dashboard,
2121    /// future trusty-mpm wiring) must agree on the path. A regression that
2122    /// moves this file breaks every client relying on `read_daemon_addr`.
2123    /// What: under a stubbed data dir, the path ends in
2124    /// `trusty-memory/http_addr` — matching `trusty_common::read_daemon_addr`'s
2125    /// expected location.
2126    #[tokio::test]
2127    async fn http_addr_path_uses_resolve_data_dir() {
2128        // Hold the env_test_lock so this test does not race with
2129        // `prompt_context::tests::*` which spin a real daemon under
2130        // the same env override and would otherwise observe a
2131        // half-mutated $TRUSTY_DATA_DIR_OVERRIDE.
2132        let _guard = crate::commands::env_test_lock().lock().await;
2133        let tmp = tempfile::tempdir().unwrap();
2134        // SAFETY: test-only env mutation serialised by env_test_lock.
2135        unsafe {
2136            std::env::set_var(trusty_common::DATA_DIR_OVERRIDE_ENV, tmp.path());
2137        }
2138        let result = http_addr_path();
2139        unsafe {
2140            std::env::remove_var(trusty_common::DATA_DIR_OVERRIDE_ENV);
2141        }
2142        let p = result.expect("http_addr_path must return Some when data dir is resolvable");
2143        assert!(
2144            p.ends_with("trusty-memory/http_addr"),
2145            "unexpected http_addr path: {}",
2146            p.display()
2147        );
2148    }
2149
2150    /// Why: write+read round-trip pins the disk format: a single line of
2151    /// `host:port\n`. Clients (cat, sh `$(cat ...)`) trim whitespace, so the
2152    /// trailing newline is invisible — but anything else (extra whitespace,
2153    /// multi-line) would break callers.
2154    /// Note (issue #226): `write_http_addr_file` is part of the HTTP-serving
2155    /// surface gated behind `axum-server`; the test follows the same gate.
2156    #[cfg(feature = "axum-server")]
2157    #[test]
2158    fn http_addr_file_round_trip_via_helpers() {
2159        let dir = tempfile::tempdir().unwrap();
2160        let path = dir.path().join("http_addr");
2161        let addr: SocketAddr = "127.0.0.1:7073".parse().unwrap();
2162        write_http_addr_file(&path, &addr).unwrap();
2163        let raw = std::fs::read_to_string(&path).unwrap();
2164        assert_eq!(raw.trim(), "127.0.0.1:7073");
2165        // The trailing newline keeps `cat` and editors happy.
2166        assert!(raw.ends_with('\n'));
2167    }
2168
2169    /// Why: dynamic binding must succeed even when the preferred port is
2170    /// already in use. Walking 7070..=7079 + OS fallback guarantees the
2171    /// daemon never fails to come up just because another process holds 7070.
2172    /// What: pre-bind 7070 (best-effort — skip the test if it's already
2173    /// busy on the host), then call `bind_dynamic_port` and assert we got
2174    /// *some* listener back.
2175    #[tokio::test]
2176    async fn bind_dynamic_port_returns_listener() {
2177        let listener = bind_dynamic_port().await.expect("bind_dynamic_port");
2178        let addr = listener.local_addr().expect("local_addr");
2179        assert_eq!(addr.ip().to_string(), "127.0.0.1");
2180        assert!(addr.port() > 0, "port must be non-zero after bind");
2181    }
2182
2183    /// Why: Issue #42 — prompt-facts are now served by the per-message
2184    /// `get_prompt_context` tool rather than the MCP prompts surface, so the
2185    /// `initialize` handshake must NOT advertise a `prompts` capability and
2186    /// `prompts/list` / `prompts/get` must fall through to the "method not
2187    /// found" path.
2188    #[tokio::test]
2189    async fn initialize_does_not_advertise_prompts_capability() {
2190        let (state, _tmp) = test_state();
2191        let init = handle_message(
2192            &state,
2193            json!({"jsonrpc": "2.0", "id": 1, "method": "initialize"}),
2194        )
2195        .await;
2196        assert!(
2197            init["result"]["capabilities"]["prompts"].is_null(),
2198            "initialize must NOT advertise the prompts capability; got {init}"
2199        );
2200
2201        // Both prompts/* dispatchers should now report method-not-found.
2202        for method in ["prompts/list", "prompts/get"] {
2203            let resp =
2204                handle_message(&state, json!({"jsonrpc": "2.0", "id": 2, "method": method})).await;
2205            assert_eq!(
2206                resp["error"]["code"], -32601,
2207                "{method} should return method-not-found; got {resp}"
2208            );
2209        }
2210    }
2211
2212    /// Why: `AppState::new` must initialise `bound_addr` to an empty
2213    /// `OnceLock` so `/health` reports `addr: None` on the stdio path. A
2214    /// regression that pre-populates this field would advertise a bogus
2215    /// address from a stale clone.
2216    ///
2217    /// Note (issue #231): now async so it runs inside a Tokio runtime —
2218    /// `AppState::new` spawns the bounded BM25 index worker via
2219    /// `tokio::spawn`, which requires an active runtime.
2220    #[tokio::test]
2221    async fn app_state_starts_with_empty_bound_addr() {
2222        let (state, _tmp) = test_state();
2223        assert!(state.bound_addr.get().is_none());
2224    }
2225
2226    /// Why (issue #96): `DaemonEvent::type_str` underpins the persisted
2227    /// activity log's `event_type` column — every variant must map to the
2228    /// exact SSE `type` tag the UI already handles. A drift between the
2229    /// SSE wire format and the stored type would break the feed's icon /
2230    /// label rendering for historical rows.
2231    /// What: constructs one of each variant, serialises via serde, and
2232    /// confirms `type_str()` matches the JSON `type` field.
2233    /// Test: this test.
2234    #[test]
2235    fn daemon_event_type_str_matches_sse_tag() {
2236        let cases = [
2237            DaemonEvent::PalaceCreated {
2238                id: "p".into(),
2239                name: "p".into(),
2240                source: ActivitySource::Http,
2241            },
2242            DaemonEvent::DrawerAdded {
2243                palace_id: "p".into(),
2244                palace_name: "p".into(),
2245                drawer_count: 1,
2246                timestamp: chrono::Utc::now(),
2247                content_preview: String::new(),
2248                source: ActivitySource::Mcp,
2249            },
2250            DaemonEvent::DrawerDeleted {
2251                palace_id: "p".into(),
2252                drawer_count: 0,
2253                source: ActivitySource::Http,
2254            },
2255            DaemonEvent::DreamCompleted {
2256                palace_id: None,
2257                merged: 0,
2258                pruned: 0,
2259                compacted: 0,
2260                closets_updated: 0,
2261                duration_ms: 0,
2262                source: ActivitySource::Http,
2263            },
2264            DaemonEvent::StatusChanged {
2265                total_drawers: 0,
2266                total_vectors: 0,
2267                total_kg_triples: 0,
2268            },
2269            DaemonEvent::HookFired {
2270                palace_id: Some("p".into()),
2271                palace_name: Some("p".into()),
2272                hook_type: HookType::UserPromptSubmit,
2273                injection_kind: InjectionKind::PromptContext,
2274                injection_length: 12,
2275                trigger_prompt_excerpt: "hello".into(),
2276                timestamp: chrono::Utc::now(),
2277                duration_ms: 5,
2278                source: ActivitySource::Hook,
2279            },
2280        ];
2281        for ev in &cases {
2282            let json = serde_json::to_value(ev).unwrap();
2283            assert_eq!(json["type"].as_str(), Some(ev.type_str()));
2284        }
2285    }
2286
2287    /// Why: `HookType` is serialised on every `HookFired` activity row; its
2288    /// wire format must round-trip cleanly so dashboard / TUI consumers can
2289    /// safely parse historic entries written by an older daemon build.
2290    /// What: serde-encodes each variant, asserts the JSON matches the
2291    /// expected PascalCase label, then decodes back.
2292    /// Test: itself.
2293    #[test]
2294    fn hook_type_serde_round_trips() {
2295        let cases = [
2296            (HookType::UserPromptSubmit, "\"UserPromptSubmit\""),
2297            (HookType::SessionStart, "\"SessionStart\""),
2298        ];
2299        for (ht, expected) in cases {
2300            let s = serde_json::to_string(&ht).unwrap();
2301            assert_eq!(s, expected, "{ht:?} should serialise to {expected}");
2302            let back: HookType = serde_json::from_str(&s).unwrap();
2303            assert_eq!(back, ht);
2304            assert_eq!(ht.as_str(), expected.trim_matches('"'));
2305        }
2306    }
2307
2308    /// Why: same as `hook_type_serde_round_trips` but for `InjectionKind`.
2309    /// What: kebab-case round trip on every variant.
2310    /// Test: itself.
2311    #[test]
2312    fn injection_kind_serde_round_trips() {
2313        let cases = [
2314            (InjectionKind::PromptContext, "\"prompt-context\""),
2315            (InjectionKind::InboxCheck, "\"inbox-check\""),
2316        ];
2317        for (ik, expected) in cases {
2318            let s = serde_json::to_string(&ik).unwrap();
2319            assert_eq!(s, expected);
2320            let back: InjectionKind = serde_json::from_str(&s).unwrap();
2321            assert_eq!(back, ik);
2322            assert_eq!(ik.as_str(), expected.trim_matches('"'));
2323        }
2324    }
2325
2326    /// Why: the activity feed renders the trigger prompt excerpt directly;
2327    /// runaway prompts must be capped at [`HOOK_PROMPT_EXCERPT_CHARS`] with
2328    /// a `…` marker so the row stays readable.
2329    /// What: feeds a 200-character prompt and asserts the excerpt is
2330    /// bounded.
2331    /// Test: itself.
2332    #[test]
2333    fn hook_excerpt_truncates_long_prompts() {
2334        let long = "x".repeat(200);
2335        let excerpt = hook_prompt_excerpt(&long);
2336        assert!(excerpt.chars().count() <= HOOK_PROMPT_EXCERPT_CHARS);
2337        assert!(excerpt.ends_with('…'));
2338        assert_eq!(hook_prompt_excerpt(""), "");
2339    }
2340
2341    /// Why: multi-line prompts must collapse to a single line so the
2342    /// activity feed row doesn't blow out vertically.
2343    /// What: feeds a multi-line whitespace-heavy prompt and asserts the
2344    /// output is a single-spaced single line.
2345    /// Test: itself.
2346    #[test]
2347    fn hook_excerpt_collapses_whitespace() {
2348        let input = "hello\n\nworld\t\tfoo";
2349        let excerpt = hook_prompt_excerpt(input);
2350        assert_eq!(excerpt, "hello world foo");
2351    }
2352
2353    /// Why (issue #96): `palace_id()` and `source()` feed the persisted
2354    /// activity log's columns; they must extract the right field per
2355    /// variant. Sloppy refactors could swap two fields and the log would
2356    /// silently mis-attribute writes.
2357    /// What: builds each variant with known field values and asserts the
2358    /// extractor returns them.
2359    /// Test: this test.
2360    #[test]
2361    fn daemon_event_palace_id_and_source_extraction() {
2362        let ev = DaemonEvent::DrawerAdded {
2363            palace_id: "alpha".into(),
2364            palace_name: "alpha".into(),
2365            drawer_count: 1,
2366            timestamp: chrono::Utc::now(),
2367            content_preview: String::new(),
2368            source: ActivitySource::Mcp,
2369        };
2370        assert_eq!(ev.palace_id(), Some("alpha"));
2371        assert_eq!(ev.source(), Some(ActivitySource::Mcp));
2372
2373        let status = DaemonEvent::StatusChanged {
2374            total_drawers: 1,
2375            total_vectors: 2,
2376            total_kg_triples: 3,
2377        };
2378        assert_eq!(status.palace_id(), None);
2379        assert_eq!(status.source(), None);
2380
2381        let dream = DaemonEvent::DreamCompleted {
2382            palace_id: Some("p1".into()),
2383            merged: 0,
2384            pruned: 0,
2385            compacted: 0,
2386            closets_updated: 0,
2387            duration_ms: 10,
2388            source: ActivitySource::Http,
2389        };
2390        assert_eq!(dream.palace_id(), Some("p1"));
2391        assert_eq!(dream.source(), Some(ActivitySource::Http));
2392    }
2393
2394    /// Why (issue #96): `AppState::emit` must persist mutation events to
2395    /// the activity log while keeping `StatusChanged` (a recomputed
2396    /// aggregate, not a mutation) out of the persisted history.
2397    /// What: emits one of each variant under a fresh state and asserts
2398    /// the persisted count matches the number of mutation events.
2399    /// Test: this test.
2400    #[tokio::test]
2401    async fn emit_persists_mutations_but_skips_status_changed() {
2402        let (state, _tmp) = test_state();
2403        state.emit(DaemonEvent::PalaceCreated {
2404            id: "p".into(),
2405            name: "p".into(),
2406            source: ActivitySource::Http,
2407        });
2408        state.emit(DaemonEvent::StatusChanged {
2409            total_drawers: 1,
2410            total_vectors: 0,
2411            total_kg_triples: 0,
2412        });
2413        state.emit(DaemonEvent::DrawerAdded {
2414            palace_id: "p".into(),
2415            palace_name: "p".into(),
2416            drawer_count: 1,
2417            timestamp: chrono::Utc::now(),
2418            content_preview: "x".into(),
2419            source: ActivitySource::Mcp,
2420        });
2421        // Issue #232: `emit` now offloads the redb write to `spawn_blocking`,
2422        // so the test must wait for the background pool to drain before
2423        // asserting on the persisted count.
2424        state.flush_activity_writes().await;
2425        let count = state.activity_log.count().unwrap();
2426        assert_eq!(count, 2, "only PalaceCreated + DrawerAdded must persist");
2427    }
2428
2429    /// Why (issue #156): the BM25 lane must be opt-in — existing deployments
2430    /// that don't set `TRUSTY_BM25_DAEMON=1` must see `bm25_client = None`
2431    /// and the recall hot path must continue to behave exactly as before.
2432    /// What: builds an `AppState` with `with_bm25_client_from_env()` while
2433    /// the env var is unset; asserts the field stays `None`.
2434    /// Test: this test.
2435    #[tokio::test]
2436    async fn bm25_client_disabled_by_default() {
2437        // Serialise with the sibling `bm25_client_enabled_when_env_set` test
2438        // so they don't race on the shared `TRUSTY_BM25_DAEMON` env var.
2439        let _guard = crate::commands::env_test_lock().lock().await;
2440        // SAFETY: this test exercises std::env::remove_var which is unsafe
2441        // in 2024 edition because the global env is shared. We restore the
2442        // pre-test value at the end so neighbours are unaffected.
2443        let prev = std::env::var("TRUSTY_BM25_DAEMON").ok();
2444        unsafe {
2445            std::env::remove_var("TRUSTY_BM25_DAEMON");
2446        }
2447        let (state, _tmp) = test_state();
2448        let state = state.with_bm25_client_from_env();
2449        assert!(
2450            state.bm25_client.is_none(),
2451            "bm25_client must be None when TRUSTY_BM25_DAEMON is unset"
2452        );
2453        // Issue #193: the spawn supervisor is bound to the same env gate as
2454        // the client — opt-out parity matters so we never accidentally
2455        // spawn daemons in deployments that explicitly didn't opt in.
2456        assert!(
2457            state.bm25_supervisor.is_none(),
2458            "bm25_supervisor must be None when TRUSTY_BM25_DAEMON is unset"
2459        );
2460        if let Some(v) = prev {
2461            unsafe {
2462                std::env::set_var("TRUSTY_BM25_DAEMON", v);
2463            }
2464        }
2465    }
2466
2467    /// Why (issue #156): when the operator opts in via `TRUSTY_BM25_DAEMON=1`,
2468    /// the builder must construct a real `Bm25Client` pointed at the canonical
2469    /// per-palace socket path. We don't connect — no daemon need be running —
2470    /// we only assert the client field is populated.
2471    /// What: sets the env var, runs the builder, asserts `Some(_)`.
2472    /// Test: this test.
2473    #[tokio::test]
2474    async fn bm25_client_enabled_when_env_set() {
2475        let _guard = crate::commands::env_test_lock().lock().await;
2476        let prev = std::env::var("TRUSTY_BM25_DAEMON").ok();
2477        unsafe {
2478            std::env::set_var("TRUSTY_BM25_DAEMON", "1");
2479        }
2480        let (state, _tmp) = test_state();
2481        let state = state.with_bm25_client_from_env();
2482        assert!(
2483            state.bm25_client.is_some(),
2484            "bm25_client must be Some when TRUSTY_BM25_DAEMON=1"
2485        );
2486        // Issue #193: opting in to the client must also install the spawn
2487        // supervisor so the daemon is auto-started on first use.
2488        assert!(
2489            state.bm25_supervisor.is_some(),
2490            "bm25_supervisor must be Some when TRUSTY_BM25_DAEMON=1"
2491        );
2492        match prev {
2493            Some(v) => unsafe { std::env::set_var("TRUSTY_BM25_DAEMON", v) },
2494            None => unsafe { std::env::remove_var("TRUSTY_BM25_DAEMON") },
2495        }
2496    }
2497}