Skip to main content

mati_core/mcp/
server.rs

1//! MCP stdio server entry point (M-07).
2//!
3//! `serve()` is the entry point. It opens the store, loads the graph,
4//! constructs `MatiServer`, and runs the rmcp stdio transport. After the
5//! client disconnects, the process auto-promotes to a headless daemon and
6//! waits for an idle timeout or signal before shutting down (a panic hook
7//! is installed at startup; lifecycle events are recorded throughout;
8//! a boot-time auto-drain bounded by `AUTO_DRAIN_TIMEOUT` runs the dirty
9//! gotcha-index repair).
10//!
11//! Also binds the Unix daemon socket (`~/.mati/<slug>/mati.sock`) so that hook
12//! scripts using `mati get`/`mati ping` can route through the daemon protocol
13//! instead of trying to open the SurrealKV store directly (which would fail with
14//! a lock error while the MCP server holds the exclusive handle). The socket
15//! task is supervised: a watcher signals graceful shutdown if it dies, and
16//! a `SHUTDOWN_DRAIN_TIMEOUT` ceiling falls back to `abort_handle` so a
17//! wedged handler can never block exit.
18//!
19//! Public surface: `serve`, `socket_handle_connection`, `Shutdown` (+
20//! methods), and the policy constants `AUTO_DRAIN_TIMEOUT`,
21//! `MAX_CONCURRENT_CONNECTIONS`, `IDLE_SHUTDOWN_SECS`,
22//! `IDLE_CHECK_INTERVAL_SECS`, `UNIX_SOCK_PATH_MAX` — all shared with
23//! `cli::daemon` so both daemon paths use identical operational policy.
24
25use std::path::Path;
26use std::path::PathBuf;
27use std::sync::Arc;
28use std::time::Duration;
29
30use anyhow::Result;
31use rmcp::model::{ServerCapabilities, ServerInfo};
32use rmcp::{tool_handler, ServerHandler, ServiceExt};
33use serde::{Deserialize, Serialize};
34use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
35use tokio::net::UnixStream;
36
37use crate::graph::edges::EdgeKind;
38use crate::graph::Graph;
39
40use super::tools::MatiServer;
41use super::types::{MemBootstrapParams, MemGetParams, MemQueryParams, MemSetParams};
42
43#[derive(Debug)]
44pub(crate) enum ProxyDaemonResult {
45    Ok(serde_json::Value),
46    NotRunning,
47    StaleSocket,
48    Unresponsive,
49}
50
51#[tool_handler(router = self.tool_router)]
52impl ServerHandler for MatiServer {
53    fn get_info(&self) -> ServerInfo {
54        ServerInfo::new(
55            ServerCapabilities::builder()
56                .enable_tools()
57                .enable_tool_list_changed()
58                .build(),
59        )
60        .with_instructions(
61            "mati is a persistent engineering knowledge store for the current \
62                 codebase. Use mem_get for direct record lookup, mem_query for \
63                 search and graph traversal, mem_bootstrap for session context, \
64                 and mem_set for writing knowledge records.",
65        )
66    }
67}
68
69/// Start the MCP stdio proxy for the project rooted at `repo_root`.
70///
71/// After γ-C4, `mati serve` is a thin MCP-stdio ↔ UDS forwarder: every
72/// tool call is proxied over the Unix domain socket to a separate daemon
73/// process which owns the store, the graph, the socket listener, the
74/// idle-shutdown loop, signal handling, and the auto-drain pipeline.
75///
76/// On startup:
77/// 1. Resolve `~/.mati/<slug>/` from `repo_root`.
78/// 2. Ensure a daemon is running (auto-spawning one if necessary via the
79///    state-aware readiness machinery in `daemon_lifecycle::ensure_daemon`).
80/// 3. Bind the rmcp stdio transport and forward every request to the
81///    daemon via `MatiServer::with_socket_root`.
82///
83/// On client disconnect, this process exits cleanly — the daemon (separate
84/// process) is unaffected and remains available for the next `mati serve`
85/// invocation that Codex / Claude Code spawns.
86///
87/// Lifecycle events (`serve_start`, `serve_failed`, `serve_shutdown`,
88/// `startup`) are appended throughout so `mati doctor` can observe the
89/// proxy's cold-start path.
90pub async fn serve(repo_root: &Path) -> Result<()> {
91    let startup_t0 = std::time::Instant::now();
92
93    // Resolve the daemon root so we can emit lifecycle events even before
94    // the daemon is reachable.
95    let mati_root: PathBuf = dirs::home_dir()
96        .map(|h| h.join(".mati").join(crate::store::derive_slug(repo_root)))
97        .ok_or_else(|| anyhow::anyhow!("cannot resolve home directory for mati_root"))?;
98
99    super::metadata::record_lifecycle_event(&mati_root, "startup", "phase=ensure_daemon");
100
101    // The daemon owns the store. `ensure_daemon` spawns a daemon if needed
102    // and waits for it to be ready via the state-aware readiness machinery
103    // (`daemon_lifecycle::wait_for_ready`).
104    if !super::daemon_lifecycle::ensure_daemon(&mati_root).await {
105        super::metadata::record_lifecycle_event(
106            &mati_root,
107            "serve_failed",
108            "daemon unreachable after auto-spawn",
109        );
110        anyhow::bail!(
111            "mati serve: daemon unreachable. \
112             Run `mati daemon start` manually and check the lifecycle.log."
113        );
114    }
115
116    super::metadata::record_lifecycle_event(
117        &mati_root,
118        "serve_start",
119        &format!("pid={} owner=proxy", std::process::id()),
120    );
121
122    // Initialize the metrics handle so any local recording is no-op rather
123    // than panicking. The daemon owns the authoritative metrics surface.
124    super::metrics::init();
125
126    super::metadata::record_lifecycle_event(
127        &mati_root,
128        "startup",
129        &format!(
130            "phase=ready elapsed_ms={}",
131            startup_t0.elapsed().as_millis()
132        ),
133    );
134
135    // MCP stdio proxy: every tool call forwards over UDS to the daemon.
136    let transport = rmcp::transport::io::stdio();
137    let service = MatiServer::with_socket_root(mati_root.clone())
138        .serve(transport)
139        .await
140        .map_err(|e| anyhow::anyhow!("MCP proxy initialization failed: {e}"))
141        .inspect_err(|e| {
142            super::metadata::record_lifecycle_event(
143                &mati_root,
144                "serve_failed",
145                &format!("proxy init: {e:#}"),
146            )
147        })?;
148
149    let shutdown_reason: &'static str = match service.waiting().await {
150        Ok(_) => "client_disconnect",
151        Err(e) => {
152            super::metadata::record_lifecycle_event(
153                &mati_root,
154                "serve_failed",
155                &format!("proxy waiting: {e}"),
156            );
157            "mcp_waiting_error"
158        }
159    };
160    super::metadata::record_lifecycle_event(
161        &mati_root,
162        "serve_shutdown",
163        &format!("reason={shutdown_reason}"),
164    );
165    Ok(())
166}
167
168pub(crate) async fn proxy_daemon_result(
169    root: &Path,
170    cmd: &str,
171    args: serde_json::Value,
172) -> ProxyDaemonResult {
173    // Daemon-restart resilience: when `mati daemon stop` followed by
174    // `mati daemon start` happens during an active MCP-stdio session, the
175    // first call after the restart can fail in three ways:
176    //   1. Socket file transiently absent (NotRunning)
177    //   2. Connection refused before the new daemon's accept loop is up
178    //      (StaleSocket / Unresponsive depending on metadata state)
179    //   3. Connection succeeds but the request carries a stale session UUID
180    //      (cached by the rmcp tool dispatcher) → daemon returns
181    //      "session_mismatch" via the v2 fence in `dispatch_v2`.
182    //
183    // Without retry, every subsequent MCP tool call returns a structured
184    // error to Claude/Codex — a P9 violation in spirit since the agent's
185    // entire MCP session becomes unusable until restart.
186    //
187    // The retry is bounded: at most one re-connect after a brief delay,
188    // re-reading daemon metadata so the new session UUID is picked up.
189    // We do NOT retry indefinitely — a hard-down daemon must surface an
190    // error eventually so the caller can fall back.
191    let result = proxy_daemon_result_no_spawn(root, cmd, &args).await;
192
193    // Pass-33: if both retries failed because the daemon is gone (not
194    // because of a session mismatch or a transient stall), auto-spawn a
195    // fresh daemon and try one final time. Phase 3's `mati daemon stop`
196    // cycles for repair/init left the daemon unrun, breaking every MCP
197    // tool call until manual restart — this closes that hole.
198    //
199    // Only NotRunning/StaleSocket are eligible: Unresponsive means
200    // ensure_daemon has its own SIGTERM-and-cleanup recovery path that
201    // would conflict with our retry, and Ok / session-mismatch don't
202    // need a spawn.
203    if matches!(
204        &result,
205        ProxyDaemonResult::NotRunning | ProxyDaemonResult::StaleSocket
206    ) && super::daemon_lifecycle::ensure_daemon(root).await
207    {
208        match proxy_daemon_result_once(root, cmd, &args).await {
209            AttemptOutcome::Final(r) | AttemptOutcome::Retryable(r) => return r,
210        }
211    }
212
213    result
214}
215
216/// Inner: the original two-attempt retry without auto-spawn. Extracted so
217/// `daemon_lifecycle::ensure_daemon`'s probe can call this without
218/// triggering its own auto-spawn (which would loop indefinitely).
219pub(crate) async fn proxy_daemon_result_no_spawn(
220    root: &Path,
221    cmd: &str,
222    args: &serde_json::Value,
223) -> ProxyDaemonResult {
224    match proxy_daemon_result_once(root, cmd, args).await {
225        AttemptOutcome::Final(result) => result,
226        AttemptOutcome::Retryable(_) => {
227            // Brief settle — give the new daemon time to bind socket and
228            // publish metadata. 100ms is generous; daemon startup is ~50ms.
229            tokio::time::sleep(Duration::from_millis(100)).await;
230            match proxy_daemon_result_once(root, cmd, args).await {
231                AttemptOutcome::Final(result) | AttemptOutcome::Retryable(result) => result,
232            }
233        }
234    }
235}
236
237/// Outcome of a single `proxy_daemon_result` attempt.
238///
239/// `Retryable` carries the result the caller would have returned if no
240/// retry were attempted — used as the fallback if the second attempt also
241/// fails. This keeps the original error shape stable for callers that
242/// distinguish StaleSocket vs Unresponsive vs structured session_mismatch.
243enum AttemptOutcome {
244    Final(ProxyDaemonResult),
245    Retryable(ProxyDaemonResult),
246}
247
248async fn proxy_daemon_result_once(
249    root: &Path,
250    cmd: &str,
251    args: &serde_json::Value,
252) -> AttemptOutcome {
253    // Build v2 request from v1-style (cmd, args) using the same mapping
254    // as cli::daemon::daemon_result. Pure-reads only — mutating callers
255    // must use [`proxy_daemon_v2`] with a typed Command (see pass-29).
256    let v2_cmd = super::protocol::v1_to_v2_command(cmd, args);
257    proxy_daemon_send_v2(root, v2_cmd).await
258}
259
260/// Send a typed v2 [`super::protocol::Command`] to the daemon socket.
261///
262/// Mirrors [`proxy_daemon_result`] for callers (currently the MCP Socket-
263/// backend `mem_set` path) that have moved to typed commands and would
264/// otherwise have to round-trip through the legacy v1 mapper, which has
265/// no entries for mutating commands and panics on them.
266///
267/// Bounded auto-reconnect mirrors `proxy_daemon_result` so a daemon
268/// restart during an active session is recovered transparently.
269pub(crate) async fn proxy_daemon_v2(
270    root: &Path,
271    cmd: super::protocol::Command,
272) -> ProxyDaemonResult {
273    // Serialize once — every retry uses the same wire bytes.
274    let v2_cmd = match serde_json::to_value(&cmd) {
275        Ok(v) => v,
276        Err(_) => return ProxyDaemonResult::Unresponsive,
277    };
278
279    let result = match proxy_daemon_send_v2(root, v2_cmd.clone()).await {
280        AttemptOutcome::Final(result) => result,
281        AttemptOutcome::Retryable(_) => {
282            tokio::time::sleep(Duration::from_millis(100)).await;
283            match proxy_daemon_send_v2(root, v2_cmd.clone()).await {
284                AttemptOutcome::Final(result) | AttemptOutcome::Retryable(result) => result,
285            }
286        }
287    };
288
289    // Pass-33: parallel auto-spawn for the typed-Command path. Same
290    // policy as `proxy_daemon_result`: if the two retries failed because
291    // the daemon is gone, ensure_daemon spawns one and we try once more.
292    if matches!(
293        &result,
294        ProxyDaemonResult::NotRunning | ProxyDaemonResult::StaleSocket
295    ) && super::daemon_lifecycle::ensure_daemon(root).await
296    {
297        match proxy_daemon_send_v2(root, v2_cmd).await {
298            AttemptOutcome::Final(r) | AttemptOutcome::Retryable(r) => return r,
299        }
300    }
301
302    result
303}
304
305/// Inner socket transaction: connect, send a pre-built v2 command JSON,
306/// read the response. Shared between v1-style and typed-Command callers
307/// so the connect/refused/session-mismatch policy stays identical.
308async fn proxy_daemon_send_v2(root: &Path, v2_cmd: serde_json::Value) -> AttemptOutcome {
309    let sock_path = root.join("mati.sock");
310
311    if sock_path.as_os_str().len() > UNIX_SOCK_PATH_MAX {
312        tracing::warn!(
313            path = %sock_path.display(),
314            "mcp proxy: socket path exceeds Unix limit"
315        );
316        // Path-length violation is not transient — never retry.
317        return AttemptOutcome::Final(ProxyDaemonResult::NotRunning);
318    }
319
320    if !sock_path.exists() {
321        // Socket missing — daemon may be mid-restart. Retry once.
322        return AttemptOutcome::Retryable(ProxyDaemonResult::NotRunning);
323    }
324
325    let stream = match UnixStream::connect(&sock_path).await {
326        Ok(s) => s,
327        Err(e) => {
328            let is_refused = e.kind() == std::io::ErrorKind::ConnectionRefused;
329            if is_refused {
330                // Socket refused — use the metadata + PID liveness protocol
331                // to decide whether to clean up. Never blindly remove.
332                use super::metadata::{self as meta, StaleCheckResult};
333                match meta::check_and_cleanup_stale(root) {
334                    StaleCheckResult::StaleRemoved | StaleCheckResult::Clean => {
335                        return AttemptOutcome::Retryable(ProxyDaemonResult::StaleSocket);
336                    }
337                    StaleCheckResult::OrphanSocket => {
338                        // No metadata + ECONNREFUSED → stale
339                        let _ = std::fs::remove_file(&sock_path);
340                        return AttemptOutcome::Retryable(ProxyDaemonResult::StaleSocket);
341                    }
342                    StaleCheckResult::LiveDaemon { .. } => {
343                        // PID alive but socket refused — daemon is starting or broken
344                        return AttemptOutcome::Retryable(ProxyDaemonResult::Unresponsive);
345                    }
346                }
347            }
348            return AttemptOutcome::Retryable(ProxyDaemonResult::NotRunning);
349        }
350    };
351
352    // Read daemon metadata fresh per attempt so a session UUID rotated by
353    // a daemon restart between attempt 1 and attempt 2 is picked up.
354    let daemon_session = super::metadata::read_metadata(root)
355        .map(|m| m.session)
356        .unwrap_or_else(uuid::Uuid::nil);
357    let request = serde_json::json!({
358        "v": super::protocol::PROTOCOL_VERSION,
359        "id": uuid::Uuid::new_v4(),
360        "session": daemon_session,
361        "cmd": v2_cmd,
362    });
363
364    let (reader, mut writer) = stream.into_split();
365    let mut bytes = match serde_json::to_vec(&request) {
366        Ok(b) => b,
367        Err(_) => return AttemptOutcome::Final(ProxyDaemonResult::Unresponsive),
368    };
369    bytes.push(b'\n');
370
371    if writer.write_all(&bytes).await.is_err() {
372        return AttemptOutcome::Retryable(ProxyDaemonResult::Unresponsive);
373    }
374    if writer.shutdown().await.is_err() {
375        return AttemptOutcome::Retryable(ProxyDaemonResult::Unresponsive);
376    }
377
378    let mut buf_reader = BufReader::new(reader);
379    let mut line = String::new();
380    match tokio::time::timeout(Duration::from_secs(2), buf_reader.read_line(&mut line)).await {
381        Ok(Ok(n)) if n > 0 => {}
382        _ => return AttemptOutcome::Retryable(ProxyDaemonResult::Unresponsive),
383    }
384
385    // Parse v2 Response and convert to v1-compatible envelope for callers.
386    let resp: serde_json::Value = match serde_json::from_str(line.trim()) {
387        Ok(v) => v,
388        Err(_) => return AttemptOutcome::Final(ProxyDaemonResult::Unresponsive),
389    };
390
391    match resp.get("status").and_then(|s| s.as_str()) {
392        Some("ok") => {
393            let data = resp.get("data").cloned().unwrap_or(serde_json::Value::Null);
394            AttemptOutcome::Final(ProxyDaemonResult::Ok(
395                serde_json::json!({"ok": true, "v": 2, "data": data}),
396            ))
397        }
398        Some("err") => {
399            let code = resp
400                .get("code")
401                .and_then(|c| c.as_str())
402                .unwrap_or("internal");
403            let message = resp
404                .get("message")
405                .and_then(|m| m.as_str())
406                .unwrap_or("unknown error");
407            let envelope = serde_json::json!({
408                "ok": false, "v": 2, "error": message, "code": code
409            });
410            // Session mismatch is the canonical "daemon restarted, your
411            // cached session is stale" signal — see dispatch_v2.rs's fence
412            // and the symmetric handling in cli::daemon::send_v2_raw. The
413            // retry will re-read metadata and pick up the new session UUID.
414            if code == "session_mismatch" {
415                tracing::debug!(
416                    "mcp proxy: session mismatch — daemon may have restarted, will retry"
417                );
418                AttemptOutcome::Retryable(ProxyDaemonResult::Ok(envelope))
419            } else {
420                AttemptOutcome::Final(ProxyDaemonResult::Ok(envelope))
421            }
422        }
423        _ => AttemptOutcome::Retryable(ProxyDaemonResult::Unresponsive),
424    }
425}
426
427// cleanup_stale_pid and local is_pid_alive removed — callers now use
428// metadata::check_and_cleanup_stale which centralizes PID liveness checks.
429
430// ── Daemon socket — hook script bridge ───────────────────────────────────────
431
432/// Unix domain socket path length limit (macOS-compatible).
433///
434/// Public so the parallel daemon path in `cli::daemon` shares the same
435/// value — preventing one path's bound from drifting from the other's.
436pub const UNIX_SOCK_PATH_MAX: usize = 104;
437
438/// Max wait for a complete request line per connection.
439const READ_TIMEOUT: Duration = Duration::from_secs(3);
440
441/// Maximum number of daemon-socket connections handled concurrently.
442///
443/// A flood beyond this limit blocks at `accept` (TCP backlog absorbs the
444/// surplus); this gives natural backpressure rather than unbounded memory
445/// use. 64 is generous for a per-user daemon — typical hook traffic is
446/// O(1) concurrent. Public so `cli::daemon` shares the same bound.
447pub const MAX_CONCURRENT_CONNECTIONS: usize = 64;
448
449/// Maximum time the boot-time auto-drain (dirty-marker queue) can run
450/// before we give up and proceed to serve. Prevents a pathological dirty
451/// queue from blocking daemon startup. The dirty marker stays set; the
452/// user can run `mati repair` manually.
453///
454/// Public so `cli::daemon::run_daemon_start` can share the same ceiling.
455pub const AUTO_DRAIN_TIMEOUT: Duration = Duration::from_secs(10);
456
457/// Race-free shutdown signal for daemon-socket loops.
458///
459/// `signal()` is idempotent and `wait()` resolves immediately if the signal
460/// has already fired. The `enable()` pattern on `Notify::notified()`
461/// registers the future before the flag check, so a `signal()` race between
462/// flag-set and notify-fire cannot strand a waiter.
463///
464/// Shared with `cli::daemon` so both the embedded MCP-server socket loop
465/// and the headless `mati daemon start` loop use identical shutdown
466/// semantics.
467#[derive(Default)]
468pub struct Shutdown {
469    flag: std::sync::atomic::AtomicBool,
470    notify: tokio::sync::Notify,
471}
472
473impl Shutdown {
474    pub fn new() -> Self {
475        Self::default()
476    }
477
478    /// Idempotent — safe to call multiple times. Wakes every active waiter.
479    pub fn signal(&self) {
480        self.flag.store(true, std::sync::atomic::Ordering::SeqCst);
481        self.notify.notify_waiters();
482    }
483
484    pub fn is_set(&self) -> bool {
485        // SeqCst (matching the store): defense-in-depth correctness on
486        // weakly-ordered architectures (ARM/POWER). Without it, the load
487        // would rely on Notify's internal mutex acquire to synchronize
488        // with `signal()`'s store — which is the pattern in our `wait()`
489        // body and works in practice, but depends on Notify's
490        // implementation detail. Explicit SC pairing is cheap (one
491        // memory barrier at most) and removes the implicit dependency.
492        self.flag.load(std::sync::atomic::Ordering::SeqCst)
493    }
494
495    /// Future resolves once `signal()` has been called. Safe to call
496    /// repeatedly; safe to race with concurrent `signal()`.
497    pub async fn wait(&self) {
498        let notified = self.notify.notified();
499        tokio::pin!(notified);
500        // Register the receiver BEFORE the flag check so a `signal()` that
501        // fires between check and notify cannot be missed.
502        notified.as_mut().enable();
503        if self.is_set() {
504            return;
505        }
506        notified.await;
507    }
508}
509
510/// Daemon protocol version (must match `cli::daemon::PROTOCOL_VERSION`).
511const PROTOCOL_VERSION: u32 = 1;
512
513#[derive(Debug, Deserialize)]
514pub(crate) struct SocketRequest {
515    pub cmd: String,
516    #[allow(dead_code)] // Wire protocol field — must exist for deserialization
517    #[serde(default, rename = "v")]
518    pub version: Option<u32>,
519    #[serde(default)]
520    pub args: serde_json::Value,
521}
522
523#[derive(Debug, Serialize)]
524pub(crate) struct SocketResponse {
525    pub(crate) ok: bool,
526    #[serde(rename = "v")]
527    version: u32,
528    #[serde(skip_serializing_if = "Option::is_none")]
529    pub(crate) data: Option<serde_json::Value>,
530    #[serde(skip_serializing_if = "Option::is_none")]
531    pub(crate) error: Option<String>,
532}
533
534impl SocketResponse {
535    pub(crate) fn ok(data: serde_json::Value) -> Self {
536        Self {
537            ok: true,
538            version: PROTOCOL_VERSION,
539            data: Some(data),
540            error: None,
541        }
542    }
543    pub(crate) fn err(msg: impl Into<String>) -> Self {
544        Self {
545            ok: false,
546            version: PROTOCOL_VERSION,
547            data: None,
548            error: Some(msg.into()),
549        }
550    }
551}
552
553pub async fn socket_handle_connection(
554    graph: Arc<tokio::sync::RwLock<Graph>>,
555    repo_root: &Path,
556    stream: UnixStream,
557    peer: super::metadata::PeerContext,
558    daemon_session: uuid::Uuid,
559) -> Result<()> {
560    use super::protocol::MAX_FRAME_SIZE;
561    use tokio::io::AsyncReadExt;
562
563    let (reader, mut writer) = stream.into_split();
564    let mut buf = String::new();
565
566    // Cap the read at MAX_FRAME_SIZE + 1 bytes so the allocation is bounded
567    // before any JSON parsing occurs. If the client sends more data than
568    // MAX_FRAME_SIZE before the newline delimiter, `read_line` will stop at
569    // the take limit and the size check below will reject the request.
570    let limited = reader.take(MAX_FRAME_SIZE as u64 + 1);
571    let mut buf_reader = BufReader::new(limited);
572    match tokio::time::timeout(READ_TIMEOUT, buf_reader.read_line(&mut buf)).await {
573        Ok(Ok(0)) => return Ok(()),
574        Ok(Ok(_)) => {}
575        Ok(Err(e)) => anyhow::bail!("read error: {e}"),
576        Err(_) => anyhow::bail!("read timeout"),
577    }
578
579    if buf.len() > MAX_FRAME_SIZE {
580        let resp = super::protocol::Response::err(
581            uuid::Uuid::nil(),
582            super::protocol::ErrorCode::FrameTooLarge,
583            format!("request exceeds {MAX_FRAME_SIZE} byte limit"),
584        );
585        let json = serde_json::to_string(&resp)?;
586        writer.write_all(json.as_bytes()).await?;
587        writer.write_all(b"\n").await?;
588        writer.flush().await?;
589        return Ok(());
590    }
591
592    let trimmed = buf.trim();
593
594    // V2 protocol ONLY — no v1 fallback on the public wire.
595    // The v2 format requires `id` (UUID), `session` (UUID), and `cmd` as
596    // a tagged object with `type`. If decode fails, the request is rejected
597    // with a protocol error — there is no legacy v1 dispatch path.
598    let v2_req = match serde_json::from_str::<super::protocol::Request>(trimmed) {
599        Ok(r) => r,
600        Err(e) => {
601            // Return a v2-shaped error. Use nil UUID since we can't extract
602            // the request ID from a malformed payload.
603            let resp = super::protocol::Response::err(
604                uuid::Uuid::nil(),
605                super::protocol::ErrorCode::MalformedRequest,
606                format!("invalid v2 request: {e}"),
607            );
608            let json = serde_json::to_string(&resp)?;
609            writer.write_all(json.as_bytes()).await?;
610            writer.write_all(b"\n").await?;
611            writer.flush().await?;
612            return Ok(());
613        }
614    };
615
616    let ctx = super::dispatch_v2::RequestContext {
617        peer,
618        daemon_session,
619        repo_root: repo_root.to_path_buf(),
620    };
621    let resp = super::dispatch_v2::dispatch_v2(&graph, &ctx, v2_req).await;
622    let json = serde_json::to_string(&resp)?;
623    writer.write_all(json.as_bytes()).await?;
624    writer.write_all(b"\n").await?;
625    writer.flush().await?;
626    Ok(())
627}
628
629/// Build a `RequestContext` for the in-process v1 socket_dispatch path.
630///
631/// The wire layer (`socket_handle_connection`) carries authentic peer
632/// credentials and the daemon session UUID; v1 callers are in-process
633/// (e.g. tests), so they synthesize a context with the current process'
634/// identity. Used by the mem_* arms which now delegate to native handlers.
635fn build_v1_dispatch_ctx(repo_root: &Path) -> super::dispatch_v2::RequestContext {
636    super::dispatch_v2::RequestContext {
637        peer: super::metadata::PeerContext {
638            uid: super::metadata::current_euid(),
639            pid: Some(std::process::id()),
640        },
641        daemon_session: uuid::Uuid::nil(),
642        repo_root: repo_root.to_path_buf(),
643    }
644}
645
646pub(crate) async fn socket_dispatch(
647    graph: &Arc<tokio::sync::RwLock<Graph>>,
648    repo_root: &Path,
649    req: &SocketRequest,
650) -> SocketResponse {
651    use crate::store::session as sess;
652
653    match req.cmd.as_str() {
654        "ping" => SocketResponse::ok(serde_json::Value::String("pong".into())),
655
656        // Live daemon metrics snapshot — per-command counters, error rates,
657        // and p50/p95/p99 latencies. Pure read, no side effects, no audit.
658        // Returns `null` if the global metrics handle was never initialized
659        // (which only happens in tests that bypass `serve`).
660        "metrics" => match super::metrics::snapshot() {
661            Some(snap) => match serde_json::to_value(&snap) {
662                Ok(v) => SocketResponse::ok(v),
663                Err(e) => SocketResponse::err(format!("metrics serialize: {e}")),
664            },
665            None => SocketResponse::ok(serde_json::Value::Null),
666        },
667
668        // ── MCP tool commands ────────────────────────────────────────────
669        //
670        // γ-C4: the wire layer (`socket_handle_connection`) accepts only v2
671        // requests, which route MemGet / MemQuery / MemBootstrap / MemSet
672        // through `dispatch_v2` to the native handlers in `mcp::handlers`.
673        // These v1 arms are reachable only via in-process callers — they
674        // dispatch directly to the same canonical handlers so v1 and v2
675        // paths cannot drift.
676        "mem_get" => {
677            let params = match serde_json::from_value::<MemGetParams>(req.args.clone()) {
678                Ok(p) => p,
679                Err(e) => return SocketResponse::err(format!("invalid mem_get args: {e}")),
680            };
681            let input = super::protocol::MemGetInput { key: params.key };
682            let ctx = build_v1_dispatch_ctx(repo_root);
683            let g = graph.read().await;
684            match super::handlers::handle_mem_get(
685                g.store(),
686                graph,
687                &ctx,
688                uuid::Uuid::new_v4(),
689                &input,
690            )
691            .await
692            {
693                Ok(v) => SocketResponse::ok(serde_json::Value::String(
694                    serde_json::to_string_pretty(&v).unwrap_or_else(|_| "{}".into()),
695                )),
696                Err((_code, msg)) => SocketResponse::err(msg),
697            }
698        }
699
700        "mem_query" => {
701            let params = match serde_json::from_value::<MemQueryParams>(req.args.clone()) {
702                Ok(p) => p,
703                Err(e) => return SocketResponse::err(format!("invalid mem_query args: {e}")),
704            };
705            let mode = match params.mode.as_str() {
706                "text" => super::protocol::QueryMode::Text,
707                "tag" => super::protocol::QueryMode::Tag,
708                "graph" => super::protocol::QueryMode::Graph,
709                "semantic" => super::protocol::QueryMode::Semantic,
710                other => {
711                    return SocketResponse::err(format!(
712                        "unknown mode: {other}. Valid modes: text, tag, graph, semantic"
713                    ));
714                }
715            };
716            let input = super::protocol::MemQueryInput {
717                query: params.query,
718                mode,
719                limit: params.limit as u32,
720            };
721            let g = graph.read().await;
722            match super::handlers::handle_mem_query(g.store(), &g, &input).await {
723                Ok(v) => SocketResponse::ok(serde_json::Value::String(
724                    serde_json::to_string_pretty(&v).unwrap_or_else(|_| "{}".into()),
725                )),
726                Err((_code, msg)) => SocketResponse::err(msg),
727            }
728        }
729
730        "mem_bootstrap" => {
731            let params = match serde_json::from_value::<MemBootstrapParams>(req.args.clone()) {
732                Ok(p) => p,
733                Err(e) => return SocketResponse::err(format!("invalid mem_bootstrap args: {e}")),
734            };
735            let input = super::protocol::MemBootstrapInput {
736                context_files: params.context_files,
737            };
738            let ctx = build_v1_dispatch_ctx(repo_root);
739            let g = graph.read().await;
740            match super::handlers::handle_mem_bootstrap(
741                g.store(),
742                &g,
743                graph,
744                &ctx,
745                uuid::Uuid::new_v4(),
746                &input,
747            )
748            .await
749            {
750                Ok(s) => SocketResponse::ok(serde_json::Value::String(s)),
751                Err((_code, msg)) => SocketResponse::err(msg),
752            }
753        }
754
755        "mem_set" => {
756            let params = match serde_json::from_value::<MemSetParams>(req.args.clone()) {
757                Ok(p) => p,
758                Err(e) => return SocketResponse::err(format!("invalid mem_set args: {e}")),
759            };
760            let ctx = build_v1_dispatch_ctx(repo_root);
761            let response =
762                super::handlers::handle_mem_set(graph, &ctx, uuid::Uuid::new_v4(), &params).await;
763            SocketResponse::ok(serde_json::Value::String(response))
764        }
765
766        // ── Hook commands (store-only) ─────────────────────────────────
767        // Acquire a short-lived read lock for store access. The lock is
768        // released at the end of each arm — no risk of deadlock.
769        "get" => {
770            let key = match req.args.get("key").and_then(|v| v.as_str()) {
771                Some(k) => k,
772                None => return SocketResponse::err("missing args.key"),
773            };
774            let g = graph.read().await;
775            let store = g.store();
776            match store.get(key).await {
777                Ok(Some(record)) => {
778                    let confirmed = record
779                        .payload_as::<crate::store::GotchaRecord>()
780                        .map(|g| g.confirmed)
781                        .unwrap_or(false);
782                    match serde_json::to_value(&record) {
783                        Ok(mut val) => {
784                            if let Some(obj) = val.as_object_mut() {
785                                obj.insert(
786                                    "confirmed".to_string(),
787                                    serde_json::Value::Bool(confirmed),
788                                );
789                            }
790                            SocketResponse::ok(val)
791                        }
792                        Err(e) => SocketResponse::err(format!("serialize: {e}")),
793                    }
794                }
795                Ok(None) => SocketResponse::ok(serde_json::Value::Null),
796                Err(e) => SocketResponse::err(format!("store: {e}")),
797            }
798        }
799
800        // ── Internal hook-decide bulk command ────────────────────────────
801        // Returns file record + all linked gotcha records + consultation
802        // status in a single round-trip. NOT an MCP tool.
803        "hook_evaluate" => {
804            let file_key = match req.args.get("file_key").and_then(|v| v.as_str()) {
805                Some(k) => k,
806                None => return SocketResponse::err("missing args.file_key"),
807            };
808            let include_recent = req
809                .args
810                .get("include_recent")
811                .and_then(|v| v.as_bool())
812                .unwrap_or(false);
813            let actor = req.args.get("actor").and_then(|v| v.as_str());
814
815            let g = graph.read().await;
816            let store = g.store();
817
818            // 1. Fetch file record. Distinguish Ok(None) from Err.
819            let (file_record, store_error) = match store.get(file_key).await {
820                Ok(Some(r)) => (serde_json::to_value(&r).ok(), false),
821                Ok(None) => (None, false),
822                Err(e) => {
823                    tracing::warn!("hook_evaluate: store.get({file_key}) failed: {e}");
824                    (None, true)
825                }
826            };
827
828            // 2. Fetch all linked gotcha records.
829            //
830            // The canonical link is `file_record.payload.gotcha_keys`, written
831            // atomically by `compute_file_link_updates`. But CLAUDE.md flags
832            // this field as a *derived* index that can drift from the
833            // canonical `gotcha:*` records (e.g. if a gotcha was created
834            // before the file record existed, or if a partial-write left the
835            // file link stale). To make enforcement robust against that
836            // drift, we union three sources:
837            //   (a) `file_record.payload.gotcha_keys`               (primary)
838            //   (b) in-memory graph edges `HasGotcha` from file_key  (secondary)
839            //   (c) reverse scan of `gotcha:*` records whose
840            //       `affected_files` contains the relative path     (fallback)
841            // Source (c) is bounded by the active-gotcha count and
842            // short-circuited when (a) or (b) already produced results, so
843            // it does not add cost on the hot path.
844            let mut gotcha_records = serde_json::Map::new();
845            let mut gotcha_error = false;
846            let mut linked_keys: std::collections::BTreeSet<String> =
847                std::collections::BTreeSet::new();
848
849            if let Some(ref fr) = file_record {
850                if let Some(keys) = fr
851                    .pointer("/payload/gotcha_keys")
852                    .and_then(|v| v.as_array())
853                {
854                    for gk in keys {
855                        if let Some(key_str) = gk.as_str() {
856                            linked_keys.insert(key_str.to_string());
857                        }
858                    }
859                }
860            }
861
862            // (b) Graph-edge fallback. Loaded at boot from `graph:edge:*`,
863            // independent of the file record's denormalized list.
864            for nkey in g.neighbors(file_key, &crate::graph::EdgeKind::HasGotcha) {
865                linked_keys.insert(nkey);
866            }
867
868            // (c) Canonical reverse-lookup fallback. Only run when both
869            // derived indexes were empty AND a file record exists — covers
870            // the "file record present but gotcha_keys never synced" drift
871            // path that CLAUDE.md documents as a known trap. Bounded scan
872            // strips the relative path from the file_key once.
873            if linked_keys.is_empty() && file_record.is_some() {
874                let rel_path = file_key.strip_prefix("file:").unwrap_or(file_key);
875                if let Ok(all_gotchas) = store.scan_prefix("gotcha:").await {
876                    for r in all_gotchas {
877                        if !matches!(r.lifecycle, crate::store::RecordLifecycle::Active) {
878                            continue;
879                        }
880                        if let Some(g) = r.payload_as::<crate::store::GotchaRecord>() {
881                            if g.affected_files.iter().any(|af| af == rel_path) {
882                                linked_keys.insert(r.key.clone());
883                            }
884                        }
885                    }
886                }
887            }
888
889            for key_str in &linked_keys {
890                match store.get(key_str).await {
891                    Ok(Some(grec)) => {
892                        // Skip tombstoned gotchas so they never feed into enforcement.
893                        if !matches!(grec.lifecycle, crate::store::RecordLifecycle::Active) {
894                            continue;
895                        }
896                        // Inline confirmed flag (same as "get" handler).
897                        let confirmed = grec
898                            .payload_as::<crate::store::GotchaRecord>()
899                            .map(|g| g.confirmed)
900                            .unwrap_or(false);
901                        if let Ok(mut val) = serde_json::to_value(&grec) {
902                            if let Some(obj) = val.as_object_mut() {
903                                obj.insert(
904                                    "confirmed".to_string(),
905                                    serde_json::Value::Bool(confirmed),
906                                );
907                            }
908                            gotcha_records.insert(key_str.clone(), val);
909                        }
910                    }
911                    Ok(None) => {}
912                    Err(e) => {
913                        tracing::warn!("hook_evaluate: store.get({key_str}) failed: {e}");
914                        gotcha_error = true;
915                    }
916                }
917            }
918
919            // Project the unified gotcha_keys back into the returned
920            // file_record so decide.rs::evaluate (which iterates
921            // `payload.gotcha_keys`) sees every gotcha we just unioned —
922            // not just the ones the canonical link recorded.
923            let file_record = if let Some(mut fr) = file_record {
924                if !gotcha_records.is_empty() {
925                    if let Some(payload) = fr.pointer_mut("/payload") {
926                        if let Some(obj) = payload.as_object_mut() {
927                            let keys: Vec<serde_json::Value> = gotcha_records
928                                .keys()
929                                .map(|k| serde_json::Value::String(k.clone()))
930                                .collect();
931                            obj.insert("gotcha_keys".to_string(), serde_json::Value::Array(keys));
932                        }
933                    }
934                }
935                Some(fr)
936            } else {
937                None
938            };
939
940            // 3. Consultation status.
941            let consulted = sess::check_consulted(store, file_key, actor)
942                .await
943                .unwrap_or(false);
944            let consulted_recent = if include_recent {
945                sess::check_consulted_recent(store, file_key, 900, actor)
946                    .await
947                    .unwrap_or(false)
948            } else {
949                false
950            };
951
952            SocketResponse::ok(serde_json::json!({
953                "file_key": file_key,
954                "file_record": file_record,
955                "gotcha_records": gotcha_records,
956                "consulted": consulted,
957                "consulted_recent": consulted_recent,
958                "store_error": store_error,
959                "gotcha_error": gotcha_error,
960            }))
961        }
962
963        "log_hit" => {
964            let key = match req.args.get("key").and_then(|v| v.as_str()) {
965                Some(k) => k,
966                None => return SocketResponse::err("missing args.key"),
967            };
968            let g = graph.read().await;
969            if let Err(e) = sess::log_hit(g.store(), key).await {
970                tracing::warn!("daemon socket log_hit: {e}");
971            }
972            SocketResponse::ok(serde_json::Value::Null)
973        }
974
975        "log_miss" => {
976            let key = match req.args.get("key").and_then(|v| v.as_str()) {
977                Some(k) => k,
978                None => return SocketResponse::err("missing args.key"),
979            };
980            let g = graph.read().await;
981            if let Err(e) = sess::log_miss(g.store(), key).await {
982                tracing::warn!("daemon socket log_miss: {e}");
983            }
984            SocketResponse::ok(serde_json::Value::Null)
985        }
986
987        "log_compliance_miss" => {
988            let key = match req.args.get("key").and_then(|v| v.as_str()) {
989                Some(k) => k,
990                None => return SocketResponse::err("missing args.key"),
991            };
992            let g = graph.read().await;
993            let store = g.store();
994            if let Err(e) = sess::log_compliance_miss(store, key).await {
995                tracing::warn!("daemon socket log_compliance_miss: {e}");
996            }
997            // Record Deny enforcement event — best-effort
998            let _ = crate::store::enforcement::record_event(
999                store,
1000                crate::store::enforcement::EnforcementEventType::Deny,
1001                crate::store::enforcement::SubjectKind::File,
1002                key.to_string(),
1003                "claude".to_string(),
1004                None,
1005                "gotcha_above_threshold".to_string(),
1006                None,
1007            )
1008            .await;
1009            SocketResponse::ok(serde_json::Value::Null)
1010        }
1011
1012        "log_compliance_hit" => {
1013            let key = match req.args.get("key").and_then(|v| v.as_str()) {
1014                Some(k) => k,
1015                None => return SocketResponse::err("missing args.key"),
1016            };
1017            let g = graph.read().await;
1018            let store = g.store();
1019            if let Err(e) = sess::log_compliance_hit(store, key).await {
1020                tracing::warn!("daemon socket log_compliance_hit: {e}");
1021            }
1022            // Record AllowAfterReceipt enforcement event — best-effort
1023            let _ = crate::store::enforcement::record_event(
1024                store,
1025                crate::store::enforcement::EnforcementEventType::AllowAfterReceipt,
1026                crate::store::enforcement::SubjectKind::File,
1027                key.to_string(),
1028                "claude".to_string(),
1029                None,
1030                "receipt_valid".to_string(),
1031                None,
1032            )
1033            .await;
1034            SocketResponse::ok(serde_json::Value::Null)
1035        }
1036
1037        "log_codex_shell_miss" => {
1038            let key = match req.args.get("key").and_then(|v| v.as_str()) {
1039                Some(k) => k,
1040                None => return SocketResponse::err("missing args.key"),
1041            };
1042            let g = graph.read().await;
1043            if let Err(e) = sess::log_codex_shell_miss(g.store(), key).await {
1044                tracing::warn!("daemon socket log_codex_shell_miss: {e}");
1045            }
1046            SocketResponse::ok(serde_json::Value::Null)
1047        }
1048
1049        "log_bootstrap" => {
1050            let key = match req.args.get("key").and_then(|v| v.as_str()) {
1051                Some(k) => k,
1052                None => return SocketResponse::err("missing args.key"),
1053            };
1054            let g = graph.read().await;
1055            if let Err(e) = sess::log_bootstrap(g.store(), key).await {
1056                tracing::warn!("daemon socket log_bootstrap: {e}");
1057            }
1058            SocketResponse::ok(serde_json::Value::Null)
1059        }
1060
1061        "log_prompt_nudge" => {
1062            let key = match req.args.get("key").and_then(|v| v.as_str()) {
1063                Some(k) => k,
1064                None => return SocketResponse::err("missing args.key"),
1065            };
1066            let g = graph.read().await;
1067            if let Err(e) = sess::log_prompt_nudge(g.store(), key).await {
1068                tracing::warn!("daemon socket log_prompt_nudge: {e}");
1069            }
1070            SocketResponse::ok(serde_json::Value::Null)
1071        }
1072
1073        "session_check_consulted" => {
1074            let key = match req.args.get("key").and_then(|v| v.as_str()) {
1075                Some(k) => k,
1076                None => return SocketResponse::err("missing args.key"),
1077            };
1078            let g = graph.read().await;
1079            match sess::check_consulted(g.store(), key, None).await {
1080                Ok(found) => SocketResponse::ok(serde_json::Value::Bool(found)),
1081                Err(e) => SocketResponse::err(format!("store: {e}")),
1082            }
1083        }
1084
1085        "session_check_consulted_recent" => {
1086            let key = match req.args.get("key").and_then(|v| v.as_str()) {
1087                Some(k) => k,
1088                None => return SocketResponse::err("missing args.key"),
1089            };
1090            let ttl_secs = req
1091                .args
1092                .get("ttl_secs")
1093                .and_then(|v| v.as_u64())
1094                .unwrap_or(900);
1095            let g = graph.read().await;
1096            match sess::check_consulted_recent(g.store(), key, ttl_secs, None).await {
1097                Ok(found) => SocketResponse::ok(serde_json::Value::Bool(found)),
1098                Err(e) => SocketResponse::err(format!("store: {e}")),
1099            }
1100        }
1101
1102        "session_flush" => {
1103            let g = graph.read().await;
1104            if let Err(e) = sess::session_flush(g.store()).await {
1105                tracing::warn!("daemon socket session_flush: {e}");
1106            }
1107            SocketResponse::ok(serde_json::Value::Null)
1108        }
1109
1110        "session_harvest" => {
1111            // Note: uses no-staleness variant because StalenessAnalyzer (git2) is !Send.
1112            // Git-based staleness analysis runs on the next CLI-path harvest.
1113            let g = graph.read().await;
1114            if let Err(e) = sess::session_harvest_no_staleness(g.store()).await {
1115                tracing::warn!("daemon socket session_harvest: {e}");
1116            }
1117            SocketResponse::ok(serde_json::Value::Null)
1118        }
1119
1120        "reparse" => {
1121            let path = match req.args.get("path").and_then(|v| v.as_str()) {
1122                Some(p) => p,
1123                None => return SocketResponse::err("missing args.path"),
1124            };
1125            let g = graph.read().await;
1126            if let Err(e) = crate::analysis::reparse::reparse_impl(g.store(), repo_root, path).await
1127            {
1128                tracing::warn!("daemon socket reparse: {e}");
1129            }
1130            SocketResponse::ok(serde_json::Value::Null)
1131        }
1132
1133        "edit_hook" => {
1134            let path = match req.args.get("path").and_then(|v| v.as_str()) {
1135                Some(p) => p,
1136                None => return SocketResponse::err("missing args.path"),
1137            };
1138            let file_key = format!("file:{path}");
1139            let g = graph.read().await;
1140            let store = g.store();
1141            if let Err(e) = sess::log_hit(store, &file_key).await {
1142                tracing::warn!("daemon socket edit_hook: log_hit failed: {e}");
1143            }
1144            if let Err(e) = crate::analysis::reparse::reparse_impl(store, repo_root, path).await {
1145                tracing::warn!("daemon socket edit_hook: reparse failed (non-fatal): {e}");
1146            }
1147
1148            // Incremental blast radius update: recompute for the modified file,
1149            // its direct importers, and the files it imports.
1150            {
1151                use crate::analysis::blast_radius::BlastRadius;
1152                use crate::graph::edges::EdgeKind;
1153
1154                let mut keys_to_update = vec![file_key.clone()];
1155                // Files that import this file (their blast radius may change if
1156                // this file's import list changed).
1157                keys_to_update.extend(g.neighbors_incoming(&file_key, &EdgeKind::Imports));
1158                // Files this file imports (this file now counts as an importer).
1159                keys_to_update.extend(g.neighbors(&file_key, &EdgeKind::Imports));
1160
1161                for key in keys_to_update {
1162                    let br = BlastRadius::compute(&key, &g);
1163                    if let Ok(Some(mut rec)) = store.get(&key).await {
1164                        if let Some(mut fr) = rec.payload_as::<crate::store::record::FileRecord>() {
1165                            fr.blast_radius = Some(br);
1166                            rec.payload = serde_json::to_value(&fr).ok();
1167                            let _ = store.put(&key, &rec).await;
1168                        }
1169                    }
1170                }
1171            }
1172
1173            // Incremental staleness propagation: recompute for the edited
1174            // file's direct importers and their importers (depth 2 only).
1175            // Does NOT recompute the full repo — keeps the hook fast.
1176            {
1177                let mut affected_keys = vec![file_key.clone()];
1178                let d1 = g.neighbors_incoming(&file_key, &EdgeKind::Imports);
1179                for d1k in &d1 {
1180                    affected_keys.push(d1k.clone());
1181                    affected_keys.extend(g.neighbors_incoming(d1k, &EdgeKind::Imports));
1182                }
1183                // Collect records for just the affected neighborhood
1184                let mut neighborhood_recs = Vec::new();
1185                for key in &affected_keys {
1186                    if let Ok(Some(rec)) = store.get(key).await {
1187                        neighborhood_recs.push(rec);
1188                    }
1189                }
1190                // Also include the edited file itself as a potential source
1191                if let Ok(Some(rec)) = store.get(&file_key).await {
1192                    if !neighborhood_recs.iter().any(|r| r.key == file_key) {
1193                        neighborhood_recs.push(rec);
1194                    }
1195                }
1196                let propagation =
1197                    crate::analysis::propagation::compute_propagation(&neighborhood_recs, &g);
1198                for (key, prop) in &propagation {
1199                    if let Ok(Some(mut rec)) = store.get(key).await {
1200                        if let Some(mut fr) = rec.payload_as::<crate::store::record::FileRecord>() {
1201                            fr.propagated_staleness = Some(prop.clone());
1202                            rec.payload = serde_json::to_value(&fr).ok();
1203                            let _ = store.put(key, &rec).await;
1204                        }
1205                    }
1206                }
1207            }
1208
1209            SocketResponse::ok(serde_json::Value::Null)
1210        }
1211
1212        "doc_capture" => {
1213            let path = match req.args.get("path").and_then(|v| v.as_str()) {
1214                Some(p) => p,
1215                None => return SocketResponse::err("missing args.path"),
1216            };
1217            let content = req
1218                .args
1219                .get("content")
1220                .and_then(|v| v.as_str())
1221                .unwrap_or("");
1222            let g = graph.read().await;
1223            if let Err(e) = sess::doc_capture(g.store(), path, content).await {
1224                tracing::warn!("daemon socket doc_capture: {e}");
1225            }
1226            SocketResponse::ok(serde_json::Value::Null)
1227        }
1228
1229        "scan_prefix" => {
1230            let prefix = match req.args.get("prefix").and_then(|v| v.as_str()) {
1231                Some(p) => p,
1232                None => return SocketResponse::err("missing args.prefix"),
1233            };
1234            let g = graph.read().await;
1235            match g.store().scan_prefix(prefix).await {
1236                Ok(records) => match serde_json::to_value(&records) {
1237                    Ok(val) => SocketResponse::ok(val),
1238                    Err(e) => SocketResponse::err(format!("serialize: {e}")),
1239                },
1240                Err(e) => SocketResponse::err(format!("store: {e}")),
1241            }
1242        }
1243
1244        "scan_enforcement_events" => {
1245            let since_seq = req
1246                .args
1247                .get("since_seq")
1248                .and_then(|v| v.as_u64())
1249                .unwrap_or(0);
1250            let until_seq = req
1251                .args
1252                .get("until_seq")
1253                .and_then(|v| v.as_u64())
1254                .unwrap_or(u64::MAX);
1255            let g = graph.read().await;
1256            match crate::store::enforcement::scan_enforcement_events(
1257                g.store(),
1258                since_seq,
1259                until_seq,
1260            )
1261            .await
1262            {
1263                Ok(events) => match serde_json::to_value(&events) {
1264                    Ok(val) => SocketResponse::ok(val),
1265                    Err(e) => SocketResponse::err(format!("serialize: {e}")),
1266                },
1267                Err(e) => SocketResponse::err(format!("store: {e}")),
1268            }
1269        }
1270
1271        "put" => {
1272            use crate::store::Record;
1273            let key = match req.args.get("key").and_then(|v| v.as_str()) {
1274                Some(k) => k,
1275                None => return SocketResponse::err("missing args.key"),
1276            };
1277            let record: Record = match req
1278                .args
1279                .get("record")
1280                .and_then(|v| serde_json::from_value(v.clone()).ok())
1281            {
1282                Some(r) => r,
1283                None => return SocketResponse::err("put: invalid record"),
1284            };
1285            let g = graph.read().await;
1286            match g.store().put(key, &record).await {
1287                Ok(()) => SocketResponse::ok(serde_json::Value::Null),
1288                Err(e) => SocketResponse::err(format!("store put: {e}")),
1289            }
1290        }
1291
1292        "delete" => {
1293            let key = match req.args.get("key").and_then(|v| v.as_str()) {
1294                Some(k) => k,
1295                None => return SocketResponse::err("missing args.key"),
1296            };
1297            let g = graph.read().await;
1298            match g.store().delete(key).await {
1299                Ok(()) => SocketResponse::ok(serde_json::Value::Null),
1300                Err(e) => SocketResponse::err(format!("delete: {e}")),
1301            }
1302        }
1303
1304        "history" => {
1305            let key = match req.args.get("key").and_then(|v| v.as_str()) {
1306                Some(k) => k,
1307                None => return SocketResponse::err("missing args.key"),
1308            };
1309            let limit = req.args.get("limit").and_then(|v| v.as_u64()).unwrap_or(50) as usize;
1310            let g = graph.read().await;
1311            match g.store().history(key, limit) {
1312                Ok(entries) => match serde_json::to_value(&entries) {
1313                    Ok(val) => SocketResponse::ok(val),
1314                    Err(e) => SocketResponse::err(format!("serialize: {e}")),
1315                },
1316                Err(e) => SocketResponse::err(format!("history: {e}")),
1317            }
1318        }
1319
1320        "history_since" => {
1321            let key = match req.args.get("key").and_then(|v| v.as_str()) {
1322                Some(k) => k,
1323                None => return SocketResponse::err("missing args.key"),
1324            };
1325            let since_ts = req
1326                .args
1327                .get("since_ts")
1328                .and_then(|v| v.as_u64())
1329                .unwrap_or(0);
1330            let limit = req.args.get("limit").and_then(|v| v.as_u64()).unwrap_or(50) as usize;
1331            let g = graph.read().await;
1332            match g.store().history_since(key, since_ts, limit) {
1333                Ok(entries) => match serde_json::to_value(&entries) {
1334                    Ok(val) => SocketResponse::ok(val),
1335                    Err(e) => SocketResponse::err(format!("serialize: {e}")),
1336                },
1337                Err(e) => SocketResponse::err(format!("history_since: {e}")),
1338            }
1339        }
1340
1341        "gotcha_write" => {
1342            use crate::store::gotcha_ops::apply_gotcha_write;
1343            use crate::store::Record;
1344
1345            let record: Record = match req
1346                .args
1347                .get("record")
1348                .and_then(|v| serde_json::from_value(v.clone()).ok())
1349            {
1350                Some(r) => r,
1351                None => return SocketResponse::err("missing or invalid args.record"),
1352            };
1353            let new_files: Vec<String> = req
1354                .args
1355                .get("new_files")
1356                .and_then(|v| serde_json::from_value(v.clone()).ok())
1357                .unwrap_or_default();
1358            let old_files: Vec<String> = req
1359                .args
1360                .get("old_files")
1361                .and_then(|v| serde_json::from_value(v.clone()).ok())
1362                .unwrap_or_default();
1363            let is_new = req
1364                .args
1365                .get("is_new")
1366                .and_then(|v| v.as_bool())
1367                .unwrap_or(false);
1368
1369            {
1370                let g = graph.read().await;
1371                match apply_gotcha_write(g.store(), &record, &old_files, &new_files, is_new).await {
1372                    Ok(()) => {}
1373                    Err(e) => return SocketResponse::err(format!("{e}")),
1374                }
1375            }
1376
1377            // Sync the in-memory graph: add HasGotcha edges for newly-affected files,
1378            // remove edges for files no longer affected. The persistent store was already
1379            // updated by apply_gotcha_write above; this keeps the in-memory adjacency list
1380            // in sync so that assemble_context_packet (bootstrap) sees the edges immediately
1381            // without requiring a daemon restart.
1382            let record_key = record.key.clone();
1383            let old_set: std::collections::HashSet<&str> =
1384                old_files.iter().map(String::as_str).collect();
1385            let new_set: std::collections::HashSet<&str> =
1386                new_files.iter().map(String::as_str).collect();
1387            {
1388                let mut g = graph.write().await;
1389                for file_path in new_set.difference(&old_set) {
1390                    let file_key = format!("file:{file_path}");
1391                    let _ = g
1392                        .add_edge(&file_key, EdgeKind::HasGotcha, &record_key)
1393                        .await;
1394                }
1395                for file_path in old_set.difference(&new_set) {
1396                    let file_key = format!("file:{file_path}");
1397                    let _ = g
1398                        .remove_edge(&file_key, &EdgeKind::HasGotcha, &record_key)
1399                        .await;
1400                }
1401            }
1402
1403            SocketResponse::ok(serde_json::Value::String("written".into()))
1404        }
1405
1406        "gotcha_tombstone" => {
1407            use crate::store::gotcha_ops::apply_gotcha_tombstone;
1408
1409            let key = match req.args.get("key").and_then(|v| v.as_str()) {
1410                Some(k) => k,
1411                None => return SocketResponse::err("missing args.key"),
1412            };
1413            if !key.starts_with("gotcha:") {
1414                return SocketResponse::err("delete action only applies to gotcha: keys");
1415            }
1416            // Read affected_files from args if provided, otherwise look up the
1417            // record to get them. The MCP proxy sends delete without affected_files.
1418            let mut affected_files: Vec<String> = req
1419                .args
1420                .get("affected_files")
1421                .and_then(|v| serde_json::from_value(v.clone()).ok())
1422                .unwrap_or_default();
1423
1424            let g = graph.read().await;
1425            if affected_files.is_empty() {
1426                if let Ok(Some(record)) = g.store().get(key).await {
1427                    if let Some(gotcha) = record.payload_as::<crate::store::GotchaRecord>() {
1428                        affected_files = gotcha.affected_files;
1429                    }
1430                }
1431            }
1432            match apply_gotcha_tombstone(g.store(), key, &affected_files).await {
1433                Ok(()) => SocketResponse::ok(serde_json::Value::String("tombstoned".into())),
1434                Err(e) => SocketResponse::err(format!("{e}")),
1435            }
1436        }
1437
1438        "gotcha_confirm" => {
1439            let key = match req.args.get("key").and_then(|v| v.as_str()) {
1440                Some(k) => k,
1441                None => return SocketResponse::err("missing args.key"),
1442            };
1443
1444            // Read record
1445            let g = graph.read().await;
1446            let store = g.store();
1447            let mut record = match store.get(key).await {
1448                Ok(Some(r)) => r,
1449                Ok(None) => return SocketResponse::err(format!("record not found: {key}")),
1450                Err(e) => return SocketResponse::err(format!("store get: {e}")),
1451            };
1452
1453            if record.category != crate::store::record::Category::Gotcha {
1454                return SocketResponse::err(format!("{key} is not a gotcha record"));
1455            }
1456
1457            if !matches!(
1458                record.lifecycle,
1459                crate::store::record::RecordLifecycle::Active
1460            ) {
1461                return SocketResponse::err(format!(
1462                    "{key} is tombstoned — cannot confirm a deleted record"
1463                ));
1464            }
1465
1466            // Set confirmed + normalize severity
1467            if let Some(ref mut payload) = record.payload {
1468                if let Some(obj) = payload.as_object_mut() {
1469                    if let Some(sev) = obj
1470                        .get("severity")
1471                        .and_then(|v| v.as_str())
1472                        .map(|s| s.to_lowercase())
1473                    {
1474                        obj.insert("severity".to_string(), serde_json::Value::String(sev));
1475                    }
1476                    obj.insert("confirmed".to_string(), serde_json::Value::Bool(true));
1477                }
1478            }
1479
1480            record.source = crate::store::record::RecordSource::DeveloperManual;
1481            record.confidence.value = crate::store::record::ConfidenceScore::base_for_source(
1482                &crate::store::record::RecordSource::DeveloperManual,
1483            );
1484            record.confidence.confirmation_count += 1;
1485            record.quality = crate::health::quality::analyze(&record);
1486
1487            let now = std::time::SystemTime::now()
1488                .duration_since(std::time::UNIX_EPOCH)
1489                .unwrap_or_default()
1490                .as_secs();
1491            record.updated_at = now;
1492            record.version.logical_clock += 1;
1493            record.version.wall_clock = now;
1494
1495            // Extract affected_files for file-link sync
1496            let affected_files: Vec<String> = record
1497                .payload_as::<crate::store::record::GotchaRecord>()
1498                .map(|g| g.affected_files)
1499                .unwrap_or_default();
1500
1501            if let Err(e) = store.put(key, &record).await {
1502                return SocketResponse::err(format!("store put: {e}"));
1503            }
1504
1505            // Sync file:*.gotcha_keys — best-effort
1506            for file_path in &affected_files {
1507                let file_key = format!("file:{file_path}");
1508                if let Ok(Some(mut file_record)) = store.get(&file_key).await {
1509                    let needs_link = file_record
1510                        .payload
1511                        .as_ref()
1512                        .and_then(|p| p.get("gotcha_keys"))
1513                        .and_then(|v| v.as_array())
1514                        .map(|arr| !arr.iter().any(|v| v.as_str() == Some(key)))
1515                        .unwrap_or(true);
1516                    if needs_link {
1517                        if let Some(ref mut payload) = file_record.payload {
1518                            if let Some(obj) = payload.as_object_mut() {
1519                                let arr = obj.entry("gotcha_keys").or_insert(serde_json::json!([]));
1520                                if let Some(arr) = arr.as_array_mut() {
1521                                    arr.push(serde_json::Value::String(key.to_string()));
1522                                }
1523                            }
1524                        }
1525                        let _ = store.put(&file_key, &file_record).await;
1526                    }
1527                }
1528            }
1529
1530            // Propagate confirmation_count to linked file records
1531            crate::store::gotcha_ops::propagate_confirmation_to_files(store, &affected_files).await;
1532
1533            // Record ControlChanged::Confirmed enforcement event — best-effort.
1534            let _ = crate::store::enforcement::record_event(
1535                store,
1536                crate::store::enforcement::EnforcementEventType::ControlChanged {
1537                    change_kind: crate::store::enforcement::ControlChangeKind::Confirmed,
1538                },
1539                crate::store::enforcement::SubjectKind::Control,
1540                key.to_string(),
1541                "developer".to_string(),
1542                None,
1543                "control_confirmed".to_string(),
1544                None,
1545            )
1546            .await;
1547
1548            SocketResponse::ok(serde_json::json!({"confirmed": true, "key": key}))
1549        }
1550
1551        other => SocketResponse::err(format!("unknown command: {other}")),
1552    }
1553}
1554
1555// ── Auto-promotion: MCP server → headless daemon ─────────────────────────────
1556
1557/// Idle shutdown threshold — wall-clock seconds with no daemon socket requests.
1558///
1559/// Shared with `cli::daemon` so both daemon paths use the same idle policy.
1560pub const IDLE_SHUTDOWN_SECS: u64 = 30 * 60; // 30 min
1561
1562/// How often to check wall-clock idle time. Shared with `cli::daemon`.
1563pub const IDLE_CHECK_INTERVAL_SECS: u64 = 5 * 60; // 5 min
1564
1565// ── Tests ────────────────────────────────────────────────────────────────────
1566
1567#[cfg(test)]
1568mod shutdown_tests {
1569    use super::*;
1570    use std::sync::Arc;
1571    use std::time::Duration;
1572
1573    #[tokio::test]
1574    async fn shutdown_signal_before_wait_returns_immediately() {
1575        // Pre-signal: subsequent wait must NOT block. Tests the flag-check
1576        // arm of `wait()` before the notified.await.
1577        let s = Shutdown::new();
1578        s.signal();
1579        // Should return well under timeout — generous bound to avoid CI flake.
1580        tokio::time::timeout(Duration::from_millis(100), s.wait())
1581            .await
1582            .expect("wait must return immediately when already signaled");
1583        assert!(s.is_set());
1584    }
1585
1586    #[tokio::test]
1587    async fn shutdown_wait_then_signal_wakes_waiter() {
1588        let s = Arc::new(Shutdown::new());
1589        let s_clone = Arc::clone(&s);
1590        let waiter = tokio::spawn(async move { s_clone.wait().await });
1591
1592        // Give the waiter a moment to register on `notified()`.
1593        tokio::time::sleep(Duration::from_millis(20)).await;
1594        assert!(!s.is_set());
1595
1596        s.signal();
1597
1598        tokio::time::timeout(Duration::from_millis(200), waiter)
1599            .await
1600            .expect("waiter must wake within timeout")
1601            .expect("waiter task should not panic");
1602        assert!(s.is_set());
1603    }
1604
1605    #[tokio::test]
1606    async fn shutdown_multiple_concurrent_waiters_all_wake() {
1607        // The notify_waiters() in signal() must wake every active waiter.
1608        let s = Arc::new(Shutdown::new());
1609        let mut handles = Vec::new();
1610        for _ in 0..16 {
1611            let s = Arc::clone(&s);
1612            handles.push(tokio::spawn(async move { s.wait().await }));
1613        }
1614        // Let waiters register.
1615        tokio::time::sleep(Duration::from_millis(20)).await;
1616
1617        s.signal();
1618
1619        for h in handles {
1620            tokio::time::timeout(Duration::from_millis(200), h)
1621                .await
1622                .expect("each waiter must wake within timeout")
1623                .expect("waiter task should not panic");
1624        }
1625    }
1626
1627    #[tokio::test]
1628    async fn shutdown_signal_is_idempotent() {
1629        // Second signal must be a no-op. Subsequent waits still return.
1630        let s = Shutdown::new();
1631        s.signal();
1632        s.signal();
1633        s.signal();
1634        tokio::time::timeout(Duration::from_millis(100), s.wait())
1635            .await
1636            .expect("wait must still return on idempotent re-signal");
1637    }
1638
1639    /// Contract test: the bounded-drain pattern in `serve_loop_graceful`
1640    /// (and the caller-side hammer for `serve_daemon_socket`) relies on
1641    /// `JoinSet::abort_all()` actually causing in-flight tasks to wake
1642    /// with a cancellation error, so a subsequent `join_next` loop
1643    /// completes. If tokio ever changes this — e.g., requires polling
1644    /// each task explicitly — our drain-timeout fallback silently
1645    /// regresses to "wait forever after abort_all".
1646    #[tokio::test]
1647    async fn joinset_abort_all_makes_drain_finite() {
1648        let mut set: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
1649        // Spawn a task that would otherwise run for a long time.
1650        set.spawn(async {
1651            tokio::time::sleep(Duration::from_secs(60)).await;
1652        });
1653
1654        // First drain attempt: time out (task is mid-sleep).
1655        let primary = tokio::time::timeout(Duration::from_millis(100), async {
1656            while set.join_next().await.is_some() {}
1657        })
1658        .await;
1659        assert!(
1660            primary.is_err(),
1661            "primary drain should time out while task is still sleeping"
1662        );
1663
1664        // Now abort and drain again — must complete promptly.
1665        set.abort_all();
1666        let secondary = tokio::time::timeout(Duration::from_millis(500), async {
1667            while set.join_next().await.is_some() {}
1668        })
1669        .await;
1670        assert!(
1671            secondary.is_ok(),
1672            "drain after abort_all must complete quickly"
1673        );
1674        assert!(set.is_empty(), "JoinSet should be empty after drain");
1675    }
1676
1677    /// Contract test: the panic-detection logic in `serve_daemon_socket`
1678    /// (and `cli::daemon::serve_loop_graceful`) relies on tokio's `JoinSet`
1679    /// reporting panicked tasks via `try_join_next() -> Some(Err(e))` with
1680    /// `e.is_panic() == true`. If tokio ever changes that, our handler-
1681    /// panic-is-terminal property silently regresses. Lock it down here.
1682    #[tokio::test]
1683    async fn joinset_panics_are_observable_via_try_join_next() {
1684        let mut set: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
1685        set.spawn(async {
1686            panic!("simulated handler panic");
1687        });
1688
1689        // Wait until the panicked task has been catch_unwind'd at the
1690        // tokio spawn boundary and parked on the JoinSet's completion queue.
1691        // Poll try_join_next briefly; assert we see the panic.
1692        let deadline = std::time::Instant::now() + Duration::from_millis(500);
1693        loop {
1694            if let Some(res) = set.try_join_next() {
1695                let err = res.expect_err("panicked task should yield Err");
1696                assert!(
1697                    err.is_panic(),
1698                    "JoinError must report is_panic for panicking task; got: {err:?}"
1699                );
1700                return;
1701            }
1702            if std::time::Instant::now() >= deadline {
1703                panic!("try_join_next never reported the panic within 500ms");
1704            }
1705            tokio::time::sleep(Duration::from_millis(10)).await;
1706        }
1707    }
1708
1709    /// Race contract — exercises the enable() pattern. A waiter that is
1710    /// JUST being constructed (between the `notified()` call and the flag
1711    /// check) must NOT miss a `signal()` that fires concurrently.
1712    ///
1713    /// Probabilistic: runs many trials and asserts every one wakes.
1714    #[tokio::test]
1715    async fn shutdown_no_lost_signal_under_race() {
1716        for trial in 0..50 {
1717            let s = Arc::new(Shutdown::new());
1718            let s_waiter = Arc::clone(&s);
1719            let s_signaler = Arc::clone(&s);
1720
1721            let waiter = tokio::spawn(async move { s_waiter.wait().await });
1722
1723            // Yield briefly so the waiter has a chance to start `wait()`.
1724            tokio::task::yield_now().await;
1725
1726            // Signal at the moment the waiter is racing to register.
1727            s_signaler.signal();
1728
1729            tokio::time::timeout(Duration::from_millis(500), waiter)
1730                .await
1731                .unwrap_or_else(|_| panic!("trial {trial}: waiter stranded by lost signal"))
1732                .expect("waiter task should not panic");
1733        }
1734    }
1735}
1736
1737#[cfg(test)]
1738mod tests {
1739    use super::*;
1740    use crate::store::record::{
1741        Category, ConfidenceScore, FileRecord, GotchaRecord, Priority, QualityScore, Record,
1742        RecordLifecycle, RecordSource, RecordVersion, StalenessScore,
1743    };
1744    use crate::store::Store;
1745
1746    fn make_gotcha_record(key: &str, files: &[&str]) -> Record {
1747        let gotcha = GotchaRecord {
1748            rule: "test rule".into(),
1749            reason: "test reason".into(),
1750            severity: Priority::High,
1751            affected_files: files.iter().map(|s| s.to_string()).collect(),
1752            ref_url: None,
1753            discovered_session: 1_000_000,
1754            confirmed: true,
1755        };
1756        Record {
1757            key: key.to_string(),
1758            value: "test rule because test reason".into(),
1759            payload: serde_json::to_value(&gotcha).ok(),
1760            category: Category::Gotcha,
1761            priority: Priority::High,
1762            tags: vec![],
1763            created_at: 1_000_000,
1764            updated_at: 1_000_000,
1765            ref_url: None,
1766            staleness: StalenessScore::fresh(),
1767            lifecycle: RecordLifecycle::Active,
1768            version: RecordVersion {
1769                device_id: uuid::Uuid::new_v4(),
1770                logical_clock: 1,
1771                wall_clock: 1_000_000,
1772            },
1773            quality: QualityScore::layer0_default(),
1774            access_count: 0,
1775            last_accessed: 0,
1776            source: RecordSource::DeveloperManual,
1777            confidence: ConfidenceScore::for_new_record(&RecordSource::DeveloperManual),
1778            gap_analysis_score: 0.0,
1779        }
1780    }
1781
1782    fn make_file_record(path: &str) -> Record {
1783        let file = FileRecord {
1784            path: path.to_string(),
1785            purpose: String::new(),
1786            entry_points: vec![],
1787            imports: vec![],
1788            gotcha_keys: vec![],
1789            decision_keys: vec![],
1790            todos: vec![],
1791            unsafe_count: 0,
1792            unwrap_count: 0,
1793            change_frequency: 0,
1794            last_author: None,
1795            is_hotspot: false,
1796            token_cost_estimate: 0,
1797            last_modified_session: 0,
1798            content_hash: None,
1799            line_count: 0,
1800            blast_radius: None,
1801            propagated_staleness: None,
1802        };
1803        Record {
1804            key: format!("file:{path}"),
1805            value: String::new(),
1806            payload: serde_json::to_value(&file).ok(),
1807            category: Category::File,
1808            priority: Priority::Normal,
1809            tags: vec![],
1810            created_at: 1_000_000,
1811            updated_at: 1_000_000,
1812            ref_url: None,
1813            staleness: StalenessScore::fresh(),
1814            lifecycle: RecordLifecycle::Active,
1815            version: RecordVersion {
1816                device_id: uuid::Uuid::new_v4(),
1817                logical_clock: 1,
1818                wall_clock: 1_000_000,
1819            },
1820            quality: QualityScore::layer0_default(),
1821            access_count: 0,
1822            last_accessed: 0,
1823            source: RecordSource::StaticAnalysis,
1824            confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
1825            gap_analysis_score: 0.0,
1826        }
1827    }
1828
1829    fn file_gotcha_keys(record: &Record) -> Vec<String> {
1830        record
1831            .payload
1832            .as_ref()
1833            .and_then(|p| p.get("gotcha_keys"))
1834            .and_then(|v| v.as_array())
1835            .map(|arr| {
1836                arr.iter()
1837                    .filter_map(|v| v.as_str().map(String::from))
1838                    .collect()
1839            })
1840            .unwrap_or_default()
1841    }
1842
1843    /// Test helper: wraps a Store in a Graph + Arc for socket_dispatch.
1844    ///
1845    /// Consumes the Store (Graph owns it). Returns the Arc and a reference
1846    /// to access the store through the graph for assertions.
1847    async fn make_test_graph(store: Store) -> Arc<tokio::sync::RwLock<Graph>> {
1848        let graph = Graph::load(store).await.expect("failed to load test graph");
1849        Arc::new(tokio::sync::RwLock::new(graph))
1850    }
1851
1852    async fn dispatch_with_graph(
1853        graph: &Arc<tokio::sync::RwLock<Graph>>,
1854        cmd: &str,
1855        args: serde_json::Value,
1856    ) -> SocketResponse {
1857        let req = SocketRequest {
1858            cmd: cmd.to_string(),
1859            version: Some(PROTOCOL_VERSION),
1860            args,
1861        };
1862        socket_dispatch(graph, Path::new("/tmp/mati-test"), &req).await
1863    }
1864
1865    // ── Regression: gotcha_write via socket syncs file links ─────────────
1866
1867    #[tokio::test]
1868    async fn socket_gotcha_write_adds_keys_to_file_records() {
1869        let dir = tempfile::TempDir::new().unwrap();
1870        let store = Store::open(dir.path()).await.unwrap();
1871        store
1872            .put("file:src/a.rs", &make_file_record("src/a.rs"))
1873            .await
1874            .unwrap();
1875        store
1876            .put("file:src/b.rs", &make_file_record("src/b.rs"))
1877            .await
1878            .unwrap();
1879        let graph = make_test_graph(store).await;
1880
1881        let record = make_gotcha_record("gotcha:socket-test", &["src/a.rs", "src/b.rs"]);
1882        let resp = dispatch_with_graph(&graph, "gotcha_write", serde_json::json!({
1883            "record": record, "new_files": ["src/a.rs", "src/b.rs"], "old_files": [], "is_new": true,
1884        })).await;
1885        assert!(resp.ok, "gotcha_write failed: {:?}", resp.error);
1886
1887        let g = graph.read().await;
1888        let a = g.store().get("file:src/a.rs").await.unwrap().unwrap();
1889        let b = g.store().get("file:src/b.rs").await.unwrap().unwrap();
1890        assert!(file_gotcha_keys(&a).contains(&"gotcha:socket-test".into()));
1891        assert!(file_gotcha_keys(&b).contains(&"gotcha:socket-test".into()));
1892    }
1893
1894    #[tokio::test]
1895    async fn socket_gotcha_write_edit_removes_key_from_old_file() {
1896        let dir = tempfile::TempDir::new().unwrap();
1897        let store = Store::open(dir.path()).await.unwrap();
1898        store
1899            .put("file:src/a.rs", &make_file_record("src/a.rs"))
1900            .await
1901            .unwrap();
1902        store
1903            .put("file:src/b.rs", &make_file_record("src/b.rs"))
1904            .await
1905            .unwrap();
1906        let graph = make_test_graph(store).await;
1907
1908        let record = make_gotcha_record("gotcha:edit-socket", &["src/a.rs"]);
1909        let resp = dispatch_with_graph(
1910            &graph,
1911            "gotcha_write",
1912            serde_json::json!({
1913                "record": record, "new_files": ["src/a.rs"], "old_files": [], "is_new": true,
1914            }),
1915        )
1916        .await;
1917        assert!(resp.ok);
1918
1919        let record2 = make_gotcha_record("gotcha:edit-socket", &["src/b.rs"]);
1920        let resp2 = dispatch_with_graph(&graph, "gotcha_write", serde_json::json!({
1921            "record": record2, "new_files": ["src/b.rs"], "old_files": ["src/a.rs"], "is_new": false,
1922        })).await;
1923        assert!(resp2.ok);
1924
1925        let g = graph.read().await;
1926        let a = g.store().get("file:src/a.rs").await.unwrap().unwrap();
1927        let b = g.store().get("file:src/b.rs").await.unwrap().unwrap();
1928        assert!(!file_gotcha_keys(&a).contains(&"gotcha:edit-socket".into()));
1929        assert!(file_gotcha_keys(&b).contains(&"gotcha:edit-socket".into()));
1930    }
1931
1932    #[tokio::test]
1933    async fn socket_gotcha_tombstone_removes_keys_from_file_records() {
1934        let dir = tempfile::TempDir::new().unwrap();
1935        let store = Store::open(dir.path()).await.unwrap();
1936        store
1937            .put("file:src/a.rs", &make_file_record("src/a.rs"))
1938            .await
1939            .unwrap();
1940        store
1941            .put("file:src/b.rs", &make_file_record("src/b.rs"))
1942            .await
1943            .unwrap();
1944        let graph = make_test_graph(store).await;
1945
1946        let record = make_gotcha_record("gotcha:tomb-socket", &["src/a.rs", "src/b.rs"]);
1947        let resp = dispatch_with_graph(&graph, "gotcha_write", serde_json::json!({
1948            "record": record, "new_files": ["src/a.rs", "src/b.rs"], "old_files": [], "is_new": true,
1949        })).await;
1950        assert!(resp.ok);
1951
1952        let resp2 = dispatch_with_graph(
1953            &graph,
1954            "gotcha_tombstone",
1955            serde_json::json!({
1956                "key": "gotcha:tomb-socket", "affected_files": ["src/a.rs", "src/b.rs"],
1957            }),
1958        )
1959        .await;
1960        assert!(resp2.ok, "gotcha_tombstone failed: {:?}", resp2.error);
1961
1962        let g = graph.read().await;
1963        let rec = g.store().get("gotcha:tomb-socket").await.unwrap().unwrap();
1964        assert!(matches!(rec.lifecycle, RecordLifecycle::Tombstoned { .. }));
1965        let a = g.store().get("file:src/a.rs").await.unwrap().unwrap();
1966        let b = g.store().get("file:src/b.rs").await.unwrap().unwrap();
1967        assert!(file_gotcha_keys(&a).is_empty());
1968        assert!(file_gotcha_keys(&b).is_empty());
1969    }
1970
1971    #[tokio::test]
1972    async fn socket_gotcha_write_rejects_duplicate_key() {
1973        let dir = tempfile::TempDir::new().unwrap();
1974        let store = Store::open(dir.path()).await.unwrap();
1975        let record1 = make_gotcha_record("gotcha:dup-socket", &["src/a.rs"]);
1976        store.put("gotcha:dup-socket", &record1).await.unwrap();
1977        let graph = make_test_graph(store).await;
1978
1979        let record2 = make_gotcha_record("gotcha:dup-socket", &["src/b.rs"]);
1980        let resp = dispatch_with_graph(
1981            &graph,
1982            "gotcha_write",
1983            serde_json::json!({
1984                "record": record2, "new_files": ["src/b.rs"], "old_files": [], "is_new": true,
1985            }),
1986        )
1987        .await;
1988        assert!(!resp.ok, "duplicate key should be rejected");
1989        assert!(resp
1990            .error
1991            .as_deref()
1992            .unwrap_or("")
1993            .contains("already exists"));
1994
1995        let g = graph.read().await;
1996        let original = g.store().get("gotcha:dup-socket").await.unwrap().unwrap();
1997        let payload = original.payload_as::<GotchaRecord>().unwrap();
1998        assert_eq!(payload.affected_files, vec!["src/a.rs"]);
1999    }
2000
2001    // ── Wire-level size enforcement ────────────────────────────────────
2002
2003    #[tokio::test]
2004    async fn oversized_request_returns_frame_too_large_with_response() {
2005        use super::super::protocol::MAX_FRAME_SIZE;
2006        use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
2007
2008        let dir = tempfile::TempDir::new().unwrap();
2009        let store = Store::open(dir.path()).await.unwrap();
2010        let graph = make_test_graph(store).await;
2011
2012        let (client, server) = UnixStream::pair().unwrap();
2013        let peer = super::super::metadata::PeerContext {
2014            uid: 501,
2015            pid: None,
2016        };
2017
2018        // Payload larger than MAX_FRAME_SIZE.
2019        let oversized = "x".repeat(MAX_FRAME_SIZE + 100);
2020        let payload = format!("{oversized}\n");
2021
2022        // Split client: write oversized request, then read response.
2023        let (client_read, client_write) = client.into_split();
2024
2025        let write_handle = tokio::spawn(async move {
2026            let mut w = client_write;
2027            w.write_all(payload.as_bytes()).await.unwrap();
2028            w.shutdown().await.unwrap();
2029        });
2030
2031        let handle_result =
2032            socket_handle_connection(graph, dir.path(), server, peer, uuid::Uuid::nil()).await;
2033        assert!(handle_result.is_ok());
2034
2035        write_handle.await.unwrap();
2036
2037        // Read the error response from the server.
2038        let mut reader = tokio::io::BufReader::new(client_read);
2039        let mut line = String::new();
2040        reader.read_line(&mut line).await.unwrap();
2041        let resp: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
2042
2043        assert_eq!(resp["status"], "err");
2044        assert_eq!(resp["code"], "frame_too_large");
2045        assert!(
2046            resp["message"]
2047                .as_str()
2048                .unwrap()
2049                .contains(&MAX_FRAME_SIZE.to_string()),
2050            "error message should mention the size limit"
2051        );
2052    }
2053
2054    #[tokio::test]
2055    async fn normal_sized_request_is_not_rejected_by_size_check() {
2056        use super::super::protocol::MAX_FRAME_SIZE;
2057        use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
2058
2059        let dir = tempfile::TempDir::new().unwrap();
2060        let store = Store::open(dir.path()).await.unwrap();
2061        let graph = make_test_graph(store).await;
2062
2063        let (client, server) = UnixStream::pair().unwrap();
2064        let peer = super::super::metadata::PeerContext {
2065            uid: 501,
2066            pid: None,
2067        };
2068
2069        // A valid v2 ping request — well under MAX_FRAME_SIZE.
2070        let request = serde_json::json!({
2071            "v": 2,
2072            "id": uuid::Uuid::new_v4(),
2073            "session": uuid::Uuid::nil(),
2074            "cmd": { "type": "ping" }
2075        });
2076        let payload = format!("{}\n", serde_json::to_string(&request).unwrap());
2077        assert!(
2078            payload.len() < MAX_FRAME_SIZE,
2079            "test payload should be small"
2080        );
2081
2082        let (client_read, client_write) = client.into_split();
2083
2084        let write_handle = tokio::spawn(async move {
2085            let mut w = client_write;
2086            w.write_all(payload.as_bytes()).await.unwrap();
2087            w.shutdown().await.unwrap();
2088        });
2089
2090        let handle_result =
2091            socket_handle_connection(graph, dir.path(), server, peer, uuid::Uuid::nil()).await;
2092        assert!(handle_result.is_ok());
2093
2094        write_handle.await.unwrap();
2095
2096        // Read response — should be a successful pong, not FrameTooLarge.
2097        let mut reader = tokio::io::BufReader::new(client_read);
2098        let mut line = String::new();
2099        reader.read_line(&mut line).await.unwrap();
2100        let resp: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
2101
2102        assert_eq!(resp["status"], "ok", "ping should succeed, got: {resp}");
2103    }
2104
2105    // ── Daemon-restart resilience ──────────────────────────────────────
2106    //
2107    // Regression for the smoke-test failure: after a daemon stop+start,
2108    // the MCP-stdio bridge sees `session_mismatch` (or transient
2109    // `Unresponsive`) on the first call because its cached daemon session
2110    // UUID predates the restart. Without retry, every subsequent
2111    // mem_get/mem_query/mem_bootstrap/mem_set returns a structured error
2112    // that effectively wedges the agent's MCP session.
2113    //
2114    // The fix in `proxy_daemon_result` is one bounded auto-reconnect: the
2115    // helper re-reads daemon metadata fresh (picking up the new session
2116    // UUID) and re-issues the request. This test asserts the reconnect
2117    // succeeds end-to-end and DOES NOT propagate the session_mismatch
2118    // error envelope to the caller.
2119
2120    /// Spawn a tiny daemon-substitute that binds the given socket and
2121    /// answers each connection with the supplied JSON response (one line),
2122    /// then closes the connection. Returns the JoinHandle so the test can
2123    /// await it.
2124    async fn spawn_canned_responder(
2125        sock_path: std::path::PathBuf,
2126        responses: Vec<serde_json::Value>,
2127    ) -> tokio::task::JoinHandle<()> {
2128        // Bind in this task synchronously so the caller can issue
2129        // requests immediately without a sleep race.
2130        let listener = tokio::net::UnixListener::bind(&sock_path).expect("bind responder socket");
2131        tokio::spawn(async move {
2132            for resp in responses {
2133                let (stream, _) = match listener.accept().await {
2134                    Ok(s) => s,
2135                    Err(_) => return,
2136                };
2137                let (reader, mut writer) = stream.into_split();
2138                // Drain the request line so the peer's `shutdown()` returns Ok.
2139                let mut buf_reader = tokio::io::BufReader::new(reader);
2140                let mut line = String::new();
2141                let _ = tokio::io::AsyncBufReadExt::read_line(&mut buf_reader, &mut line).await;
2142                let mut bytes = serde_json::to_vec(&resp).unwrap();
2143                bytes.push(b'\n');
2144                let _ = tokio::io::AsyncWriteExt::write_all(&mut writer, &bytes).await;
2145                let _ = tokio::io::AsyncWriteExt::shutdown(&mut writer).await;
2146            }
2147        })
2148    }
2149
2150    #[tokio::test]
2151    async fn mcp_call_after_daemon_restart_does_not_kill_transport() {
2152        // Scenario: the proxy's first attempt hits a daemon whose session
2153        // UUID does not match (simulating a daemon restart between two
2154        // tool calls). The fix retries once, re-reads metadata, and the
2155        // second attempt succeeds.
2156
2157        let dir = tempfile::TempDir::new().unwrap();
2158        let root = dir.path().to_path_buf();
2159        let sock_path = root.join("mati.sock");
2160
2161        // Initial daemon session "before restart". The proxy will read
2162        // this UUID, but our canned responder pretends not to recognize
2163        // it (returning session_mismatch). After the retry delay, we
2164        // rotate metadata to a new UUID — exactly what `mati daemon stop`
2165        // + `mati daemon start` would do in production.
2166        let session_before = uuid::Uuid::new_v4();
2167        let session_after = uuid::Uuid::new_v4();
2168
2169        let meta_before = super::super::metadata::DaemonMetadata {
2170            pid: std::process::id(),
2171            session: session_before,
2172            owner: super::super::metadata::DaemonOwner::Daemon,
2173        };
2174        super::super::metadata::publish_metadata(&root, &meta_before).unwrap();
2175
2176        // Stage two responses on the same socket: the first is a
2177        // SessionMismatch err (pre-restart daemon view), the second is a
2178        // successful pong (post-restart daemon view).
2179        let responder_handle = spawn_canned_responder(
2180            sock_path.clone(),
2181            vec![
2182                serde_json::json!({
2183                    "v": 2,
2184                    "id": uuid::Uuid::new_v4(),
2185                    "status": "err",
2186                    "code": "session_mismatch",
2187                    "message": "session mismatch: re-read daemon metadata and retry",
2188                }),
2189                serde_json::json!({
2190                    "v": 2,
2191                    "id": uuid::Uuid::new_v4(),
2192                    "status": "ok",
2193                    "data": "pong",
2194                }),
2195            ],
2196        )
2197        .await;
2198
2199        // Concurrent metadata rotation — fires during the retry delay.
2200        // Mirrors what a real daemon restart does: writes fresh metadata.
2201        let root_for_rotate = root.clone();
2202        let rotate_handle = tokio::spawn(async move {
2203            // Sleep just less than the proxy's 100ms retry settle so the
2204            // metadata rewrite is committed before the second attempt.
2205            tokio::time::sleep(Duration::from_millis(20)).await;
2206            let meta_after = super::super::metadata::DaemonMetadata {
2207                pid: std::process::id(),
2208                session: session_after,
2209                owner: super::super::metadata::DaemonOwner::Daemon,
2210            };
2211            super::super::metadata::publish_metadata(&root_for_rotate, &meta_after).unwrap();
2212        });
2213
2214        // Wrap in a tokio timeout: if the retry path is missing, the
2215        // proxy returns the first attempt's envelope without ever
2216        // dialing the second responder, which would leave the test
2217        // hanging on the spare canned response. The timeout converts
2218        // that latent hang into a deterministic failure with a clear
2219        // error message.
2220        let result = tokio::time::timeout(
2221            Duration::from_secs(5),
2222            super::proxy_daemon_result(&root, "ping", serde_json::json!({})),
2223        )
2224        .await
2225        .expect("proxy_daemon_result should resolve within 5s — retry path appears wedged");
2226
2227        rotate_handle.await.unwrap();
2228        // Drop the responder task — the second canned response may go
2229        // unconsumed in failure modes. Aborting prevents the test from
2230        // hanging on `responder_handle.await` in failure mode.
2231        responder_handle.abort();
2232
2233        // The proxy must transparently recover: caller sees Ok, not the
2234        // session_mismatch error envelope from the first attempt.
2235        match result {
2236            super::ProxyDaemonResult::Ok(v) => {
2237                let ok = v.get("ok") == Some(&serde_json::Value::Bool(true));
2238                let code = v.get("code").and_then(|c| c.as_str()).unwrap_or("");
2239                assert!(
2240                    ok,
2241                    "second attempt should succeed after metadata rotation, \
2242                     but caller saw the first attempt's session_mismatch envelope: \
2243                     ok={ok} code={code:?} v={v}"
2244                );
2245            }
2246            other => panic!(
2247                "expected Ok(true) after auto-reconnect, got {other:?}; \
2248                 the daemon-restart retry path is not engaging"
2249            ),
2250        }
2251    }
2252
2253    #[tokio::test]
2254    async fn mcp_call_session_mismatch_no_retry_target_returns_envelope() {
2255        // Negative-side guard: if the second attempt also fails with the
2256        // same error (e.g. the daemon was not actually restarted), the
2257        // proxy still returns the structured error envelope to the
2258        // caller — it does NOT panic, hang, or close the rmcp transport.
2259        // This preserves the per-call structured-error discipline that
2260        // keeps Claude's MCP session alive.
2261
2262        let dir = tempfile::TempDir::new().unwrap();
2263        let root = dir.path().to_path_buf();
2264        let sock_path = root.join("mati.sock");
2265
2266        let session = uuid::Uuid::new_v4();
2267        let meta = super::super::metadata::DaemonMetadata {
2268            pid: std::process::id(),
2269            session,
2270            owner: super::super::metadata::DaemonOwner::Daemon,
2271        };
2272        super::super::metadata::publish_metadata(&root, &meta).unwrap();
2273
2274        // Both attempts get a session_mismatch — emulates a daemon that
2275        // truly cannot be reconciled (wedged in a state the proxy can't
2276        // recover from).
2277        let responder_handle = spawn_canned_responder(
2278            sock_path.clone(),
2279            vec![
2280                serde_json::json!({
2281                    "v": 2,
2282                    "id": uuid::Uuid::new_v4(),
2283                    "status": "err",
2284                    "code": "session_mismatch",
2285                    "message": "session mismatch (1)",
2286                }),
2287                serde_json::json!({
2288                    "v": 2,
2289                    "id": uuid::Uuid::new_v4(),
2290                    "status": "err",
2291                    "code": "session_mismatch",
2292                    "message": "session mismatch (2)",
2293                }),
2294            ],
2295        )
2296        .await;
2297
2298        let result = tokio::time::timeout(
2299            Duration::from_secs(5),
2300            super::proxy_daemon_result(&root, "ping", serde_json::json!({})),
2301        )
2302        .await
2303        .expect("proxy_daemon_result must resolve within 5s");
2304        responder_handle.abort();
2305
2306        // The caller MUST get a structured Ok envelope with ok:false +
2307        // the session_mismatch code, never a panic or transport-killing
2308        // surprise. socket_call (in tools.rs) renders this to a JSON
2309        // error string — which is exactly the contract the rmcp loop
2310        // expects: a String response, not a Result::Err.
2311        match result {
2312            super::ProxyDaemonResult::Ok(v) => {
2313                assert_eq!(v.get("ok"), Some(&serde_json::Value::Bool(false)));
2314                assert_eq!(
2315                    v.get("code").and_then(|c| c.as_str()),
2316                    Some("session_mismatch")
2317                );
2318            }
2319            other => panic!("expected structured Ok envelope, got {other:?}"),
2320        }
2321    }
2322
2323    // ── Pass-29 regression: proxy_daemon_result handles side-effecting reads ──
2324    //
2325    // Pre-fix: every Socket-backed `mem_get` and `mem_bootstrap` MCP call
2326    // panicked the rmcp task at `v1_to_v2_command` (no match arm), which
2327    // surfaced to the client as `Transport closed` and wedged Phases 6–17
2328    // of the smoke. The translation layer is the load-bearing artifact
2329    // — pass 27's mock-UnixListener test bypassed it entirely, so the
2330    // bug shipped.
2331    //
2332    // These tests drive `proxy_daemon_result` with the exact strings
2333    // tools.rs sends today. Without the new arms in v1_to_v2_command,
2334    // both panic. With the fix, both return a clean `NotRunning` because
2335    // the socket doesn't exist — proving the translation succeeded
2336    // before the connect attempt.
2337
2338    #[tokio::test]
2339    async fn proxy_daemon_result_handles_mem_get_translation_no_panic() {
2340        let dir = tempfile::TempDir::new().unwrap();
2341        // No socket file present — the call must reach the
2342        // sock_path.exists() guard, which it cannot do if v1_to_v2_command
2343        // panics first.
2344        let result = super::proxy_daemon_result(
2345            dir.path(),
2346            "mem_get",
2347            serde_json::json!({ "key": "file:src/main.rs" }),
2348        )
2349        .await;
2350        assert!(
2351            matches!(result, super::ProxyDaemonResult::NotRunning),
2352            "mem_get without daemon must return NotRunning, got {result:?}"
2353        );
2354    }
2355
2356    #[tokio::test]
2357    async fn proxy_daemon_result_handles_mem_bootstrap_translation_no_panic() {
2358        let dir = tempfile::TempDir::new().unwrap();
2359        let result = super::proxy_daemon_result(
2360            dir.path(),
2361            "mem_bootstrap",
2362            serde_json::json!({ "context_files": ["src/lib.rs"] }),
2363        )
2364        .await;
2365        assert!(
2366            matches!(result, super::ProxyDaemonResult::NotRunning),
2367            "mem_bootstrap without daemon must return NotRunning, got {result:?}"
2368        );
2369    }
2370
2371    #[tokio::test]
2372    async fn proxy_daemon_v2_typed_path_handles_mem_set_mutations_no_panic() {
2373        // The Socket-backend mem_set now takes the typed path. With no
2374        // daemon present, the typed-Command serialize→connect path must
2375        // surface as a clean NotRunning, never a panic. This is the
2376        // load-bearing fence: any future caller that accidentally routes
2377        // gotcha_upsert through the v1 mapper would fail
2378        // v1_to_v2_command_no_mutations_silently_accepted in protocol.rs;
2379        // here we make sure the typed path itself is wired end-to-end.
2380        let dir = tempfile::TempDir::new().unwrap();
2381        let cmd = super::super::protocol::Command::GotchaConfirm(
2382            super::super::protocol::GotchaConfirmInput {
2383                key: "gotcha:test".into(),
2384            },
2385        );
2386        let result = super::proxy_daemon_v2(dir.path(), cmd).await;
2387        assert!(
2388            matches!(result, super::ProxyDaemonResult::NotRunning),
2389            "typed proxy_daemon_v2 must return NotRunning when daemon is absent, got {result:?}"
2390        );
2391    }
2392}