trusty_memory/lib.rs
1//! MCP server (HTTP/SSE + UDS) for trusty-memory.
2//!
3//! Why: Claude Code and other MCP-aware clients integrate with trusty-memory
4//! through the standardized Model Context Protocol; we expose memory + KG
5//! tools so they can be called by name. Claude Code itself speaks stdio,
6//! but the in-process `serve --stdio` path was removed in issue #150
7//! because it deadlocked on the redb exclusive write lock whenever a
8//! long-lived daemon was already running — the canonical stdio integration
9//! is now the `trusty-memory-mcp-bridge` binary (PR #149), which pipes
10//! Claude Code's stdio over a Unix domain socket to the daemon.
11//! What: Provides `run_http` / `run_http_dynamic` / `run_http_on` (axum
12//! HTTP/SSE + REST + UI) and the `transport::uds` module (Unix-domain
13//! socket transport for the MCP bridge), plus an `AppState` that carries
14//! the shared `PalaceRegistry`, on-disk data root, and a lazily-initialized
15//! embedder.
16//! Test: `cargo test -p trusty-memory` validates handshake + dispatch via
17//! the in-process `handle_message` unit tests and the `tests/uds_roundtrip.rs`
18//! end-to-end harness.
19
20use anyhow::Result;
21use serde_json::{json, Value};
22use std::net::SocketAddr;
23use std::path::{Path, PathBuf};
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::sync::{Arc, OnceLock};
26use tokio::sync::{broadcast, OnceCell, RwLock};
27use trusty_common::bm25_client::Bm25Client;
28use trusty_common::mcp::initialize_response;
29use trusty_common::memory_core::embed::FastEmbedder;
30use trusty_common::memory_core::store::ChatSessionStore;
31use trusty_common::memory_core::PalaceRegistry;
32use trusty_common::ChatProvider;
33
34// Why: `tracing::info` is only used by the axum HTTP-serving helpers
35// (`run_http_on`, `spawn_uds_listener`). Pulling it in unconditionally
36// would trigger `unused_imports` warnings when the `axum-server`
37// feature is disabled. `SocketAddr` is still used by `bound_addr` on
38// `AppState` so it stays unconditional.
39#[cfg(feature = "axum-server")]
40use tracing::info;
41
42pub mod activity;
43pub mod attribution;
44pub mod bm25_supervisor;
45pub mod bootstrap;
46// Why (issue #226): `chat` and `web` are pure axum HTTP/SSE handler
47// surfaces. Gating them behind the `axum-server` feature is what lets
48// library consumers (e.g. `open-mpm` linking only `MemoryMcpService`)
49// drop axum + tower-http entirely from their build graph.
50#[cfg(feature = "axum-server")]
51pub mod chat;
52pub mod commands;
53pub mod discovery;
54pub mod hook_emit;
55pub mod kg_extract;
56pub mod mcp_service;
57pub mod messaging;
58pub mod openrpc;
59pub mod prompt_facts;
60pub mod prompt_log;
61pub mod service;
62pub mod tools;
63pub mod transport;
64#[cfg(feature = "axum-server")]
65pub mod web;
66
67pub use activity::{ActivityEntry, ActivityFilter, ActivityLog, ActivitySource};
68pub use attribution::{CreatorInfo, CreatorSource};
69
70/// Maximum bytes retained in the trigger-prompt excerpt embedded on a
71/// `HookFired` event.
72///
73/// Why: the full triggering prompt is sensitive and already lives in the
74/// JSONL prompt log; the activity feed only needs enough text to give an
75/// operator a glance — a single-line ~80 char preview matches the existing
76/// `drawer_content_preview` convention so dashboard rows render uniformly.
77/// What: 80 characters; longer prompts are truncated with a trailing `…`.
78/// Test: `hook_excerpt_truncates_long_prompts`.
79pub const HOOK_PROMPT_EXCERPT_CHARS: usize = 80;
80
81/// Reduce a triggering prompt to the short excerpt embedded on a
82/// `HookFired` activity event.
83///
84/// Why: see [`HOOK_PROMPT_EXCERPT_CHARS`]. Centralising the truncation rule
85/// keeps every emitter (HTTP, hook CLI handlers, future tests) producing
86/// the same preview shape so UI rendering is uniform.
87/// What: whitespace-collapses `prompt` and trims to
88/// [`HOOK_PROMPT_EXCERPT_CHARS`] chars with `…` when cut. Empty input
89/// returns an empty string.
90/// Test: `hook_excerpt_truncates_long_prompts`,
91/// `hook_excerpt_collapses_whitespace`.
92pub fn hook_prompt_excerpt(prompt: &str) -> String {
93 let normalised: String = prompt.split_whitespace().collect::<Vec<_>>().join(" ");
94 if normalised.chars().count() <= HOOK_PROMPT_EXCERPT_CHARS {
95 normalised
96 } else {
97 let kept: String = normalised
98 .chars()
99 .take(HOOK_PROMPT_EXCERPT_CHARS.saturating_sub(1))
100 .collect();
101 format!("{kept}…")
102 }
103}
104
105pub use mcp_service::MemoryMcpService;
106pub use tools::MemoryMcpServer;
107
108/// Resolve the directory that actually holds the per-palace subdirectories.
109///
110/// Why: there are two on-disk layouts in the wild. The current monorepo code
111/// treats the registry directory *itself* as the parent of per-palace dirs
112/// (`<dir>/<id>/palace.json`). The legacy standalone `trusty-memory` repo
113/// nested everything one level deeper under a `palaces/` subdirectory
114/// (`<data_dir>/palaces/<id>/palace.json`) — and that is where existing
115/// installs' data lives (e.g. 88 palaces under
116/// `~/Library/Application Support/trusty-memory/palaces/`). A daemon that uses
117/// the bare data dir as its registry root finds zero palaces because every
118/// `palace.json` sits one level below where it looked — the "palaces lost on
119/// restart" bug.
120/// What: given the standard data dir, returns `<data_dir>/palaces` when that
121/// subdirectory exists, otherwise `<data_dir>` itself. Resolving this once in
122/// `main.rs` and using the result as `AppState::data_root` keeps every call
123/// site (`status`, `palace_list`, `open_palace`, `palace_create`,
124/// `load_palaces_from_disk`) consistent without forcing a data migration.
125/// Test: `tests::resolve_palace_registry_dir_prefers_palaces_subdir` and
126/// `resolve_palace_registry_dir_falls_back_to_data_dir`.
127pub fn resolve_palace_registry_dir(data_dir: PathBuf) -> PathBuf {
128 let nested = data_dir.join("palaces");
129 if nested.is_dir() {
130 nested
131 } else {
132 data_dir
133 }
134}
135
136/// Hook type — labels the Claude Code hook that triggered a submission.
137///
138/// Why: every hook firing produces an activity-feed entry tagged with the
139/// originating hook so operators can tell whether activity came from a user
140/// prompt (`UserPromptSubmit`), a new session (`SessionStart`), or a future
141/// hook variant. Threading this through `DaemonEvent::HookFired` lets the
142/// dashboard badge each row with the hook label.
143/// What: serde-serialised in PascalCase so the wire format matches Claude
144/// Code's own hook-name strings exactly (e.g. `"UserPromptSubmit"`).
145/// Test: `hook_type_serde_round_trips`.
146#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
147pub enum HookType {
148 /// Claude Code's `UserPromptSubmit` hook — fires on every user prompt.
149 UserPromptSubmit,
150 /// Claude Code's `SessionStart` hook — fires once at session open.
151 SessionStart,
152}
153
154impl HookType {
155 /// Stable string label used for the wire format.
156 pub fn as_str(&self) -> &'static str {
157 match self {
158 Self::UserPromptSubmit => "UserPromptSubmit",
159 Self::SessionStart => "SessionStart",
160 }
161 }
162}
163
164/// Injection kind — labels what the hook actually injected (or attempted).
165///
166/// Why: distinct from `HookType` because one hook could in principle render
167/// more than one kind of injection (e.g. SessionStart can deliver both an
168/// inbox check and bootstrap context). Tagging the rendered kind explicitly
169/// keeps the activity log searchable when that fan-out lands.
170/// What: serde-serialised as kebab-case so it matches the labels already
171/// used in the JSONL prompt log (`prompt-context-facts`,
172/// `inbox-check-messages`).
173/// Test: `injection_kind_serde_round_trips`.
174#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
175#[serde(rename_all = "kebab-case")]
176pub enum InjectionKind {
177 /// `prompt-context` hook rendered the prompt-facts block.
178 PromptContext,
179 /// `inbox-check` hook delivered unread messages.
180 InboxCheck,
181}
182
183impl InjectionKind {
184 /// Stable string label used for the wire format.
185 pub fn as_str(&self) -> &'static str {
186 match self {
187 Self::PromptContext => "prompt-context",
188 Self::InboxCheck => "inbox-check",
189 }
190 }
191}
192
193/// Live daemon events broadcast to connected SSE subscribers.
194///
195/// Why: The dashboard needs push-driven updates so palace creation, drawer
196/// add/delete, dream cycles, and aggregate status changes are visible without
197/// polling. A single broadcast channel fans out to every connected browser.
198/// What: Tagged enum serialized as `{"type": "...", ...fields}` over SSE.
199/// Test: `web::tests::sse_stream_emits_events` subscribes, triggers a
200/// mutation, and asserts the frame arrives.
201#[derive(Clone, Debug, serde::Serialize)]
202#[serde(tag = "type", rename_all = "snake_case")]
203pub enum DaemonEvent {
204 PalaceCreated {
205 id: String,
206 name: String,
207 /// Originating subsystem (HTTP, MCP, Hook). Why (issue #96): the
208 /// UI badges each row with its source so operators can tell at a
209 /// glance whether a write came from the dashboard form, an MCP
210 /// tool call, or a hook-driven path. The wire-format key is
211 /// `source` (lower-case strings via serde rename_all on
212 /// `ActivitySource`).
213 source: ActivitySource,
214 },
215 DrawerAdded {
216 palace_id: String,
217 /// Friendly palace name (Palace.name) at write time. Why: lets SSE
218 /// consumers (the dashboard activity feed) render the human-readable
219 /// label without a separate id→name lookup. Empty string if the
220 /// emitter could not resolve the name.
221 #[serde(default)]
222 palace_name: String,
223 drawer_count: usize,
224 /// Wall-clock timestamp when the drawer was added. Why: SSE
225 /// receivers want to render "just now / 2m ago" relative to the
226 /// daemon's clock, not the time the SSE frame happens to arrive.
227 timestamp: chrono::DateTime<chrono::Utc>,
228 /// Short preview of the drawer's content (whitespace-collapsed,
229 /// truncated to ~80 chars with an ellipsis when cut). Why: the TUI
230 /// activity feed and dashboard ticker want to show *what* was
231 /// stored, not just the running drawer count. Empty when the
232 /// emitter could not resolve the content (legacy clients tolerate
233 /// the missing field via `#[serde(default)]`).
234 #[serde(default)]
235 content_preview: String,
236 /// Originating subsystem (issue #96).
237 source: ActivitySource,
238 },
239 DrawerDeleted {
240 palace_id: String,
241 drawer_count: usize,
242 /// Originating subsystem (issue #96).
243 source: ActivitySource,
244 },
245 DreamCompleted {
246 palace_id: Option<String>,
247 merged: usize,
248 pruned: usize,
249 compacted: usize,
250 closets_updated: usize,
251 duration_ms: u64,
252 /// Originating subsystem (issue #96).
253 source: ActivitySource,
254 },
255 StatusChanged {
256 total_drawers: usize,
257 total_vectors: usize,
258 total_kg_triples: usize,
259 },
260 /// A Claude Code hook completed and rendered (or attempted to render) an
261 /// injection block.
262 ///
263 /// Why: pre-#XXX the activity feed only fired on drawer / palace / dream
264 /// writes, which meant a normal Claude Code session — whose only daemon
265 /// traffic is hook invocations — left the feed empty. Surfacing every
266 /// hook firing answers the user complaint "no activity in the TUI" and
267 /// gives operators a way to see how often each project palace is
268 /// actually picking up prompt-context / inbox-check work.
269 /// What: carries the resolved palace (or `None` if cwd resolution
270 /// failed), the [`HookType`] label, the [`InjectionKind`] label, the
271 /// rendered injection byte length, a short excerpt of the triggering
272 /// prompt (capped at ~80 chars; the full content stays in the JSONL
273 /// prompt log only), the timestamp, the hook's wall-clock duration,
274 /// and the [`ActivitySource`] tag (always `Hook` for this variant).
275 /// Backwards-compatible: SSE clients that do not recognise the
276 /// `hook_fired` `type` tag can safely ignore the frame.
277 HookFired {
278 /// Resolved palace id (slug) — `None` if cwd resolution failed.
279 #[serde(default)]
280 palace_id: Option<String>,
281 /// Friendly palace name at hook time — `None` if the registry
282 /// could not be consulted (HTTP path uses `palace_id` here when
283 /// no separate name is known).
284 #[serde(default)]
285 palace_name: Option<String>,
286 hook_type: HookType,
287 injection_kind: InjectionKind,
288 /// Rendered injection size in bytes (`0` when no injection was
289 /// emitted, e.g. SessionStart with an empty inbox).
290 injection_length: u64,
291 /// Short excerpt of the triggering prompt for the activity feed
292 /// display. Capped at ~80 chars with a trailing `…` when cut.
293 /// Why: the activity feed renders this directly; full prompt
294 /// content (which may be sensitive) stays in the JSONL log.
295 #[serde(default)]
296 trigger_prompt_excerpt: String,
297 timestamp: chrono::DateTime<chrono::Utc>,
298 /// Hook wall-clock duration in milliseconds.
299 duration_ms: u64,
300 /// Always `ActivitySource::Hook` for this variant; encoded explicitly
301 /// so the same dispatch path (`emit`) can persist + broadcast it.
302 source: ActivitySource,
303 },
304}
305
306/// Open the activity log under `data_root`, falling back to a per-process
307/// tempdir and finally to a no-op `Discard` variant when no writable
308/// directory is available.
309///
310/// Why (issues #96, #225): the activity log is a best-effort feature — if
311/// the data root is on a read-only mount, missing, or locked by another
312/// process, the daemon should still come up and serve every other endpoint.
313/// The first fallback is a `std::env::temp_dir()`-anchored subdirectory
314/// keyed by the daemon's process id. Issue #225: a previous version called
315/// `expect()` on the tempdir fallback, which crashed the daemon on hosts
316/// where neither `data_root` nor `std::env::temp_dir()` is writable
317/// (read-only containers, locked-down sandboxes). The contract is
318/// "best-effort", so the final fallback is now `ActivityLog::discard()` —
319/// a no-op variant that drops every append and returns empty reads. The
320/// dashboard's activity feed simply shows up empty in that degraded state.
321/// What: tries `ActivityLog::open(data_root)`; on error logs a warning and
322/// retries against `<temp>/trusty-memory-activity-<pid>/`. If both fail,
323/// emits a final warning and returns `ActivityLog::discard()`.
324/// Test: `open_activity_log_with_fallback_returns_discard_when_unwritable`
325/// covers the discard branch; existing `AppState` construction tests cover
326/// the happy and tempdir-fallback paths.
327fn open_activity_log_with_fallback(data_root: &Path) -> Arc<ActivityLog> {
328 match ActivityLog::open(data_root) {
329 Ok(log) => Arc::new(log),
330 Err(primary_err) => {
331 tracing::warn!(
332 "could not open activity log at {}: {primary_err:#}; falling back to per-process tempdir",
333 data_root.display()
334 );
335 let fallback =
336 std::env::temp_dir().join(format!("trusty-memory-activity-{}", std::process::id()));
337 match ActivityLog::open(&fallback) {
338 Ok(log) => Arc::new(log),
339 Err(fallback_err) => {
340 tracing::warn!(
341 "activity log tempdir fallback at {} also failed: {fallback_err:#}; \
342 activity feed disabled for this process (no-op log)",
343 fallback.display()
344 );
345 Arc::new(ActivityLog::discard())
346 }
347 }
348 }
349 }
350}
351
352impl DaemonEvent {
353 /// Short discriminant label matching the SSE `type` field.
354 ///
355 /// Why: the persisted activity log stores `event_type` as a string so
356 /// the UI can render the row without re-parsing the payload. Sharing
357 /// the same labels the SSE serializer uses keeps the wire and the
358 /// stored history consistent.
359 /// What: returns one of `palace_created`, `drawer_added`,
360 /// `drawer_deleted`, `dream_completed`, `status_changed`.
361 /// Test: `daemon_event_type_str_matches_sse_tag` in the lib tests.
362 pub fn type_str(&self) -> &'static str {
363 match self {
364 Self::PalaceCreated { .. } => "palace_created",
365 Self::DrawerAdded { .. } => "drawer_added",
366 Self::DrawerDeleted { .. } => "drawer_deleted",
367 Self::DreamCompleted { .. } => "dream_completed",
368 Self::StatusChanged { .. } => "status_changed",
369 Self::HookFired { .. } => "hook_fired",
370 }
371 }
372
373 /// `palace_id` if the event is scoped to a single palace.
374 ///
375 /// Why: the activity log indexes entries by palace id so the UI can
376 /// filter by palace; daemon-wide events (`status_changed`,
377 /// dream-across-all-palaces) return `None`.
378 /// What: returns a borrowed string when the variant carries a palace
379 /// id, otherwise `None`.
380 /// Test: `daemon_event_palace_id_extraction`.
381 pub fn palace_id(&self) -> Option<&str> {
382 match self {
383 Self::PalaceCreated { id, .. } => Some(id),
384 Self::DrawerAdded { palace_id, .. } | Self::DrawerDeleted { palace_id, .. } => {
385 Some(palace_id)
386 }
387 Self::DreamCompleted { palace_id, .. } => palace_id.as_deref(),
388 Self::HookFired { palace_id, .. } => palace_id.as_deref(),
389 Self::StatusChanged { .. } => None,
390 }
391 }
392
393 /// Originating subsystem if the event carries one.
394 ///
395 /// Why: only mutation events carry a `source`; the aggregate
396 /// `StatusChanged` is recomputed by the daemon and has no caller, so
397 /// it returns `None`.
398 /// What: returns the variant's `source` field where present.
399 /// Test: `daemon_event_source_extraction`.
400 pub fn source(&self) -> Option<ActivitySource> {
401 match self {
402 Self::PalaceCreated { source, .. }
403 | Self::DrawerAdded { source, .. }
404 | Self::DrawerDeleted { source, .. }
405 | Self::DreamCompleted { source, .. }
406 | Self::HookFired { source, .. } => Some(*source),
407 Self::StatusChanged { .. } => None,
408 }
409 }
410}
411
412/// Shared application state passed to every request handler.
413///
414/// Why: The stdio loop and HTTP server need the same handles to the registry,
415/// data root, and embedder so MCP tools can perform real reads/writes against
416/// the live trusty-memory core. The embedder is heavy (loads ONNX weights) so
417/// we hold it behind a `OnceCell` and initialize lazily on first use.
418/// What: `Clone`-able via `Arc` fields. The registry / data root are eager;
419/// `embedder` is `Arc<OnceCell<Arc<FastEmbedder>>>` so concurrent first-use
420/// races resolve to a single shared instance.
421/// Test: `app_state_default_constructs` confirms construction without panic.
422#[derive(Clone)]
423pub struct AppState {
424 pub version: String,
425 pub registry: Arc<PalaceRegistry>,
426 pub data_root: PathBuf,
427 pub embedder: Arc<OnceCell<Arc<FastEmbedder>>>,
428 /// Optional default palace applied to MCP tool calls when the caller
429 /// omits the `palace` argument. Set via `trusty-memory serve --palace`.
430 pub default_palace: Option<String>,
431 /// Active chat provider selected at startup. `None` means no upstream is
432 /// configured (no Ollama detected and no OpenRouter key) — callers must
433 /// degrade gracefully (chat endpoint returns 412).
434 pub chat_provider: Arc<OnceCell<Option<Arc<dyn ChatProvider>>>>,
435 /// Per-palace chat-session stores, opened lazily so cold-start cost is
436 /// paid only when chat-history endpoints are hit.
437 pub session_stores: Arc<dashmap::DashMap<String, Arc<ChatSessionStore>>>,
438 /// Broadcast sender for live `DaemonEvent` pushes to SSE subscribers.
439 ///
440 /// Why: Lets mutating handlers emit events that any connected dashboard
441 /// receives instantly. Cap of 128 buffers transient slow readers; if a
442 /// receiver lags it gets `RecvError::Lagged` and we emit a `lag` frame.
443 pub events: Arc<broadcast::Sender<DaemonEvent>>,
444 /// Instant the daemon started, used to compute `uptime_secs` on `/health`.
445 ///
446 /// Why (issue #35): `GET /health` reports how long the daemon has been
447 /// up. Capturing a monotonic `Instant` at `AppState` construction lets the
448 /// handler compute the elapsed seconds cheaply and without a clock-skew
449 /// hazard.
450 /// What: a wall-monotonic `Instant`; `AppState::new` stamps it at startup.
451 /// Test: `health_endpoint_includes_resource_fields`.
452 pub started_at: std::time::Instant,
453 /// In-memory ring buffer of recent tracing log lines (issue #35).
454 ///
455 /// Why: the `GET /api/v1/logs/tail` endpoint serves the last N log lines
456 /// so operators can inspect a running daemon without tailing a file. The
457 /// buffer is shared between the tracing `LogBufferLayer` (writer) and the
458 /// HTTP handler (reader).
459 /// What: a cheap `Arc`-backed clone of the buffer the subscriber writes
460 /// to. Defaults to an empty buffer for states that never install the
461 /// layer (tests, the stdio path).
462 /// Test: `logs_tail_returns_recent_lines`.
463 pub log_buffer: trusty_common::log_buffer::LogBuffer,
464 /// Most recent on-disk footprint of `data_root`, in bytes (issue #35).
465 ///
466 /// Why: `GET /health` reports `disk_bytes`. Walking the data directory on
467 /// every health request would make a frequent health poll do unbounded
468 /// I/O; a background task recomputes it every 10 s and stores it here so
469 /// the handler reads it lock-free.
470 /// What: an `AtomicU64` updated by the ticker spawned in `run_http_on`.
471 /// `0` until the first walk completes.
472 /// Test: `health_endpoint_includes_resource_fields`.
473 pub disk_bytes: Arc<std::sync::atomic::AtomicU64>,
474 /// Per-process RSS + CPU sampler, refreshed on each `/health` request
475 /// (issue #35).
476 ///
477 /// Why: CPU usage is a delta between two `sysinfo` refreshes, so the
478 /// sampler must persist between requests — hence the shared `Mutex`.
479 /// What: a `tokio::sync::Mutex<SysMetrics>` so the async health handler
480 /// can sample without blocking the runtime.
481 /// Test: `health_endpoint_includes_resource_fields`.
482 pub sys_metrics: Arc<tokio::sync::Mutex<trusty_common::sys_metrics::SysMetrics>>,
483 /// HTTP listener address the daemon bound to, once `run_http_on` is running.
484 ///
485 /// Why: clients (and `/health` responses) need to advertise the live
486 /// `host:port` even though port selection happens dynamically (7070–7079
487 /// walk + OS fallback). Stashing it on `AppState` lets request handlers
488 /// surface the discovery value without re-querying the listener.
489 /// What: a `OnceLock<SocketAddr>` so `run_http_on` writes it exactly once
490 /// at bind time and every handler reads it lock-free thereafter. Empty
491 /// (`None` from `get()`) on the stdio path where no listener exists.
492 /// Test: `health_endpoint_reports_bound_addr` (added below).
493 pub bound_addr: Arc<OnceLock<SocketAddr>>,
494 /// Cached prompt-facts surface served by the MCP `get_prompt_context`
495 /// tool (issue #42).
496 ///
497 /// Why: The original session-init `prompts/get` design loaded context
498 /// once per connection; switching to a per-message tool lets the model
499 /// pull fresh, query-filtered context on demand. The cache holds both
500 /// the raw triples (for filtered lookups) and a pre-formatted Markdown
501 /// block (for the unfiltered hot path) so neither code path re-walks
502 /// the KG. The cache is rebuilt by
503 /// `prompt_facts::rebuild_prompt_cache` after any write that touches a
504 /// hot predicate (`kg_assert`, `add_alias`, `remove_prompt_fact`).
505 /// What: An `Arc<tokio::sync::RwLock<PromptFactsCache>>` so the hot
506 /// read path takes a brief read lock and clones the cache; rebuilds
507 /// take a write lock for the assignment only. The async-aware lock
508 /// (issue #229) yields to the tokio runtime instead of blocking a
509 /// runtime thread for the rebuild duration. An empty `triples` vec ↔
510 /// "no context stored yet" (the tool handler renders a hint).
511 /// Test: `get_prompt_context_returns_cached_or_hint`,
512 /// `get_prompt_context_filters_by_query`.
513 pub prompt_context_cache: Arc<RwLock<prompt_facts::PromptFactsCache>>,
514 /// Persistent activity log (issue #96).
515 ///
516 /// Why: the dashboard activity feed used to be a pure live-stream over
517 /// `/sse` — opening the UI showed an empty feed and any mutation from
518 /// the MCP path was invisible. Holding an `ActivityLog` on `AppState`
519 /// lets `emit` record an entry on every push so the
520 /// `GET /api/v1/activity` handler can return historical rows on mount
521 /// and the live SSE stream can continue prepending events on top of
522 /// the loaded history. `None` on builds that opt out (tests that use
523 /// `AppState::new` get a real log under their tempdir so behaviour
524 /// matches production).
525 /// What: an `Arc<ActivityLog>` shared with every emitter.
526 /// Test: `web::tests::activity_endpoint_lists_recent_emits`.
527 pub activity_log: Arc<ActivityLog>,
528 /// Optional per-palace BM25 lexical search lane (issue #156).
529 ///
530 /// Why: in-process BM25 would serialise the recall hot path on disk
531 /// I/O during writes and contend with the redb/usearch locks. Delegating
532 /// to the `trusty-bm25-daemon` subprocess (one socket per palace) keeps
533 /// BM25 ingestion and search off the critical path while still feeding
534 /// hits into the recall RRF fusion.
535 /// What: `Some(client)` only when `TRUSTY_BM25_DAEMON=1` at startup —
536 /// every code path that uses this field is gated on `is_some()` and
537 /// falls back to vector-only behaviour otherwise so existing deployments
538 /// see zero behavioural change.
539 /// Test: `bm25_client_disabled_by_default`,
540 /// `bm25_client_enabled_when_env_set`.
541 pub bm25_client: Option<Arc<Bm25Client>>,
542 /// Optional per-palace BM25 daemon spawn supervisor (issue #193).
543 ///
544 /// Why: without an in-process supervisor the BM25 daemon must be
545 /// launched out-of-band (launchd, manual `trusty-bm25-daemon`), which
546 /// is the same UX trap PR #190 fixed for trusty-embedderd. Holding a
547 /// supervisor here lets us spawn the daemon on first BM25 use for a
548 /// palace, restart it if it dies, and reap it on clean shutdown.
549 /// `Some` only when `TRUSTY_BM25_DAEMON=1` at startup — the same gate
550 /// that enables `bm25_client`. When set but `TRUSTY_BM25_EXTERNAL=1`,
551 /// the supervisor's `ensure_running` becomes a no-op that just returns
552 /// the canonical socket path so operators can keep using their own
553 /// process manager.
554 /// Test: covered by `bm25_supervisor_present_when_env_set` and the
555 /// `bm25_supervisor::tests` unit tests.
556 pub bm25_supervisor: Option<Arc<bm25_supervisor::Bm25Supervisor>>,
557 /// Per-palace write serialisation locks (issue #230).
558 ///
559 /// Why: the dedup gate in `tools.rs` previously read a snapshot of
560 /// existing drawers, checked for near-duplicates via Jaro-Winkler, and
561 /// then issued the write — a classic time-of-check/time-of-use race.
562 /// Two concurrent `memory_remember` calls with the same content could
563 /// both see the pre-write snapshot, both pass the gate, and both land
564 /// duplicate drawers. Serialising the gate-then-write sequence per
565 /// palace closes the window: while one task holds the mutex, any
566 /// concurrent writer for the same palace blocks until the first write
567 /// finishes and is visible to `list_drawers`. The lock is **per
568 /// palace** (not global) so writes to different palaces continue to
569 /// run in parallel.
570 /// What: a `DashMap` keyed by palace id, where each entry is an
571 /// `Arc<tokio::sync::Mutex<()>>`. The mutex is constructed lazily by
572 /// `palace_write_lock` on first access. `Arc` lets callers hold a
573 /// clone of the lock past the lifetime of the `DashMap` entry so the
574 /// map never needs to be held across an `.await`.
575 /// Test: `tools::tests::dedup_gate_blocks_concurrent_duplicate_writes`.
576 pub palace_write_locks: Arc<dashmap::DashMap<String, Arc<tokio::sync::Mutex<()>>>>,
577 /// Counter of in-flight activity-log writes spawned by `emit`
578 /// (issue #232).
579 ///
580 /// Why: `emit` offloads the synchronous redb append to the tokio blocking
581 /// pool via `spawn_blocking` so the async runtime is never parked waiting
582 /// on fsync. The write is fire-and-forget — `emit` returns immediately
583 /// after spawning. Tests that observe the activity log right after a
584 /// burst of `emit` calls need a deterministic synchronization point;
585 /// holding an in-flight counter lets `flush_activity_writes` poll until
586 /// every spawned append has settled, which keeps the assertions
587 /// race-free without forcing every caller to `.await`.
588 /// What: an `Arc<AtomicUsize>` incremented before each `spawn_blocking`
589 /// and decremented inside the closure (after the append completes, even
590 /// if it errored). The counter is cheap (one atomic add per emit) and
591 /// stays at zero in steady-state production traffic.
592 /// Test: `web::tests::activity_endpoint_lists_recent_emits` and
593 /// `tests::emit_persists_mutations_but_skips_status_changed` call
594 /// `flush_activity_writes` to drain the counter before reading the log.
595 pub pending_activity_writes: Arc<AtomicUsize>,
596 /// In-memory cache mapping palace id → `Palace.name` (issue #228).
597 ///
598 /// Why: every `memory_remember` / `memory_note` write used to call
599 /// `PalaceRegistry::list_palaces` (a synchronous filesystem walk of the
600 /// data root) just to resolve a friendly palace name for the SSE
601 /// `DrawerAdded` event. With N palaces on disk the cost was O(N) opendirs
602 /// plus `palace.json` reads on every write, blocking the async runtime.
603 /// Caching the name in-memory turns the lookup into a `DashMap::get`.
604 /// What: `DashMap<String, String>` populated by `create_palace` and
605 /// `load_palaces_from_disk`, kept in sync by rename / delete paths.
606 /// Missing entries are treated as "name unknown" so callers fall back to
607 /// the palace id and the emit path never fails.
608 /// Test: `palace_name_cache_populated_after_hydration` and
609 /// `palace_name_cache_updates_on_create`.
610 pub palace_names: Arc<dashmap::DashMap<String, String>>,
611 /// Bounded sender for the BM25 index worker (issue #231).
612 ///
613 /// Why: the previous fire-and-forget design `tokio::spawn`ed one task per
614 /// `memory_remember` / `memory_note` call, so a write burst against a slow
615 /// or unreachable BM25 daemon grew an unbounded in-flight task queue. A
616 /// single long-lived worker draining a bounded mpsc channel caps that
617 /// back-pressure: writers `try_send` (never block), full-queue requests
618 /// are dropped with a `warn!`, and the worker exits cleanly when the last
619 /// sender is dropped on shutdown.
620 /// What: an `mpsc::Sender` cloned to every `AppState` clone (cheap). The
621 /// matching receiver is consumed by the worker spawned in
622 /// [`AppState::new`] via [`tools::spawn_bm25_index_worker`]. Capacity is
623 /// [`tools::BM25_INDEX_QUEUE_CAPACITY`] (256).
624 /// Test: `bm25_index_queue_drops_when_full` exercises the full-queue
625 /// branch via `bm25_index_enqueue`.
626 pub bm25_index_tx: tokio::sync::mpsc::Sender<tools::Bm25IndexRequest>,
627}
628
629impl AppState {
630 /// Construct an `AppState` rooted at the given on-disk data directory.
631 ///
632 /// Why: The CLI (`serve`) and integration tests need to point the MCP
633 /// server at different roots — production at `dirs::data_dir`, tests at a
634 /// `tempfile::tempdir()`.
635 /// What: Builds an empty `PalaceRegistry`, captures the version, and
636 /// allocates an empty `OnceCell` for the embedder. `default_palace` is
637 /// `None`; use `with_default_palace` to set it.
638 /// Test: `tools::tests::dispatch_palace_create_persists` constructs an
639 /// AppState pointed at a tempdir and round-trips a palace through it.
640 pub fn new(data_root: PathBuf) -> Self {
641 let (events_tx, _) = broadcast::channel::<DaemonEvent>(128);
642 // Issue #96: open (or create) the persistent activity log under the
643 // daemon data root. Open failure is logged but never crashes the
644 // daemon — we fall back to a per-process tempdir so emits remain
645 // best-effort and the rest of the daemon keeps working.
646 let activity_log = open_activity_log_with_fallback(&data_root);
647 // Issue #231: bounded mpsc channel + single long-lived worker
648 // replaces the per-write `tokio::spawn` fire-and-forget pattern so
649 // BM25 indexing back-pressure is capped. The worker is spawned here
650 // unconditionally so the channel always has a drain — even when
651 // `bm25_client` is `None`, the worker just consumes and discards
652 // each request so senders never block on a full queue.
653 let (bm25_index_tx, bm25_index_rx) =
654 tokio::sync::mpsc::channel::<tools::Bm25IndexRequest>(tools::BM25_INDEX_QUEUE_CAPACITY);
655 // `bm25_client` / `bm25_supervisor` start as `None`; the builder
656 // `with_bm25_client_from_env` rebuilds the worker with the real
657 // client + supervisor once env-gated opt-in is resolved.
658 tools::spawn_bm25_index_worker(bm25_index_rx, None, None);
659 Self {
660 version: env!("CARGO_PKG_VERSION").to_string(),
661 registry: Arc::new(PalaceRegistry::new()),
662 data_root,
663 embedder: Arc::new(OnceCell::new()),
664 default_palace: None,
665 chat_provider: Arc::new(OnceCell::new()),
666 session_stores: Arc::new(dashmap::DashMap::new()),
667 events: Arc::new(events_tx),
668 started_at: std::time::Instant::now(),
669 // Default to an empty buffer — `with_log_buffer` overrides this
670 // when the daemon installs the `LogBufferLayer` (HTTP mode).
671 log_buffer: trusty_common::log_buffer::LogBuffer::new(
672 trusty_common::log_buffer::DEFAULT_LOG_CAPACITY,
673 ),
674 disk_bytes: Arc::new(std::sync::atomic::AtomicU64::new(0)),
675 sys_metrics: Arc::new(tokio::sync::Mutex::new(
676 trusty_common::sys_metrics::SysMetrics::new(),
677 )),
678 bound_addr: Arc::new(OnceLock::new()),
679 prompt_context_cache: Arc::new(RwLock::new(prompt_facts::PromptFactsCache::default())),
680 activity_log,
681 bm25_client: None,
682 bm25_supervisor: None,
683 palace_write_locks: Arc::new(dashmap::DashMap::new()),
684 pending_activity_writes: Arc::new(AtomicUsize::new(0)),
685 palace_names: Arc::new(dashmap::DashMap::new()),
686 bm25_index_tx,
687 }
688 }
689
690 /// Acquire (lazily, then clone) the per-palace write mutex.
691 ///
692 /// Why (issue #230): the dedup-check + `remember_with_options` write
693 /// sequence in `tools.rs` must be atomic per palace to prevent two
694 /// concurrent identical writes from both passing the dedup gate.
695 /// Callers hold the returned `Arc<Mutex<()>>`'s guard across the gate
696 /// check and the write so the second writer blocks until the first
697 /// write is visible to `list_drawers`. Returning a clone of the `Arc`
698 /// rather than a borrow into the `DashMap` lets the caller `.await`
699 /// while holding the lock without risking a deadlock against any
700 /// future map mutation (DashMap shards are sync mutexes).
701 /// What: looks up the palace id in `palace_write_locks` and returns
702 /// a clone of the existing mutex; on the first call for a palace,
703 /// inserts a freshly-constructed `tokio::sync::Mutex<()>` first. The
704 /// `DashMap::entry().or_insert_with` API guarantees the lazy
705 /// construction is racy-safe — only one mutex is ever inserted per
706 /// palace id.
707 /// Test: `tools::tests::dedup_gate_blocks_concurrent_duplicate_writes`.
708 pub fn palace_write_lock(&self, palace_id: &str) -> Arc<tokio::sync::Mutex<()>> {
709 if let Some(existing) = self.palace_write_locks.get(palace_id) {
710 return existing.clone();
711 }
712 self.palace_write_locks
713 .entry(palace_id.to_string())
714 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
715 .clone()
716 }
717
718 /// Builder-style: opt-in to the BM25 lexical lane (issue #156).
719 ///
720 /// Why: the BM25 subprocess is gated behind `TRUSTY_BM25_DAEMON=1` so
721 /// the default `cargo install trusty-memory` / launchd plist deployment
722 /// stays vector-only and existing test fixtures keep passing without
723 /// having to provision a daemon. Reading the env var here keeps the
724 /// gating logic in one place (the helper in `main.rs` just plumbs the
725 /// result through).
726 /// What: when `TRUSTY_BM25_DAEMON=1`, constructs one `Bm25Client` per
727 /// palace by lazy-resolving the socket path the first time the palace
728 /// id is observed. Currently we install a shared `default` client up
729 /// front and re-key on the palace id at the call site — palaces with no
730 /// daemon socket simply see search/index errors which we log + ignore.
731 /// Returns `self` unchanged when the env var is unset or set to anything
732 /// other than `1`.
733 /// Test: `bm25_client_disabled_by_default`,
734 /// `bm25_client_enabled_when_env_set`.
735 #[must_use]
736 pub fn with_bm25_client_from_env(mut self) -> Self {
737 if std::env::var("TRUSTY_BM25_DAEMON").as_deref() == Ok("1") {
738 // Install the default-palace client; per-palace clients are
739 // constructed on demand via `Bm25Client::for_palace`.
740 let default_palace = self.default_palace.as_deref().unwrap_or("default");
741 self.bm25_client = Some(Arc::new(Bm25Client::for_palace(default_palace)));
742 // Issue #193: hand-in-hand with the client, attach a spawn
743 // supervisor so the BM25 daemon is auto-started on first use
744 // for any palace. Operators who want to manage daemons
745 // out-of-band (launchd, systemd, manual) set
746 // TRUSTY_BM25_EXTERNAL=1 which makes the supervisor a no-op.
747 self.bm25_supervisor = Some(Arc::new(bm25_supervisor::Bm25Supervisor::new()));
748 // Issue #231: rebuild the bounded indexer channel + worker so
749 // the worker holds the now-populated client + supervisor. The
750 // placeholder worker installed by `AppState::new` (with `None`
751 // / `None`) drained the channel into the void — replacing the
752 // sender here closes the placeholder receiver and the
753 // placeholder worker exits cleanly. The new worker takes over
754 // as the sole drain for the indexer queue.
755 let (tx, rx) = tokio::sync::mpsc::channel::<tools::Bm25IndexRequest>(
756 tools::BM25_INDEX_QUEUE_CAPACITY,
757 );
758 tools::spawn_bm25_index_worker(
759 rx,
760 self.bm25_client.clone(),
761 self.bm25_supervisor.clone(),
762 );
763 self.bm25_index_tx = tx;
764 tracing::info!(
765 palace = default_palace,
766 "BM25 daemon client + spawn supervisor enabled (TRUSTY_BM25_DAEMON=1)"
767 );
768 }
769 self
770 }
771
772 /// Scan the palace registry directory and re-register every persisted
773 /// palace into the in-memory [`PalaceRegistry`].
774 ///
775 /// Why: `AppState::new` builds an *empty* registry, so after a daemon
776 /// restart `palace_list` / the dashboard reported zero palaces even though
777 /// dozens existed on disk — palace metadata was persisted by
778 /// `palace_create` but never re-hydrated on startup. This method closes
779 /// that gap by walking the on-disk layout (each subdirectory holding a
780 /// `palace.json` is one palace) and rebuilding a live `PalaceHandle` for
781 /// each, so recall paths see the full set immediately after a restart.
782 /// What: runs the blocking filesystem walk + per-palace `PalaceHandle::open`
783 /// on a `spawn_blocking` thread (so it never stalls the async runtime),
784 /// registers each successfully opened palace via `register_arc`, logs every
785 /// load at `debug!`, and returns the count loaded. A palace that fails to
786 /// open (corrupt index, unreadable `kg.db`, etc.) is logged at `warn!` and
787 /// skipped — one bad palace must not abort startup or crash the daemon.
788 /// `data_root` is expected to already be the palace registry directory —
789 /// `main.rs` resolves it via [`resolve_palace_registry_dir`] before
790 /// constructing the `AppState`, so the flat / legacy-`palaces/` layout
791 /// difference is handled exactly once.
792 /// Test: `tests::load_palaces_from_disk_rehydrates_registry` writes two
793 /// palaces into a tempdir, constructs an `AppState`, calls this method, and
794 /// asserts the returned count and registry contents.
795 pub async fn load_palaces_from_disk(&self) -> Result<usize> {
796 let registry_dir = self.data_root.clone();
797 let registry = self.registry.clone();
798 let palace_names = self.palace_names.clone();
799 // The directory walk and each `PalaceHandle::open` perform blocking
800 // filesystem + redb/usearch I/O — run the whole hydration on the
801 // blocking pool so it never parks an async worker thread.
802 let count = tokio::task::spawn_blocking(move || -> Result<usize> {
803 let palaces = PalaceRegistry::list_palaces(®istry_dir)?;
804 let total = palaces.len();
805 let mut loaded = 0usize;
806 let mut skipped = 0usize;
807 for palace in palaces {
808 match trusty_common::memory_core::PalaceHandle::open(&palace) {
809 Ok(handle) => {
810 tracing::debug!(
811 palace = %palace.id,
812 data_dir = %palace.data_dir.display(),
813 "loaded palace from disk"
814 );
815 // Issue #228: seed the in-memory name cache so write
816 // hot paths (memory_remember / memory_note) can resolve
817 // the friendly palace name without re-walking the data
818 // root. Insert here (during hydration) is the single
819 // point of truth for restart-time population.
820 palace_names.insert(palace.id.0.clone(), palace.name.clone());
821 registry.register_arc(handle);
822 loaded += 1;
823 }
824 Err(e) => {
825 // Why: a single bad palace (corrupt kg.db, stale WAL,
826 // permissions) must never abort startup or block the
827 // HTTP server from binding. Log per-palace and keep
828 // going; the summary below tells operators how many
829 // were skipped without trawling the log.
830 tracing::warn!(
831 palace = %palace.id,
832 data_dir = %palace.data_dir.display(),
833 "skipping palace during startup hydration: {e:#}"
834 );
835 skipped += 1;
836 }
837 }
838 }
839 tracing::info!(
840 "palace hydration summary: loaded {loaded}/{total} ({skipped} skipped due to errors)"
841 );
842 Ok(loaded)
843 })
844 .await
845 .map_err(|e| anyhow::anyhow!("join load_palaces_from_disk: {e}"))??;
846 Ok(count)
847 }
848
849 /// Builder-style: attach the daemon's shared [`LogBuffer`] so the
850 /// `GET /api/v1/logs/tail` endpoint serves the same lines the tracing
851 /// subscriber captures (issue #35).
852 ///
853 /// Why: `main` builds the buffer (via `init_tracing_with_buffer`) before
854 /// constructing the `AppState`, then hands a clone here so the HTTP
855 /// handler and the tracing layer observe the same ring.
856 /// What: replaces the empty default buffer with the supplied one.
857 /// Test: `logs_tail_returns_recent_lines`.
858 #[must_use]
859 pub fn with_log_buffer(mut self, buffer: trusty_common::log_buffer::LogBuffer) -> Self {
860 self.log_buffer = buffer;
861 self
862 }
863
864 /// Send a `DaemonEvent` to all connected SSE subscribers and persist
865 /// it to the activity log when the variant carries a source.
866 ///
867 /// Why: Mutating handlers call this after a successful write so the
868 /// dashboard can update without polling. The send is best-effort —
869 /// `broadcast::Sender::send` returns `Err` only when there are no live
870 /// receivers, which is fine (no listeners == no work to do). Issue
871 /// #96 additionally writes the entry to the persistent activity log
872 /// so the feed can serve historical rows on page load and so MCP /
873 /// HTTP / Hook origins are visible to the operator. Persistence is
874 /// also best-effort — a write failure is logged but never blocks the
875 /// SSE broadcast.
876 ///
877 /// Issue #232: the activity-log append is a synchronous redb write +
878 /// fsync. Calling it directly on the async caller's task parked a tokio
879 /// worker thread on disk I/O for every SSE event. We now offload the
880 /// append to the blocking thread pool via `spawn_blocking` and return
881 /// immediately — `emit` stays synchronous so every existing caller
882 /// (including the sync `dispatch_hook_fired` JSON-RPC handler) keeps
883 /// compiling unchanged. The fire-and-forget pattern matches the
884 /// pre-fix semantics (best-effort, never blocks the SSE broadcast)
885 /// while freeing the async runtime to do real work during the write.
886 /// What: serialises the event for the log (skipping `StatusChanged`
887 /// which is a recomputed aggregate, not a mutation), spawns the redb
888 /// append on `tokio::task::spawn_blocking` keyed by a clone of the
889 /// `Arc<ActivityLog>` and the cloned event, then sends the event over
890 /// the broadcast channel. A `pending_activity_writes` counter is bumped
891 /// before the spawn and decremented inside the closure so
892 /// [`Self::flush_activity_writes`] can drain in tests.
893 /// Test: `web::tests::sse_stream_receives_palace_created` confirms a
894 /// subscriber observes the emitted event;
895 /// `activity_endpoint_lists_recent_emits` confirms persistence via
896 /// `flush_activity_writes`.
897 pub fn emit(&self, event: DaemonEvent) {
898 if let Some(source) = event.source() {
899 let event_type = event.type_str();
900 let palace_id = event.palace_id().map(|s| s.to_string());
901 let log = Arc::clone(&self.activity_log);
902 let event_for_log = event.clone();
903 let pending = Arc::clone(&self.pending_activity_writes);
904 // Pre-allocate the sequence id in the emitting thread so the
905 // persisted order matches the emission order even when blocking-pool
906 // workers execute the writes concurrently (issue #247). Without
907 // this, four rapid emits would assign IDs inside their respective
908 // `spawn_blocking` closures in a non-deterministic order.
909 let id = log.alloc_id();
910 pending.fetch_add(1, Ordering::SeqCst);
911 // Why: the synchronous redb append + fsync must not park an
912 // async worker thread (issue #232). Spawn the write on the
913 // blocking pool; the JoinHandle is intentionally dropped —
914 // the write is best-effort and any failure is logged below.
915 tokio::task::spawn_blocking(move || {
916 let result = log.append_with_id(id, source, palace_id, event_type, &event_for_log);
917 if let Err(e) = result {
918 tracing::warn!("activity_log.append failed for {event_type}: {e:#}");
919 }
920 pending.fetch_sub(1, Ordering::SeqCst);
921 });
922 }
923 let _ = self.events.send(event);
924 }
925
926 /// Block (asynchronously) until every in-flight activity-log write
927 /// spawned by [`Self::emit`] has settled.
928 ///
929 /// Why: `emit` offloads its redb append to `tokio::task::spawn_blocking`
930 /// and returns immediately (issue #232). Tests that observe the
931 /// activity log right after a burst of emits would otherwise race the
932 /// blocking-pool worker; this helper gives them a deterministic
933 /// synchronization point. Production code never needs to call this —
934 /// the dashboard reads through `GET /api/v1/activity`, which already
935 /// tolerates writes settling asynchronously.
936 /// What: spins on `pending_activity_writes` with a 1 ms yield until the
937 /// counter is zero. Cheap: tests typically emit a handful of events
938 /// and the loop exits within a single scheduler tick.
939 /// Test: covered indirectly by `emit_persists_mutations_but_skips_status_changed`
940 /// and `web::tests::activity_endpoint_lists_recent_emits`.
941 pub async fn flush_activity_writes(&self) {
942 while self.pending_activity_writes.load(Ordering::SeqCst) > 0 {
943 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
944 }
945 }
946
947 /// Open (or return cached) the chat-session store for a palace.
948 ///
949 /// Why: Chat session persistence lives in a dedicated SQLite file under
950 /// the palace's data dir (`chat_sessions.db`) so it doesn't intermingle
951 /// with the KG's transactional load. The store is cheap to clone via
952 /// `Arc` but the underlying r2d2 pool should be reused, so cache by id.
953 /// What: Creates the palace data dir if missing, opens (or reuses) a
954 /// `ChatSessionStore` and stashes an `Arc` in the DashMap.
955 /// Test: Indirectly via the session HTTP handlers in `web::tests`.
956 pub fn session_store(&self, palace_id: &str) -> Result<Arc<ChatSessionStore>> {
957 if let Some(entry) = self.session_stores.get(palace_id) {
958 return Ok(entry.clone());
959 }
960 let dir = self.data_root.join(palace_id);
961 std::fs::create_dir_all(&dir)
962 .map_err(|e| anyhow::anyhow!("create palace dir {}: {e}", dir.display()))?;
963 let store = Arc::new(ChatSessionStore::open(&dir.join("chat_sessions.db"))?);
964 self.session_stores
965 .insert(palace_id.to_string(), store.clone());
966 Ok(store)
967 }
968
969 /// Builder-style setter for the default palace name.
970 ///
971 /// Why: `serve --palace <name>` wants to bind every tool call to a
972 /// project-scoped namespace without forcing every MCP request to repeat
973 /// the palace argument.
974 /// What: Returns `self` with `default_palace = Some(name)`.
975 /// Test: `default_palace_used_when_arg_omitted` covers the resolution
976 /// path; this setter is exercised there.
977 pub fn with_default_palace(mut self, name: Option<String>) -> Self {
978 self.default_palace = name;
979 self
980 }
981
982 /// Resolve (or initialize) the shared embedder.
983 ///
984 /// Why: FastEmbedder load is expensive — we share one instance across all
985 /// tool calls; the `OnceCell` ensures concurrent first-use races collapse
986 /// to a single load.
987 /// What: Returns `Arc<FastEmbedder>` on success. Errors propagate from the
988 /// underlying ONNX load.
989 /// Test: Indirectly via `dispatch_remember_then_recall`.
990 /// Resolve the active chat provider, auto-detecting on first call.
991 ///
992 /// Why: Provider selection depends on filesystem-loaded config plus a
993 /// network probe (Ollama liveness), so it must be lazily initialised at
994 /// runtime. Caching the choice in a `OnceCell` keeps it stable across
995 /// concurrent requests without re-probing on every chat call.
996 /// What: On first use loads `~/.trusty-memory/config.toml`, prefers an
997 /// auto-detected Ollama instance (when `local_model.enabled`), and falls
998 /// back to OpenRouter when an API key is set. Returns `Ok(None)` when
999 /// neither is available so the caller can emit a 412.
1000 /// Test: `web::tests::providers_endpoint_returns_payload` covers the
1001 /// detection path indirectly through `/api/v1/chat/providers`.
1002 pub async fn chat_provider(&self) -> Option<Arc<dyn ChatProvider>> {
1003 self.chat_provider
1004 .get_or_init(|| async {
1005 // Why (issue #226): `service::load_user_config` is the
1006 // axum-free home of the loader; the `web::load_user_config`
1007 // re-export only exists for the HTTP handlers. Going
1008 // direct to `service` keeps this method usable when
1009 // the `axum-server` feature is disabled.
1010 let cfg = crate::service::load_user_config().unwrap_or_default();
1011 if cfg.local_model.enabled {
1012 if let Some(mut p) =
1013 trusty_common::auto_detect_local_provider(&cfg.local_model.base_url).await
1014 {
1015 // auto_detect returns an empty model id; callers must
1016 // set the configured model name themselves.
1017 p.model = cfg.local_model.model.clone();
1018 return Some(Arc::new(p) as Arc<dyn ChatProvider>);
1019 }
1020 }
1021 if !cfg.openrouter_api_key.is_empty() {
1022 return Some(Arc::new(trusty_common::OpenRouterProvider::new(
1023 cfg.openrouter_api_key,
1024 cfg.openrouter_model,
1025 )) as Arc<dyn ChatProvider>);
1026 }
1027 None
1028 })
1029 .await
1030 .clone()
1031 }
1032
1033 /// Spawn a fire-and-forget background task that auto-discovers project
1034 /// aliases under `project_root` and asserts new ones into `palace`.
1035 ///
1036 /// Why (issue #42): Projects carry implicit shorthand — cargo package
1037 /// names that differ from their directory, binary names that differ
1038 /// from packages, first-letter abbreviations — that should be surfaced
1039 /// without a user ever calling `add_alias`. Running discovery as a
1040 /// detached task on palace-open keeps startup latency unchanged: the
1041 /// daemon binds and starts serving immediately while the discovery scan
1042 /// completes in the background, and any newly-asserted aliases land in
1043 /// the prompt cache before the model's next `get_prompt_context` call.
1044 /// What: clones `self` (cheap; `Arc`-backed), spawns a tokio task that
1045 /// invokes the `discover_aliases` tool handler directly so the
1046 /// dedup + cache-rebuild logic runs exactly the same path as the MCP
1047 /// tool call. Errors are logged at `warn!`; one failed discovery never
1048 /// destabilises the daemon.
1049 /// Test: not unit-tested (timing-dependent fire-and-forget); the
1050 /// underlying `discover_aliases` dispatch is covered by
1051 /// `dispatch_discover_aliases_inserts_new_and_dedupes` in `tools::tests`.
1052 pub fn spawn_alias_discovery(&self, palace: String, project_root: PathBuf) {
1053 let state = self.clone();
1054 tokio::spawn(async move {
1055 let args = serde_json::json!({
1056 "palace": palace,
1057 "project_root": project_root.to_string_lossy(),
1058 });
1059 match tools::dispatch_tool(&state, "discover_aliases", args).await {
1060 Ok(result) => tracing::info!(
1061 new = ?result.get("new"),
1062 already_known = ?result.get("already_known"),
1063 "alias discovery complete"
1064 ),
1065 Err(e) => tracing::warn!("alias discovery failed: {e:#}"),
1066 }
1067 });
1068 }
1069
1070 pub async fn embedder(&self) -> Result<Arc<FastEmbedder>> {
1071 let cell = self.embedder.clone();
1072 let embedder = cell
1073 .get_or_try_init(|| async {
1074 let e = FastEmbedder::new().await?;
1075 Ok::<Arc<FastEmbedder>, anyhow::Error>(Arc::new(e))
1076 })
1077 .await?
1078 .clone();
1079 Ok(embedder)
1080 }
1081}
1082
1083impl std::fmt::Debug for AppState {
1084 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1085 f.debug_struct("AppState")
1086 .field("version", &self.version)
1087 .field("data_root", &self.data_root)
1088 .field("registry_len", &self.registry.len())
1089 .finish()
1090 }
1091}
1092
1093/// Handle a single MCP JSON-RPC message and produce its response.
1094///
1095/// Why: Pulled out of the stdio loop so unit tests can drive every method
1096/// without touching real stdin/stdout.
1097/// What: Routes `initialize`, `tools/list`, `tools/call`, `ping`, and the
1098/// `notifications/initialized` notification (which returns `Value::Null`).
1099/// Test: See unit tests below — initialize/list/call all return expected
1100/// JSON-RPC envelopes; notifications return `Null` (no response written).
1101pub async fn handle_message(state: &AppState, msg: Value) -> Value {
1102 let id = msg.get("id").cloned().unwrap_or(Value::Null);
1103 let method = msg.get("method").and_then(|m| m.as_str()).unwrap_or("");
1104
1105 match method {
1106 "initialize" => {
1107 let extra = state
1108 .default_palace
1109 .as_ref()
1110 .map(|dp| json!({ "default_palace": dp }));
1111 let result = initialize_response("trusty-memory", &state.version, extra);
1112 // Why (issue #42): prompt-facts now flow through the
1113 // per-message `get_prompt_context` tool rather than MCP
1114 // prompts, so we no longer advertise the `prompts` capability.
1115 json!({
1116 "jsonrpc": "2.0",
1117 "id": id,
1118 "result": result,
1119 })
1120 }
1121 // Notifications must NOT receive a response.
1122 "notifications/initialized" | "notifications/cancelled" => Value::Null,
1123 "tools/list" => json!({
1124 "jsonrpc": "2.0",
1125 "id": id,
1126 "result": tools::tool_definitions_with(state.default_palace.is_some())
1127 }),
1128 // OpenRPC 1.3.2 discovery — see `openrpc.rs`. Returns the full
1129 // service description so orchestrators (open-mpm, etc.) can
1130 // introspect every tool and its required `memory.read`/`memory.write`
1131 // scope without bespoke per-server adapters.
1132 "rpc.discover" => json!({
1133 "jsonrpc": "2.0",
1134 "id": id,
1135 "result": openrpc::build_discover_response(
1136 &state.version,
1137 state.default_palace.is_some(),
1138 ),
1139 }),
1140 "tools/call" => {
1141 let params = msg.get("params").cloned().unwrap_or_default();
1142 let tool_name = params
1143 .get("name")
1144 .and_then(|n| n.as_str())
1145 .unwrap_or("")
1146 .to_string();
1147 let args = params.get("arguments").cloned().unwrap_or_default();
1148 match tools::dispatch_tool(state, &tool_name, args).await {
1149 Ok(content) => {
1150 // Why: tools that return a bare JSON string (e.g.
1151 // `get_prompt_context` returning the formatted
1152 // Markdown block) should surface as plain text in the
1153 // MCP `content[0].text` field — wrapping in
1154 // `Value::to_string()` would re-quote the payload and
1155 // force every caller to strip outer quotes.
1156 let text = match &content {
1157 Value::String(s) => s.clone(),
1158 other => other.to_string(),
1159 };
1160 json!({
1161 "jsonrpc": "2.0",
1162 "id": id,
1163 "result": {
1164 "content": [{"type": "text", "text": text}]
1165 }
1166 })
1167 }
1168 Err(e) => json!({
1169 "jsonrpc": "2.0",
1170 "id": id,
1171 // Why: anyhow's `{:#}` alternate format walks the full
1172 // `Caused by:` chain so MCP clients see actionable
1173 // detail (e.g. "PalaceHandle::remember_with_options:
1174 // filter rejected: too short") instead of just the
1175 // outermost context label.
1176 "error": {"code": -32603, "message": format!("{e:#}")}
1177 }),
1178 }
1179 }
1180 "ping" => json!({"jsonrpc": "2.0", "id": id, "result": {}}),
1181 _ => json!({
1182 "jsonrpc": "2.0",
1183 "id": id,
1184 "error": {
1185 "code": -32601,
1186 "message": format!("Method not found: {method}")
1187 }
1188 }),
1189 }
1190}
1191
1192/// Preferred starting port for the trusty-memory HTTP daemon.
1193///
1194/// Why: keeps the well-known default stable for clients that have hard-coded
1195/// `127.0.0.1:7070` in their configuration, while still allowing dynamic
1196/// walking when the port is in use (`DYNAMIC_PORT_RANGE` ports starting here).
1197/// What: `7070` — historic default, matches the launchd plist's prior value.
1198/// Test: covered indirectly by `bind_dynamic_port_returns_listener`.
1199pub const DEFAULT_HTTP_PORT: u16 = 7070;
1200
1201/// Number of consecutive ports `bind_dynamic_port` walks before falling back
1202/// to the OS-assigned port. Matches the trusty-search convention.
1203const DYNAMIC_PORT_RANGE: u16 = 10;
1204
1205/// Path to the canonical address-discovery file for the trusty-memory daemon.
1206///
1207/// Why: clients (CLI, MCP tools, dashboards) need to find the running daemon
1208/// without configuration when the port was selected dynamically. Using
1209/// `trusty_common::resolve_data_dir` aligns this path with the location
1210/// that `trusty_common::read_daemon_addr("trusty-memory")` reads from, so
1211/// `prompt-context`, `doctor`, and `start`'s probe all find the running daemon.
1212/// The old `~/.trusty-memory/http_addr` path and the new
1213/// `~/Library/Application Support/trusty-memory/http_addr` (macOS) path were
1214/// divergent — the daemon wrote one; readers expected the other.
1215/// What: returns `{resolve_data_dir("trusty-memory")}/http_addr`, or `None` if
1216/// the data dir cannot be resolved (locked-down container, no passwd entry).
1217/// Test: `http_addr_path_uses_resolve_data_dir`.
1218pub fn http_addr_path() -> Option<PathBuf> {
1219 trusty_common::resolve_data_dir("trusty-memory")
1220 .ok()
1221 .map(|d| d.join("http_addr"))
1222}
1223
1224/// Bind a `TcpListener` to `127.0.0.1`, dynamically selecting a port.
1225///
1226/// Why: the historic default `7070` is convenient for clients but a stale
1227/// process or a second daemon must not produce a noisy failure. Walking
1228/// `DEFAULT_HTTP_PORT..DEFAULT_HTTP_PORT+DYNAMIC_PORT_RANGE` first preserves
1229/// backwards compatibility for the common case; OS-assigned fallback (`:0`)
1230/// guarantees the daemon always comes up even when every preferred port is
1231/// busy.
1232/// What: returns the first successful `TcpListener`. Tries 7070..=7079
1233/// in order, then falls back to OS-assigned. Caller inspects
1234/// `local_addr()` to learn the chosen port.
1235/// Test: `bind_dynamic_port_returns_listener` confirms it always binds *some*
1236/// port even after another listener occupies the preferred one.
1237pub async fn bind_dynamic_port() -> Result<tokio::net::TcpListener> {
1238 let preferred: SocketAddr = SocketAddr::from(([127, 0, 0, 1], DEFAULT_HTTP_PORT));
1239 // First: walk the preferred range (7070..=7079).
1240 if let Ok(listener) =
1241 trusty_common::bind_with_auto_port(preferred, DYNAMIC_PORT_RANGE - 1).await
1242 {
1243 return Ok(listener);
1244 }
1245 // Last resort: ask the kernel for any free port. `bind_with_auto_port`
1246 // with `:0` resolves immediately to the OS-assigned port.
1247 tracing::warn!(
1248 "all ports {DEFAULT_HTTP_PORT}..{} in use; requesting OS-assigned port",
1249 DEFAULT_HTTP_PORT + DYNAMIC_PORT_RANGE - 1
1250 );
1251 let any: SocketAddr = SocketAddr::from(([127, 0, 0, 1], 0));
1252 trusty_common::bind_with_auto_port(any, 0).await
1253}
1254
1255/// Write the bound `host:port` to `~/.trusty-memory/http_addr` atomically.
1256///
1257/// Why: clients must read the file mid-write without observing a partial
1258/// value. Writing to a `.tmp` sibling and renaming over the target gives
1259/// POSIX atomicity, matching the trusty-search implementation.
1260/// What: creates `~/.trusty-memory/` if missing; writes `addr` followed by a
1261/// trailing newline (avoids the "no newline at end of file" warnings from
1262/// `cat`); renames `.tmp` → `http_addr`. Best-effort: I/O errors are
1263/// returned to the caller so `run_http_on` can log without panicking.
1264/// Test: `http_addr_file_round_trip_via_helpers`.
1265#[cfg(feature = "axum-server")]
1266fn write_http_addr_file(path: &Path, addr: &SocketAddr) -> std::io::Result<()> {
1267 use std::io::Write;
1268 if let Some(parent) = path.parent() {
1269 std::fs::create_dir_all(parent)?;
1270 }
1271 let tmp = path.with_extension("addr.tmp");
1272 {
1273 let mut f = std::fs::File::create(&tmp)?;
1274 writeln!(f, "{addr}")?;
1275 f.sync_all()?;
1276 }
1277 std::fs::rename(&tmp, path)?;
1278 Ok(())
1279}
1280
1281/// Run the optional HTTP/SSE + web admin server.
1282///
1283/// Why: A long-running daemon mode lets non-stdio clients (browsers, curl,
1284/// future remote agents) hit `/health`, the `/api/v1/*` REST surface, and the
1285/// embedded admin SPA.
1286/// What: axum router built from `web::router()` plus a `/sse` stub for the
1287/// existing MCP-over-SSE clients. Caller provides a pre-bound listener so
1288/// port auto-detection lives at the call site. Before accepting connections
1289/// the daemon stamps the bound `host:port` onto `AppState.bound_addr` and
1290/// writes `~/.trusty-memory/http_addr` so clients can discover the live port.
1291/// On shutdown the file is removed best-effort (a stale file with the wrong
1292/// port is worse than a missing one).
1293/// Test: `cargo test -p trusty-memory web::tests` exercises the router shape;
1294/// manual: `curl http://127.0.0.1:<port>/health` returns `ok` with `addr`.
1295#[cfg(feature = "axum-server")]
1296pub async fn run_http_on(state: AppState, listener: tokio::net::TcpListener) -> Result<()> {
1297 use axum::routing::get;
1298
1299 // Issue #35: recompute the `data_root` disk footprint every 10 s on a
1300 // background task so `GET /health` reports `disk_bytes` without doing a
1301 // recursive directory walk on the request path.
1302 spawn_disk_size_ticker(state.clone());
1303
1304 // Issue #228: emit aggregate `StatusChanged` on a fixed cadence rather
1305 // than on every drawer write. The previous design called
1306 // `aggregate_status_event` from every `memory_remember` / `memory_note`
1307 // / `memory_forget` (and the matching HTTP handlers), each of which
1308 // walked the data root + opened every palace handle. Coalescing the
1309 // emit to a 30 s ticker keeps dashboards live without dragging an
1310 // O(N palaces) recompute onto the write hot path.
1311 spawn_status_event_ticker(state.clone());
1312
1313 // Capture and advertise the bound address BEFORE serving so the first
1314 // request handler — and the http_addr discovery file — see the real port
1315 // even if `local_addr()` would otherwise be racy.
1316 let local = listener.local_addr().ok();
1317 let written_path = if let Some(a) = local {
1318 // Stash on state for handlers (e.g. /health) to surface.
1319 let _ = state.bound_addr.set(a);
1320 info!("HTTP server listening on http://{a}");
1321 eprintln!("HTTP server listening on http://{a}");
1322 // Best-effort: a missing $HOME or read-only fs is non-fatal — the
1323 // /health endpoint still advertises `addr`. Logging the failure
1324 // helps operators diagnose discovery problems.
1325 match http_addr_path() {
1326 Some(p) => match write_http_addr_file(&p, &a) {
1327 Ok(()) => {
1328 info!("wrote daemon address to {}", p.display());
1329 Some(p)
1330 }
1331 Err(e) => {
1332 tracing::warn!("could not write {}: {e}", p.display());
1333 None
1334 }
1335 },
1336 None => {
1337 tracing::warn!("no $HOME — skipping http_addr discovery file");
1338 None
1339 }
1340 }
1341 } else {
1342 None
1343 };
1344
1345 // Multi-transport refactor: bind the Unix domain socket alongside
1346 // the HTTP listener. The UDS serves NDJSON JSON-RPC 2.0 for the
1347 // `trusty-memory-mcp-bridge` binary (and any local CLI that wants
1348 // to skip HTTP overhead). Failures are logged but never block the
1349 // HTTP server from coming up — UDS is best-effort on hosts where
1350 // it's unsupported (e.g. some Docker overlays).
1351 let uds_sock_path = spawn_uds_listener(state.clone()).await;
1352
1353 // Keep a handle to the BM25 supervisor (if any) so we can call
1354 // `shutdown()` on the exit path. Cloning here is cheap (`Arc`) and
1355 // detaches the lifetime of the supervisor from the `state` move into
1356 // the router below.
1357 let bm25_supervisor = state.bm25_supervisor.clone();
1358
1359 let app = web::router()
1360 .route("/sse", get(sse_handler))
1361 .with_state(state);
1362
1363 let serve_result = axum::serve(listener, app).await;
1364
1365 // Best-effort cleanup: remove `http_addr` so stale clients fail fast
1366 // instead of timing out against a dead port.
1367 if let Some(p) = written_path.as_ref() {
1368 let _ = std::fs::remove_file(p);
1369 }
1370 if let Some(p) = uds_sock_path.as_ref() {
1371 let _ = std::fs::remove_file(p);
1372 }
1373
1374 // Issue #193: gracefully reap every spawned BM25 daemon before the
1375 // process exits so each one gets a chance to flush its snapshot and
1376 // unlink its socket. `kill_on_drop=true` on the children would
1377 // SIGKILL them on Drop anyway, but that skips the daemon's own
1378 // shutdown sequence and leaves stale sockets behind.
1379 if let Some(supervisor) = bm25_supervisor {
1380 supervisor.shutdown().await;
1381 }
1382
1383 serve_result?;
1384 Ok(())
1385}
1386
1387/// Spawn the UDS accept loop alongside the HTTP server.
1388///
1389/// Why: UDS is an additive transport — failing to bind it (unusual
1390/// $TMPDIR layout, permission error on macOS) should not block the
1391/// HTTP daemon from coming up. Logging the failure and returning
1392/// `None` lets the caller skip cleanup later.
1393/// What: resolves [`transport::uds::socket_path`], cleans any stale
1394/// file, binds, writes the `<data_root>/uds_addr` discovery file, and
1395/// spawns the accept loop on a background tokio task. Returns the
1396/// bound path so the caller can clean it up on shutdown.
1397/// Test: covered by `uds_ndjson_roundtrip` in the integration tests
1398/// and the unit tests in [`transport::uds`].
1399#[cfg(feature = "axum-server")]
1400async fn spawn_uds_listener(state: AppState) -> Option<PathBuf> {
1401 // Use a data-root-scoped socket path so multiple daemons (typical
1402 // in tests) don't collide on the shared `$TMPDIR/trusty-memory.sock`.
1403 // Production daemons (those rooted at the canonical data dir) still
1404 // get the canonical socket path so the bridge can find it without
1405 // reading the discovery file.
1406 let sock_path = transport::uds::socket_path_for(&state.data_root);
1407 let listener = match transport::uds::bind_uds(&sock_path).await {
1408 Ok(l) => l,
1409 Err(e) => {
1410 tracing::warn!(
1411 "UDS bind at {} failed: {e:#}; continuing without UDS transport",
1412 sock_path.display()
1413 );
1414 return None;
1415 }
1416 };
1417 info!("UDS listener bound at {}", sock_path.display());
1418 eprintln!("UDS listener bound at {}", sock_path.display());
1419 // Best-effort: write the address discovery file so the bridge can
1420 // find the live socket even when the daemon was started with an
1421 // unusual $TMPDIR.
1422 if let Err(e) = transport::uds::write_uds_addr_file(&state.data_root, &sock_path) {
1423 tracing::warn!(
1424 "could not write {}/{}: {e:#}",
1425 state.data_root.display(),
1426 transport::uds::UDS_ADDR_FILE
1427 );
1428 }
1429 let task_state = state.clone();
1430 tokio::spawn(async move {
1431 if let Err(e) = transport::uds::run_uds(task_state, listener).await {
1432 tracing::error!("UDS accept loop exited: {e:#}");
1433 }
1434 });
1435 Some(sock_path)
1436}
1437
1438/// Convenience: bind `addr` and serve via [`run_http_on`].
1439#[cfg(feature = "axum-server")]
1440pub async fn run_http(state: AppState, addr: std::net::SocketAddr) -> Result<()> {
1441 let listener = tokio::net::TcpListener::bind(addr).await?;
1442 run_http_on(state, listener).await
1443}
1444
1445/// Convenience: bind dynamically (7070..=7079, OS fallback) and serve.
1446///
1447/// Why: `trusty-memory serve` with no `--http` flag is the canonical
1448/// launchd-managed daemon entry point. Dynamic binding lets a stale daemon
1449/// or a hand-spawned `serve --http 127.0.0.1:7070` coexist without breaking
1450/// the launchd-managed instance.
1451/// What: calls [`bind_dynamic_port`] then [`run_http_on`].
1452/// Test: integration via `trusty-memory serve` + `cat ~/.trusty-memory/http_addr`.
1453#[cfg(feature = "axum-server")]
1454pub async fn run_http_dynamic(state: AppState) -> Result<()> {
1455 let listener = bind_dynamic_port().await?;
1456 run_http_on(state, listener).await
1457}
1458
1459/// Spawn a background ticker that recomputes the `data_root` disk footprint
1460/// every 10 seconds and stores it in `state.disk_bytes` (issue #35).
1461///
1462/// Why: `GET /health` reports `disk_bytes`. Walking the data directory on
1463/// every health request would turn a frequent health poll into unbounded
1464/// recursive I/O. Computing it off the request path on a fixed cadence keeps
1465/// `/health` cheap and bounds the staleness to ~10 s — fine for an
1466/// at-a-glance footprint figure.
1467/// What: spawns a detached tokio task. `AppState` is cheap to `Clone` (all
1468/// `Arc` fields), so the task holds a full clone; the daemon process lives
1469/// for the lifetime of the server anyway, so no `Weak` downgrade is needed.
1470/// Each tick runs the blocking directory walk on `spawn_blocking` so it never
1471/// stalls the async runtime, then stores the byte total atomically.
1472/// Test: `health_endpoint_includes_resource_fields` asserts the field shape;
1473/// the ticker cadence is not unit-tested (timing-dependent).
1474#[cfg(feature = "axum-server")]
1475fn spawn_disk_size_ticker(state: AppState) {
1476 tokio::spawn(async move {
1477 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1478 loop {
1479 interval.tick().await;
1480 let dir = state.data_root.clone();
1481 // The directory walk is blocking filesystem I/O — run it on the
1482 // blocking pool so it never parks an async worker thread.
1483 let bytes = tokio::task::spawn_blocking(move || {
1484 trusty_common::sys_metrics::dir_size_bytes(&dir)
1485 })
1486 .await
1487 .unwrap_or(0);
1488 state
1489 .disk_bytes
1490 .store(bytes, std::sync::atomic::Ordering::Relaxed);
1491 }
1492 });
1493}
1494
1495/// Interval between aggregate-status snapshot emits on the SSE bus.
1496///
1497/// Why (issue #228): mutations used to fire `StatusChanged` synchronously on
1498/// the write path, which forced an O(N palaces) sum of drawer / vector / KG
1499/// counts on every `memory_remember`. Coalescing into a fixed-cadence ticker
1500/// lets dashboards stay current (a 30 s lag is invisible at human scale)
1501/// while keeping the write path free of aggregate work.
1502/// What: 30 seconds — short enough that the operator UI doesn't feel stale
1503/// between manual writes, long enough that the recompute cost (in-memory
1504/// registry walk plus the redb `count_active_triples` per palace) is a
1505/// rounding error on the daemon's CPU budget.
1506/// Test: covered indirectly — the math has not changed, only the cadence.
1507const STATUS_EVENT_TICK_SECS: u64 = 30;
1508
1509/// Spawn a background ticker that emits `DaemonEvent::StatusChanged` every
1510/// [`STATUS_EVENT_TICK_SECS`] seconds (issue #228).
1511///
1512/// Why: replaces the per-write `state.emit(self.aggregate_status_event())`
1513/// call sites that used to recompute the aggregate every time a drawer was
1514/// created or deleted. Walking N palaces on every write blocks the async
1515/// runtime; coalescing the emit onto a ticker keeps dashboards up-to-date
1516/// without that cost.
1517/// What: spawns a detached tokio task that holds a full `AppState` clone
1518/// (cheap — every field is `Arc`-backed) and ticks every
1519/// [`STATUS_EVENT_TICK_SECS`] seconds. Each tick computes
1520/// `MemoryService::aggregate_status_event` (which now iterates the
1521/// in-memory registry, not disk) and broadcasts it via `state.emit`. If
1522/// no SSE subscribers are connected the broadcast `send` is a cheap no-op,
1523/// so the ticker imposes no cost when nobody is listening.
1524/// Test: not unit-tested (timing-dependent fire-and-forget); the underlying
1525/// `aggregate_status_event` math is exercised by the existing
1526/// `status_endpoint_returns_payload` path.
1527fn spawn_status_event_ticker(state: AppState) {
1528 tokio::spawn(async move {
1529 let mut interval =
1530 tokio::time::interval(std::time::Duration::from_secs(STATUS_EVENT_TICK_SECS));
1531 // The first tick fires immediately, which is fine: it gives SSE
1532 // subscribers a baseline `StatusChanged` shortly after they connect.
1533 loop {
1534 interval.tick().await;
1535 let event = service::MemoryService::new(state.clone()).aggregate_status_event();
1536 state.emit(event);
1537 }
1538 });
1539}
1540
1541/// Live SSE event stream — pushes `DaemonEvent` frames to dashboard clients.
1542///
1543/// Why: The dashboard subscribes once and reacts to live pushes (palace
1544/// created, drawer added/deleted, dream completed, status changed) instead of
1545/// polling `/api/v1/*` endpoints.
1546/// What: Subscribes to `state.events`, emits an initial `connected` frame,
1547/// then forwards every `DaemonEvent` as `data: <json>\n\n`. Lagged
1548/// subscribers receive a `lag` frame indicating skipped events; channel
1549/// closure ends the stream.
1550/// Test: `web::tests::sse_stream_emits_palace_created` (covers subscribe +
1551/// emit + receive); manual: `curl -N http://.../sse`.
1552#[cfg(feature = "axum-server")]
1553pub(crate) async fn sse_handler(
1554 axum::extract::State(state): axum::extract::State<AppState>,
1555) -> impl axum::response::IntoResponse {
1556 use futures::StreamExt;
1557 use tokio_stream::wrappers::BroadcastStream;
1558
1559 let rx = state.events.subscribe();
1560 let initial = futures::stream::once(async {
1561 Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(
1562 "data: {\"type\":\"connected\"}\n\n",
1563 ))
1564 });
1565 let events = BroadcastStream::new(rx).map(|res| {
1566 let frame = match res {
1567 Ok(event) => match serde_json::to_string(&event) {
1568 Ok(json) => format!("data: {json}\n\n"),
1569 Err(e) => format!("data: {{\"type\":\"error\",\"message\":\"{e}\"}}\n\n"),
1570 },
1571 Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
1572 format!("data: {{\"type\":\"lag\",\"skipped\":{n}}}\n\n")
1573 }
1574 };
1575 Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(frame))
1576 });
1577 let stream = initial.chain(events);
1578
1579 axum::response::Response::builder()
1580 .header("Content-Type", "text/event-stream")
1581 .header("Cache-Control", "no-cache")
1582 .header("X-Accel-Buffering", "no")
1583 .body(axum::body::Body::from_stream(stream))
1584 .expect("valid SSE response")
1585}
1586
1587#[cfg(test)]
1588mod tests {
1589 use super::*;
1590
1591 /// Why: Issue #234 — previously we `mem::forget`ed the `TempDir` so tests
1592 /// could keep using `AppState` without juggling the directory handle, but
1593 /// that leaked one temp directory per test (262+ accumulated each run).
1594 /// What: Returns the `TempDir` alongside the `AppState` so the caller can
1595 /// bind it (`let (state, _tmp) = ...;`) and let drop semantics clean up
1596 /// when the test scope ends.
1597 /// Test: Every test in this module that constructs state.
1598 fn test_state() -> (AppState, tempfile::TempDir) {
1599 let tmp = tempfile::tempdir().expect("tempdir");
1600 let root = tmp.path().to_path_buf();
1601 (AppState::new(root), tmp)
1602 }
1603
1604 #[tokio::test]
1605 async fn initialize_returns_protocol_version_and_capabilities() {
1606 let (state, _tmp) = test_state();
1607 let req = json!({
1608 "jsonrpc": "2.0",
1609 "id": 1,
1610 "method": "initialize",
1611 "params": {
1612 "protocolVersion": "2024-11-05",
1613 "capabilities": {},
1614 "clientInfo": {"name": "test", "version": "0"}
1615 }
1616 });
1617 let resp = handle_message(&state, req).await;
1618 assert_eq!(resp["jsonrpc"], "2.0");
1619 assert_eq!(resp["id"], 1);
1620 assert_eq!(resp["result"]["protocolVersion"], "2024-11-05");
1621 assert!(resp["result"]["capabilities"]["tools"].is_object());
1622 assert_eq!(resp["result"]["serverInfo"]["name"], "trusty-memory");
1623 }
1624
1625 #[tokio::test]
1626 async fn initialized_notification_returns_null() {
1627 let (state, _tmp) = test_state();
1628 let req = json!({
1629 "jsonrpc": "2.0",
1630 "method": "notifications/initialized",
1631 "params": {}
1632 });
1633 let resp = handle_message(&state, req).await;
1634 assert!(resp.is_null());
1635 }
1636
1637 #[tokio::test]
1638 async fn tools_list_returns_all_tools() {
1639 let (state, _tmp) = test_state();
1640 let req = json!({"jsonrpc": "2.0", "id": 2, "method": "tools/list"});
1641 let resp = handle_message(&state, req).await;
1642 let tools = resp["result"]["tools"].as_array().expect("tools array");
1643 // Issue #99 added `memory_send_message`; issue #180 added
1644 // `palace_delete`; the #180 follow-up adds `palace_update` on top
1645 // of the 22-tool baseline.
1646 assert_eq!(tools.len(), 23);
1647 }
1648
1649 #[tokio::test]
1650 async fn unknown_method_returns_error() {
1651 let (state, _tmp) = test_state();
1652 let req = json!({"jsonrpc": "2.0", "id": 4, "method": "wat"});
1653 let resp = handle_message(&state, req).await;
1654 assert_eq!(resp["error"]["code"], -32601);
1655 }
1656
1657 #[tokio::test]
1658 async fn ping_returns_empty_result() {
1659 let (state, _tmp) = test_state();
1660 let req = json!({"jsonrpc": "2.0", "id": 5, "method": "ping"});
1661 let resp = handle_message(&state, req).await;
1662 assert!(resp["result"].is_object());
1663 }
1664
1665 #[tokio::test]
1666 async fn app_state_default_constructs() {
1667 let (s, _tmp) = test_state();
1668 assert!(!s.version.is_empty());
1669 assert!(s.registry.is_empty());
1670 assert!(s.default_palace.is_none());
1671 }
1672
1673 /// Why (issue #225): the previous implementation called `.expect()` on the
1674 /// tempdir fallback, which panicked the daemon at startup on hosts where
1675 /// neither the data root nor `std::env::temp_dir()` is writable
1676 /// (read-only Docker overlays, locked-down sandboxes). The activity log
1677 /// is documented as best-effort, so the fix returns a no-op `Discard`
1678 /// variant instead. This test forces both paths to fail and asserts the
1679 /// helper returns the discard variant rather than panicking.
1680 ///
1681 /// Skipped when running as root because `chmod 000` is a no-op for the
1682 /// root user — the kernel grants root access regardless of mode bits.
1683 /// CI typically runs as non-root, so coverage is preserved in the
1684 /// common case; local root invocations simply skip with a warning.
1685 #[test]
1686 #[cfg(unix)]
1687 fn open_activity_log_with_fallback_returns_discard_when_unwritable() {
1688 // Skip when running as root — chmod is ignored.
1689 // SAFETY: libc::geteuid is a thread-safe syscall with no preconditions.
1690 if unsafe { libc::geteuid() } == 0 {
1691 eprintln!(
1692 "skipping open_activity_log_with_fallback_returns_discard_when_unwritable: running as root"
1693 );
1694 return;
1695 }
1696
1697 use std::os::unix::fs::PermissionsExt;
1698
1699 // Build two unwritable directories: the primary "data root" and a
1700 // shadow "TMPDIR" so the tempdir fallback also fails.
1701 let outer = tempfile::tempdir().expect("outer tempdir");
1702 let primary = outer.path().join("primary");
1703 let tmpdir = outer.path().join("fake-tmp");
1704 std::fs::create_dir(&primary).expect("create primary");
1705 std::fs::create_dir(&tmpdir).expect("create tmpdir");
1706
1707 // chmod 000 on both — neither can be opened for write.
1708 std::fs::set_permissions(&primary, std::fs::Permissions::from_mode(0o000))
1709 .expect("chmod primary");
1710 std::fs::set_permissions(&tmpdir, std::fs::Permissions::from_mode(0o000))
1711 .expect("chmod tmpdir");
1712
1713 // Override the tempdir lookup so `open_activity_log_with_fallback`
1714 // hits our unwritable fake-tmp instead of the real system temp.
1715 // Note: env var mutation is process-global; this test is the only
1716 // accessor for `TMPDIR` in this test binary, and we restore the
1717 // previous value before returning.
1718 let prev_tmpdir = std::env::var_os("TMPDIR");
1719 std::env::set_var("TMPDIR", &tmpdir);
1720
1721 let log = open_activity_log_with_fallback(&primary);
1722
1723 // Restore TMPDIR ASAP so a panic later in the test doesn't leak it.
1724 match prev_tmpdir {
1725 Some(v) => std::env::set_var("TMPDIR", v),
1726 None => std::env::remove_var("TMPDIR"),
1727 }
1728
1729 // Restore permissions so the outer tempdir can clean up.
1730 let _ = std::fs::set_permissions(&primary, std::fs::Permissions::from_mode(0o700));
1731 let _ = std::fs::set_permissions(&tmpdir, std::fs::Permissions::from_mode(0o700));
1732
1733 assert!(
1734 log.is_discard(),
1735 "expected ActivityLog::Discard when both data root and tempdir are unwritable"
1736 );
1737
1738 // The Discard variant must still satisfy the public contract: no
1739 // panic on append/count/list.
1740 let id = log
1741 .append(
1742 ActivitySource::Http,
1743 None,
1744 "drawer_added",
1745 json!({"smoke": true}),
1746 )
1747 .expect("discard append must succeed");
1748 assert_eq!(id, 0);
1749 assert_eq!(log.count().expect("discard count"), 0);
1750 assert!(log
1751 .list(&ActivityFilter::default(), 10, 0)
1752 .expect("discard list")
1753 .is_empty());
1754 }
1755
1756 /// Why: Issue #26 — when `serve --palace <name>` is set, the MCP server
1757 /// must (a) report the default in the `initialize` `serverInfo`, (b)
1758 /// drop `palace` from the required schema in `tools/list`, and (c) let
1759 /// `tools/call` use the default when the caller omits `palace`.
1760 /// Test: Construct an AppState with a default palace, create that palace
1761 /// on disk via the registry, then call `memory_remember` without a
1762 /// `palace` argument and confirm it resolves to the default.
1763 #[tokio::test]
1764 async fn default_palace_used_when_arg_omitted() {
1765 let tmp = tempfile::tempdir().expect("tempdir");
1766 let root = tmp.path().to_path_buf();
1767
1768 // Pre-create the default palace so remember has somewhere to land.
1769 let registry = trusty_common::memory_core::PalaceRegistry::new();
1770 let palace = trusty_common::memory_core::Palace {
1771 id: trusty_common::memory_core::PalaceId::new("default-pal"),
1772 name: "default-pal".to_string(),
1773 description: None,
1774 created_at: chrono::Utc::now(),
1775 data_dir: root.join("default-pal"),
1776 };
1777 registry
1778 .create_palace(&root, palace)
1779 .expect("create_palace");
1780
1781 let state = AppState::new(root).with_default_palace(Some("default-pal".to_string()));
1782
1783 // (a) initialize advertises the default.
1784 let init = handle_message(
1785 &state,
1786 json!({"jsonrpc": "2.0", "id": 1, "method": "initialize"}),
1787 )
1788 .await;
1789 assert_eq!(
1790 init["result"]["serverInfo"]["default_palace"], "default-pal",
1791 "initialize must echo default_palace in serverInfo"
1792 );
1793
1794 // (b) tools/list drops `palace` from required when default is set.
1795 let list = handle_message(
1796 &state,
1797 json!({"jsonrpc": "2.0", "id": 2, "method": "tools/list"}),
1798 )
1799 .await;
1800 let tools = list["result"]["tools"].as_array().expect("tools array");
1801 let remember = tools
1802 .iter()
1803 .find(|t| t["name"] == "memory_remember")
1804 .expect("memory_remember tool");
1805 let required: Vec<&str> = remember["inputSchema"]["required"]
1806 .as_array()
1807 .expect("required array")
1808 .iter()
1809 .filter_map(|v| v.as_str())
1810 .collect();
1811 assert!(
1812 !required.contains(&"palace"),
1813 "palace must not be required when default is configured; got {required:?}"
1814 );
1815 assert!(required.contains(&"text"));
1816
1817 // (c) tools/call resolves the default when arg is omitted.
1818 let call = handle_message(
1819 &state,
1820 json!({
1821 "jsonrpc": "2.0",
1822 "id": 3,
1823 "method": "tools/call",
1824 "params": {
1825 "name": "memory_remember",
1826 "arguments": {"text": "default palace test memory content with several tokens"},
1827 },
1828 }),
1829 )
1830 .await;
1831 // Successful dispatch returns `result.content[0].text` JSON.
1832 let text = call["result"]["content"][0]["text"]
1833 .as_str()
1834 .unwrap_or_else(|| panic!("expected success result, got {call}"));
1835 let parsed: Value = serde_json::from_str(text).expect("parse content json");
1836 assert_eq!(parsed["palace"], "default-pal");
1837 assert_eq!(parsed["status"], "stored");
1838 assert!(parsed["drawer_id"].as_str().is_some());
1839 }
1840
1841 /// Why: When no default is set, `tools/call` for a palace-bound tool
1842 /// without a `palace` argument should error helpfully rather than panic.
1843 #[tokio::test]
1844 async fn missing_palace_without_default_errors() {
1845 let (state, _tmp) = test_state();
1846 let resp = handle_message(
1847 &state,
1848 json!({
1849 "jsonrpc": "2.0",
1850 "id": 7,
1851 "method": "tools/call",
1852 "params": {
1853 "name": "memory_recall",
1854 "arguments": {"query": "anything"},
1855 },
1856 }),
1857 )
1858 .await;
1859 assert_eq!(resp["error"]["code"], -32603);
1860 let msg = resp["error"]["message"].as_str().unwrap_or("");
1861 assert!(
1862 msg.contains("missing 'palace'"),
1863 "expected helpful error, got: {msg}"
1864 );
1865 }
1866
1867 /// Why: regression for the "palaces lost on restart" bug — `AppState::new`
1868 /// builds an empty registry, so the daemon must call
1869 /// `load_palaces_from_disk` on startup to re-register palaces persisted by
1870 /// a previous run. Without that call the registry stays empty even though
1871 /// `palace.json` files exist on disk.
1872 /// What: persists two palaces under a tempdir (via the same
1873 /// `create_palace` path the `palace_create` tool uses), constructs a fresh
1874 /// `AppState` rooted there, calls `load_palaces_from_disk`, and asserts the
1875 /// returned count and registry contents.
1876 /// Test: this test itself.
1877 #[tokio::test]
1878 async fn load_palaces_from_disk_rehydrates_registry() {
1879 use trusty_common::memory_core::{Palace, PalaceId, PalaceRegistry};
1880
1881 let tmp = tempfile::tempdir().expect("tempdir");
1882 let root = tmp.path().to_path_buf();
1883
1884 // Phase 1: persist two palaces to disk, then drop the writer registry
1885 // so nothing is held in memory — simulating a prior daemon run.
1886 {
1887 let writer = PalaceRegistry::new();
1888 for id in ["alpha", "beta"] {
1889 let palace = Palace {
1890 id: PalaceId::new(id),
1891 name: id.to_string(),
1892 description: None,
1893 created_at: chrono::Utc::now(),
1894 data_dir: root.join(id),
1895 };
1896 writer
1897 .create_palace(&root, palace)
1898 .expect("persist palace to disk");
1899 }
1900 }
1901
1902 // Add a stray non-palace subdirectory; the walker must ignore it.
1903 std::fs::create_dir_all(root.join("not-a-palace")).expect("mkdir");
1904
1905 // Phase 2: fresh AppState starts with an empty registry (the bug).
1906 let state = AppState::new(root);
1907 assert!(
1908 state.registry.is_empty(),
1909 "AppState::new must start with an empty registry"
1910 );
1911
1912 // The fix: hydrate from disk.
1913 let count = state
1914 .load_palaces_from_disk()
1915 .await
1916 .expect("load_palaces_from_disk");
1917
1918 assert_eq!(count, 2, "both persisted palaces should be loaded");
1919 assert_eq!(state.registry.len(), 2, "registry should hold both palaces");
1920 let ids: Vec<String> = state.registry.list().into_iter().map(|p| p.0).collect();
1921 assert!(ids.contains(&"alpha".to_string()));
1922 assert!(ids.contains(&"beta".to_string()));
1923 }
1924
1925 /// Why: existing installs (and the legacy standalone `trusty-memory` repo)
1926 /// nest palaces one level deeper under a `palaces/` subdirectory. When that
1927 /// subdirectory exists, `resolve_palace_registry_dir` must descend into it
1928 /// so the daemon scans the level that actually holds the `palace.json`
1929 /// files — otherwise it finds zero palaces, which is the restart bug.
1930 /// What: creates `<dir>/palaces/`, resolves, and asserts the nested path is
1931 /// returned.
1932 /// Test: this test itself.
1933 #[test]
1934 fn resolve_palace_registry_dir_prefers_palaces_subdir() {
1935 let tmp = tempfile::tempdir().expect("tempdir");
1936 let data_dir = tmp.path().to_path_buf();
1937 std::fs::create_dir_all(data_dir.join("palaces")).expect("mkdir palaces");
1938
1939 let resolved = resolve_palace_registry_dir(data_dir.clone());
1940 assert_eq!(resolved, data_dir.join("palaces"));
1941 }
1942
1943 /// Why: a fresh install with no `palaces/` subdirectory must fall back to
1944 /// the data dir itself (the current flat monorepo layout).
1945 #[test]
1946 fn resolve_palace_registry_dir_falls_back_to_data_dir() {
1947 let tmp = tempfile::tempdir().expect("tempdir");
1948 let data_dir = tmp.path().to_path_buf();
1949
1950 let resolved = resolve_palace_registry_dir(data_dir.clone());
1951 assert_eq!(resolved, data_dir);
1952 }
1953
1954 /// Why: end-to-end check that the nested-`palaces/` layout hydrates — the
1955 /// daemon resolves the registry dir via `resolve_palace_registry_dir`, so
1956 /// an `AppState` rooted there must load palaces persisted one level below
1957 /// the bare data dir.
1958 /// What: persists two palaces under `<root>/palaces/<id>/`, constructs an
1959 /// `AppState` rooted at the resolved registry dir, and asserts hydration
1960 /// finds both.
1961 /// Test: this test itself.
1962 #[tokio::test]
1963 async fn load_palaces_from_disk_handles_palaces_subdir() {
1964 use trusty_common::memory_core::{Palace, PalaceId, PalaceRegistry};
1965
1966 let tmp = tempfile::tempdir().expect("tempdir");
1967 let root = tmp.path().to_path_buf();
1968 let nested = root.join("palaces");
1969
1970 {
1971 let writer = PalaceRegistry::new();
1972 for id in ["cto", "engineering"] {
1973 let palace = Palace {
1974 id: PalaceId::new(id),
1975 name: id.to_string(),
1976 description: None,
1977 created_at: chrono::Utc::now(),
1978 data_dir: nested.join(id),
1979 };
1980 // create_palace anchors data_dir under the passed root, so
1981 // pass `nested` here to land palaces under `<root>/palaces/`.
1982 writer
1983 .create_palace(&nested, palace)
1984 .expect("persist palace under palaces/ subdir");
1985 }
1986 }
1987
1988 // Mirror main.rs: resolve the registry dir, then root AppState there.
1989 let registry_dir = resolve_palace_registry_dir(root);
1990 assert_eq!(registry_dir, nested, "must resolve into palaces/ subdir");
1991 let state = AppState::new(registry_dir);
1992 let count = state
1993 .load_palaces_from_disk()
1994 .await
1995 .expect("load_palaces_from_disk");
1996
1997 assert_eq!(count, 2, "both nested palaces should be loaded");
1998 assert_eq!(state.registry.len(), 2);
1999 let ids: Vec<String> = state.registry.list().into_iter().map(|p| p.0).collect();
2000 assert!(ids.contains(&"cto".to_string()));
2001 assert!(ids.contains(&"engineering".to_string()));
2002 }
2003
2004 /// Why: an empty (or missing) palace registry directory must not error — a
2005 /// brand-new install has nothing to hydrate and should report zero.
2006 #[tokio::test]
2007 async fn load_palaces_from_disk_empty_root_returns_zero() {
2008 let (state, _tmp) = test_state();
2009 let count = state
2010 .load_palaces_from_disk()
2011 .await
2012 .expect("load_palaces_from_disk on empty root");
2013 assert_eq!(count, 0);
2014 assert!(state.registry.is_empty());
2015 }
2016
2017 /// Why (issue #228): hydration must seed `state.palace_names` so the
2018 /// MCP write hot path (`memory_remember` / `memory_note`) can resolve a
2019 /// friendly palace name without re-walking the data root on every call.
2020 /// Regression risk: a future refactor that forgets to populate the cache
2021 /// would silently degrade write latency.
2022 /// What: persists two palaces with distinct `name` values, constructs a
2023 /// fresh `AppState`, hydrates from disk, and asserts the cache holds the
2024 /// expected mappings.
2025 /// Test: this test itself.
2026 #[tokio::test]
2027 async fn palace_name_cache_populated_after_hydration() {
2028 use trusty_common::memory_core::{Palace, PalaceId, PalaceRegistry};
2029
2030 let tmp = tempfile::tempdir().expect("tempdir");
2031 let root = tmp.path().to_path_buf();
2032 {
2033 let writer = PalaceRegistry::new();
2034 for (id, name) in [("alpha", "Alpha Project"), ("beta", "Beta Project")] {
2035 let palace = Palace {
2036 id: PalaceId::new(id),
2037 name: name.to_string(),
2038 description: None,
2039 created_at: chrono::Utc::now(),
2040 data_dir: root.join(id),
2041 };
2042 writer.create_palace(&root, palace).expect("persist palace");
2043 }
2044 }
2045
2046 let state = AppState::new(root);
2047 assert!(
2048 state.palace_names.is_empty(),
2049 "fresh AppState must start with an empty name cache"
2050 );
2051 state
2052 .load_palaces_from_disk()
2053 .await
2054 .expect("load_palaces_from_disk");
2055
2056 assert_eq!(state.palace_names.len(), 2, "cache must hold both palaces");
2057 assert_eq!(
2058 state.palace_names.get("alpha").map(|e| e.value().clone()),
2059 Some("Alpha Project".to_string()),
2060 );
2061 assert_eq!(
2062 state.palace_names.get("beta").map(|e| e.value().clone()),
2063 Some("Beta Project".to_string()),
2064 );
2065 }
2066
2067 /// Why (issue #228): `palace_create` (MCP tool) and `MemoryService::create_palace`
2068 /// (HTTP path) both insert into the name cache so a freshly-created palace
2069 /// is resolvable on the very next write — without waiting for the next
2070 /// hydration cycle.
2071 /// What: dispatches the `palace_create` MCP tool against a tempdir and
2072 /// asserts the cache row was written.
2073 /// Test: this test itself.
2074 #[tokio::test]
2075 async fn palace_name_cache_updates_on_create() {
2076 use serde_json::json;
2077
2078 let (state, _tmp) = test_state();
2079 let _ = tools::dispatch_tool(&state, "palace_create", json!({"name": "gamma"}))
2080 .await
2081 .expect("palace_create");
2082 assert_eq!(
2083 state.palace_names.get("gamma").map(|e| e.value().clone()),
2084 Some("gamma".to_string()),
2085 "palace_create must populate the in-memory name cache so writes \
2086 can resolve the friendly name without a disk walk"
2087 );
2088 }
2089
2090 /// Why: initialize without a default palace must omit `default_palace`
2091 /// from `serverInfo` so clients can detect the unbound mode.
2092 #[tokio::test]
2093 async fn initialize_without_default_palace_omits_field() {
2094 let (state, _tmp) = test_state();
2095 let init = handle_message(
2096 &state,
2097 json!({"jsonrpc": "2.0", "id": 1, "method": "initialize"}),
2098 )
2099 .await;
2100 assert!(init["result"]["serverInfo"]["default_palace"].is_null());
2101 }
2102
2103 /// Why: every `~/.trusty-memory/http_addr` consumer (CLI, dashboard,
2104 /// future trusty-mpm wiring) must agree on the path. A regression that
2105 /// moves this file breaks every client relying on `read_daemon_addr`.
2106 /// What: under a stubbed data dir, the path ends in
2107 /// `trusty-memory/http_addr` — matching `trusty_common::read_daemon_addr`'s
2108 /// expected location.
2109 #[tokio::test]
2110 async fn http_addr_path_uses_resolve_data_dir() {
2111 // Hold the env_test_lock so this test does not race with
2112 // `prompt_context::tests::*` which spin a real daemon under
2113 // the same env override and would otherwise observe a
2114 // half-mutated $TRUSTY_DATA_DIR_OVERRIDE.
2115 let _guard = crate::commands::env_test_lock().lock().await;
2116 let tmp = tempfile::tempdir().unwrap();
2117 // SAFETY: test-only env mutation serialised by env_test_lock.
2118 unsafe {
2119 std::env::set_var(trusty_common::DATA_DIR_OVERRIDE_ENV, tmp.path());
2120 }
2121 let result = http_addr_path();
2122 unsafe {
2123 std::env::remove_var(trusty_common::DATA_DIR_OVERRIDE_ENV);
2124 }
2125 let p = result.expect("http_addr_path must return Some when data dir is resolvable");
2126 assert!(
2127 p.ends_with("trusty-memory/http_addr"),
2128 "unexpected http_addr path: {}",
2129 p.display()
2130 );
2131 }
2132
2133 /// Why: write+read round-trip pins the disk format: a single line of
2134 /// `host:port\n`. Clients (cat, sh `$(cat ...)`) trim whitespace, so the
2135 /// trailing newline is invisible — but anything else (extra whitespace,
2136 /// multi-line) would break callers.
2137 /// Note (issue #226): `write_http_addr_file` is part of the HTTP-serving
2138 /// surface gated behind `axum-server`; the test follows the same gate.
2139 #[cfg(feature = "axum-server")]
2140 #[test]
2141 fn http_addr_file_round_trip_via_helpers() {
2142 let dir = tempfile::tempdir().unwrap();
2143 let path = dir.path().join("http_addr");
2144 let addr: SocketAddr = "127.0.0.1:7073".parse().unwrap();
2145 write_http_addr_file(&path, &addr).unwrap();
2146 let raw = std::fs::read_to_string(&path).unwrap();
2147 assert_eq!(raw.trim(), "127.0.0.1:7073");
2148 // The trailing newline keeps `cat` and editors happy.
2149 assert!(raw.ends_with('\n'));
2150 }
2151
2152 /// Why: dynamic binding must succeed even when the preferred port is
2153 /// already in use. Walking 7070..=7079 + OS fallback guarantees the
2154 /// daemon never fails to come up just because another process holds 7070.
2155 /// What: pre-bind 7070 (best-effort — skip the test if it's already
2156 /// busy on the host), then call `bind_dynamic_port` and assert we got
2157 /// *some* listener back.
2158 #[tokio::test]
2159 async fn bind_dynamic_port_returns_listener() {
2160 let listener = bind_dynamic_port().await.expect("bind_dynamic_port");
2161 let addr = listener.local_addr().expect("local_addr");
2162 assert_eq!(addr.ip().to_string(), "127.0.0.1");
2163 assert!(addr.port() > 0, "port must be non-zero after bind");
2164 }
2165
2166 /// Why: Issue #42 — prompt-facts are now served by the per-message
2167 /// `get_prompt_context` tool rather than the MCP prompts surface, so the
2168 /// `initialize` handshake must NOT advertise a `prompts` capability and
2169 /// `prompts/list` / `prompts/get` must fall through to the "method not
2170 /// found" path.
2171 #[tokio::test]
2172 async fn initialize_does_not_advertise_prompts_capability() {
2173 let (state, _tmp) = test_state();
2174 let init = handle_message(
2175 &state,
2176 json!({"jsonrpc": "2.0", "id": 1, "method": "initialize"}),
2177 )
2178 .await;
2179 assert!(
2180 init["result"]["capabilities"]["prompts"].is_null(),
2181 "initialize must NOT advertise the prompts capability; got {init}"
2182 );
2183
2184 // Both prompts/* dispatchers should now report method-not-found.
2185 for method in ["prompts/list", "prompts/get"] {
2186 let resp =
2187 handle_message(&state, json!({"jsonrpc": "2.0", "id": 2, "method": method})).await;
2188 assert_eq!(
2189 resp["error"]["code"], -32601,
2190 "{method} should return method-not-found; got {resp}"
2191 );
2192 }
2193 }
2194
2195 /// Why: `AppState::new` must initialise `bound_addr` to an empty
2196 /// `OnceLock` so `/health` reports `addr: None` on the stdio path. A
2197 /// regression that pre-populates this field would advertise a bogus
2198 /// address from a stale clone.
2199 ///
2200 /// Note (issue #231): now async so it runs inside a Tokio runtime —
2201 /// `AppState::new` spawns the bounded BM25 index worker via
2202 /// `tokio::spawn`, which requires an active runtime.
2203 #[tokio::test]
2204 async fn app_state_starts_with_empty_bound_addr() {
2205 let (state, _tmp) = test_state();
2206 assert!(state.bound_addr.get().is_none());
2207 }
2208
2209 /// Why (issue #96): `DaemonEvent::type_str` underpins the persisted
2210 /// activity log's `event_type` column — every variant must map to the
2211 /// exact SSE `type` tag the UI already handles. A drift between the
2212 /// SSE wire format and the stored type would break the feed's icon /
2213 /// label rendering for historical rows.
2214 /// What: constructs one of each variant, serialises via serde, and
2215 /// confirms `type_str()` matches the JSON `type` field.
2216 /// Test: this test.
2217 #[test]
2218 fn daemon_event_type_str_matches_sse_tag() {
2219 let cases = [
2220 DaemonEvent::PalaceCreated {
2221 id: "p".into(),
2222 name: "p".into(),
2223 source: ActivitySource::Http,
2224 },
2225 DaemonEvent::DrawerAdded {
2226 palace_id: "p".into(),
2227 palace_name: "p".into(),
2228 drawer_count: 1,
2229 timestamp: chrono::Utc::now(),
2230 content_preview: String::new(),
2231 source: ActivitySource::Mcp,
2232 },
2233 DaemonEvent::DrawerDeleted {
2234 palace_id: "p".into(),
2235 drawer_count: 0,
2236 source: ActivitySource::Http,
2237 },
2238 DaemonEvent::DreamCompleted {
2239 palace_id: None,
2240 merged: 0,
2241 pruned: 0,
2242 compacted: 0,
2243 closets_updated: 0,
2244 duration_ms: 0,
2245 source: ActivitySource::Http,
2246 },
2247 DaemonEvent::StatusChanged {
2248 total_drawers: 0,
2249 total_vectors: 0,
2250 total_kg_triples: 0,
2251 },
2252 DaemonEvent::HookFired {
2253 palace_id: Some("p".into()),
2254 palace_name: Some("p".into()),
2255 hook_type: HookType::UserPromptSubmit,
2256 injection_kind: InjectionKind::PromptContext,
2257 injection_length: 12,
2258 trigger_prompt_excerpt: "hello".into(),
2259 timestamp: chrono::Utc::now(),
2260 duration_ms: 5,
2261 source: ActivitySource::Hook,
2262 },
2263 ];
2264 for ev in &cases {
2265 let json = serde_json::to_value(ev).unwrap();
2266 assert_eq!(json["type"].as_str(), Some(ev.type_str()));
2267 }
2268 }
2269
2270 /// Why: `HookType` is serialised on every `HookFired` activity row; its
2271 /// wire format must round-trip cleanly so dashboard / TUI consumers can
2272 /// safely parse historic entries written by an older daemon build.
2273 /// What: serde-encodes each variant, asserts the JSON matches the
2274 /// expected PascalCase label, then decodes back.
2275 /// Test: itself.
2276 #[test]
2277 fn hook_type_serde_round_trips() {
2278 let cases = [
2279 (HookType::UserPromptSubmit, "\"UserPromptSubmit\""),
2280 (HookType::SessionStart, "\"SessionStart\""),
2281 ];
2282 for (ht, expected) in cases {
2283 let s = serde_json::to_string(&ht).unwrap();
2284 assert_eq!(s, expected, "{ht:?} should serialise to {expected}");
2285 let back: HookType = serde_json::from_str(&s).unwrap();
2286 assert_eq!(back, ht);
2287 assert_eq!(ht.as_str(), expected.trim_matches('"'));
2288 }
2289 }
2290
2291 /// Why: same as `hook_type_serde_round_trips` but for `InjectionKind`.
2292 /// What: kebab-case round trip on every variant.
2293 /// Test: itself.
2294 #[test]
2295 fn injection_kind_serde_round_trips() {
2296 let cases = [
2297 (InjectionKind::PromptContext, "\"prompt-context\""),
2298 (InjectionKind::InboxCheck, "\"inbox-check\""),
2299 ];
2300 for (ik, expected) in cases {
2301 let s = serde_json::to_string(&ik).unwrap();
2302 assert_eq!(s, expected);
2303 let back: InjectionKind = serde_json::from_str(&s).unwrap();
2304 assert_eq!(back, ik);
2305 assert_eq!(ik.as_str(), expected.trim_matches('"'));
2306 }
2307 }
2308
2309 /// Why: the activity feed renders the trigger prompt excerpt directly;
2310 /// runaway prompts must be capped at [`HOOK_PROMPT_EXCERPT_CHARS`] with
2311 /// a `…` marker so the row stays readable.
2312 /// What: feeds a 200-character prompt and asserts the excerpt is
2313 /// bounded.
2314 /// Test: itself.
2315 #[test]
2316 fn hook_excerpt_truncates_long_prompts() {
2317 let long = "x".repeat(200);
2318 let excerpt = hook_prompt_excerpt(&long);
2319 assert!(excerpt.chars().count() <= HOOK_PROMPT_EXCERPT_CHARS);
2320 assert!(excerpt.ends_with('…'));
2321 assert_eq!(hook_prompt_excerpt(""), "");
2322 }
2323
2324 /// Why: multi-line prompts must collapse to a single line so the
2325 /// activity feed row doesn't blow out vertically.
2326 /// What: feeds a multi-line whitespace-heavy prompt and asserts the
2327 /// output is a single-spaced single line.
2328 /// Test: itself.
2329 #[test]
2330 fn hook_excerpt_collapses_whitespace() {
2331 let input = "hello\n\nworld\t\tfoo";
2332 let excerpt = hook_prompt_excerpt(input);
2333 assert_eq!(excerpt, "hello world foo");
2334 }
2335
2336 /// Why (issue #96): `palace_id()` and `source()` feed the persisted
2337 /// activity log's columns; they must extract the right field per
2338 /// variant. Sloppy refactors could swap two fields and the log would
2339 /// silently mis-attribute writes.
2340 /// What: builds each variant with known field values and asserts the
2341 /// extractor returns them.
2342 /// Test: this test.
2343 #[test]
2344 fn daemon_event_palace_id_and_source_extraction() {
2345 let ev = DaemonEvent::DrawerAdded {
2346 palace_id: "alpha".into(),
2347 palace_name: "alpha".into(),
2348 drawer_count: 1,
2349 timestamp: chrono::Utc::now(),
2350 content_preview: String::new(),
2351 source: ActivitySource::Mcp,
2352 };
2353 assert_eq!(ev.palace_id(), Some("alpha"));
2354 assert_eq!(ev.source(), Some(ActivitySource::Mcp));
2355
2356 let status = DaemonEvent::StatusChanged {
2357 total_drawers: 1,
2358 total_vectors: 2,
2359 total_kg_triples: 3,
2360 };
2361 assert_eq!(status.palace_id(), None);
2362 assert_eq!(status.source(), None);
2363
2364 let dream = DaemonEvent::DreamCompleted {
2365 palace_id: Some("p1".into()),
2366 merged: 0,
2367 pruned: 0,
2368 compacted: 0,
2369 closets_updated: 0,
2370 duration_ms: 10,
2371 source: ActivitySource::Http,
2372 };
2373 assert_eq!(dream.palace_id(), Some("p1"));
2374 assert_eq!(dream.source(), Some(ActivitySource::Http));
2375 }
2376
2377 /// Why (issue #96): `AppState::emit` must persist mutation events to
2378 /// the activity log while keeping `StatusChanged` (a recomputed
2379 /// aggregate, not a mutation) out of the persisted history.
2380 /// What: emits one of each variant under a fresh state and asserts
2381 /// the persisted count matches the number of mutation events.
2382 /// Test: this test.
2383 #[tokio::test]
2384 async fn emit_persists_mutations_but_skips_status_changed() {
2385 let (state, _tmp) = test_state();
2386 state.emit(DaemonEvent::PalaceCreated {
2387 id: "p".into(),
2388 name: "p".into(),
2389 source: ActivitySource::Http,
2390 });
2391 state.emit(DaemonEvent::StatusChanged {
2392 total_drawers: 1,
2393 total_vectors: 0,
2394 total_kg_triples: 0,
2395 });
2396 state.emit(DaemonEvent::DrawerAdded {
2397 palace_id: "p".into(),
2398 palace_name: "p".into(),
2399 drawer_count: 1,
2400 timestamp: chrono::Utc::now(),
2401 content_preview: "x".into(),
2402 source: ActivitySource::Mcp,
2403 });
2404 // Issue #232: `emit` now offloads the redb write to `spawn_blocking`,
2405 // so the test must wait for the background pool to drain before
2406 // asserting on the persisted count.
2407 state.flush_activity_writes().await;
2408 let count = state.activity_log.count().unwrap();
2409 assert_eq!(count, 2, "only PalaceCreated + DrawerAdded must persist");
2410 }
2411
2412 /// Why (issue #156): the BM25 lane must be opt-in — existing deployments
2413 /// that don't set `TRUSTY_BM25_DAEMON=1` must see `bm25_client = None`
2414 /// and the recall hot path must continue to behave exactly as before.
2415 /// What: builds an `AppState` with `with_bm25_client_from_env()` while
2416 /// the env var is unset; asserts the field stays `None`.
2417 /// Test: this test.
2418 #[tokio::test]
2419 async fn bm25_client_disabled_by_default() {
2420 // Serialise with the sibling `bm25_client_enabled_when_env_set` test
2421 // so they don't race on the shared `TRUSTY_BM25_DAEMON` env var.
2422 let _guard = crate::commands::env_test_lock().lock().await;
2423 // SAFETY: this test exercises std::env::remove_var which is unsafe
2424 // in 2024 edition because the global env is shared. We restore the
2425 // pre-test value at the end so neighbours are unaffected.
2426 let prev = std::env::var("TRUSTY_BM25_DAEMON").ok();
2427 unsafe {
2428 std::env::remove_var("TRUSTY_BM25_DAEMON");
2429 }
2430 let (state, _tmp) = test_state();
2431 let state = state.with_bm25_client_from_env();
2432 assert!(
2433 state.bm25_client.is_none(),
2434 "bm25_client must be None when TRUSTY_BM25_DAEMON is unset"
2435 );
2436 // Issue #193: the spawn supervisor is bound to the same env gate as
2437 // the client — opt-out parity matters so we never accidentally
2438 // spawn daemons in deployments that explicitly didn't opt in.
2439 assert!(
2440 state.bm25_supervisor.is_none(),
2441 "bm25_supervisor must be None when TRUSTY_BM25_DAEMON is unset"
2442 );
2443 if let Some(v) = prev {
2444 unsafe {
2445 std::env::set_var("TRUSTY_BM25_DAEMON", v);
2446 }
2447 }
2448 }
2449
2450 /// Why (issue #156): when the operator opts in via `TRUSTY_BM25_DAEMON=1`,
2451 /// the builder must construct a real `Bm25Client` pointed at the canonical
2452 /// per-palace socket path. We don't connect — no daemon need be running —
2453 /// we only assert the client field is populated.
2454 /// What: sets the env var, runs the builder, asserts `Some(_)`.
2455 /// Test: this test.
2456 #[tokio::test]
2457 async fn bm25_client_enabled_when_env_set() {
2458 let _guard = crate::commands::env_test_lock().lock().await;
2459 let prev = std::env::var("TRUSTY_BM25_DAEMON").ok();
2460 unsafe {
2461 std::env::set_var("TRUSTY_BM25_DAEMON", "1");
2462 }
2463 let (state, _tmp) = test_state();
2464 let state = state.with_bm25_client_from_env();
2465 assert!(
2466 state.bm25_client.is_some(),
2467 "bm25_client must be Some when TRUSTY_BM25_DAEMON=1"
2468 );
2469 // Issue #193: opting in to the client must also install the spawn
2470 // supervisor so the daemon is auto-started on first use.
2471 assert!(
2472 state.bm25_supervisor.is_some(),
2473 "bm25_supervisor must be Some when TRUSTY_BM25_DAEMON=1"
2474 );
2475 match prev {
2476 Some(v) => unsafe { std::env::set_var("TRUSTY_BM25_DAEMON", v) },
2477 None => unsafe { std::env::remove_var("TRUSTY_BM25_DAEMON") },
2478 }
2479 }
2480}