Skip to main content

trusty_memory/
lib.rs

1//! MCP server (HTTP/SSE + stdio) for trusty-memory.
2//!
3//! Why: Claude Code and other MCP-aware clients integrate with trusty-memory
4//! through the standardized Model Context Protocol; we expose memory + KG
5//! tools so they can be called by name. The canonical stdio integration is
6//! `trusty-memory serve --stdio` (PR1 #919 of the #914 cutover epic), a
7//! self-contained direct MCP server that binds no HTTP port or UDS socket.
8//! The former `trusty-memory-mcp-bridge` binary and Unix-domain-socket
9//! transport were removed in PR3 (#914) once `serve --stdio` made them dead
10//! code.
11//! What: Provides `run_http` / `run_http_dynamic` / `run_http_on` (axum
12//! HTTP/SSE + REST + UI) plus an `AppState` that carries the shared
13//! `PalaceRegistry`, on-disk data root, and a lazily-initialized embedder.
14//! Test: `cargo test -p trusty-memory` validates handshake + dispatch via
15//! the in-process `handle_message` unit tests and the
16//! `tests/serve_stdio_e2e.rs` end-to-end harness.
17
18use anyhow::Result;
19use serde_json::{json, Value};
20use std::net::SocketAddr;
21use std::path::{Path, PathBuf};
22use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
23use std::sync::{Arc, OnceLock};
24use tokio::sync::{broadcast, OnceCell, RwLock};
25use trusty_common::bm25_client::Bm25Client;
26use trusty_common::mcp::initialize_response;
27use trusty_common::memory_core::embed::FastEmbedder;
28use trusty_common::memory_core::{store::ChatSessionStore, PalaceRegistry};
29use trusty_common::ChatProvider;
30
31// Why: `tracing::info` is only used by the axum HTTP-serving helpers
32//      (`run_http_on`, `spawn_uds_listener`). Pulling it in unconditionally
33//      would trigger `unused_imports` warnings when the `axum-server`
34//      feature is disabled. `SocketAddr` is still used by `bound_addr` on
35//      `AppState` so it stays unconditional.
36#[cfg(feature = "axum-server")]
37use tracing::info;
38
39/// Two-phase daemon readiness state (issues #910/#911).
40///
41/// Why: The embedder cold-init (CoreML compile, 30-120 s) blocks the first
42/// real `memory_remember`/`memory_recall` call if it arrives before warm-up
43/// completes.  Advertising the state lets handlers return an explicit, fast
44/// error ("daemon is warming up, retry shortly") instead of blocking for
45/// minutes.
46/// What: Two stable values stored atomically.  `Warming` (0) is the initial
47/// state; `Ready` (1) is set once the embedder has been successfully
48/// initialised by `spawn_startup_tasks`.  The transition is one-way and
49/// lock-free: a single `AtomicU8` compare-and-swap.
50/// Test: `daemon_readiness_transitions_warming_to_ready` in this module;
51///       end-to-end warming-error path covered by
52///       `tools::tests::remember_returns_warming_error_while_state_is_warming`.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum DaemonReadiness {
55    /// Embedder cold-init (and/or pin scan) still in progress.
56    Warming = 0,
57    /// Embedder initialised; all handlers may proceed normally.
58    Ready = 1,
59}
60
61impl DaemonReadiness {
62    /// Decode the raw atomic value.
63    ///
64    /// Why: centralises the `0 → Warming, else Ready` mapping so every
65    /// caller loads a meaningful enum rather than comparing raw integers.
66    /// What: returns `Warming` for `0`, `Ready` for any other value (only
67    /// `1` is ever written).
68    /// Test: `daemon_readiness_from_u8` in this module.
69    pub fn from_u8(v: u8) -> Self {
70        if v == 0 {
71            Self::Warming
72        } else {
73            Self::Ready
74        }
75    }
76}
77
78pub mod activity;
79pub mod attribution;
80pub mod bm25_supervisor;
81pub mod bootstrap;
82/// Autonomous Dreamer scheduler — spawns per-palace dream loops on daemon startup.
83///
84/// Why: issue #1529 — `Dreamer::start_with_shutdown()` was fully implemented
85/// but never called. This module wires it into the daemon so each palace gets
86/// a background dream loop that fires every 5 minutes of idle time.
87/// What: exports `spawn_dream_scheduler`, `make_shutdown_watch`, and
88/// `spawn_shutdown_bridge`. Disable with `TRUSTY_DREAM_DISABLED=1`.
89/// Test: see unit tests inside this module.
90pub mod dream_scheduler;
91/// File-descriptor usage and limit reporting for `/health`.
92///
93/// Why: expose `open_fds` / `fd_soft_limit` so operators can see the fd
94/// ceiling and current consumption without needing lsof or shell access.
95/// Test: `fd_metrics::tests::fd_metrics_returns_sane_values`.
96pub mod fd_metrics;
97// Why (issue #226): `chat` and `web` are pure axum HTTP/SSE handler
98//      surfaces. Gating them behind the `axum-server` feature is what lets
99//      library consumers (e.g. `open-mpm` linking only `MemoryMcpService`)
100//      drop axum + tower-http entirely from their build graph.
101#[cfg(feature = "axum-server")]
102pub mod chat;
103pub mod commands;
104pub mod console_metrics;
105pub mod discovery;
106/// Supervised `serve --foreground` entry point (issue #787).
107///
108/// Why: launchd supervisors need loud failure on port collision, not silent
109/// port-walking to 7071+. Extracted to stay under the 500-line ratchet cap.
110/// What: exports `bind_foreground_port` (Fix C — abort on EADDRINUSE) and
111/// `run_http_foreground` (Fix A lock + Fix B http_addr + Fix C combined).
112/// Test: `foreground::tests::bind_foreground_port_refuses_collision`;
113/// `daemon_lock` module tests cover the lock-file logic.
114pub mod foreground;
115pub mod hook_emit;
116pub mod kg_extract;
117pub mod mcp_service;
118pub mod messaging;
119pub mod openrpc;
120/// Issue #1217: default palace-ID derivation from project identity.
121///
122/// Why: the default palace ID should reflect the project's identity
123/// (git `owner/repo`, else `parent/dir`) rather than the bare directory
124/// basename, so the same repo resolves to the same palace across checkouts.
125/// What: exports the pure `derive_palace_id` core plus
126/// `owner_repo_from_git_remote`, `parent_dir_slug`, and the
127/// `TRUSTY_MEMORY_PALACE` env-override helpers.
128/// Test: see unit tests inside this module.
129pub mod palace_id_derive;
130/// Issue #88: project-root detection and palace-slug enforcement.
131///
132/// Why: prevents unbounded palace creation by anchoring palace names to the
133/// canonical slug of the project directory that contains the CWD, or to the
134/// `personal` sentinel for non-project contexts.
135/// What: exports `find_project_root`, `project_slug_at`, `project_slug`,
136/// `validate_palace_name`, `PERSONAL_PALACE`, and `PROJECT_MARKERS`.
137/// Test: see unit tests inside this module.
138pub mod project_root;
139pub mod prompt_facts;
140pub mod prompt_log;
141pub mod service;
142pub mod startup_scan;
143pub mod tools;
144pub mod transport;
145#[cfg(feature = "axum-server")]
146pub mod web;
147
148pub use activity::{ActivityEntry, ActivityFilter, ActivityLog, ActivitySource};
149pub use attribution::{CreatorInfo, CreatorSource};
150
151/// Maximum bytes retained in the trigger-prompt excerpt embedded on a
152/// `HookFired` event.
153///
154/// Why: the full triggering prompt is sensitive and already lives in the
155/// JSONL prompt log; the activity feed only needs enough text to give an
156/// operator a glance — a single-line ~80 char preview matches the existing
157/// `drawer_content_preview` convention so dashboard rows render uniformly.
158/// What: 80 characters; longer prompts are truncated with a trailing `…`.
159/// Test: `hook_excerpt_truncates_long_prompts`.
160pub const HOOK_PROMPT_EXCERPT_CHARS: usize = 80;
161
162/// Reduce a triggering prompt to the short excerpt embedded on a
163/// `HookFired` activity event.
164///
165/// Why: see [`HOOK_PROMPT_EXCERPT_CHARS`]. Centralising the truncation rule
166/// keeps every emitter (HTTP, hook CLI handlers, future tests) producing
167/// the same preview shape so UI rendering is uniform.
168/// What: whitespace-collapses `prompt` and trims to
169/// [`HOOK_PROMPT_EXCERPT_CHARS`] chars with `…` when cut. Empty input
170/// returns an empty string.
171/// Test: `hook_excerpt_truncates_long_prompts`,
172/// `hook_excerpt_collapses_whitespace`.
173pub fn hook_prompt_excerpt(prompt: &str) -> String {
174    let normalised: String = prompt.split_whitespace().collect::<Vec<_>>().join(" ");
175    if normalised.chars().count() <= HOOK_PROMPT_EXCERPT_CHARS {
176        normalised
177    } else {
178        let kept: String = normalised
179            .chars()
180            .take(HOOK_PROMPT_EXCERPT_CHARS.saturating_sub(1))
181            .collect();
182        format!("{kept}…")
183    }
184}
185
186pub use mcp_service::MemoryMcpService;
187pub use tools::MemoryMcpServer;
188
189/// Resolve the directory that actually holds the per-palace subdirectories.
190///
191/// Why: there are two on-disk layouts in the wild. The current monorepo code
192/// treats the registry directory *itself* as the parent of per-palace dirs
193/// (`<dir>/<id>/palace.json`). The legacy standalone `trusty-memory` repo
194/// nested everything one level deeper under a `palaces/` subdirectory
195/// (`<data_dir>/palaces/<id>/palace.json`) — and that is where existing
196/// installs' data lives (e.g. 88 palaces under
197/// `~/Library/Application Support/trusty-memory/palaces/`). A daemon that uses
198/// the bare data dir as its registry root finds zero palaces because every
199/// `palace.json` sits one level below where it looked — the "palaces lost on
200/// restart" bug.
201/// What: given the standard data dir, returns `<data_dir>/palaces` when that
202/// subdirectory exists, otherwise `<data_dir>` itself. Resolving this once in
203/// `main.rs` and using the result as `AppState::data_root` keeps every call
204/// site (`status`, `palace_list`, `open_palace`, `palace_create`,
205/// `load_palaces_from_disk`) consistent without forcing a data migration.
206/// Test: `tests::resolve_palace_registry_dir_prefers_palaces_subdir` and
207/// `resolve_palace_registry_dir_falls_back_to_data_dir`.
208pub fn resolve_palace_registry_dir(data_dir: PathBuf) -> PathBuf {
209    let nested = data_dir.join("palaces");
210    if nested.is_dir() {
211        nested
212    } else {
213        data_dir
214    }
215}
216
217/// Hook type — labels the Claude Code hook that triggered a submission.
218///
219/// Why: every hook firing produces an activity-feed entry tagged with the
220/// originating hook so operators can tell whether activity came from a user
221/// prompt (`UserPromptSubmit`), a new session (`SessionStart`), or a future
222/// hook variant. Threading this through `DaemonEvent::HookFired` lets the
223/// dashboard badge each row with the hook label.
224/// What: serde-serialised in PascalCase so the wire format matches Claude
225/// Code's own hook-name strings exactly (e.g. `"UserPromptSubmit"`).
226/// Test: `hook_type_serde_round_trips`.
227#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
228pub enum HookType {
229    /// Claude Code's `UserPromptSubmit` hook — fires on every user prompt.
230    UserPromptSubmit,
231    /// Claude Code's `SessionStart` hook — fires once at session open.
232    SessionStart,
233}
234
235impl HookType {
236    /// Stable string label used for the wire format.
237    pub fn as_str(&self) -> &'static str {
238        match self {
239            Self::UserPromptSubmit => "UserPromptSubmit",
240            Self::SessionStart => "SessionStart",
241        }
242    }
243}
244
245/// Injection kind — labels what the hook actually injected (or attempted).
246///
247/// Why: distinct from `HookType` because one hook could in principle render
248/// more than one kind of injection (e.g. SessionStart can deliver both an
249/// inbox check and bootstrap context). Tagging the rendered kind explicitly
250/// keeps the activity log searchable when that fan-out lands.
251/// What: serde-serialised as kebab-case so it matches the labels already
252/// used in the JSONL prompt log (`prompt-context-facts`,
253/// `inbox-check-messages`).
254/// Test: `injection_kind_serde_round_trips`.
255#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
256#[serde(rename_all = "kebab-case")]
257pub enum InjectionKind {
258    /// `prompt-context` hook rendered the prompt-facts block.
259    PromptContext,
260    /// `inbox-check` hook delivered unread messages.
261    InboxCheck,
262}
263
264impl InjectionKind {
265    /// Stable string label used for the wire format.
266    pub fn as_str(&self) -> &'static str {
267        match self {
268            Self::PromptContext => "prompt-context",
269            Self::InboxCheck => "inbox-check",
270        }
271    }
272}
273
274/// Live daemon events broadcast to connected SSE subscribers.
275///
276/// Why: The dashboard needs push-driven updates so palace creation, drawer
277/// add/delete, dream cycles, and aggregate status changes are visible without
278/// polling. A single broadcast channel fans out to every connected browser.
279/// What: Tagged enum serialized as `{"type": "...", ...fields}` over SSE.
280/// Test: `web::tests::sse_stream_emits_events` subscribes, triggers a
281/// mutation, and asserts the frame arrives.
282#[derive(Clone, Debug, serde::Serialize)]
283#[serde(tag = "type", rename_all = "snake_case")]
284pub enum DaemonEvent {
285    PalaceCreated {
286        id: String,
287        name: String,
288        /// Originating subsystem (HTTP, MCP, Hook). Why (issue #96): the
289        /// UI badges each row with its source so operators can tell at a
290        /// glance whether a write came from the dashboard form, an MCP
291        /// tool call, or a hook-driven path. The wire-format key is
292        /// `source` (lower-case strings via serde rename_all on
293        /// `ActivitySource`).
294        source: ActivitySource,
295    },
296    DrawerAdded {
297        palace_id: String,
298        /// Friendly palace name (Palace.name) at write time. Why: lets SSE
299        /// consumers (the dashboard activity feed) render the human-readable
300        /// label without a separate id→name lookup. Empty string if the
301        /// emitter could not resolve the name.
302        #[serde(default)]
303        palace_name: String,
304        drawer_count: usize,
305        /// Wall-clock timestamp when the drawer was added. Why: SSE
306        /// receivers want to render "just now / 2m ago" relative to the
307        /// daemon's clock, not the time the SSE frame happens to arrive.
308        timestamp: chrono::DateTime<chrono::Utc>,
309        /// Short preview of the drawer's content (whitespace-collapsed,
310        /// truncated to ~80 chars with an ellipsis when cut). Why: the TUI
311        /// activity feed and dashboard ticker want to show *what* was
312        /// stored, not just the running drawer count. Empty when the
313        /// emitter could not resolve the content (legacy clients tolerate
314        /// the missing field via `#[serde(default)]`).
315        #[serde(default)]
316        content_preview: String,
317        /// Originating subsystem (issue #96).
318        source: ActivitySource,
319    },
320    DrawerDeleted {
321        palace_id: String,
322        drawer_count: usize,
323        /// Originating subsystem (issue #96).
324        source: ActivitySource,
325    },
326    DreamCompleted {
327        palace_id: Option<String>,
328        merged: usize,
329        pruned: usize,
330        compacted: usize,
331        closets_updated: usize,
332        duration_ms: u64,
333        /// Originating subsystem (issue #96).
334        source: ActivitySource,
335    },
336    StatusChanged {
337        total_drawers: usize,
338        total_vectors: usize,
339        total_kg_triples: usize,
340    },
341    /// A Claude Code hook completed and rendered (or attempted to render) an
342    /// injection block.
343    ///
344    /// Why: pre-#XXX the activity feed only fired on drawer / palace / dream
345    /// writes, which meant a normal Claude Code session — whose only daemon
346    /// traffic is hook invocations — left the feed empty. Surfacing every
347    /// hook firing answers the user complaint "no activity in the TUI" and
348    /// gives operators a way to see how often each project palace is
349    /// actually picking up prompt-context / inbox-check work.
350    /// What: carries the resolved palace (or `None` if cwd resolution
351    /// failed), the [`HookType`] label, the [`InjectionKind`] label, the
352    /// rendered injection byte length, a short excerpt of the triggering
353    /// prompt (capped at ~80 chars; the full content stays in the JSONL
354    /// prompt log only), the timestamp, the hook's wall-clock duration,
355    /// and the [`ActivitySource`] tag (always `Hook` for this variant).
356    /// Backwards-compatible: SSE clients that do not recognise the
357    /// `hook_fired` `type` tag can safely ignore the frame.
358    HookFired {
359        /// Resolved palace id (slug) — `None` if cwd resolution failed.
360        #[serde(default)]
361        palace_id: Option<String>,
362        /// Friendly palace name at hook time — `None` if the registry
363        /// could not be consulted (HTTP path uses `palace_id` here when
364        /// no separate name is known).
365        #[serde(default)]
366        palace_name: Option<String>,
367        hook_type: HookType,
368        injection_kind: InjectionKind,
369        /// Rendered injection size in bytes (`0` when no injection was
370        /// emitted, e.g. SessionStart with an empty inbox).
371        injection_length: u64,
372        /// Short excerpt of the triggering prompt for the activity feed
373        /// display. Capped at ~80 chars with a trailing `…` when cut.
374        /// Why: the activity feed renders this directly; full prompt
375        /// content (which may be sensitive) stays in the JSONL log.
376        #[serde(default)]
377        trigger_prompt_excerpt: String,
378        timestamp: chrono::DateTime<chrono::Utc>,
379        /// Hook wall-clock duration in milliseconds.
380        duration_ms: u64,
381        /// Always `ActivitySource::Hook` for this variant; encoded explicitly
382        /// so the same dispatch path (`emit`) can persist + broadcast it.
383        source: ActivitySource,
384    },
385}
386
387/// Open the activity log under `data_root`, falling back to a per-process
388/// tempdir and finally to a no-op `Discard` variant when no writable
389/// directory is available.
390///
391/// Why (issues #96, #225): the activity log is a best-effort feature — if
392/// the data root is on a read-only mount, missing, or locked by another
393/// process, the daemon should still come up and serve every other endpoint.
394/// The first fallback is a `std::env::temp_dir()`-anchored subdirectory
395/// keyed by the daemon's process id. Issue #225: a previous version called
396/// `expect()` on the tempdir fallback, which crashed the daemon on hosts
397/// where neither `data_root` nor `std::env::temp_dir()` is writable
398/// (read-only containers, locked-down sandboxes). The contract is
399/// "best-effort", so the final fallback is now `ActivityLog::discard()` —
400/// a no-op variant that drops every append and returns empty reads. The
401/// dashboard's activity feed simply shows up empty in that degraded state.
402/// What: tries `ActivityLog::open(data_root)`; on error logs a warning and
403/// retries against `<temp>/trusty-memory-activity-<pid>/`. If both fail,
404/// emits a final warning and returns `ActivityLog::discard()`.
405/// Test: `open_activity_log_with_fallback_returns_discard_when_unwritable`
406/// covers the discard branch; existing `AppState` construction tests cover
407/// the happy and tempdir-fallback paths.
408fn open_activity_log_with_fallback(data_root: &Path) -> Arc<ActivityLog> {
409    match ActivityLog::open(data_root) {
410        Ok(log) => Arc::new(log),
411        Err(primary_err) => {
412            tracing::warn!(
413                "could not open activity log at {}: {primary_err:#}; falling back to per-process tempdir",
414                data_root.display()
415            );
416            let fallback =
417                std::env::temp_dir().join(format!("trusty-memory-activity-{}", std::process::id()));
418            match ActivityLog::open(&fallback) {
419                Ok(log) => Arc::new(log),
420                Err(fallback_err) => {
421                    tracing::warn!(
422                        "activity log tempdir fallback at {} also failed: {fallback_err:#}; \
423                         activity feed disabled for this process (no-op log)",
424                        fallback.display()
425                    );
426                    Arc::new(ActivityLog::discard())
427                }
428            }
429        }
430    }
431}
432
433impl DaemonEvent {
434    /// Short discriminant label matching the SSE `type` field.
435    ///
436    /// Why: the persisted activity log stores `event_type` as a string so
437    /// the UI can render the row without re-parsing the payload. Sharing
438    /// the same labels the SSE serializer uses keeps the wire and the
439    /// stored history consistent.
440    /// What: returns one of `palace_created`, `drawer_added`,
441    /// `drawer_deleted`, `dream_completed`, `status_changed`.
442    /// Test: `daemon_event_type_str_matches_sse_tag` in the lib tests.
443    pub fn type_str(&self) -> &'static str {
444        match self {
445            Self::PalaceCreated { .. } => "palace_created",
446            Self::DrawerAdded { .. } => "drawer_added",
447            Self::DrawerDeleted { .. } => "drawer_deleted",
448            Self::DreamCompleted { .. } => "dream_completed",
449            Self::StatusChanged { .. } => "status_changed",
450            Self::HookFired { .. } => "hook_fired",
451        }
452    }
453
454    /// `palace_id` if the event is scoped to a single palace.
455    ///
456    /// Why: the activity log indexes entries by palace id so the UI can
457    /// filter by palace; daemon-wide events (`status_changed`,
458    /// dream-across-all-palaces) return `None`.
459    /// What: returns a borrowed string when the variant carries a palace
460    /// id, otherwise `None`.
461    /// Test: `daemon_event_palace_id_extraction`.
462    pub fn palace_id(&self) -> Option<&str> {
463        match self {
464            Self::PalaceCreated { id, .. } => Some(id),
465            Self::DrawerAdded { palace_id, .. } | Self::DrawerDeleted { palace_id, .. } => {
466                Some(palace_id)
467            }
468            Self::DreamCompleted { palace_id, .. } => palace_id.as_deref(),
469            Self::HookFired { palace_id, .. } => palace_id.as_deref(),
470            Self::StatusChanged { .. } => None,
471        }
472    }
473
474    /// Originating subsystem if the event carries one.
475    ///
476    /// Why: only mutation events carry a `source`; the aggregate
477    /// `StatusChanged` is recomputed by the daemon and has no caller, so
478    /// it returns `None`.
479    /// What: returns the variant's `source` field where present.
480    /// Test: `daemon_event_source_extraction`.
481    pub fn source(&self) -> Option<ActivitySource> {
482        match self {
483            Self::PalaceCreated { source, .. }
484            | Self::DrawerAdded { source, .. }
485            | Self::DrawerDeleted { source, .. }
486            | Self::DreamCompleted { source, .. }
487            | Self::HookFired { source, .. } => Some(*source),
488            Self::StatusChanged { .. } => None,
489        }
490    }
491}
492
493/// Shared application state passed to every request handler.
494///
495/// Why: The stdio loop and HTTP server need the same handles to the registry,
496/// data root, and embedder so MCP tools can perform real reads/writes against
497/// the live trusty-memory core. The embedder is heavy (loads ONNX weights) so
498/// we hold it behind a `OnceCell` and initialize lazily on first use.
499/// What: `Clone`-able via `Arc` fields. The registry / data root are eager;
500/// `embedder` is `Arc<OnceCell<Arc<FastEmbedder>>>` so concurrent first-use
501/// races resolve to a single shared instance.
502/// Test: `app_state_default_constructs` confirms construction without panic.
503#[derive(Clone)]
504pub struct AppState {
505    pub version: String,
506    pub registry: Arc<PalaceRegistry>,
507    pub data_root: PathBuf,
508    pub embedder: Arc<OnceCell<Arc<FastEmbedder>>>,
509    /// Optional default palace applied to MCP tool calls when the caller
510    /// omits the `palace` argument. Set via `trusty-memory serve --palace`.
511    pub default_palace: Option<String>,
512    /// Active chat provider selected at startup. `None` means no upstream is
513    /// configured (no Ollama detected and no OpenRouter key) — callers must
514    /// degrade gracefully (chat endpoint returns 412).
515    pub chat_provider: Arc<OnceCell<Option<Arc<dyn ChatProvider>>>>,
516    /// Per-palace chat-session stores, opened lazily so cold-start cost is
517    /// paid only when chat-history endpoints are hit.
518    pub session_stores: Arc<dashmap::DashMap<String, Arc<ChatSessionStore>>>,
519    /// Broadcast sender for live `DaemonEvent` pushes to SSE subscribers.
520    ///
521    /// Why: Lets mutating handlers emit events that any connected dashboard
522    /// receives instantly. Cap of 128 buffers transient slow readers; if a
523    /// receiver lags it gets `RecvError::Lagged` and we emit a `lag` frame.
524    pub events: Arc<broadcast::Sender<DaemonEvent>>,
525    /// Instant the daemon started, used to compute `uptime_secs` on `/health`.
526    ///
527    /// Why (issue #35): `GET /health` reports how long the daemon has been
528    /// up. Capturing a monotonic `Instant` at `AppState` construction lets the
529    /// handler compute the elapsed seconds cheaply and without a clock-skew
530    /// hazard.
531    /// What: a wall-monotonic `Instant`; `AppState::new` stamps it at startup.
532    /// Test: `health_endpoint_includes_resource_fields`.
533    pub started_at: std::time::Instant,
534    /// In-memory ring buffer of recent tracing log lines (issue #35).
535    ///
536    /// Why: the `GET /api/v1/logs/tail` endpoint serves the last N log lines
537    /// so operators can inspect a running daemon without tailing a file. The
538    /// buffer is shared between the tracing `LogBufferLayer` (writer) and the
539    /// HTTP handler (reader).
540    /// What: a cheap `Arc`-backed clone of the buffer the subscriber writes
541    /// to. Defaults to an empty buffer for states that never install the
542    /// layer (tests, the stdio path).
543    /// Test: `logs_tail_returns_recent_lines`.
544    pub log_buffer: trusty_common::log_buffer::LogBuffer,
545    /// Bug-capture ERROR store (bug-reporting #478, Phase 1).
546    ///
547    /// Why: Phase 2 MCP / HTTP endpoints need to query captured errors; stashing
548    ///      the `ErrorStore` handle here lets any handler reach it cheaply without
549    ///      a second global or per-request construction.
550    /// What: populated by `run_serve` from the `init_tracing_with_buffer_and_capture`
551    ///      result; the layer writes to this store automatically so every
552    ///      `tracing::error!` call site contributes without any changes to call
553    ///      sites. `None` in states that do not install the layer (tests, the
554    ///      stdio path).
555    /// Test: compile-presence is verified by the `trusty-memory` build; Phase 2
556    ///      will add query tests in `web.rs`.
557    pub error_store: Option<trusty_common::error_capture::ErrorStore>,
558    /// Most recent on-disk footprint of `data_root`, in bytes (issue #35).
559    ///
560    /// Why: `GET /health` reports `disk_bytes`. Walking the data directory on
561    /// every health request would make a frequent health poll do unbounded
562    /// I/O; a background task recomputes it every 10 s and stores it here so
563    /// the handler reads it lock-free.
564    /// What: an `AtomicU64` updated by the ticker spawned in `run_http_on`.
565    /// `0` until the first walk completes.
566    /// Test: `health_endpoint_includes_resource_fields`.
567    pub disk_bytes: Arc<std::sync::atomic::AtomicU64>,
568    /// Per-process RSS + CPU sampler, refreshed on each `/health` request
569    /// (issue #35).
570    ///
571    /// Why: CPU usage is a delta between two `sysinfo` refreshes, so the
572    /// sampler must persist between requests — hence the shared `Mutex`.
573    /// What: a `tokio::sync::Mutex<SysMetrics>` so the async health handler
574    /// can sample without blocking the runtime.
575    /// Test: `health_endpoint_includes_resource_fields`.
576    pub sys_metrics: Arc<tokio::sync::Mutex<trusty_common::sys_metrics::SysMetrics>>,
577    /// HTTP listener address the daemon bound to, once `run_http_on` is running.
578    ///
579    /// Why: clients (and `/health` responses) need to advertise the live
580    /// `host:port` even though port selection happens dynamically (7070–7079
581    /// walk + OS fallback). Stashing it on `AppState` lets request handlers
582    /// surface the discovery value without re-querying the listener.
583    /// What: a `OnceLock<SocketAddr>` so `run_http_on` writes it exactly once
584    /// at bind time and every handler reads it lock-free thereafter. Empty
585    /// (`None` from `get()`) on the stdio path where no listener exists.
586    /// Test: `health_endpoint_reports_bound_addr` (added below).
587    pub bound_addr: Arc<OnceLock<SocketAddr>>,
588    /// Cached prompt-facts surface served by the MCP `get_prompt_context`
589    /// tool (issue #42).
590    ///
591    /// Why: The original session-init `prompts/get` design loaded context
592    /// once per connection; switching to a per-message tool lets the model
593    /// pull fresh, query-filtered context on demand. The cache holds both
594    /// the raw triples (for filtered lookups) and a pre-formatted Markdown
595    /// block (for the unfiltered hot path) so neither code path re-walks
596    /// the KG. The cache is rebuilt by
597    /// `prompt_facts::rebuild_prompt_cache` after any write that touches a
598    /// hot predicate (`kg_assert`, `add_alias`, `remove_prompt_fact`).
599    /// What: An `Arc<tokio::sync::RwLock<PromptFactsCache>>` so the hot
600    /// read path takes a brief read lock and clones the cache; rebuilds
601    /// take a write lock for the assignment only. The async-aware lock
602    /// (issue #229) yields to the tokio runtime instead of blocking a
603    /// runtime thread for the rebuild duration. An empty `triples` vec ↔
604    /// "no context stored yet" (the tool handler renders a hint).
605    /// Test: `get_prompt_context_returns_cached_or_hint`,
606    /// `get_prompt_context_filters_by_query`.
607    pub prompt_context_cache: Arc<RwLock<prompt_facts::PromptFactsCache>>,
608    /// Persistent activity log (issue #96).
609    ///
610    /// Why: the dashboard activity feed used to be a pure live-stream over
611    /// `/sse` — opening the UI showed an empty feed and any mutation from
612    /// the MCP path was invisible. Holding an `ActivityLog` on `AppState`
613    /// lets `emit` record an entry on every push so the
614    /// `GET /api/v1/activity` handler can return historical rows on mount
615    /// and the live SSE stream can continue prepending events on top of
616    /// the loaded history. `None` on builds that opt out (tests that use
617    /// `AppState::new` get a real log under their tempdir so behaviour
618    /// matches production).
619    /// What: an `Arc<ActivityLog>` shared with every emitter.
620    /// Test: `web::tests::activity_endpoint_lists_recent_emits`.
621    pub activity_log: Arc<ActivityLog>,
622    /// Optional per-palace BM25 lexical search lane (issue #156).
623    ///
624    /// Why: in-process BM25 would serialise the recall hot path on disk
625    /// I/O during writes and contend with the redb/usearch locks. Delegating
626    /// to the `trusty-bm25-daemon` subprocess (one socket per palace) keeps
627    /// BM25 ingestion and search off the critical path while still feeding
628    /// hits into the recall RRF fusion.
629    /// What: `Some(client)` only when `TRUSTY_BM25_DAEMON=1` at startup —
630    /// every code path that uses this field is gated on `is_some()` and
631    /// falls back to vector-only behaviour otherwise so existing deployments
632    /// see zero behavioural change.
633    /// Test: `bm25_client_disabled_by_default`,
634    /// `bm25_client_enabled_when_env_set`.
635    pub bm25_client: Option<Arc<Bm25Client>>,
636    /// Optional per-palace BM25 daemon spawn supervisor (issue #193).
637    ///
638    /// Why: without an in-process supervisor the BM25 daemon must be
639    /// launched out-of-band (launchd, manual `trusty-bm25-daemon`), which
640    /// is the same UX trap PR #190 fixed for trusty-embedderd. Holding a
641    /// supervisor here lets us spawn the daemon on first BM25 use for a
642    /// palace, restart it if it dies, and reap it on clean shutdown.
643    /// `Some` only when `TRUSTY_BM25_DAEMON=1` at startup — the same gate
644    /// that enables `bm25_client`. When set but `TRUSTY_BM25_EXTERNAL=1`,
645    /// the supervisor's `ensure_running` becomes a no-op that just returns
646    /// the canonical socket path so operators can keep using their own
647    /// process manager.
648    /// Test: covered by `bm25_supervisor_present_when_env_set` and the
649    /// `bm25_supervisor::tests` unit tests.
650    pub bm25_supervisor: Option<Arc<bm25_supervisor::Bm25Supervisor>>,
651    /// Per-palace write serialisation locks (issue #230).
652    ///
653    /// Why: the dedup gate in `tools.rs` previously read a snapshot of
654    /// existing drawers, checked for near-duplicates via Jaro-Winkler, and
655    /// then issued the write — a classic time-of-check/time-of-use race.
656    /// Two concurrent `memory_remember` calls with the same content could
657    /// both see the pre-write snapshot, both pass the gate, and both land
658    /// duplicate drawers. Serialising the gate-then-write sequence per
659    /// palace closes the window: while one task holds the mutex, any
660    /// concurrent writer for the same palace blocks until the first write
661    /// finishes and is visible to `list_drawers`. The lock is **per
662    /// palace** (not global) so writes to different palaces continue to
663    /// run in parallel.
664    /// What: a `DashMap` keyed by palace id, where each entry is an
665    /// `Arc<tokio::sync::Mutex<()>>`. The mutex is constructed lazily by
666    /// `palace_write_lock` on first access. `Arc` lets callers hold a
667    /// clone of the lock past the lifetime of the `DashMap` entry so the
668    /// map never needs to be held across an `.await`.
669    /// Test: `tools::tests::dedup_gate_blocks_concurrent_duplicate_writes`.
670    pub palace_write_locks: Arc<dashmap::DashMap<String, Arc<tokio::sync::Mutex<()>>>>,
671    /// Counter of in-flight activity-log writes spawned by `emit`
672    /// (issue #232).
673    ///
674    /// Why: `emit` offloads the synchronous redb append to the tokio blocking
675    /// pool via `spawn_blocking` so the async runtime is never parked waiting
676    /// on fsync. The write is fire-and-forget — `emit` returns immediately
677    /// after spawning. Tests that observe the activity log right after a
678    /// burst of `emit` calls need a deterministic synchronization point;
679    /// holding an in-flight counter lets `flush_activity_writes` poll until
680    /// every spawned append has settled, which keeps the assertions
681    /// race-free without forcing every caller to `.await`.
682    /// What: an `Arc<AtomicUsize>` incremented before each `spawn_blocking`
683    /// and decremented inside the closure (after the append completes, even
684    /// if it errored). The counter is cheap (one atomic add per emit) and
685    /// stays at zero in steady-state production traffic.
686    /// Test: `web::tests::activity_endpoint_lists_recent_emits` and
687    /// `tests::emit_persists_mutations_but_skips_status_changed` call
688    /// `flush_activity_writes` to drain the counter before reading the log.
689    pub pending_activity_writes: Arc<AtomicUsize>,
690    /// In-memory cache mapping palace id → `Palace.name` (issue #228).
691    ///
692    /// Why: every `memory_remember` / `memory_note` write used to call
693    /// `PalaceRegistry::list_palaces` (a synchronous filesystem walk of the
694    /// data root) just to resolve a friendly palace name for the SSE
695    /// `DrawerAdded` event. With N palaces on disk the cost was O(N) opendirs
696    /// plus `palace.json` reads on every write, blocking the async runtime.
697    /// Caching the name in-memory turns the lookup into a `DashMap::get`.
698    /// What: `DashMap<String, String>` populated by `create_palace` and
699    /// `load_palaces_from_disk`, kept in sync by rename / delete paths.
700    /// Missing entries are treated as "name unknown" so callers fall back to
701    /// the palace id and the emit path never fails.
702    /// Test: `palace_name_cache_populated_after_hydration` and
703    /// `palace_name_cache_updates_on_create`.
704    pub palace_names: Arc<dashmap::DashMap<String, String>>,
705    /// Single-pass startup pin-file map: palace id → project root path (issue #470).
706    ///
707    /// Why: after daemon startup we have no record of which on-disk project
708    /// directories correspond to which palace ids — that information only
709    /// existed inside the pin files on disk. Eager-opening every palace on
710    /// startup is too expensive. This field captures the scan-only result of
711    /// `startup_scan::scan_pin_map` so handlers that want to locate a project
712    /// by its palace id (e.g. future cwd-inference, project-health checks)
713    /// can do a single `DashMap::get` instead of a filesystem walk.
714    /// Populated once, shortly after `load_palaces_from_disk` returns, by
715    /// `spawn_startup_tasks`. Never mutated after population — it is a
716    /// snapshot of what the filesystem looked like at startup.
717    /// What: `DashMap<String (palace_id), PathBuf (project root)>`.
718    /// The outer `Arc` lets `spawn_startup_tasks` (which holds only a clone
719    /// of `AppState`) write to the same backing map that request handlers
720    /// read. Population is asynchronous so callers must treat an absent entry
721    /// as "not yet scanned" (or "no pin found"), never as "palace unknown".
722    /// Test: `startup_scan::tests::scan_pin_map_*` validate the underlying
723    /// scanner function; the wiring in `spawn_startup_tasks` is covered by
724    /// the integration-test daemon start path.
725    pub pin_project_map: Arc<dashmap::DashMap<String, PathBuf>>,
726    /// Bounded sender for the BM25 index worker (issue #231).
727    ///
728    /// Why: the previous fire-and-forget design `tokio::spawn`ed one task per
729    /// `memory_remember` / `memory_note` call, so a write burst against a slow
730    /// or unreachable BM25 daemon grew an unbounded in-flight task queue. A
731    /// single long-lived worker draining a bounded mpsc channel caps that
732    /// back-pressure: writers `try_send` (never block), full-queue requests
733    /// are dropped with a `warn!`, and the worker exits cleanly when the last
734    /// sender is dropped on shutdown.
735    /// What: an `mpsc::Sender` cloned to every `AppState` clone (cheap). The
736    /// matching receiver is consumed by the worker spawned in
737    /// [`AppState::new`] via [`tools::spawn_bm25_index_worker`]. Capacity is
738    /// [`tools::BM25_INDEX_QUEUE_CAPACITY`] (256).
739    /// Test: `bm25_index_queue_drops_when_full` exercises the full-queue
740    /// branch via `bm25_index_enqueue`.
741    pub bm25_index_tx: tokio::sync::mpsc::Sender<tools::Bm25IndexRequest>,
742    /// Cached result of the startup update check (issue #537).
743    ///
744    /// Why: `/health` should report `update_available` without hitting crates.io
745    /// on every probe. A single background check at daemon startup stores the
746    /// result here; the health handler reads it lock-free (well, a brief mutex
747    /// lock) without a network call.
748    /// What: `None` = up-to-date or check not yet done; `Some("x.y.z")` = newer
749    /// version available. The field is populated by a `tokio::spawn` in
750    /// `spawn_startup_tasks` (main.rs) after the daemon binds.
751    /// Test: indirectly by the `/health` endpoint tests in `web.rs`.
752    pub update_available: Arc<std::sync::Mutex<Option<String>>>,
753    /// Two-phase readiness state — `Warming` until the embedder is initialised,
754    /// then `Ready` (issues #910 / #911).
755    ///
756    /// Why: `AppState::embedder()` used to call `FastEmbedder::new()` without
757    /// any timeout, so the first `memory_recall`/`memory_remember` that arrived
758    /// before CoreML finished compiling would block for 5–11 hours until the
759    /// OnceCell resolved (issue #910). Exposing this state lets the preflight
760    /// guards in `tools.rs` return an explicit fast error immediately —
761    /// `"trusty-memory is warming up, retry shortly"` — instead of queueing
762    /// behind an open-ended init.
763    /// What: An `AtomicU8` starting at `DaemonReadiness::Warming` (0) and flipped
764    /// to `DaemonReadiness::Ready` (1) by `spawn_startup_tasks` after the embedder
765    /// warm-up succeeds.  The transition is one-way and lock-free.
766    /// Test: `daemon_readiness_transitions_warming_to_ready`.
767    pub daemon_readiness: Arc<AtomicU8>,
768}
769
770impl AppState {
771    /// Construct an `AppState` rooted at the given on-disk data directory.
772    ///
773    /// Why: The CLI (`serve`) and integration tests need to point the MCP
774    /// server at different roots — production at `dirs::data_dir`, tests at a
775    /// `tempfile::tempdir()`.
776    /// What: Builds an empty `PalaceRegistry`, captures the version, and
777    /// allocates an empty `OnceCell` for the embedder. `default_palace` is
778    /// `None`; use `with_default_palace` to set it.
779    /// Test: `tools::tests::dispatch_palace_create_persists` constructs an
780    /// AppState pointed at a tempdir and round-trips a palace through it.
781    pub fn new(data_root: PathBuf) -> Self {
782        let (events_tx, _) = broadcast::channel::<DaemonEvent>(128);
783        // Issue #96: open (or create) the persistent activity log under the
784        // daemon data root. Open failure is logged but never crashes the
785        // daemon — we fall back to a per-process tempdir so emits remain
786        // best-effort and the rest of the daemon keeps working.
787        let activity_log = open_activity_log_with_fallback(&data_root);
788        // Issue #231: bounded mpsc channel + single long-lived worker
789        // replaces the per-write `tokio::spawn` fire-and-forget pattern so
790        // BM25 indexing back-pressure is capped. The worker is spawned here
791        // unconditionally so the channel always has a drain — even when
792        // `bm25_client` is `None`, the worker just consumes and discards
793        // each request so senders never block on a full queue.
794        let (bm25_index_tx, bm25_index_rx) =
795            tokio::sync::mpsc::channel::<tools::Bm25IndexRequest>(tools::BM25_INDEX_QUEUE_CAPACITY);
796        // `bm25_client` / `bm25_supervisor` start as `None`; the builder
797        // `with_bm25_client_from_env` rebuilds the worker with the real
798        // client + supervisor once env-gated opt-in is resolved.
799        tools::spawn_bm25_index_worker(bm25_index_rx, None, None);
800        Self {
801            version: env!("CARGO_PKG_VERSION").to_string(),
802            registry: Arc::new(PalaceRegistry::new()),
803            data_root,
804            embedder: Arc::new(OnceCell::new()),
805            default_palace: None,
806            chat_provider: Arc::new(OnceCell::new()),
807            session_stores: Arc::new(dashmap::DashMap::new()),
808            events: Arc::new(events_tx),
809            started_at: std::time::Instant::now(),
810            // Default to an empty buffer — `with_log_buffer` overrides this
811            // when the daemon installs the `LogBufferLayer` (HTTP mode).
812            log_buffer: trusty_common::log_buffer::LogBuffer::new(
813                trusty_common::log_buffer::DEFAULT_LOG_CAPACITY,
814            ),
815            // Bug-reporting #478: `None` until `with_error_store` is called
816            // during daemon startup (HTTP mode). Tests keep `None` so no
817            // unexpected files are written to the OS data dir.
818            error_store: None,
819            disk_bytes: Arc::new(std::sync::atomic::AtomicU64::new(0)),
820            sys_metrics: Arc::new(tokio::sync::Mutex::new(
821                trusty_common::sys_metrics::SysMetrics::new(),
822            )),
823            bound_addr: Arc::new(OnceLock::new()),
824            prompt_context_cache: Arc::new(RwLock::new(prompt_facts::PromptFactsCache::default())),
825            activity_log,
826            bm25_client: None,
827            bm25_supervisor: None,
828            palace_write_locks: Arc::new(dashmap::DashMap::new()),
829            pending_activity_writes: Arc::new(AtomicUsize::new(0)),
830            palace_names: Arc::new(dashmap::DashMap::new()),
831            pin_project_map: Arc::new(dashmap::DashMap::new()),
832            bm25_index_tx,
833            update_available: Arc::new(std::sync::Mutex::new(None)),
834            // Start in Warming state; flipped to Ready by spawn_startup_tasks
835            // once the embedder warm-up succeeds (issues #910/#911).
836            daemon_readiness: Arc::new(AtomicU8::new(DaemonReadiness::Warming as u8)),
837        }
838    }
839
840    /// Acquire (lazily, then clone) the per-palace write mutex.
841    ///
842    /// Why (issue #230): the dedup-check + `remember_with_options` write
843    /// sequence in `tools.rs` must be atomic per palace to prevent two
844    /// concurrent identical writes from both passing the dedup gate.
845    /// Callers hold the returned `Arc<Mutex<()>>`'s guard across the gate
846    /// check and the write so the second writer blocks until the first
847    /// write is visible to `list_drawers`. Returning a clone of the `Arc`
848    /// rather than a borrow into the `DashMap` lets the caller `.await`
849    /// while holding the lock without risking a deadlock against any
850    /// future map mutation (DashMap shards are sync mutexes).
851    /// What: looks up the palace id in `palace_write_locks` and returns
852    /// a clone of the existing mutex; on the first call for a palace,
853    /// inserts a freshly-constructed `tokio::sync::Mutex<()>` first. The
854    /// `DashMap::entry().or_insert_with` API guarantees the lazy
855    /// construction is racy-safe — only one mutex is ever inserted per
856    /// palace id.
857    /// Test: `tools::tests::dedup_gate_blocks_concurrent_duplicate_writes`.
858    pub fn palace_write_lock(&self, palace_id: &str) -> Arc<tokio::sync::Mutex<()>> {
859        if let Some(existing) = self.palace_write_locks.get(palace_id) {
860            return existing.clone();
861        }
862        self.palace_write_locks
863            .entry(palace_id.to_string())
864            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
865            .clone()
866    }
867
868    /// Look up a project root path by palace id in the startup pin-scan map.
869    ///
870    /// Why: provides a stable, cheap accessor so handlers do not reach directly
871    /// into the `DashMap` field and so the accessor can be mocked in future
872    /// tests without touching `AppState` internals. The map is populated
873    /// asynchronously by `spawn_startup_tasks` — an absent entry means either
874    /// the scan has not completed yet or no pin file claimed that id.
875    /// What: returns `Some(project_path)` when the palace id was found during
876    /// startup scan; `None` otherwise.
877    /// Test: covered indirectly via the startup-scan integration path; the
878    /// underlying map data is validated by `startup_scan::tests`.
879    pub fn pinned_project_path(&self, palace_id: &str) -> Option<PathBuf> {
880        self.pin_project_map.get(palace_id).map(|e| e.clone())
881    }
882
883    /// Builder-style: opt-in to the BM25 lexical lane (issue #156).
884    ///
885    /// Why: the BM25 subprocess is gated behind `TRUSTY_BM25_DAEMON=1` so
886    /// the default `cargo install trusty-memory` / launchd plist deployment
887    /// stays vector-only and existing test fixtures keep passing without
888    /// having to provision a daemon. Reading the env var here keeps the
889    /// gating logic in one place (the helper in `main.rs` just plumbs the
890    /// result through).
891    /// What: when `TRUSTY_BM25_DAEMON=1`, constructs one `Bm25Client` per
892    /// palace by lazy-resolving the socket path the first time the palace
893    /// id is observed. Currently we install a shared `default` client up
894    /// front and re-key on the palace id at the call site — palaces with no
895    /// daemon socket simply see search/index errors which we log + ignore.
896    /// Returns `self` unchanged when the env var is unset or set to anything
897    /// other than `1`.
898    /// Test: `bm25_client_disabled_by_default`,
899    /// `bm25_client_enabled_when_env_set`.
900    #[must_use]
901    pub fn with_bm25_client_from_env(mut self) -> Self {
902        if std::env::var("TRUSTY_BM25_DAEMON").as_deref() == Ok("1") {
903            // Install the default-palace client; per-palace clients are
904            // constructed on demand via `Bm25Client::for_palace`.
905            let default_palace = self.default_palace.as_deref().unwrap_or("default");
906            self.bm25_client = Some(Arc::new(Bm25Client::for_palace(default_palace)));
907            // Issue #193: hand-in-hand with the client, attach a spawn
908            // supervisor so the BM25 daemon is auto-started on first use
909            // for any palace. Operators who want to manage daemons
910            // out-of-band (launchd, systemd, manual) set
911            // TRUSTY_BM25_EXTERNAL=1 which makes the supervisor a no-op.
912            self.bm25_supervisor = Some(Arc::new(bm25_supervisor::Bm25Supervisor::new()));
913            // Issue #231: rebuild the bounded indexer channel + worker so
914            // the worker holds the now-populated client + supervisor. The
915            // placeholder worker installed by `AppState::new` (with `None`
916            // / `None`) drained the channel into the void — replacing the
917            // sender here closes the placeholder receiver and the
918            // placeholder worker exits cleanly. The new worker takes over
919            // as the sole drain for the indexer queue.
920            let (tx, rx) = tokio::sync::mpsc::channel::<tools::Bm25IndexRequest>(
921                tools::BM25_INDEX_QUEUE_CAPACITY,
922            );
923            tools::spawn_bm25_index_worker(
924                rx,
925                self.bm25_client.clone(),
926                self.bm25_supervisor.clone(),
927            );
928            self.bm25_index_tx = tx;
929            tracing::info!(
930                palace = default_palace,
931                "BM25 daemon client + spawn supervisor enabled (TRUSTY_BM25_DAEMON=1)"
932            );
933        }
934        self
935    }
936
937    /// Scan the palace registry directory and re-register every persisted
938    /// palace into the in-memory [`PalaceRegistry`].
939    ///
940    /// Why: `AppState::new` builds an *empty* registry, so after a daemon
941    /// restart `palace_list` / the dashboard reported zero palaces even though
942    /// dozens existed on disk — palace metadata was persisted by
943    /// `palace_create` but never re-hydrated on startup. This method closes
944    /// that gap by walking the on-disk layout (each subdirectory holding a
945    /// `palace.json` is one palace) and rebuilding a live `PalaceHandle` for
946    /// each, so recall paths see the full set immediately after a restart.
947    /// What: runs the blocking filesystem walk + per-palace `PalaceHandle::open`
948    /// on a `spawn_blocking` thread (so it never stalls the async runtime),
949    /// registers each successfully opened palace via `register_arc`, logs every
950    /// load at `debug!`, and returns the count loaded. A palace that fails to
951    /// open (corrupt index, unreadable `kg.db`, etc.) is logged at `warn!` and
952    /// skipped — one bad palace must not abort startup or crash the daemon.
953    /// `data_root` is expected to already be the palace registry directory —
954    /// `main.rs` resolves it via [`resolve_palace_registry_dir`] before
955    /// constructing the `AppState`, so the flat / legacy-`palaces/` layout
956    /// difference is handled exactly once.
957    /// Test: `tests::load_palaces_from_disk_rehydrates_registry` writes two
958    /// palaces into a tempdir, constructs an `AppState`, calls this method, and
959    /// asserts the returned count and registry contents.
960    pub async fn load_palaces_from_disk(&self) -> Result<usize> {
961        let registry_dir = self.data_root.clone();
962        let registry = self.registry.clone();
963        let palace_names = self.palace_names.clone();
964        // The directory walk and each `PalaceHandle::open` perform blocking
965        // filesystem + redb/usearch I/O — run the whole hydration on the
966        // blocking pool so it never parks an async worker thread.
967        let count = tokio::task::spawn_blocking(move || -> Result<usize> {
968            let palaces = PalaceRegistry::list_palaces(&registry_dir)?;
969            let total = palaces.len();
970            let mut loaded = 0usize;
971            let mut skipped = 0usize;
972            for palace in palaces {
973                match trusty_common::memory_core::PalaceHandle::open(&palace) {
974                    Ok(handle) => {
975                        tracing::debug!(
976                            palace = %palace.id,
977                            data_dir = %palace.data_dir.display(),
978                            "loaded palace from disk"
979                        );
980                        // Issue #228: seed the in-memory name cache so write
981                        // hot paths (memory_remember / memory_note) can resolve
982                        // the friendly palace name without re-walking the data
983                        // root. Insert here (during hydration) is the single
984                        // point of truth for restart-time population.
985                        palace_names.insert(palace.id.0.clone(), palace.name.clone());
986                        registry.register_arc(handle);
987                        loaded += 1;
988                    }
989                    Err(e) => {
990                        // Why (issue #467): a single bad palace (corrupt kg.db,
991                        // stale WAL, EMFILE — "Too many open files", permissions)
992                        // must never abort startup or block the HTTP server from
993                        // binding. Log per-palace and keep going; the summary
994                        // below tells operators how many were skipped without
995                        // trawling the log.
996                        // The palace is NOT registered in the in-memory registry,
997                        // so the next `open_palace` call for this id will attempt
998                        // a fresh open from disk — the lazy-reopen path. If the
999                        // root cause was EMFILE and the fd-limit fix (#462) raised
1000                        // the soft limit to 8192, that first request will succeed.
1001                        tracing::warn!(
1002                            palace = %palace.id,
1003                            data_dir = %palace.data_dir.display(),
1004                            "skipping palace during startup hydration: {e:#}; \
1005                             will retry lazily on first access"
1006                        );
1007                        skipped += 1;
1008                    }
1009                }
1010            }
1011            tracing::info!(
1012                "palace hydration summary: loaded {loaded}/{total} ({skipped} skipped due to errors)"
1013            );
1014            Ok(loaded)
1015        })
1016        .await
1017        .map_err(|e| anyhow::anyhow!("join load_palaces_from_disk: {e}"))??;
1018        Ok(count)
1019    }
1020
1021    /// Builder-style: attach the daemon's shared [`LogBuffer`] so the
1022    /// `GET /api/v1/logs/tail` endpoint serves the same lines the tracing
1023    /// subscriber captures (issue #35).
1024    ///
1025    /// Why: `main` builds the buffer (via `init_tracing_with_buffer`) before
1026    /// constructing the `AppState`, then hands a clone here so the HTTP
1027    /// handler and the tracing layer observe the same ring.
1028    /// What: replaces the empty default buffer with the supplied one.
1029    /// Test: `logs_tail_returns_recent_lines`.
1030    #[must_use]
1031    pub fn with_log_buffer(mut self, buffer: trusty_common::log_buffer::LogBuffer) -> Self {
1032        self.log_buffer = buffer;
1033        self
1034    }
1035
1036    /// Builder-style: mark this daemon as the sole palace writer so palace
1037    /// redb files open with `OpenIntent::Writer` (issue #1487).
1038    ///
1039    /// Why: The HTTP daemon owns the write lock on every palace's `kg.redb`
1040    /// and `index.usearch.redb`. Before this fix, when a *second* daemon
1041    /// instance opened the same store it silently degraded to a read-only
1042    /// snapshot and rejected every `memory_remember` for its lifetime —
1043    /// effectively silent data loss when an MCP client routed a write to the
1044    /// rogue instance. Opening as `Writer` makes the second instance fail
1045    /// loud (after a short handoff-retry window that absorbs a graceful
1046    /// launchd `bootout`→`bootstrap` overlap) instead of serving broken
1047    /// reads-only. CLI, stdio-proxy, and test code paths never call this, so
1048    /// they keep the snapshot read-fallback (issue #59).
1049    /// What: Replaces `self.registry` with a fresh `PalaceRegistry` carrying
1050    /// `OpenIntent::Writer`.
1051    ///
1052    /// Invariant: MUST be called on a fresh, unhydrated, unshared registry —
1053    /// during startup, before `spawn_startup_tasks`/`load_palaces_from_disk`
1054    /// registers any `PalaceHandle` and before the `AppState` (hence its
1055    /// `Arc<PalaceRegistry>`) is cloned to a handler. Replacing the registry
1056    /// discards the prior `Arc`; doing so after hydration would silently drop
1057    /// live handles (data loss), and doing so after the state is shared would
1058    /// leave other clones on the stale read-only registry. The guard is a
1059    /// `debug_assert!` on the strongest cheap signals the registry exposes —
1060    /// `is_empty()` (no handles hydrated) and `Arc::strong_count == 1` (not yet
1061    /// shared) — so an ordering violation fails fast as the programmer error it
1062    /// is (the call site is startup-only and fixed). Release builds elide the
1063    /// assert; the real call site (`run_serve`) always satisfies it.
1064    /// Test: `with_writer_intent_marks_registry_writer` and
1065    /// `with_writer_intent_panics_on_hydrated_registry` in `lib_tests`.
1066    #[must_use]
1067    pub fn with_writer_intent(mut self) -> Self {
1068        // Fail fast on an ordering bug: a hydrated registry (`!is_empty`) or a
1069        // shared one (`strong_count > 1`) would silently drop live handles or
1070        // strand other clones on the stale read-only registry (issue #1487).
1071        debug_assert!(self.registry.is_empty() && Arc::strong_count(&self.registry) == 1);
1072        self.registry = Arc::new(PalaceRegistry::new().with_writer_intent());
1073        self
1074    }
1075
1076    /// Builder-style: attach the bug-capture `ErrorStore` handle (bug-reporting #478).
1077    ///
1078    /// Why: Phase 2 MCP / HTTP endpoints need a handle to the in-memory error
1079    ///      ring so they can serve `recent_errors` / `errors_by_fingerprint`
1080    ///      without disk I/O on the hot path. Installing it here — rather than
1081    ///      adding it as a separate global — keeps the state graph explicit and
1082    ///      lets tests skip it by never calling this method.
1083    /// What: stores `Some(store)` in `AppState::error_store`; the `BugCaptureLayer`
1084    ///      that writes to this store is already installed in the tracing
1085    ///      subscriber by `init_tracing_with_buffer_and_capture`. The store is
1086    ///      `Clone` (cheap `Arc` clone internally) so both the layer and this
1087    ///      field share the same underlying ring.
1088    /// Test: Phase 2 will add `error_store_captures_and_queries` in `web.rs`.
1089    #[must_use]
1090    pub fn with_error_store(mut self, store: trusty_common::error_capture::ErrorStore) -> Self {
1091        self.error_store = Some(store);
1092        self
1093    }
1094
1095    /// Send a `DaemonEvent` to all connected SSE subscribers and persist
1096    /// it to the activity log when the variant carries a source.
1097    ///
1098    /// Why: Mutating handlers call this after a successful write so the
1099    /// dashboard can update without polling. The send is best-effort —
1100    /// `broadcast::Sender::send` returns `Err` only when there are no live
1101    /// receivers, which is fine (no listeners == no work to do). Issue
1102    /// #96 additionally writes the entry to the persistent activity log
1103    /// so the feed can serve historical rows on page load and so MCP /
1104    /// HTTP / Hook origins are visible to the operator. Persistence is
1105    /// also best-effort — a write failure is logged but never blocks the
1106    /// SSE broadcast.
1107    ///
1108    /// Issue #232: the activity-log append is a synchronous redb write +
1109    /// fsync. Calling it directly on the async caller's task parked a tokio
1110    /// worker thread on disk I/O for every SSE event. We now offload the
1111    /// append to the blocking thread pool via `spawn_blocking` and return
1112    /// immediately — `emit` stays synchronous so every existing caller
1113    /// (including the sync `dispatch_hook_fired` JSON-RPC handler) keeps
1114    /// compiling unchanged. The fire-and-forget pattern matches the
1115    /// pre-fix semantics (best-effort, never blocks the SSE broadcast)
1116    /// while freeing the async runtime to do real work during the write.
1117    /// What: serialises the event for the log (skipping `StatusChanged`
1118    /// which is a recomputed aggregate, not a mutation), spawns the redb
1119    /// append on `tokio::task::spawn_blocking` keyed by a clone of the
1120    /// `Arc<ActivityLog>` and the cloned event, then sends the event over
1121    /// the broadcast channel. A `pending_activity_writes` counter is bumped
1122    /// before the spawn and decremented inside the closure so
1123    /// [`Self::flush_activity_writes`] can drain in tests.
1124    /// Test: `web::tests::sse_stream_receives_palace_created` confirms a
1125    /// subscriber observes the emitted event;
1126    /// `activity_endpoint_lists_recent_emits` confirms persistence via
1127    /// `flush_activity_writes`.
1128    pub fn emit(&self, event: DaemonEvent) {
1129        if let Some(source) = event.source() {
1130            let event_type = event.type_str();
1131            let palace_id = event.palace_id().map(|s| s.to_string());
1132            let log = Arc::clone(&self.activity_log);
1133            let event_for_log = event.clone();
1134            let pending = Arc::clone(&self.pending_activity_writes);
1135            // Pre-allocate the sequence id in the emitting thread so the
1136            // persisted order matches the emission order even when blocking-pool
1137            // workers execute the writes concurrently (issue #247). Without
1138            // this, four rapid emits would assign IDs inside their respective
1139            // `spawn_blocking` closures in a non-deterministic order.
1140            let id = log.alloc_id();
1141            pending.fetch_add(1, Ordering::SeqCst);
1142            // Why: the synchronous redb append + fsync must not park an
1143            // async worker thread (issue #232). Spawn the write on the
1144            // blocking pool; the JoinHandle is intentionally dropped —
1145            // the write is best-effort and any failure is logged below.
1146            tokio::task::spawn_blocking(move || {
1147                let result = log.append_with_id(id, source, palace_id, event_type, &event_for_log);
1148                if let Err(e) = result {
1149                    tracing::warn!("activity_log.append failed for {event_type}: {e:#}");
1150                }
1151                pending.fetch_sub(1, Ordering::SeqCst);
1152            });
1153        }
1154        let _ = self.events.send(event);
1155    }
1156
1157    /// Block (asynchronously) until every in-flight activity-log write
1158    /// spawned by [`Self::emit`] has settled.
1159    ///
1160    /// Why: `emit` offloads its redb append to `tokio::task::spawn_blocking`
1161    /// and returns immediately (issue #232). Tests that observe the
1162    /// activity log right after a burst of emits would otherwise race the
1163    /// blocking-pool worker; this helper gives them a deterministic
1164    /// synchronization point. Production code never needs to call this —
1165    /// the dashboard reads through `GET /api/v1/activity`, which already
1166    /// tolerates writes settling asynchronously.
1167    /// What: spins on `pending_activity_writes` with a 1 ms yield until the
1168    /// counter is zero. Cheap: tests typically emit a handful of events
1169    /// and the loop exits within a single scheduler tick.
1170    /// Test: covered indirectly by `emit_persists_mutations_but_skips_status_changed`
1171    /// and `web::tests::activity_endpoint_lists_recent_emits`.
1172    pub async fn flush_activity_writes(&self) {
1173        while self.pending_activity_writes.load(Ordering::SeqCst) > 0 {
1174            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1175        }
1176    }
1177
1178    /// Open (or return cached) the chat-session store for a palace.
1179    ///
1180    /// Why: Chat session persistence lives in a dedicated redb file under
1181    /// the palace's data dir (`chat_sessions.redb`) so it doesn't intermingle
1182    /// with the KG's transactional load. The store is cheap to clone via
1183    /// `Arc` but the underlying connection should be reused, so cache by id.
1184    /// What: Creates the palace data dir if missing, opens (or reuses) a
1185    /// `ChatSessionStore` and stashes an `Arc` in the DashMap.
1186    /// Test: Indirectly via the session HTTP handlers in `web::tests`.
1187    pub fn session_store(&self, palace_id: &str) -> Result<Arc<ChatSessionStore>> {
1188        if let Some(entry) = self.session_stores.get(palace_id) {
1189            return Ok(entry.clone());
1190        }
1191        let dir = self.data_root.join(palace_id);
1192        std::fs::create_dir_all(&dir)
1193            .map_err(|e| anyhow::anyhow!("create palace dir {}: {e}", dir.display()))?;
1194        let store = Arc::new(ChatSessionStore::open(&dir.join("chat_sessions.db"))?);
1195        self.session_stores
1196            .insert(palace_id.to_string(), store.clone());
1197        Ok(store)
1198    }
1199
1200    /// Builder-style setter for the default palace name.
1201    ///
1202    /// Why: `serve --palace <name>` wants to bind every tool call to a
1203    /// project-scoped namespace without forcing every MCP request to repeat
1204    /// the palace argument.
1205    /// What: Returns `self` with `default_palace = Some(name)`.
1206    /// Test: `default_palace_used_when_arg_omitted` covers the resolution
1207    /// path; this setter is exercised there.
1208    pub fn with_default_palace(mut self, name: Option<String>) -> Self {
1209        self.default_palace = name;
1210        self
1211    }
1212
1213    /// Resolve (or initialize) the shared embedder.
1214    ///
1215    /// Why: FastEmbedder load is expensive — we share one instance across all
1216    /// tool calls; the `OnceCell` ensures concurrent first-use races collapse
1217    /// to a single load.
1218    /// What: Returns `Arc<FastEmbedder>` on success. Errors propagate from the
1219    /// underlying ONNX load.
1220    /// Test: Indirectly via `dispatch_remember_then_recall`.
1221    /// Resolve the active chat provider, auto-detecting on first call.
1222    ///
1223    /// Why: Provider selection depends on filesystem-loaded config plus a
1224    /// network probe (Ollama liveness), so it must be lazily initialised at
1225    /// runtime. Caching the choice in a `OnceCell` keeps it stable across
1226    /// concurrent requests without re-probing on every chat call.
1227    /// What: On first use loads `~/.trusty-memory/config.toml`, prefers an
1228    /// auto-detected Ollama instance (when `local_model.enabled`), and falls
1229    /// back to OpenRouter when an API key is set. Returns `Ok(None)` when
1230    /// neither is available so the caller can emit a 412.
1231    /// Test: `web::tests::providers_endpoint_returns_payload` covers the
1232    /// detection path indirectly through `/api/v1/chat/providers`.
1233    pub async fn chat_provider(&self) -> Option<Arc<dyn ChatProvider>> {
1234        self.chat_provider
1235            .get_or_init(|| async {
1236                // Why (issue #226): `service::load_user_config` is the
1237                //      axum-free home of the loader; the `web::load_user_config`
1238                //      re-export only exists for the HTTP handlers. Going
1239                //      direct to `service` keeps this method usable when
1240                //      the `axum-server` feature is disabled.
1241                let cfg = crate::service::load_user_config().unwrap_or_default();
1242                if cfg.local_model.enabled {
1243                    if let Some(mut p) =
1244                        trusty_common::auto_detect_local_provider(&cfg.local_model.base_url).await
1245                    {
1246                        // auto_detect returns an empty model id; callers must
1247                        // set the configured model name themselves.
1248                        p.model = cfg.local_model.model.clone();
1249                        return Some(Arc::new(p) as Arc<dyn ChatProvider>);
1250                    }
1251                }
1252                if !cfg.openrouter_api_key.is_empty() {
1253                    return Some(Arc::new(trusty_common::OpenRouterProvider::new(
1254                        cfg.openrouter_api_key,
1255                        cfg.openrouter_model,
1256                    )) as Arc<dyn ChatProvider>);
1257                }
1258                None
1259            })
1260            .await
1261            .clone()
1262    }
1263
1264    /// Spawn a fire-and-forget background task that auto-discovers project
1265    /// aliases under `project_root` and asserts new ones into `palace`.
1266    ///
1267    /// Why (issue #42): Projects carry implicit shorthand — cargo package
1268    /// names that differ from their directory, binary names that differ
1269    /// from packages, first-letter abbreviations — that should be surfaced
1270    /// without a user ever calling `add_alias`. Running discovery as a
1271    /// detached task on palace-open keeps startup latency unchanged: the
1272    /// daemon binds and starts serving immediately while the discovery scan
1273    /// completes in the background, and any newly-asserted aliases land in
1274    /// the prompt cache before the model's next `get_prompt_context` call.
1275    /// What: clones `self` (cheap; `Arc`-backed), spawns a tokio task that
1276    /// invokes the `discover_aliases` tool handler directly so the
1277    /// dedup + cache-rebuild logic runs exactly the same path as the MCP
1278    /// tool call. Errors are logged at `warn!`; one failed discovery never
1279    /// destabilises the daemon.
1280    /// Test: not unit-tested (timing-dependent fire-and-forget); the
1281    /// underlying `discover_aliases` dispatch is covered by
1282    /// `dispatch_discover_aliases_inserts_new_and_dedupes` in `tools::tests`.
1283    pub fn spawn_alias_discovery(&self, palace: String, project_root: PathBuf) {
1284        let state = self.clone();
1285        tokio::spawn(async move {
1286            let args = serde_json::json!({
1287                "palace": palace,
1288                "project_root": project_root.to_string_lossy(),
1289            });
1290            match tools::dispatch_tool(&state, "discover_aliases", args).await {
1291                Ok(result) => tracing::info!(
1292                    new = ?result.get("new"),
1293                    already_known = ?result.get("already_known"),
1294                    "alias discovery complete"
1295                ),
1296                Err(e) => tracing::warn!("alias discovery failed: {e:#}"),
1297            }
1298        });
1299    }
1300
1301    /// Return the current readiness state.
1302    ///
1303    /// Why: tool handlers and the `/health` endpoint need a cheap, lock-free
1304    /// way to check whether the embedder has been initialised yet.
1305    /// What: loads `daemon_readiness` with `Acquire` ordering so the caller
1306    /// sees all writes the startup task made before setting the state.
1307    /// Test: `daemon_readiness_transitions_warming_to_ready`.
1308    pub fn readiness(&self) -> DaemonReadiness {
1309        DaemonReadiness::from_u8(self.daemon_readiness.load(Ordering::Acquire))
1310    }
1311
1312    /// Flip the readiness state from `Warming` to `Ready`.
1313    ///
1314    /// Why: called by `spawn_startup_tasks` in `main.rs` once the embedder
1315    /// warm-up succeeds — this is the single state-transition site.
1316    /// What: `store(Ready, Release)` so subsequent `Acquire` loads in handlers
1317    /// observe a consistent state.  Idempotent: calling it multiple times is
1318    /// harmless.
1319    /// Test: `daemon_readiness_transitions_warming_to_ready`.
1320    pub fn set_ready(&self) {
1321        self.daemon_readiness
1322            .store(DaemonReadiness::Ready as u8, Ordering::Release);
1323    }
1324
1325    /// Return `Ok(())` when `Ready`, or an explicit `Err` with the warming
1326    /// message when still `Warming`.
1327    ///
1328    /// Why: the preflight in every bounded handler calls this and returns the
1329    /// error immediately so no embedding / redb I/O is attempted while the
1330    /// daemon is still initialising (tracks #911 internally).
1331    /// What: cheaply reads `daemon_readiness`; returns the fast error string
1332    /// on `Warming`.  Zero allocation on the happy path.
1333    /// Test: covered by `tools::tests::remember_returns_warming_error_while_state_is_warming`.
1334    pub fn readiness_check(&self) -> Result<()> {
1335        if self.readiness() == DaemonReadiness::Warming {
1336            return Err(anyhow::anyhow!(
1337                "trusty-memory is warming up (embedder initialising); \
1338                 please retry in a few seconds"
1339            ));
1340        }
1341        Ok(())
1342    }
1343
1344    /// Obtain the shared `FastEmbedder` instance, initialising it on first call.
1345    ///
1346    /// Why: centralises lazy embedder access so every tool handler goes through
1347    /// one bounded init path (tracks #910 internally).
1348    /// What: wraps `OnceCell::get_or_try_init` with a timeout so a slow
1349    /// CoreML/CUDA first-compile cannot block a handler indefinitely.  On
1350    /// timeout the `OnceCell` is left unresolved and the next caller retries.
1351    ///
1352    /// **Callers on the request path MUST call `readiness_check()` before
1353    /// this method.**  The four guarded handlers (`memory_remember`,
1354    /// `memory_recall`, `memory_recall_deep`, `memory_note`) do so; any new
1355    /// handler that calls `embedder()` must follow the same pattern.
1356    /// Reaching this method while still `Warming` is not a bug — the warm-up
1357    /// task itself calls `embedder()` while in `Warming` state — but request
1358    /// handlers should have short-circuited before here via `readiness_check()`.
1359    ///
1360    /// The `readiness_check()` preflight is the PRIMARY guard (fast rejection
1361    /// with no I/O).  This timeout is the last-resort backstop in case a
1362    /// handler bypasses the preflight or the warm-up task itself hits a
1363    /// pathological init delay.  If this timeout fires the `OnceCell` is left
1364    /// in the unresolved state and the next call retries from scratch.
1365    pub async fn embedder(&self) -> Result<Arc<FastEmbedder>> {
1366        use trusty_common::memory_core::timeouts;
1367        let cell = self.embedder.clone();
1368        let timeout = timeouts::embedder_init_timeout();
1369        // `readiness_check()` is the PRIMARY guard — handlers return a fast
1370        // warming error before reaching here.  This timeout is the last-resort
1371        // backstop: if the embedder init races past the preflight (e.g. in the
1372        // warm-up task itself, which calls embedder() while still Warming) or
1373        // the CoreML/CUDA compile stalls, we fail fast rather than blocking
1374        // indefinitely.  On timeout the OnceCell stays unresolved; the next
1375        // caller will retry the init from scratch.
1376        let embedder = tokio::time::timeout(
1377            timeout,
1378            cell.get_or_try_init(|| async {
1379                let e = FastEmbedder::new().await?;
1380                Ok::<Arc<FastEmbedder>, anyhow::Error>(Arc::new(e))
1381            }),
1382        )
1383        .await
1384        .map_err(|_| {
1385            anyhow::anyhow!(
1386                "AppState::embedder() timed out after {:?}; \
1387                 the CoreML/CUDA model is taking unusually long to compile — \
1388                 increase TRUSTY_EMBEDDER_INIT_TIMEOUT_SECS if needed",
1389                timeout
1390            )
1391        })??
1392        .clone();
1393        Ok(embedder)
1394    }
1395}
1396
1397impl std::fmt::Debug for AppState {
1398    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1399        f.debug_struct("AppState")
1400            .field("version", &self.version)
1401            .field("data_root", &self.data_root)
1402            .field("registry_len", &self.registry.len())
1403            .finish()
1404    }
1405}
1406
1407/// Handle a single MCP JSON-RPC message and produce its response.
1408///
1409/// Why: Pulled out of the stdio loop so unit tests can drive every method
1410/// without touching real stdin/stdout.
1411/// What: Routes `initialize`, `tools/list`, `tools/call`, `ping`, and the
1412/// `notifications/initialized` notification (which returns `Value::Null`).
1413/// Test: See unit tests below — initialize/list/call all return expected
1414/// JSON-RPC envelopes; notifications return `Null` (no response written).
1415pub async fn handle_message(state: &AppState, msg: Value) -> Value {
1416    let id = msg.get("id").cloned().unwrap_or(Value::Null);
1417    let method = msg.get("method").and_then(|m| m.as_str()).unwrap_or("");
1418
1419    match method {
1420        "initialize" => {
1421            let extra = state
1422                .default_palace
1423                .as_ref()
1424                .map(|dp| json!({ "default_palace": dp }));
1425            let result = initialize_response("trusty-memory", &state.version, extra);
1426            // Why (issue #42): prompt-facts now flow through the
1427            // per-message `get_prompt_context` tool rather than MCP
1428            // prompts, so we no longer advertise the `prompts` capability.
1429            json!({
1430                "jsonrpc": "2.0",
1431                "id": id,
1432                "result": result,
1433            })
1434        }
1435        // Notifications must NOT receive a response.
1436        "notifications/initialized" | "notifications/cancelled" => Value::Null,
1437        "tools/list" => json!({
1438            "jsonrpc": "2.0",
1439            "id": id,
1440            "result": tools::tool_definitions_with(state.default_palace.is_some())
1441        }),
1442        // OpenRPC 1.3.2 discovery — see `openrpc.rs`. Returns the full
1443        // service description so orchestrators (open-mpm, etc.) can
1444        // introspect every tool and its required `memory.read`/`memory.write`
1445        // scope without bespoke per-server adapters.
1446        "rpc.discover" => json!({
1447            "jsonrpc": "2.0",
1448            "id": id,
1449            "result": openrpc::build_discover_response(
1450                &state.version,
1451                state.default_palace.is_some(),
1452            ),
1453        }),
1454        "tools/call" => {
1455            let params = msg.get("params").cloned().unwrap_or_default();
1456            let tool_name = params
1457                .get("name")
1458                .and_then(|n| n.as_str())
1459                .unwrap_or("")
1460                .to_string();
1461            let args = params.get("arguments").cloned().unwrap_or_default();
1462            match tools::dispatch_tool(state, &tool_name, args).await {
1463                Ok(content) => {
1464                    // Why: tools that return a bare JSON string (e.g.
1465                    // `get_prompt_context` returning the formatted
1466                    // Markdown block) should surface as plain text in the
1467                    // MCP `content[0].text` field — wrapping in
1468                    // `Value::to_string()` would re-quote the payload and
1469                    // force every caller to strip outer quotes.
1470                    let text = match &content {
1471                        Value::String(s) => s.clone(),
1472                        other => other.to_string(),
1473                    };
1474                    json!({
1475                        "jsonrpc": "2.0",
1476                        "id": id,
1477                        "result": {
1478                            "content": [{"type": "text", "text": text}]
1479                        }
1480                    })
1481                }
1482                Err(e) => json!({
1483                    "jsonrpc": "2.0",
1484                    "id": id,
1485                    // Why: anyhow's `{:#}` alternate format walks the full
1486                    // `Caused by:` chain so MCP clients see actionable
1487                    // detail (e.g. "PalaceHandle::remember_with_options:
1488                    // filter rejected: too short") instead of just the
1489                    // outermost context label.
1490                    "error": {"code": -32603, "message": format!("{e:#}")}
1491                }),
1492            }
1493        }
1494        "ping" => json!({"jsonrpc": "2.0", "id": id, "result": {}}),
1495        _ => json!({
1496            "jsonrpc": "2.0",
1497            "id": id,
1498            "error": {
1499                "code": -32601,
1500                "message": format!("Method not found: {method}")
1501            }
1502        }),
1503    }
1504}
1505
1506/// Preferred starting port for the trusty-memory HTTP daemon.
1507///
1508/// Why: keeps the well-known default stable for clients that have hard-coded
1509/// `127.0.0.1:7070` in their configuration, while still allowing dynamic
1510/// walking when the port is in use (`DYNAMIC_PORT_RANGE` ports starting here).
1511/// What: `7070` — historic default, matches the launchd plist's prior value.
1512/// Test: covered indirectly by `bind_dynamic_port_returns_listener`.
1513pub const DEFAULT_HTTP_PORT: u16 = 7070;
1514
1515/// Number of consecutive ports `bind_dynamic_port` walks before falling back
1516/// to the OS-assigned port. Matches the trusty-search convention.
1517const DYNAMIC_PORT_RANGE: u16 = 10;
1518
1519/// Path to the canonical address-discovery file for the trusty-memory daemon.
1520///
1521/// Why: clients (CLI, MCP tools, dashboards) need to find the running daemon
1522/// without configuration when the port was selected dynamically. Using
1523/// `trusty_common::resolve_data_dir` aligns this path with the location
1524/// that `trusty_common::read_daemon_addr("trusty-memory")` reads from, so
1525/// `prompt-context`, `doctor`, and `start`'s probe all find the running daemon.
1526/// The old `~/.trusty-memory/http_addr` path and the new
1527/// `~/Library/Application Support/trusty-memory/http_addr` (macOS) path were
1528/// divergent — the daemon wrote one; readers expected the other.
1529/// What: returns `{resolve_data_dir("trusty-memory")}/http_addr`, or `None` if
1530/// the data dir cannot be resolved (locked-down container, no passwd entry).
1531/// Test: `http_addr_path_uses_resolve_data_dir`.
1532pub fn http_addr_path() -> Option<PathBuf> {
1533    trusty_common::resolve_data_dir("trusty-memory")
1534        .ok()
1535        .map(|d| d.join("http_addr"))
1536}
1537
1538/// Bind a `TcpListener` to `127.0.0.1`, dynamically selecting a port.
1539///
1540/// Why: the historic default `7070` is convenient for clients but a stale
1541/// process or a second daemon must not produce a noisy failure. Walking
1542/// `DEFAULT_HTTP_PORT..DEFAULT_HTTP_PORT+DYNAMIC_PORT_RANGE` first preserves
1543/// backwards compatibility for the common case; OS-assigned fallback (`:0`)
1544/// guarantees the daemon always comes up even when every preferred port is
1545/// busy.
1546/// What: returns the first successful `TcpListener` (7070..=7079, then
1547/// OS-assigned); caller inspects `local_addr()` to learn the chosen port.
1548/// Test: `bind_dynamic_port_returns_listener` confirms it always binds *some*
1549/// port even after another listener occupies the preferred one.
1550pub async fn bind_dynamic_port() -> Result<tokio::net::TcpListener> {
1551    let preferred: SocketAddr = SocketAddr::from(([127, 0, 0, 1], DEFAULT_HTTP_PORT));
1552    // First: walk the preferred range (7070..=7079).
1553    if let Ok(listener) =
1554        trusty_common::bind_with_auto_port(preferred, DYNAMIC_PORT_RANGE - 1).await
1555    {
1556        return Ok(listener);
1557    }
1558    // Last resort: ask the kernel for any free port. `bind_with_auto_port`
1559    // with `:0` resolves immediately to the OS-assigned port.
1560    tracing::warn!(
1561        "all ports {DEFAULT_HTTP_PORT}..{} in use; requesting OS-assigned port",
1562        DEFAULT_HTTP_PORT + DYNAMIC_PORT_RANGE - 1
1563    );
1564    let any: SocketAddr = SocketAddr::from(([127, 0, 0, 1], 0));
1565    trusty_common::bind_with_auto_port(any, 0).await
1566}
1567
1568/// Write the bound `host:port` to `~/.trusty-memory/http_addr` atomically.
1569///
1570/// Why: clients must read the file mid-write without observing a partial
1571/// value. Writing to a `.tmp` sibling and renaming over the target gives
1572/// POSIX atomicity, matching the trusty-search implementation.
1573/// What: creates the parent directory if missing; writes `addr` followed by a
1574/// trailing newline (avoids the "no newline at end of file" warnings from
1575/// `cat`); renames `.tmp` → `http_addr`. Best-effort: I/O errors are
1576/// returned to the caller so `run_http_on` can log without panicking.
1577/// Test: `http_addr_file_round_trip_via_helpers`.
1578#[cfg(feature = "axum-server")]
1579fn write_http_addr_file(path: &Path, addr: &SocketAddr) -> std::io::Result<()> {
1580    use std::io::Write;
1581    if let Some(parent) = path.parent() {
1582        std::fs::create_dir_all(parent)?;
1583    }
1584    let tmp = path.with_extension("addr.tmp");
1585    {
1586        let mut f = std::fs::File::create(&tmp)?;
1587        writeln!(f, "{addr}")?;
1588        f.sync_all()?;
1589    }
1590    std::fs::rename(&tmp, path)?;
1591    Ok(())
1592}
1593
1594/// Return `true` when a non-default data directory is in effect.
1595///
1596/// Why (issue #880): two startup side-effects must be suppressed when the
1597/// daemon runs with an isolated/overridden data root:
1598/// 1. The legacy `~/.trusty-memory/http_addr` dotfile write — it would
1599///    overwrite the real production daemon's discovery file with the isolated
1600///    instance's throwaway address.
1601/// 2. The startup pin-scan — it reads project pin files from the **real**
1602///    user environment (~/Projects, ~/Developer, …) and imports palaces from
1603///    the real environment into the isolated data root, defeating isolation.
1604///
1605/// A "non-default data dir" means `TRUSTY_DATA_DIR_OVERRIDE` is set to a
1606/// non-empty, non-whitespace value. Empty or whitespace-only values are
1607/// treated as unset (same rule as `resolve_data_dir`), so an accidental blank
1608/// env var does not suppress the dotfile write on real production instances.
1609/// What: reads `TRUSTY_DATA_DIR_OVERRIDE`; returns `true` when it contains a
1610/// non-empty, non-whitespace string. Returns `false` otherwise.
1611/// Test: `is_data_dir_override_active_when_set`,
1612///       `is_data_dir_override_inactive_when_unset`,
1613///       `is_data_dir_override_inactive_when_blank`.
1614#[inline]
1615pub fn is_data_dir_override_active() -> bool {
1616    matches!(
1617        std::env::var(trusty_common::DATA_DIR_OVERRIDE_ENV),
1618        Ok(v) if !v.trim().is_empty()
1619    )
1620}
1621
1622/// Resolve the dotfile discovery path `~/.trusty-memory/http_addr`.
1623///
1624/// Why (issue #498): external tooling such as claude-mpm's `migrate_trusty_autodetect`
1625/// reads `~/.trusty-memory/http_addr` to find the running daemon's port. On
1626/// macOS, `resolve_data_dir("trusty-memory")` returns
1627/// `~/Library/Application Support/trusty-memory/`, not `~/.trusty-memory/`,
1628/// so the daemon was writing to the OS-standard location while readers expected
1629/// the dotfile location. Writing to both locations keeps every reader happy
1630/// regardless of which convention they follow.
1631///
1632/// Fix #880: returns `None` when `TRUSTY_DATA_DIR_OVERRIDE` is active so an
1633/// isolated instance (test rig, CI, parallel run) never overwrites the real
1634/// production daemon's discovery dotfile.
1635///
1636/// What: returns `$HOME/.trusty-memory/http_addr` in the default (production)
1637/// case, or `None` when `dirs::home_dir()` is unavailable OR when a data-dir
1638/// override is active (see `is_data_dir_override_active`).
1639/// Test: `dotfile_http_addr_path_uses_home_dir`,
1640///       `dotfile_suppressed_when_override_active`.
1641#[cfg(feature = "axum-server")]
1642fn dotfile_http_addr_path() -> Option<PathBuf> {
1643    // Fix #880: never write to the shared dotfile when an override is active.
1644    if is_data_dir_override_active() {
1645        return None;
1646    }
1647    dirs::home_dir().map(|h| h.join(".trusty-memory").join("http_addr"))
1648}
1649
1650/// Run the optional HTTP/SSE + web admin server.
1651///
1652/// Why: A long-running daemon mode lets non-stdio clients (browsers, curl,
1653/// future remote agents) hit `/health`, the `/api/v1/*` REST surface, and the
1654/// embedded admin SPA. The Unix-domain-socket transport and the
1655/// `trusty-memory-mcp-bridge` binary were removed in PR3 of the #914
1656/// stdio-cutover epic; the canonical MCP integration is now
1657/// `trusty-memory serve --stdio` (PR1 #919).
1658/// What: axum router built from `web::router()` plus a `/sse` stub for the
1659/// existing MCP-over-SSE clients. Caller provides a pre-bound listener so
1660/// port auto-detection lives at the call site. Before accepting connections
1661/// the daemon stamps the bound `host:port` onto `AppState.bound_addr` and
1662/// writes `~/.trusty-memory/http_addr` so clients can discover the live port.
1663/// On shutdown the file is removed best-effort (a stale file with the wrong
1664/// port is worse than a missing one).
1665/// Test: `cargo test -p trusty-memory web::tests` exercises the router shape;
1666/// manual: `curl http://127.0.0.1:<port>/health` returns `ok` with `addr`.
1667#[cfg(feature = "axum-server")]
1668pub async fn run_http_on(state: AppState, listener: tokio::net::TcpListener) -> Result<()> {
1669    use axum::routing::get;
1670
1671    // Issue #35: recompute the `data_root` disk footprint every 10 s on a
1672    // background task so `GET /health` reports `disk_bytes` without doing a
1673    // recursive directory walk on the request path.
1674    spawn_disk_size_ticker(state.clone());
1675
1676    // Issue #228: emit aggregate `StatusChanged` on a fixed cadence rather
1677    // than on every drawer write. The previous design called
1678    // `aggregate_status_event` from every `memory_remember` / `memory_note`
1679    // / `memory_forget` (and the matching HTTP handlers), each of which
1680    // walked the data root + opened every palace handle. Coalescing the
1681    // emit to a 30 s ticker keeps dashboards live without dragging an
1682    // O(N palaces) recompute onto the write hot path.
1683    spawn_status_event_ticker(state.clone());
1684
1685    // Capture and advertise the bound address BEFORE serving so the first
1686    // request handler — and the http_addr discovery file — see the real port
1687    // even if `local_addr()` would otherwise be racy.
1688    let local = listener.local_addr().ok();
1689    let (written_path, written_dotfile_path) = if let Some(a) = local {
1690        // Stash on state for handlers (e.g. /health) to surface.
1691        let _ = state.bound_addr.set(a);
1692        info!("HTTP server listening on http://{a}");
1693        eprintln!("HTTP server listening on http://{a}");
1694        // Primary: write to the OS-standard data dir (`~/Library/Application
1695        // Support/trusty-memory/http_addr` on macOS, `~/.local/share/…` on
1696        // Linux). This is what `trusty_common::read_daemon_addr` reads.
1697        // Best-effort: a missing $HOME or read-only fs is non-fatal.
1698        let primary = match http_addr_path() {
1699            Some(p) => match write_http_addr_file(&p, &a) {
1700                Ok(()) => {
1701                    info!("wrote daemon address to {}", p.display());
1702                    Some(p)
1703                }
1704                Err(e) => {
1705                    tracing::warn!("could not write {}: {e}", p.display());
1706                    None
1707                }
1708            },
1709            None => {
1710                tracing::warn!("no $HOME — skipping http_addr discovery file");
1711                None
1712            }
1713        };
1714        // Issue #498: also write to `~/.trusty-memory/http_addr` so external
1715        // tools (e.g. claude-mpm's `migrate_trusty_autodetect`) that read the
1716        // dotfile path can discover the daemon's port. On macOS the OS-standard
1717        // path differs from the dotfile path; writing both ensures consumers
1718        // using either convention find the file. Best-effort: failures are
1719        // logged but do not block startup.
1720        let dotfile = match dotfile_http_addr_path() {
1721            Some(p) => match write_http_addr_file(&p, &a) {
1722                Ok(()) => {
1723                    info!("wrote daemon address to dotfile {}", p.display());
1724                    Some(p)
1725                }
1726                Err(e) => {
1727                    tracing::warn!("could not write dotfile {}: {e}", p.display());
1728                    None
1729                }
1730            },
1731            None => None,
1732        };
1733        (primary, dotfile)
1734    } else {
1735        (None, None)
1736    };
1737
1738    // Keep a handle to the BM25 supervisor (if any) so we can call
1739    // `shutdown()` on the exit path. Cloning here is cheap (`Arc`) and
1740    // detaches the lifetime of the supervisor from the `state` move into
1741    // the router below.
1742    let bm25_supervisor = state.bm25_supervisor.clone();
1743
1744    let app = web::router()
1745        .route("/sse", get(sse_handler))
1746        .with_state(state);
1747
1748    // Why (issue #534): bare axum::serve exits only on an internal error; SIGTERM
1749    // (launchctl bootout) would kill the process before the cleanup below had a
1750    // chance to run, leaving stale addr/socket files behind and dropping any
1751    // in-flight request without draining. `with_graceful_shutdown` installs a
1752    // SIGTERM + SIGINT watcher; when either fires axum stops accepting new
1753    // connections, drains active requests, then returns here so cleanup runs.
1754    let serve_result = axum::serve(listener, app)
1755        .with_graceful_shutdown(trusty_common::shutdown_signal())
1756        .await;
1757
1758    // Best-effort cleanup: remove `http_addr` files so stale clients fail fast
1759    // instead of timing out against a dead port. Remove both the OS-standard
1760    // path and the dotfile path (#498).
1761    if let Some(p) = written_path.as_ref() {
1762        let _ = std::fs::remove_file(p);
1763    }
1764    if let Some(p) = written_dotfile_path.as_ref() {
1765        let _ = std::fs::remove_file(p);
1766    }
1767
1768    // Issue #193: gracefully reap every spawned BM25 daemon before the
1769    // process exits so each one gets a chance to flush its snapshot and
1770    // unlink its socket. `kill_on_drop=true` on the children would
1771    // SIGKILL them on Drop anyway, but that skips the daemon's own
1772    // shutdown sequence and leaves stale sockets behind.
1773    if let Some(supervisor) = bm25_supervisor {
1774        supervisor.shutdown().await;
1775    }
1776
1777    serve_result?;
1778    Ok(())
1779}
1780
1781/// Convenience: bind `addr` and serve via [`run_http_on`].
1782#[cfg(feature = "axum-server")]
1783pub async fn run_http(state: AppState, addr: std::net::SocketAddr) -> Result<()> {
1784    let listener = tokio::net::TcpListener::bind(addr).await?;
1785    run_http_on(state, listener).await
1786}
1787
1788/// Convenience: bind dynamically (7070..=7079, OS fallback) and serve.
1789///
1790/// Why: `trusty-memory serve` with no `--http` flag is the canonical
1791/// launchd-managed daemon entry point. Dynamic binding lets a stale daemon
1792/// or a hand-spawned `serve --http 127.0.0.1:7070` coexist without breaking
1793/// the launchd-managed instance.
1794/// What: calls [`bind_dynamic_port`] then [`run_http_on`].
1795/// Test: integration via `trusty-memory serve` + `cat ~/.trusty-memory/http_addr`.
1796#[cfg(feature = "axum-server")]
1797pub async fn run_http_dynamic(state: AppState) -> Result<()> {
1798    let listener = bind_dynamic_port().await?;
1799    run_http_on(state, listener).await
1800}
1801
1802/// Spawn a background ticker that recomputes the `data_root` disk footprint
1803/// every 10 seconds and stores it in `state.disk_bytes` (issue #35).
1804///
1805/// Why: `GET /health` reports `disk_bytes`. Walking the data directory on
1806/// every health request would turn a frequent health poll into unbounded
1807/// recursive I/O. Computing it off the request path on a fixed cadence keeps
1808/// `/health` cheap and bounds the staleness to ~10 s — fine for an
1809/// at-a-glance footprint figure.
1810/// What: spawns a detached tokio task. `AppState` is cheap to `Clone` (all
1811/// `Arc` fields), so the task holds a full clone; the daemon process lives
1812/// for the lifetime of the server anyway, so no `Weak` downgrade is needed.
1813/// Each tick runs the blocking directory walk on `spawn_blocking` so it never
1814/// stalls the async runtime, then stores the byte total atomically.
1815/// Test: `health_endpoint_includes_resource_fields` asserts the field shape;
1816/// the ticker cadence is not unit-tested (timing-dependent).
1817#[cfg(feature = "axum-server")]
1818fn spawn_disk_size_ticker(state: AppState) {
1819    tokio::spawn(async move {
1820        let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1821        loop {
1822            interval.tick().await;
1823            let dir = state.data_root.clone();
1824            // The directory walk is blocking filesystem I/O — run it on the
1825            // blocking pool so it never parks an async worker thread.
1826            let bytes = tokio::task::spawn_blocking(move || {
1827                trusty_common::sys_metrics::dir_size_bytes(&dir)
1828            })
1829            .await
1830            .unwrap_or(0);
1831            state
1832                .disk_bytes
1833                .store(bytes, std::sync::atomic::Ordering::Relaxed);
1834        }
1835    });
1836}
1837
1838/// Interval between aggregate-status snapshot emits on the SSE bus.
1839///
1840/// Why (issue #228): mutations used to fire `StatusChanged` synchronously on
1841/// the write path, which forced an O(N palaces) sum of drawer / vector / KG
1842/// counts on every `memory_remember`. Coalescing into a fixed-cadence ticker
1843/// lets dashboards stay current (a 30 s lag is invisible at human scale)
1844/// while keeping the write path free of aggregate work.
1845/// What: 30 seconds — short enough that the operator UI doesn't feel stale
1846/// between manual writes, long enough that the recompute cost (in-memory
1847/// registry walk plus the redb `count_active_triples` per palace) is a
1848/// rounding error on the daemon's CPU budget.
1849/// Test: covered indirectly — the math has not changed, only the cadence.
1850#[allow(dead_code)]
1851const STATUS_EVENT_TICK_SECS: u64 = 30;
1852
1853/// Spawn a background ticker that emits `DaemonEvent::StatusChanged` every
1854/// [`STATUS_EVENT_TICK_SECS`] seconds (issue #228).
1855///
1856/// Why: replaces the per-write `state.emit(self.aggregate_status_event())`
1857/// call sites that used to recompute the aggregate every time a drawer was
1858/// created or deleted. Walking N palaces on every write blocks the async
1859/// runtime; coalescing the emit onto a ticker keeps dashboards up-to-date
1860/// without that cost.
1861/// What: spawns a detached tokio task that holds a full `AppState` clone
1862/// (cheap — every field is `Arc`-backed) and ticks every
1863/// [`STATUS_EVENT_TICK_SECS`] seconds. Each tick computes
1864/// `MemoryService::aggregate_status_event` (which now iterates the
1865/// in-memory registry, not disk) and broadcasts it via `state.emit`. If
1866/// no SSE subscribers are connected the broadcast `send` is a cheap no-op,
1867/// so the ticker imposes no cost when nobody is listening.
1868/// Test: not unit-tested (timing-dependent fire-and-forget); the underlying
1869/// `aggregate_status_event` math is exercised by the existing
1870/// `status_endpoint_returns_payload` path.
1871#[allow(dead_code)]
1872fn spawn_status_event_ticker(state: AppState) {
1873    tokio::spawn(async move {
1874        let mut interval =
1875            tokio::time::interval(std::time::Duration::from_secs(STATUS_EVENT_TICK_SECS));
1876        // The first tick fires immediately, which is fine: it gives SSE
1877        // subscribers a baseline `StatusChanged` shortly after they connect.
1878        loop {
1879            interval.tick().await;
1880            let event = service::MemoryService::new(state.clone()).aggregate_status_event();
1881            state.emit(event);
1882        }
1883    });
1884}
1885
1886/// Live SSE event stream — pushes `DaemonEvent` frames to dashboard clients.
1887///
1888/// Why: The dashboard subscribes once and reacts to live pushes (palace
1889/// created, drawer added/deleted, dream completed, status changed) instead of
1890/// polling `/api/v1/*` endpoints.
1891/// What: Subscribes to `state.events`, emits an initial `connected` frame,
1892/// then forwards every `DaemonEvent` as `data: <json>\n\n`. Lagged
1893/// subscribers receive a `lag` frame indicating skipped events; channel
1894/// closure ends the stream.
1895/// Test: `web::tests::sse_stream_emits_palace_created` (covers subscribe +
1896/// emit + receive); manual: `curl -N http://.../sse`.
1897#[cfg(feature = "axum-server")]
1898pub(crate) async fn sse_handler(
1899    axum::extract::State(state): axum::extract::State<AppState>,
1900) -> impl axum::response::IntoResponse {
1901    use futures::StreamExt;
1902    use tokio_stream::wrappers::BroadcastStream;
1903
1904    let rx = state.events.subscribe();
1905    let initial = futures::stream::once(async {
1906        Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(
1907            "data: {\"type\":\"connected\"}\n\n",
1908        ))
1909    });
1910    let events = BroadcastStream::new(rx).map(|res| {
1911        let frame = match res {
1912            Ok(event) => match serde_json::to_string(&event) {
1913                Ok(json) => format!("data: {json}\n\n"),
1914                Err(e) => format!("data: {{\"type\":\"error\",\"message\":\"{e}\"}}\n\n"),
1915            },
1916            Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
1917                format!("data: {{\"type\":\"lag\",\"skipped\":{n}}}\n\n")
1918            }
1919        };
1920        Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(frame))
1921    });
1922    let stream = initial.chain(events);
1923
1924    axum::response::Response::builder()
1925        .header("Content-Type", "text/event-stream")
1926        .header("Cache-Control", "no-cache")
1927        .header("X-Accel-Buffering", "no")
1928        .body(axum::body::Body::from_stream(stream))
1929        .expect("valid SSE response") // Why: invariant — SSE headers are compile-time constants; builder cannot fail
1930}
1931
1932#[cfg(test)]
1933mod lib_tests;