1use std::path::Path;
26use std::path::PathBuf;
27use std::sync::Arc;
28use std::time::Duration;
29
30use anyhow::Result;
31use rmcp::model::{ServerCapabilities, ServerInfo};
32use rmcp::{tool_handler, ServerHandler, ServiceExt};
33use serde::{Deserialize, Serialize};
34use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
35use tokio::net::UnixStream;
36
37use crate::graph::edges::EdgeKind;
38use crate::graph::Graph;
39
40use super::tools::MatiServer;
41use super::types::{MemBootstrapParams, MemGetParams, MemQueryParams, MemSetParams};
42
43#[derive(Debug)]
44pub(crate) enum ProxyDaemonResult {
45 Ok(serde_json::Value),
46 NotRunning,
47 StaleSocket,
48 Unresponsive,
49}
50
51#[tool_handler(router = self.tool_router)]
52impl ServerHandler for MatiServer {
53 fn get_info(&self) -> ServerInfo {
54 ServerInfo::new(
55 ServerCapabilities::builder()
56 .enable_tools()
57 .enable_tool_list_changed()
58 .build(),
59 )
60 .with_instructions(
61 "mati is a persistent engineering knowledge store for the current \
62 codebase. Use mem_get for direct record lookup, mem_query for \
63 search and graph traversal, mem_bootstrap for session context, \
64 and mem_set for writing knowledge records.",
65 )
66 }
67}
68
69pub async fn serve(repo_root: &Path) -> Result<()> {
91 let startup_t0 = std::time::Instant::now();
92
93 let mati_root: PathBuf = dirs::home_dir()
96 .map(|h| h.join(".mati").join(crate::store::derive_slug(repo_root)))
97 .ok_or_else(|| anyhow::anyhow!("cannot resolve home directory for mati_root"))?;
98
99 super::metadata::record_lifecycle_event(&mati_root, "startup", "phase=ensure_daemon");
100
101 if !super::daemon_lifecycle::ensure_daemon(&mati_root).await {
105 super::metadata::record_lifecycle_event(
106 &mati_root,
107 "serve_failed",
108 "daemon unreachable after auto-spawn",
109 );
110 anyhow::bail!(
111 "mati serve: daemon unreachable. \
112 Run `mati daemon start` manually and check the lifecycle.log."
113 );
114 }
115
116 super::metadata::record_lifecycle_event(
117 &mati_root,
118 "serve_start",
119 &format!("pid={} owner=proxy", std::process::id()),
120 );
121
122 super::metrics::init();
125
126 super::metadata::record_lifecycle_event(
127 &mati_root,
128 "startup",
129 &format!(
130 "phase=ready elapsed_ms={}",
131 startup_t0.elapsed().as_millis()
132 ),
133 );
134
135 let transport = rmcp::transport::io::stdio();
137 let service = MatiServer::with_socket_root(mati_root.clone())
138 .serve(transport)
139 .await
140 .map_err(|e| anyhow::anyhow!("MCP proxy initialization failed: {e}"))
141 .inspect_err(|e| {
142 super::metadata::record_lifecycle_event(
143 &mati_root,
144 "serve_failed",
145 &format!("proxy init: {e:#}"),
146 )
147 })?;
148
149 let shutdown_reason: &'static str = match service.waiting().await {
150 Ok(_) => "client_disconnect",
151 Err(e) => {
152 super::metadata::record_lifecycle_event(
153 &mati_root,
154 "serve_failed",
155 &format!("proxy waiting: {e}"),
156 );
157 "mcp_waiting_error"
158 }
159 };
160 super::metadata::record_lifecycle_event(
161 &mati_root,
162 "serve_shutdown",
163 &format!("reason={shutdown_reason}"),
164 );
165 Ok(())
166}
167
168pub(crate) async fn proxy_daemon_result(
169 root: &Path,
170 cmd: &str,
171 args: serde_json::Value,
172) -> ProxyDaemonResult {
173 let result = proxy_daemon_result_no_spawn(root, cmd, &args).await;
192
193 if matches!(
204 &result,
205 ProxyDaemonResult::NotRunning | ProxyDaemonResult::StaleSocket
206 ) && super::daemon_lifecycle::ensure_daemon(root).await
207 {
208 match proxy_daemon_result_once(root, cmd, &args).await {
209 AttemptOutcome::Final(r) | AttemptOutcome::Retryable(r) => return r,
210 }
211 }
212
213 result
214}
215
216pub(crate) async fn proxy_daemon_result_no_spawn(
220 root: &Path,
221 cmd: &str,
222 args: &serde_json::Value,
223) -> ProxyDaemonResult {
224 match proxy_daemon_result_once(root, cmd, args).await {
225 AttemptOutcome::Final(result) => result,
226 AttemptOutcome::Retryable(_) => {
227 tokio::time::sleep(Duration::from_millis(100)).await;
230 match proxy_daemon_result_once(root, cmd, args).await {
231 AttemptOutcome::Final(result) | AttemptOutcome::Retryable(result) => result,
232 }
233 }
234 }
235}
236
237enum AttemptOutcome {
244 Final(ProxyDaemonResult),
245 Retryable(ProxyDaemonResult),
246}
247
248async fn proxy_daemon_result_once(
249 root: &Path,
250 cmd: &str,
251 args: &serde_json::Value,
252) -> AttemptOutcome {
253 let v2_cmd = super::protocol::v1_to_v2_command(cmd, args);
257 proxy_daemon_send_v2(root, v2_cmd).await
258}
259
260pub(crate) async fn proxy_daemon_v2(
270 root: &Path,
271 cmd: super::protocol::Command,
272) -> ProxyDaemonResult {
273 let v2_cmd = match serde_json::to_value(&cmd) {
275 Ok(v) => v,
276 Err(_) => return ProxyDaemonResult::Unresponsive,
277 };
278
279 let result = match proxy_daemon_send_v2(root, v2_cmd.clone()).await {
280 AttemptOutcome::Final(result) => result,
281 AttemptOutcome::Retryable(_) => {
282 tokio::time::sleep(Duration::from_millis(100)).await;
283 match proxy_daemon_send_v2(root, v2_cmd.clone()).await {
284 AttemptOutcome::Final(result) | AttemptOutcome::Retryable(result) => result,
285 }
286 }
287 };
288
289 if matches!(
293 &result,
294 ProxyDaemonResult::NotRunning | ProxyDaemonResult::StaleSocket
295 ) && super::daemon_lifecycle::ensure_daemon(root).await
296 {
297 match proxy_daemon_send_v2(root, v2_cmd).await {
298 AttemptOutcome::Final(r) | AttemptOutcome::Retryable(r) => return r,
299 }
300 }
301
302 result
303}
304
305async fn proxy_daemon_send_v2(root: &Path, v2_cmd: serde_json::Value) -> AttemptOutcome {
309 let sock_path = root.join("mati.sock");
310
311 if sock_path.as_os_str().len() > UNIX_SOCK_PATH_MAX {
312 tracing::warn!(
313 path = %sock_path.display(),
314 "mcp proxy: socket path exceeds Unix limit"
315 );
316 return AttemptOutcome::Final(ProxyDaemonResult::NotRunning);
318 }
319
320 if !sock_path.exists() {
321 return AttemptOutcome::Retryable(ProxyDaemonResult::NotRunning);
323 }
324
325 let stream = match UnixStream::connect(&sock_path).await {
326 Ok(s) => s,
327 Err(e) => {
328 let is_refused = e.kind() == std::io::ErrorKind::ConnectionRefused;
329 if is_refused {
330 use super::metadata::{self as meta, StaleCheckResult};
333 match meta::check_and_cleanup_stale(root) {
334 StaleCheckResult::StaleRemoved | StaleCheckResult::Clean => {
335 return AttemptOutcome::Retryable(ProxyDaemonResult::StaleSocket);
336 }
337 StaleCheckResult::OrphanSocket => {
338 let _ = std::fs::remove_file(&sock_path);
340 return AttemptOutcome::Retryable(ProxyDaemonResult::StaleSocket);
341 }
342 StaleCheckResult::LiveDaemon { .. } => {
343 return AttemptOutcome::Retryable(ProxyDaemonResult::Unresponsive);
345 }
346 }
347 }
348 return AttemptOutcome::Retryable(ProxyDaemonResult::NotRunning);
349 }
350 };
351
352 let daemon_session = super::metadata::read_metadata(root)
355 .map(|m| m.session)
356 .unwrap_or_else(uuid::Uuid::nil);
357 let request = serde_json::json!({
358 "v": super::protocol::PROTOCOL_VERSION,
359 "id": uuid::Uuid::new_v4(),
360 "session": daemon_session,
361 "cmd": v2_cmd,
362 });
363
364 let (reader, mut writer) = stream.into_split();
365 let mut bytes = match serde_json::to_vec(&request) {
366 Ok(b) => b,
367 Err(_) => return AttemptOutcome::Final(ProxyDaemonResult::Unresponsive),
368 };
369 bytes.push(b'\n');
370
371 if writer.write_all(&bytes).await.is_err() {
372 return AttemptOutcome::Retryable(ProxyDaemonResult::Unresponsive);
373 }
374 if writer.shutdown().await.is_err() {
375 return AttemptOutcome::Retryable(ProxyDaemonResult::Unresponsive);
376 }
377
378 let mut buf_reader = BufReader::new(reader);
379 let mut line = String::new();
380 match tokio::time::timeout(Duration::from_secs(2), buf_reader.read_line(&mut line)).await {
381 Ok(Ok(n)) if n > 0 => {}
382 _ => return AttemptOutcome::Retryable(ProxyDaemonResult::Unresponsive),
383 }
384
385 let resp: serde_json::Value = match serde_json::from_str(line.trim()) {
387 Ok(v) => v,
388 Err(_) => return AttemptOutcome::Final(ProxyDaemonResult::Unresponsive),
389 };
390
391 match resp.get("status").and_then(|s| s.as_str()) {
392 Some("ok") => {
393 let data = resp.get("data").cloned().unwrap_or(serde_json::Value::Null);
394 AttemptOutcome::Final(ProxyDaemonResult::Ok(
395 serde_json::json!({"ok": true, "v": 2, "data": data}),
396 ))
397 }
398 Some("err") => {
399 let code = resp
400 .get("code")
401 .and_then(|c| c.as_str())
402 .unwrap_or("internal");
403 let message = resp
404 .get("message")
405 .and_then(|m| m.as_str())
406 .unwrap_or("unknown error");
407 let envelope = serde_json::json!({
408 "ok": false, "v": 2, "error": message, "code": code
409 });
410 if code == "session_mismatch" {
415 tracing::debug!(
416 "mcp proxy: session mismatch — daemon may have restarted, will retry"
417 );
418 AttemptOutcome::Retryable(ProxyDaemonResult::Ok(envelope))
419 } else {
420 AttemptOutcome::Final(ProxyDaemonResult::Ok(envelope))
421 }
422 }
423 _ => AttemptOutcome::Retryable(ProxyDaemonResult::Unresponsive),
424 }
425}
426
427pub const UNIX_SOCK_PATH_MAX: usize = 104;
437
438const READ_TIMEOUT: Duration = Duration::from_secs(3);
440
441pub const MAX_CONCURRENT_CONNECTIONS: usize = 64;
448
449pub const AUTO_DRAIN_TIMEOUT: Duration = Duration::from_secs(10);
456
457#[derive(Default)]
468pub struct Shutdown {
469 flag: std::sync::atomic::AtomicBool,
470 notify: tokio::sync::Notify,
471}
472
473impl Shutdown {
474 pub fn new() -> Self {
475 Self::default()
476 }
477
478 pub fn signal(&self) {
480 self.flag.store(true, std::sync::atomic::Ordering::SeqCst);
481 self.notify.notify_waiters();
482 }
483
484 pub fn is_set(&self) -> bool {
485 self.flag.load(std::sync::atomic::Ordering::SeqCst)
493 }
494
495 pub async fn wait(&self) {
498 let notified = self.notify.notified();
499 tokio::pin!(notified);
500 notified.as_mut().enable();
503 if self.is_set() {
504 return;
505 }
506 notified.await;
507 }
508}
509
510const PROTOCOL_VERSION: u32 = 1;
512
513#[derive(Debug, Deserialize)]
514pub(crate) struct SocketRequest {
515 pub cmd: String,
516 #[allow(dead_code)] #[serde(default, rename = "v")]
518 pub version: Option<u32>,
519 #[serde(default)]
520 pub args: serde_json::Value,
521}
522
523#[derive(Debug, Serialize)]
524pub(crate) struct SocketResponse {
525 pub(crate) ok: bool,
526 #[serde(rename = "v")]
527 version: u32,
528 #[serde(skip_serializing_if = "Option::is_none")]
529 pub(crate) data: Option<serde_json::Value>,
530 #[serde(skip_serializing_if = "Option::is_none")]
531 pub(crate) error: Option<String>,
532}
533
534impl SocketResponse {
535 pub(crate) fn ok(data: serde_json::Value) -> Self {
536 Self {
537 ok: true,
538 version: PROTOCOL_VERSION,
539 data: Some(data),
540 error: None,
541 }
542 }
543 pub(crate) fn err(msg: impl Into<String>) -> Self {
544 Self {
545 ok: false,
546 version: PROTOCOL_VERSION,
547 data: None,
548 error: Some(msg.into()),
549 }
550 }
551}
552
553pub async fn socket_handle_connection(
554 graph: Arc<tokio::sync::RwLock<Graph>>,
555 repo_root: &Path,
556 stream: UnixStream,
557 peer: super::metadata::PeerContext,
558 daemon_session: uuid::Uuid,
559) -> Result<()> {
560 use super::protocol::MAX_FRAME_SIZE;
561 use tokio::io::AsyncReadExt;
562
563 let (reader, mut writer) = stream.into_split();
564 let mut buf = String::new();
565
566 let limited = reader.take(MAX_FRAME_SIZE as u64 + 1);
571 let mut buf_reader = BufReader::new(limited);
572 match tokio::time::timeout(READ_TIMEOUT, buf_reader.read_line(&mut buf)).await {
573 Ok(Ok(0)) => return Ok(()),
574 Ok(Ok(_)) => {}
575 Ok(Err(e)) => anyhow::bail!("read error: {e}"),
576 Err(_) => anyhow::bail!("read timeout"),
577 }
578
579 if buf.len() > MAX_FRAME_SIZE {
580 let resp = super::protocol::Response::err(
581 uuid::Uuid::nil(),
582 super::protocol::ErrorCode::FrameTooLarge,
583 format!("request exceeds {MAX_FRAME_SIZE} byte limit"),
584 );
585 let json = serde_json::to_string(&resp)?;
586 writer.write_all(json.as_bytes()).await?;
587 writer.write_all(b"\n").await?;
588 writer.flush().await?;
589 return Ok(());
590 }
591
592 let trimmed = buf.trim();
593
594 let v2_req = match serde_json::from_str::<super::protocol::Request>(trimmed) {
599 Ok(r) => r,
600 Err(e) => {
601 let resp = super::protocol::Response::err(
604 uuid::Uuid::nil(),
605 super::protocol::ErrorCode::MalformedRequest,
606 format!("invalid v2 request: {e}"),
607 );
608 let json = serde_json::to_string(&resp)?;
609 writer.write_all(json.as_bytes()).await?;
610 writer.write_all(b"\n").await?;
611 writer.flush().await?;
612 return Ok(());
613 }
614 };
615
616 let ctx = super::dispatch_v2::RequestContext {
617 peer,
618 daemon_session,
619 repo_root: repo_root.to_path_buf(),
620 };
621 let resp = super::dispatch_v2::dispatch_v2(&graph, &ctx, v2_req).await;
622 let json = serde_json::to_string(&resp)?;
623 writer.write_all(json.as_bytes()).await?;
624 writer.write_all(b"\n").await?;
625 writer.flush().await?;
626 Ok(())
627}
628
629fn build_v1_dispatch_ctx(repo_root: &Path) -> super::dispatch_v2::RequestContext {
636 super::dispatch_v2::RequestContext {
637 peer: super::metadata::PeerContext {
638 uid: super::metadata::current_euid(),
639 pid: Some(std::process::id()),
640 },
641 daemon_session: uuid::Uuid::nil(),
642 repo_root: repo_root.to_path_buf(),
643 }
644}
645
646pub(crate) async fn socket_dispatch(
647 graph: &Arc<tokio::sync::RwLock<Graph>>,
648 repo_root: &Path,
649 req: &SocketRequest,
650) -> SocketResponse {
651 use crate::store::session as sess;
652
653 match req.cmd.as_str() {
654 "ping" => SocketResponse::ok(serde_json::Value::String("pong".into())),
655
656 "metrics" => match super::metrics::snapshot() {
661 Some(snap) => match serde_json::to_value(&snap) {
662 Ok(v) => SocketResponse::ok(v),
663 Err(e) => SocketResponse::err(format!("metrics serialize: {e}")),
664 },
665 None => SocketResponse::ok(serde_json::Value::Null),
666 },
667
668 "mem_get" => {
677 let params = match serde_json::from_value::<MemGetParams>(req.args.clone()) {
678 Ok(p) => p,
679 Err(e) => return SocketResponse::err(format!("invalid mem_get args: {e}")),
680 };
681 let input = super::protocol::MemGetInput { key: params.key };
682 let ctx = build_v1_dispatch_ctx(repo_root);
683 let g = graph.read().await;
684 match super::handlers::handle_mem_get(
685 g.store(),
686 graph,
687 &ctx,
688 uuid::Uuid::new_v4(),
689 &input,
690 )
691 .await
692 {
693 Ok(v) => SocketResponse::ok(serde_json::Value::String(
694 serde_json::to_string_pretty(&v).unwrap_or_else(|_| "{}".into()),
695 )),
696 Err((_code, msg)) => SocketResponse::err(msg),
697 }
698 }
699
700 "mem_query" => {
701 let params = match serde_json::from_value::<MemQueryParams>(req.args.clone()) {
702 Ok(p) => p,
703 Err(e) => return SocketResponse::err(format!("invalid mem_query args: {e}")),
704 };
705 let mode = match params.mode.as_str() {
706 "text" => super::protocol::QueryMode::Text,
707 "tag" => super::protocol::QueryMode::Tag,
708 "graph" => super::protocol::QueryMode::Graph,
709 "semantic" => super::protocol::QueryMode::Semantic,
710 other => {
711 return SocketResponse::err(format!(
712 "unknown mode: {other}. Valid modes: text, tag, graph, semantic"
713 ));
714 }
715 };
716 let input = super::protocol::MemQueryInput {
717 query: params.query,
718 mode,
719 limit: params.limit as u32,
720 };
721 let g = graph.read().await;
722 match super::handlers::handle_mem_query(g.store(), &g, &input).await {
723 Ok(v) => SocketResponse::ok(serde_json::Value::String(
724 serde_json::to_string_pretty(&v).unwrap_or_else(|_| "{}".into()),
725 )),
726 Err((_code, msg)) => SocketResponse::err(msg),
727 }
728 }
729
730 "mem_bootstrap" => {
731 let params = match serde_json::from_value::<MemBootstrapParams>(req.args.clone()) {
732 Ok(p) => p,
733 Err(e) => return SocketResponse::err(format!("invalid mem_bootstrap args: {e}")),
734 };
735 let input = super::protocol::MemBootstrapInput {
736 context_files: params.context_files,
737 };
738 let ctx = build_v1_dispatch_ctx(repo_root);
739 let g = graph.read().await;
740 match super::handlers::handle_mem_bootstrap(
741 g.store(),
742 &g,
743 graph,
744 &ctx,
745 uuid::Uuid::new_v4(),
746 &input,
747 )
748 .await
749 {
750 Ok(s) => SocketResponse::ok(serde_json::Value::String(s)),
751 Err((_code, msg)) => SocketResponse::err(msg),
752 }
753 }
754
755 "mem_set" => {
756 let params = match serde_json::from_value::<MemSetParams>(req.args.clone()) {
757 Ok(p) => p,
758 Err(e) => return SocketResponse::err(format!("invalid mem_set args: {e}")),
759 };
760 let ctx = build_v1_dispatch_ctx(repo_root);
761 let response =
762 super::handlers::handle_mem_set(graph, &ctx, uuid::Uuid::new_v4(), ¶ms).await;
763 SocketResponse::ok(serde_json::Value::String(response))
764 }
765
766 "get" => {
770 let key = match req.args.get("key").and_then(|v| v.as_str()) {
771 Some(k) => k,
772 None => return SocketResponse::err("missing args.key"),
773 };
774 let g = graph.read().await;
775 let store = g.store();
776 match store.get(key).await {
777 Ok(Some(record)) => {
778 let confirmed = record
779 .payload_as::<crate::store::GotchaRecord>()
780 .map(|g| g.confirmed)
781 .unwrap_or(false);
782 match serde_json::to_value(&record) {
783 Ok(mut val) => {
784 if let Some(obj) = val.as_object_mut() {
785 obj.insert(
786 "confirmed".to_string(),
787 serde_json::Value::Bool(confirmed),
788 );
789 }
790 SocketResponse::ok(val)
791 }
792 Err(e) => SocketResponse::err(format!("serialize: {e}")),
793 }
794 }
795 Ok(None) => SocketResponse::ok(serde_json::Value::Null),
796 Err(e) => SocketResponse::err(format!("store: {e}")),
797 }
798 }
799
800 "hook_evaluate" => {
804 let file_key = match req.args.get("file_key").and_then(|v| v.as_str()) {
805 Some(k) => k,
806 None => return SocketResponse::err("missing args.file_key"),
807 };
808 let include_recent = req
809 .args
810 .get("include_recent")
811 .and_then(|v| v.as_bool())
812 .unwrap_or(false);
813 let actor = req.args.get("actor").and_then(|v| v.as_str());
814
815 let g = graph.read().await;
816 let store = g.store();
817
818 let (file_record, store_error) = match store.get(file_key).await {
820 Ok(Some(r)) => (serde_json::to_value(&r).ok(), false),
821 Ok(None) => (None, false),
822 Err(e) => {
823 tracing::warn!("hook_evaluate: store.get({file_key}) failed: {e}");
824 (None, true)
825 }
826 };
827
828 let mut gotcha_records = serde_json::Map::new();
845 let mut gotcha_error = false;
846 let mut linked_keys: std::collections::BTreeSet<String> =
847 std::collections::BTreeSet::new();
848
849 if let Some(ref fr) = file_record {
850 if let Some(keys) = fr
851 .pointer("/payload/gotcha_keys")
852 .and_then(|v| v.as_array())
853 {
854 for gk in keys {
855 if let Some(key_str) = gk.as_str() {
856 linked_keys.insert(key_str.to_string());
857 }
858 }
859 }
860 }
861
862 for nkey in g.neighbors(file_key, &crate::graph::EdgeKind::HasGotcha) {
865 linked_keys.insert(nkey);
866 }
867
868 if linked_keys.is_empty() && file_record.is_some() {
874 let rel_path = file_key.strip_prefix("file:").unwrap_or(file_key);
875 if let Ok(all_gotchas) = store.scan_prefix("gotcha:").await {
876 for r in all_gotchas {
877 if !matches!(r.lifecycle, crate::store::RecordLifecycle::Active) {
878 continue;
879 }
880 if let Some(g) = r.payload_as::<crate::store::GotchaRecord>() {
881 if g.affected_files.iter().any(|af| af == rel_path) {
882 linked_keys.insert(r.key.clone());
883 }
884 }
885 }
886 }
887 }
888
889 for key_str in &linked_keys {
890 match store.get(key_str).await {
891 Ok(Some(grec)) => {
892 if !matches!(grec.lifecycle, crate::store::RecordLifecycle::Active) {
894 continue;
895 }
896 let confirmed = grec
898 .payload_as::<crate::store::GotchaRecord>()
899 .map(|g| g.confirmed)
900 .unwrap_or(false);
901 if let Ok(mut val) = serde_json::to_value(&grec) {
902 if let Some(obj) = val.as_object_mut() {
903 obj.insert(
904 "confirmed".to_string(),
905 serde_json::Value::Bool(confirmed),
906 );
907 }
908 gotcha_records.insert(key_str.clone(), val);
909 }
910 }
911 Ok(None) => {}
912 Err(e) => {
913 tracing::warn!("hook_evaluate: store.get({key_str}) failed: {e}");
914 gotcha_error = true;
915 }
916 }
917 }
918
919 let file_record = if let Some(mut fr) = file_record {
924 if !gotcha_records.is_empty() {
925 if let Some(payload) = fr.pointer_mut("/payload") {
926 if let Some(obj) = payload.as_object_mut() {
927 let keys: Vec<serde_json::Value> = gotcha_records
928 .keys()
929 .map(|k| serde_json::Value::String(k.clone()))
930 .collect();
931 obj.insert("gotcha_keys".to_string(), serde_json::Value::Array(keys));
932 }
933 }
934 }
935 Some(fr)
936 } else {
937 None
938 };
939
940 let consulted = sess::check_consulted(store, file_key, actor)
942 .await
943 .unwrap_or(false);
944 let consulted_recent = if include_recent {
945 sess::check_consulted_recent(store, file_key, 900, actor)
946 .await
947 .unwrap_or(false)
948 } else {
949 false
950 };
951
952 SocketResponse::ok(serde_json::json!({
953 "file_key": file_key,
954 "file_record": file_record,
955 "gotcha_records": gotcha_records,
956 "consulted": consulted,
957 "consulted_recent": consulted_recent,
958 "store_error": store_error,
959 "gotcha_error": gotcha_error,
960 }))
961 }
962
963 "log_hit" => {
964 let key = match req.args.get("key").and_then(|v| v.as_str()) {
965 Some(k) => k,
966 None => return SocketResponse::err("missing args.key"),
967 };
968 let g = graph.read().await;
969 if let Err(e) = sess::log_hit(g.store(), key).await {
970 tracing::warn!("daemon socket log_hit: {e}");
971 }
972 SocketResponse::ok(serde_json::Value::Null)
973 }
974
975 "log_miss" => {
976 let key = match req.args.get("key").and_then(|v| v.as_str()) {
977 Some(k) => k,
978 None => return SocketResponse::err("missing args.key"),
979 };
980 let g = graph.read().await;
981 if let Err(e) = sess::log_miss(g.store(), key).await {
982 tracing::warn!("daemon socket log_miss: {e}");
983 }
984 SocketResponse::ok(serde_json::Value::Null)
985 }
986
987 "log_compliance_miss" => {
988 let key = match req.args.get("key").and_then(|v| v.as_str()) {
989 Some(k) => k,
990 None => return SocketResponse::err("missing args.key"),
991 };
992 let g = graph.read().await;
993 let store = g.store();
994 if let Err(e) = sess::log_compliance_miss(store, key).await {
995 tracing::warn!("daemon socket log_compliance_miss: {e}");
996 }
997 let _ = crate::store::enforcement::record_event(
999 store,
1000 crate::store::enforcement::EnforcementEventType::Deny,
1001 crate::store::enforcement::SubjectKind::File,
1002 key.to_string(),
1003 "claude".to_string(),
1004 None,
1005 "gotcha_above_threshold".to_string(),
1006 None,
1007 )
1008 .await;
1009 SocketResponse::ok(serde_json::Value::Null)
1010 }
1011
1012 "log_compliance_hit" => {
1013 let key = match req.args.get("key").and_then(|v| v.as_str()) {
1014 Some(k) => k,
1015 None => return SocketResponse::err("missing args.key"),
1016 };
1017 let g = graph.read().await;
1018 let store = g.store();
1019 if let Err(e) = sess::log_compliance_hit(store, key).await {
1020 tracing::warn!("daemon socket log_compliance_hit: {e}");
1021 }
1022 let _ = crate::store::enforcement::record_event(
1024 store,
1025 crate::store::enforcement::EnforcementEventType::AllowAfterReceipt,
1026 crate::store::enforcement::SubjectKind::File,
1027 key.to_string(),
1028 "claude".to_string(),
1029 None,
1030 "receipt_valid".to_string(),
1031 None,
1032 )
1033 .await;
1034 SocketResponse::ok(serde_json::Value::Null)
1035 }
1036
1037 "log_codex_shell_miss" => {
1038 let key = match req.args.get("key").and_then(|v| v.as_str()) {
1039 Some(k) => k,
1040 None => return SocketResponse::err("missing args.key"),
1041 };
1042 let g = graph.read().await;
1043 if let Err(e) = sess::log_codex_shell_miss(g.store(), key).await {
1044 tracing::warn!("daemon socket log_codex_shell_miss: {e}");
1045 }
1046 SocketResponse::ok(serde_json::Value::Null)
1047 }
1048
1049 "log_bootstrap" => {
1050 let key = match req.args.get("key").and_then(|v| v.as_str()) {
1051 Some(k) => k,
1052 None => return SocketResponse::err("missing args.key"),
1053 };
1054 let g = graph.read().await;
1055 if let Err(e) = sess::log_bootstrap(g.store(), key).await {
1056 tracing::warn!("daemon socket log_bootstrap: {e}");
1057 }
1058 SocketResponse::ok(serde_json::Value::Null)
1059 }
1060
1061 "log_prompt_nudge" => {
1062 let key = match req.args.get("key").and_then(|v| v.as_str()) {
1063 Some(k) => k,
1064 None => return SocketResponse::err("missing args.key"),
1065 };
1066 let g = graph.read().await;
1067 if let Err(e) = sess::log_prompt_nudge(g.store(), key).await {
1068 tracing::warn!("daemon socket log_prompt_nudge: {e}");
1069 }
1070 SocketResponse::ok(serde_json::Value::Null)
1071 }
1072
1073 "session_check_consulted" => {
1074 let key = match req.args.get("key").and_then(|v| v.as_str()) {
1075 Some(k) => k,
1076 None => return SocketResponse::err("missing args.key"),
1077 };
1078 let g = graph.read().await;
1079 match sess::check_consulted(g.store(), key, None).await {
1080 Ok(found) => SocketResponse::ok(serde_json::Value::Bool(found)),
1081 Err(e) => SocketResponse::err(format!("store: {e}")),
1082 }
1083 }
1084
1085 "session_check_consulted_recent" => {
1086 let key = match req.args.get("key").and_then(|v| v.as_str()) {
1087 Some(k) => k,
1088 None => return SocketResponse::err("missing args.key"),
1089 };
1090 let ttl_secs = req
1091 .args
1092 .get("ttl_secs")
1093 .and_then(|v| v.as_u64())
1094 .unwrap_or(900);
1095 let g = graph.read().await;
1096 match sess::check_consulted_recent(g.store(), key, ttl_secs, None).await {
1097 Ok(found) => SocketResponse::ok(serde_json::Value::Bool(found)),
1098 Err(e) => SocketResponse::err(format!("store: {e}")),
1099 }
1100 }
1101
1102 "session_flush" => {
1103 let g = graph.read().await;
1104 if let Err(e) = sess::session_flush(g.store()).await {
1105 tracing::warn!("daemon socket session_flush: {e}");
1106 }
1107 SocketResponse::ok(serde_json::Value::Null)
1108 }
1109
1110 "session_harvest" => {
1111 let g = graph.read().await;
1114 if let Err(e) = sess::session_harvest_no_staleness(g.store()).await {
1115 tracing::warn!("daemon socket session_harvest: {e}");
1116 }
1117 SocketResponse::ok(serde_json::Value::Null)
1118 }
1119
1120 "reparse" => {
1121 let path = match req.args.get("path").and_then(|v| v.as_str()) {
1122 Some(p) => p,
1123 None => return SocketResponse::err("missing args.path"),
1124 };
1125 let g = graph.read().await;
1126 if let Err(e) = crate::analysis::reparse::reparse_impl(g.store(), repo_root, path).await
1127 {
1128 tracing::warn!("daemon socket reparse: {e}");
1129 }
1130 SocketResponse::ok(serde_json::Value::Null)
1131 }
1132
1133 "edit_hook" => {
1134 let path = match req.args.get("path").and_then(|v| v.as_str()) {
1135 Some(p) => p,
1136 None => return SocketResponse::err("missing args.path"),
1137 };
1138 let file_key = format!("file:{path}");
1139 let g = graph.read().await;
1140 let store = g.store();
1141 if let Err(e) = sess::log_hit(store, &file_key).await {
1142 tracing::warn!("daemon socket edit_hook: log_hit failed: {e}");
1143 }
1144 if let Err(e) = crate::analysis::reparse::reparse_impl(store, repo_root, path).await {
1145 tracing::warn!("daemon socket edit_hook: reparse failed (non-fatal): {e}");
1146 }
1147
1148 {
1151 use crate::analysis::blast_radius::BlastRadius;
1152 use crate::graph::edges::EdgeKind;
1153
1154 let mut keys_to_update = vec![file_key.clone()];
1155 keys_to_update.extend(g.neighbors_incoming(&file_key, &EdgeKind::Imports));
1158 keys_to_update.extend(g.neighbors(&file_key, &EdgeKind::Imports));
1160
1161 for key in keys_to_update {
1162 let br = BlastRadius::compute(&key, &g);
1163 if let Ok(Some(mut rec)) = store.get(&key).await {
1164 if let Some(mut fr) = rec.payload_as::<crate::store::record::FileRecord>() {
1165 fr.blast_radius = Some(br);
1166 rec.payload = serde_json::to_value(&fr).ok();
1167 let _ = store.put(&key, &rec).await;
1168 }
1169 }
1170 }
1171 }
1172
1173 {
1177 let mut affected_keys = vec![file_key.clone()];
1178 let d1 = g.neighbors_incoming(&file_key, &EdgeKind::Imports);
1179 for d1k in &d1 {
1180 affected_keys.push(d1k.clone());
1181 affected_keys.extend(g.neighbors_incoming(d1k, &EdgeKind::Imports));
1182 }
1183 let mut neighborhood_recs = Vec::new();
1185 for key in &affected_keys {
1186 if let Ok(Some(rec)) = store.get(key).await {
1187 neighborhood_recs.push(rec);
1188 }
1189 }
1190 if let Ok(Some(rec)) = store.get(&file_key).await {
1192 if !neighborhood_recs.iter().any(|r| r.key == file_key) {
1193 neighborhood_recs.push(rec);
1194 }
1195 }
1196 let propagation =
1197 crate::analysis::propagation::compute_propagation(&neighborhood_recs, &g);
1198 for (key, prop) in &propagation {
1199 if let Ok(Some(mut rec)) = store.get(key).await {
1200 if let Some(mut fr) = rec.payload_as::<crate::store::record::FileRecord>() {
1201 fr.propagated_staleness = Some(prop.clone());
1202 rec.payload = serde_json::to_value(&fr).ok();
1203 let _ = store.put(key, &rec).await;
1204 }
1205 }
1206 }
1207 }
1208
1209 SocketResponse::ok(serde_json::Value::Null)
1210 }
1211
1212 "doc_capture" => {
1213 let path = match req.args.get("path").and_then(|v| v.as_str()) {
1214 Some(p) => p,
1215 None => return SocketResponse::err("missing args.path"),
1216 };
1217 let content = req
1218 .args
1219 .get("content")
1220 .and_then(|v| v.as_str())
1221 .unwrap_or("");
1222 let g = graph.read().await;
1223 if let Err(e) = sess::doc_capture(g.store(), path, content).await {
1224 tracing::warn!("daemon socket doc_capture: {e}");
1225 }
1226 SocketResponse::ok(serde_json::Value::Null)
1227 }
1228
1229 "scan_prefix" => {
1230 let prefix = match req.args.get("prefix").and_then(|v| v.as_str()) {
1231 Some(p) => p,
1232 None => return SocketResponse::err("missing args.prefix"),
1233 };
1234 let g = graph.read().await;
1235 match g.store().scan_prefix(prefix).await {
1236 Ok(records) => match serde_json::to_value(&records) {
1237 Ok(val) => SocketResponse::ok(val),
1238 Err(e) => SocketResponse::err(format!("serialize: {e}")),
1239 },
1240 Err(e) => SocketResponse::err(format!("store: {e}")),
1241 }
1242 }
1243
1244 "scan_enforcement_events" => {
1245 let since_seq = req
1246 .args
1247 .get("since_seq")
1248 .and_then(|v| v.as_u64())
1249 .unwrap_or(0);
1250 let until_seq = req
1251 .args
1252 .get("until_seq")
1253 .and_then(|v| v.as_u64())
1254 .unwrap_or(u64::MAX);
1255 let g = graph.read().await;
1256 match crate::store::enforcement::scan_enforcement_events(
1257 g.store(),
1258 since_seq,
1259 until_seq,
1260 )
1261 .await
1262 {
1263 Ok(events) => match serde_json::to_value(&events) {
1264 Ok(val) => SocketResponse::ok(val),
1265 Err(e) => SocketResponse::err(format!("serialize: {e}")),
1266 },
1267 Err(e) => SocketResponse::err(format!("store: {e}")),
1268 }
1269 }
1270
1271 "put" => {
1272 use crate::store::Record;
1273 let key = match req.args.get("key").and_then(|v| v.as_str()) {
1274 Some(k) => k,
1275 None => return SocketResponse::err("missing args.key"),
1276 };
1277 let record: Record = match req
1278 .args
1279 .get("record")
1280 .and_then(|v| serde_json::from_value(v.clone()).ok())
1281 {
1282 Some(r) => r,
1283 None => return SocketResponse::err("put: invalid record"),
1284 };
1285 let g = graph.read().await;
1286 match g.store().put(key, &record).await {
1287 Ok(()) => SocketResponse::ok(serde_json::Value::Null),
1288 Err(e) => SocketResponse::err(format!("store put: {e}")),
1289 }
1290 }
1291
1292 "delete" => {
1293 let key = match req.args.get("key").and_then(|v| v.as_str()) {
1294 Some(k) => k,
1295 None => return SocketResponse::err("missing args.key"),
1296 };
1297 let g = graph.read().await;
1298 match g.store().delete(key).await {
1299 Ok(()) => SocketResponse::ok(serde_json::Value::Null),
1300 Err(e) => SocketResponse::err(format!("delete: {e}")),
1301 }
1302 }
1303
1304 "history" => {
1305 let key = match req.args.get("key").and_then(|v| v.as_str()) {
1306 Some(k) => k,
1307 None => return SocketResponse::err("missing args.key"),
1308 };
1309 let limit = req.args.get("limit").and_then(|v| v.as_u64()).unwrap_or(50) as usize;
1310 let g = graph.read().await;
1311 match g.store().history(key, limit) {
1312 Ok(entries) => match serde_json::to_value(&entries) {
1313 Ok(val) => SocketResponse::ok(val),
1314 Err(e) => SocketResponse::err(format!("serialize: {e}")),
1315 },
1316 Err(e) => SocketResponse::err(format!("history: {e}")),
1317 }
1318 }
1319
1320 "history_since" => {
1321 let key = match req.args.get("key").and_then(|v| v.as_str()) {
1322 Some(k) => k,
1323 None => return SocketResponse::err("missing args.key"),
1324 };
1325 let since_ts = req
1326 .args
1327 .get("since_ts")
1328 .and_then(|v| v.as_u64())
1329 .unwrap_or(0);
1330 let limit = req.args.get("limit").and_then(|v| v.as_u64()).unwrap_or(50) as usize;
1331 let g = graph.read().await;
1332 match g.store().history_since(key, since_ts, limit) {
1333 Ok(entries) => match serde_json::to_value(&entries) {
1334 Ok(val) => SocketResponse::ok(val),
1335 Err(e) => SocketResponse::err(format!("serialize: {e}")),
1336 },
1337 Err(e) => SocketResponse::err(format!("history_since: {e}")),
1338 }
1339 }
1340
1341 "gotcha_write" => {
1342 use crate::store::gotcha_ops::apply_gotcha_write;
1343 use crate::store::Record;
1344
1345 let record: Record = match req
1346 .args
1347 .get("record")
1348 .and_then(|v| serde_json::from_value(v.clone()).ok())
1349 {
1350 Some(r) => r,
1351 None => return SocketResponse::err("missing or invalid args.record"),
1352 };
1353 let new_files: Vec<String> = req
1354 .args
1355 .get("new_files")
1356 .and_then(|v| serde_json::from_value(v.clone()).ok())
1357 .unwrap_or_default();
1358 let old_files: Vec<String> = req
1359 .args
1360 .get("old_files")
1361 .and_then(|v| serde_json::from_value(v.clone()).ok())
1362 .unwrap_or_default();
1363 let is_new = req
1364 .args
1365 .get("is_new")
1366 .and_then(|v| v.as_bool())
1367 .unwrap_or(false);
1368
1369 {
1370 let g = graph.read().await;
1371 match apply_gotcha_write(g.store(), &record, &old_files, &new_files, is_new).await {
1372 Ok(()) => {}
1373 Err(e) => return SocketResponse::err(format!("{e}")),
1374 }
1375 }
1376
1377 let record_key = record.key.clone();
1383 let old_set: std::collections::HashSet<&str> =
1384 old_files.iter().map(String::as_str).collect();
1385 let new_set: std::collections::HashSet<&str> =
1386 new_files.iter().map(String::as_str).collect();
1387 {
1388 let mut g = graph.write().await;
1389 for file_path in new_set.difference(&old_set) {
1390 let file_key = format!("file:{file_path}");
1391 let _ = g
1392 .add_edge(&file_key, EdgeKind::HasGotcha, &record_key)
1393 .await;
1394 }
1395 for file_path in old_set.difference(&new_set) {
1396 let file_key = format!("file:{file_path}");
1397 let _ = g
1398 .remove_edge(&file_key, &EdgeKind::HasGotcha, &record_key)
1399 .await;
1400 }
1401 }
1402
1403 SocketResponse::ok(serde_json::Value::String("written".into()))
1404 }
1405
1406 "gotcha_tombstone" => {
1407 use crate::store::gotcha_ops::apply_gotcha_tombstone;
1408
1409 let key = match req.args.get("key").and_then(|v| v.as_str()) {
1410 Some(k) => k,
1411 None => return SocketResponse::err("missing args.key"),
1412 };
1413 if !key.starts_with("gotcha:") {
1414 return SocketResponse::err("delete action only applies to gotcha: keys");
1415 }
1416 let mut affected_files: Vec<String> = req
1419 .args
1420 .get("affected_files")
1421 .and_then(|v| serde_json::from_value(v.clone()).ok())
1422 .unwrap_or_default();
1423
1424 let g = graph.read().await;
1425 if affected_files.is_empty() {
1426 if let Ok(Some(record)) = g.store().get(key).await {
1427 if let Some(gotcha) = record.payload_as::<crate::store::GotchaRecord>() {
1428 affected_files = gotcha.affected_files;
1429 }
1430 }
1431 }
1432 match apply_gotcha_tombstone(g.store(), key, &affected_files).await {
1433 Ok(()) => SocketResponse::ok(serde_json::Value::String("tombstoned".into())),
1434 Err(e) => SocketResponse::err(format!("{e}")),
1435 }
1436 }
1437
1438 "gotcha_confirm" => {
1439 let key = match req.args.get("key").and_then(|v| v.as_str()) {
1440 Some(k) => k,
1441 None => return SocketResponse::err("missing args.key"),
1442 };
1443
1444 let g = graph.read().await;
1446 let store = g.store();
1447 let mut record = match store.get(key).await {
1448 Ok(Some(r)) => r,
1449 Ok(None) => return SocketResponse::err(format!("record not found: {key}")),
1450 Err(e) => return SocketResponse::err(format!("store get: {e}")),
1451 };
1452
1453 if record.category != crate::store::record::Category::Gotcha {
1454 return SocketResponse::err(format!("{key} is not a gotcha record"));
1455 }
1456
1457 if !matches!(
1458 record.lifecycle,
1459 crate::store::record::RecordLifecycle::Active
1460 ) {
1461 return SocketResponse::err(format!(
1462 "{key} is tombstoned — cannot confirm a deleted record"
1463 ));
1464 }
1465
1466 if let Some(ref mut payload) = record.payload {
1468 if let Some(obj) = payload.as_object_mut() {
1469 if let Some(sev) = obj
1470 .get("severity")
1471 .and_then(|v| v.as_str())
1472 .map(|s| s.to_lowercase())
1473 {
1474 obj.insert("severity".to_string(), serde_json::Value::String(sev));
1475 }
1476 obj.insert("confirmed".to_string(), serde_json::Value::Bool(true));
1477 }
1478 }
1479
1480 record.source = crate::store::record::RecordSource::DeveloperManual;
1481 record.confidence.value = crate::store::record::ConfidenceScore::base_for_source(
1482 &crate::store::record::RecordSource::DeveloperManual,
1483 );
1484 record.confidence.confirmation_count += 1;
1485 record.quality = crate::health::quality::analyze(&record);
1486
1487 let now = std::time::SystemTime::now()
1488 .duration_since(std::time::UNIX_EPOCH)
1489 .unwrap_or_default()
1490 .as_secs();
1491 record.updated_at = now;
1492 record.version.logical_clock += 1;
1493 record.version.wall_clock = now;
1494
1495 let affected_files: Vec<String> = record
1497 .payload_as::<crate::store::record::GotchaRecord>()
1498 .map(|g| g.affected_files)
1499 .unwrap_or_default();
1500
1501 if let Err(e) = store.put(key, &record).await {
1502 return SocketResponse::err(format!("store put: {e}"));
1503 }
1504
1505 for file_path in &affected_files {
1507 let file_key = format!("file:{file_path}");
1508 if let Ok(Some(mut file_record)) = store.get(&file_key).await {
1509 let needs_link = file_record
1510 .payload
1511 .as_ref()
1512 .and_then(|p| p.get("gotcha_keys"))
1513 .and_then(|v| v.as_array())
1514 .map(|arr| !arr.iter().any(|v| v.as_str() == Some(key)))
1515 .unwrap_or(true);
1516 if needs_link {
1517 if let Some(ref mut payload) = file_record.payload {
1518 if let Some(obj) = payload.as_object_mut() {
1519 let arr = obj.entry("gotcha_keys").or_insert(serde_json::json!([]));
1520 if let Some(arr) = arr.as_array_mut() {
1521 arr.push(serde_json::Value::String(key.to_string()));
1522 }
1523 }
1524 }
1525 let _ = store.put(&file_key, &file_record).await;
1526 }
1527 }
1528 }
1529
1530 crate::store::gotcha_ops::propagate_confirmation_to_files(store, &affected_files).await;
1532
1533 let _ = crate::store::enforcement::record_event(
1535 store,
1536 crate::store::enforcement::EnforcementEventType::ControlChanged {
1537 change_kind: crate::store::enforcement::ControlChangeKind::Confirmed,
1538 },
1539 crate::store::enforcement::SubjectKind::Control,
1540 key.to_string(),
1541 "developer".to_string(),
1542 None,
1543 "control_confirmed".to_string(),
1544 None,
1545 )
1546 .await;
1547
1548 SocketResponse::ok(serde_json::json!({"confirmed": true, "key": key}))
1549 }
1550
1551 other => SocketResponse::err(format!("unknown command: {other}")),
1552 }
1553}
1554
1555pub const IDLE_SHUTDOWN_SECS: u64 = 30 * 60; pub const IDLE_CHECK_INTERVAL_SECS: u64 = 5 * 60; #[cfg(test)]
1568mod shutdown_tests {
1569 use super::*;
1570 use std::sync::Arc;
1571 use std::time::Duration;
1572
1573 #[tokio::test]
1574 async fn shutdown_signal_before_wait_returns_immediately() {
1575 let s = Shutdown::new();
1578 s.signal();
1579 tokio::time::timeout(Duration::from_millis(100), s.wait())
1581 .await
1582 .expect("wait must return immediately when already signaled");
1583 assert!(s.is_set());
1584 }
1585
1586 #[tokio::test]
1587 async fn shutdown_wait_then_signal_wakes_waiter() {
1588 let s = Arc::new(Shutdown::new());
1589 let s_clone = Arc::clone(&s);
1590 let waiter = tokio::spawn(async move { s_clone.wait().await });
1591
1592 tokio::time::sleep(Duration::from_millis(20)).await;
1594 assert!(!s.is_set());
1595
1596 s.signal();
1597
1598 tokio::time::timeout(Duration::from_millis(200), waiter)
1599 .await
1600 .expect("waiter must wake within timeout")
1601 .expect("waiter task should not panic");
1602 assert!(s.is_set());
1603 }
1604
1605 #[tokio::test]
1606 async fn shutdown_multiple_concurrent_waiters_all_wake() {
1607 let s = Arc::new(Shutdown::new());
1609 let mut handles = Vec::new();
1610 for _ in 0..16 {
1611 let s = Arc::clone(&s);
1612 handles.push(tokio::spawn(async move { s.wait().await }));
1613 }
1614 tokio::time::sleep(Duration::from_millis(20)).await;
1616
1617 s.signal();
1618
1619 for h in handles {
1620 tokio::time::timeout(Duration::from_millis(200), h)
1621 .await
1622 .expect("each waiter must wake within timeout")
1623 .expect("waiter task should not panic");
1624 }
1625 }
1626
1627 #[tokio::test]
1628 async fn shutdown_signal_is_idempotent() {
1629 let s = Shutdown::new();
1631 s.signal();
1632 s.signal();
1633 s.signal();
1634 tokio::time::timeout(Duration::from_millis(100), s.wait())
1635 .await
1636 .expect("wait must still return on idempotent re-signal");
1637 }
1638
1639 #[tokio::test]
1647 async fn joinset_abort_all_makes_drain_finite() {
1648 let mut set: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
1649 set.spawn(async {
1651 tokio::time::sleep(Duration::from_secs(60)).await;
1652 });
1653
1654 let primary = tokio::time::timeout(Duration::from_millis(100), async {
1656 while set.join_next().await.is_some() {}
1657 })
1658 .await;
1659 assert!(
1660 primary.is_err(),
1661 "primary drain should time out while task is still sleeping"
1662 );
1663
1664 set.abort_all();
1666 let secondary = tokio::time::timeout(Duration::from_millis(500), async {
1667 while set.join_next().await.is_some() {}
1668 })
1669 .await;
1670 assert!(
1671 secondary.is_ok(),
1672 "drain after abort_all must complete quickly"
1673 );
1674 assert!(set.is_empty(), "JoinSet should be empty after drain");
1675 }
1676
1677 #[tokio::test]
1683 async fn joinset_panics_are_observable_via_try_join_next() {
1684 let mut set: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
1685 set.spawn(async {
1686 panic!("simulated handler panic");
1687 });
1688
1689 let deadline = std::time::Instant::now() + Duration::from_millis(500);
1693 loop {
1694 if let Some(res) = set.try_join_next() {
1695 let err = res.expect_err("panicked task should yield Err");
1696 assert!(
1697 err.is_panic(),
1698 "JoinError must report is_panic for panicking task; got: {err:?}"
1699 );
1700 return;
1701 }
1702 if std::time::Instant::now() >= deadline {
1703 panic!("try_join_next never reported the panic within 500ms");
1704 }
1705 tokio::time::sleep(Duration::from_millis(10)).await;
1706 }
1707 }
1708
1709 #[tokio::test]
1715 async fn shutdown_no_lost_signal_under_race() {
1716 for trial in 0..50 {
1717 let s = Arc::new(Shutdown::new());
1718 let s_waiter = Arc::clone(&s);
1719 let s_signaler = Arc::clone(&s);
1720
1721 let waiter = tokio::spawn(async move { s_waiter.wait().await });
1722
1723 tokio::task::yield_now().await;
1725
1726 s_signaler.signal();
1728
1729 tokio::time::timeout(Duration::from_millis(500), waiter)
1730 .await
1731 .unwrap_or_else(|_| panic!("trial {trial}: waiter stranded by lost signal"))
1732 .expect("waiter task should not panic");
1733 }
1734 }
1735}
1736
1737#[cfg(test)]
1738mod tests {
1739 use super::*;
1740 use crate::store::record::{
1741 Category, ConfidenceScore, FileRecord, GotchaRecord, Priority, QualityScore, Record,
1742 RecordLifecycle, RecordSource, RecordVersion, StalenessScore,
1743 };
1744 use crate::store::Store;
1745
1746 fn make_gotcha_record(key: &str, files: &[&str]) -> Record {
1747 let gotcha = GotchaRecord {
1748 rule: "test rule".into(),
1749 reason: "test reason".into(),
1750 severity: Priority::High,
1751 affected_files: files.iter().map(|s| s.to_string()).collect(),
1752 ref_url: None,
1753 discovered_session: 1_000_000,
1754 confirmed: true,
1755 };
1756 Record {
1757 key: key.to_string(),
1758 value: "test rule because test reason".into(),
1759 payload: serde_json::to_value(&gotcha).ok(),
1760 category: Category::Gotcha,
1761 priority: Priority::High,
1762 tags: vec![],
1763 created_at: 1_000_000,
1764 updated_at: 1_000_000,
1765 ref_url: None,
1766 staleness: StalenessScore::fresh(),
1767 lifecycle: RecordLifecycle::Active,
1768 version: RecordVersion {
1769 device_id: uuid::Uuid::new_v4(),
1770 logical_clock: 1,
1771 wall_clock: 1_000_000,
1772 },
1773 quality: QualityScore::layer0_default(),
1774 access_count: 0,
1775 last_accessed: 0,
1776 source: RecordSource::DeveloperManual,
1777 confidence: ConfidenceScore::for_new_record(&RecordSource::DeveloperManual),
1778 gap_analysis_score: 0.0,
1779 }
1780 }
1781
1782 fn make_file_record(path: &str) -> Record {
1783 let file = FileRecord {
1784 path: path.to_string(),
1785 purpose: String::new(),
1786 entry_points: vec![],
1787 imports: vec![],
1788 gotcha_keys: vec![],
1789 decision_keys: vec![],
1790 todos: vec![],
1791 unsafe_count: 0,
1792 unwrap_count: 0,
1793 change_frequency: 0,
1794 last_author: None,
1795 is_hotspot: false,
1796 token_cost_estimate: 0,
1797 last_modified_session: 0,
1798 content_hash: None,
1799 line_count: 0,
1800 blast_radius: None,
1801 propagated_staleness: None,
1802 };
1803 Record {
1804 key: format!("file:{path}"),
1805 value: String::new(),
1806 payload: serde_json::to_value(&file).ok(),
1807 category: Category::File,
1808 priority: Priority::Normal,
1809 tags: vec![],
1810 created_at: 1_000_000,
1811 updated_at: 1_000_000,
1812 ref_url: None,
1813 staleness: StalenessScore::fresh(),
1814 lifecycle: RecordLifecycle::Active,
1815 version: RecordVersion {
1816 device_id: uuid::Uuid::new_v4(),
1817 logical_clock: 1,
1818 wall_clock: 1_000_000,
1819 },
1820 quality: QualityScore::layer0_default(),
1821 access_count: 0,
1822 last_accessed: 0,
1823 source: RecordSource::StaticAnalysis,
1824 confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
1825 gap_analysis_score: 0.0,
1826 }
1827 }
1828
1829 fn file_gotcha_keys(record: &Record) -> Vec<String> {
1830 record
1831 .payload
1832 .as_ref()
1833 .and_then(|p| p.get("gotcha_keys"))
1834 .and_then(|v| v.as_array())
1835 .map(|arr| {
1836 arr.iter()
1837 .filter_map(|v| v.as_str().map(String::from))
1838 .collect()
1839 })
1840 .unwrap_or_default()
1841 }
1842
1843 async fn make_test_graph(store: Store) -> Arc<tokio::sync::RwLock<Graph>> {
1848 let graph = Graph::load(store).await.expect("failed to load test graph");
1849 Arc::new(tokio::sync::RwLock::new(graph))
1850 }
1851
1852 async fn dispatch_with_graph(
1853 graph: &Arc<tokio::sync::RwLock<Graph>>,
1854 cmd: &str,
1855 args: serde_json::Value,
1856 ) -> SocketResponse {
1857 let req = SocketRequest {
1858 cmd: cmd.to_string(),
1859 version: Some(PROTOCOL_VERSION),
1860 args,
1861 };
1862 socket_dispatch(graph, Path::new("/tmp/mati-test"), &req).await
1863 }
1864
1865 #[tokio::test]
1868 async fn socket_gotcha_write_adds_keys_to_file_records() {
1869 let dir = tempfile::TempDir::new().unwrap();
1870 let store = Store::open(dir.path()).await.unwrap();
1871 store
1872 .put("file:src/a.rs", &make_file_record("src/a.rs"))
1873 .await
1874 .unwrap();
1875 store
1876 .put("file:src/b.rs", &make_file_record("src/b.rs"))
1877 .await
1878 .unwrap();
1879 let graph = make_test_graph(store).await;
1880
1881 let record = make_gotcha_record("gotcha:socket-test", &["src/a.rs", "src/b.rs"]);
1882 let resp = dispatch_with_graph(&graph, "gotcha_write", serde_json::json!({
1883 "record": record, "new_files": ["src/a.rs", "src/b.rs"], "old_files": [], "is_new": true,
1884 })).await;
1885 assert!(resp.ok, "gotcha_write failed: {:?}", resp.error);
1886
1887 let g = graph.read().await;
1888 let a = g.store().get("file:src/a.rs").await.unwrap().unwrap();
1889 let b = g.store().get("file:src/b.rs").await.unwrap().unwrap();
1890 assert!(file_gotcha_keys(&a).contains(&"gotcha:socket-test".into()));
1891 assert!(file_gotcha_keys(&b).contains(&"gotcha:socket-test".into()));
1892 }
1893
1894 #[tokio::test]
1895 async fn socket_gotcha_write_edit_removes_key_from_old_file() {
1896 let dir = tempfile::TempDir::new().unwrap();
1897 let store = Store::open(dir.path()).await.unwrap();
1898 store
1899 .put("file:src/a.rs", &make_file_record("src/a.rs"))
1900 .await
1901 .unwrap();
1902 store
1903 .put("file:src/b.rs", &make_file_record("src/b.rs"))
1904 .await
1905 .unwrap();
1906 let graph = make_test_graph(store).await;
1907
1908 let record = make_gotcha_record("gotcha:edit-socket", &["src/a.rs"]);
1909 let resp = dispatch_with_graph(
1910 &graph,
1911 "gotcha_write",
1912 serde_json::json!({
1913 "record": record, "new_files": ["src/a.rs"], "old_files": [], "is_new": true,
1914 }),
1915 )
1916 .await;
1917 assert!(resp.ok);
1918
1919 let record2 = make_gotcha_record("gotcha:edit-socket", &["src/b.rs"]);
1920 let resp2 = dispatch_with_graph(&graph, "gotcha_write", serde_json::json!({
1921 "record": record2, "new_files": ["src/b.rs"], "old_files": ["src/a.rs"], "is_new": false,
1922 })).await;
1923 assert!(resp2.ok);
1924
1925 let g = graph.read().await;
1926 let a = g.store().get("file:src/a.rs").await.unwrap().unwrap();
1927 let b = g.store().get("file:src/b.rs").await.unwrap().unwrap();
1928 assert!(!file_gotcha_keys(&a).contains(&"gotcha:edit-socket".into()));
1929 assert!(file_gotcha_keys(&b).contains(&"gotcha:edit-socket".into()));
1930 }
1931
1932 #[tokio::test]
1933 async fn socket_gotcha_tombstone_removes_keys_from_file_records() {
1934 let dir = tempfile::TempDir::new().unwrap();
1935 let store = Store::open(dir.path()).await.unwrap();
1936 store
1937 .put("file:src/a.rs", &make_file_record("src/a.rs"))
1938 .await
1939 .unwrap();
1940 store
1941 .put("file:src/b.rs", &make_file_record("src/b.rs"))
1942 .await
1943 .unwrap();
1944 let graph = make_test_graph(store).await;
1945
1946 let record = make_gotcha_record("gotcha:tomb-socket", &["src/a.rs", "src/b.rs"]);
1947 let resp = dispatch_with_graph(&graph, "gotcha_write", serde_json::json!({
1948 "record": record, "new_files": ["src/a.rs", "src/b.rs"], "old_files": [], "is_new": true,
1949 })).await;
1950 assert!(resp.ok);
1951
1952 let resp2 = dispatch_with_graph(
1953 &graph,
1954 "gotcha_tombstone",
1955 serde_json::json!({
1956 "key": "gotcha:tomb-socket", "affected_files": ["src/a.rs", "src/b.rs"],
1957 }),
1958 )
1959 .await;
1960 assert!(resp2.ok, "gotcha_tombstone failed: {:?}", resp2.error);
1961
1962 let g = graph.read().await;
1963 let rec = g.store().get("gotcha:tomb-socket").await.unwrap().unwrap();
1964 assert!(matches!(rec.lifecycle, RecordLifecycle::Tombstoned { .. }));
1965 let a = g.store().get("file:src/a.rs").await.unwrap().unwrap();
1966 let b = g.store().get("file:src/b.rs").await.unwrap().unwrap();
1967 assert!(file_gotcha_keys(&a).is_empty());
1968 assert!(file_gotcha_keys(&b).is_empty());
1969 }
1970
1971 #[tokio::test]
1972 async fn socket_gotcha_write_rejects_duplicate_key() {
1973 let dir = tempfile::TempDir::new().unwrap();
1974 let store = Store::open(dir.path()).await.unwrap();
1975 let record1 = make_gotcha_record("gotcha:dup-socket", &["src/a.rs"]);
1976 store.put("gotcha:dup-socket", &record1).await.unwrap();
1977 let graph = make_test_graph(store).await;
1978
1979 let record2 = make_gotcha_record("gotcha:dup-socket", &["src/b.rs"]);
1980 let resp = dispatch_with_graph(
1981 &graph,
1982 "gotcha_write",
1983 serde_json::json!({
1984 "record": record2, "new_files": ["src/b.rs"], "old_files": [], "is_new": true,
1985 }),
1986 )
1987 .await;
1988 assert!(!resp.ok, "duplicate key should be rejected");
1989 assert!(resp
1990 .error
1991 .as_deref()
1992 .unwrap_or("")
1993 .contains("already exists"));
1994
1995 let g = graph.read().await;
1996 let original = g.store().get("gotcha:dup-socket").await.unwrap().unwrap();
1997 let payload = original.payload_as::<GotchaRecord>().unwrap();
1998 assert_eq!(payload.affected_files, vec!["src/a.rs"]);
1999 }
2000
2001 #[tokio::test]
2004 async fn oversized_request_returns_frame_too_large_with_response() {
2005 use super::super::protocol::MAX_FRAME_SIZE;
2006 use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
2007
2008 let dir = tempfile::TempDir::new().unwrap();
2009 let store = Store::open(dir.path()).await.unwrap();
2010 let graph = make_test_graph(store).await;
2011
2012 let (client, server) = UnixStream::pair().unwrap();
2013 let peer = super::super::metadata::PeerContext {
2014 uid: 501,
2015 pid: None,
2016 };
2017
2018 let oversized = "x".repeat(MAX_FRAME_SIZE + 100);
2020 let payload = format!("{oversized}\n");
2021
2022 let (client_read, client_write) = client.into_split();
2024
2025 let write_handle = tokio::spawn(async move {
2026 let mut w = client_write;
2027 w.write_all(payload.as_bytes()).await.unwrap();
2028 w.shutdown().await.unwrap();
2029 });
2030
2031 let handle_result =
2032 socket_handle_connection(graph, dir.path(), server, peer, uuid::Uuid::nil()).await;
2033 assert!(handle_result.is_ok());
2034
2035 write_handle.await.unwrap();
2036
2037 let mut reader = tokio::io::BufReader::new(client_read);
2039 let mut line = String::new();
2040 reader.read_line(&mut line).await.unwrap();
2041 let resp: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
2042
2043 assert_eq!(resp["status"], "err");
2044 assert_eq!(resp["code"], "frame_too_large");
2045 assert!(
2046 resp["message"]
2047 .as_str()
2048 .unwrap()
2049 .contains(&MAX_FRAME_SIZE.to_string()),
2050 "error message should mention the size limit"
2051 );
2052 }
2053
2054 #[tokio::test]
2055 async fn normal_sized_request_is_not_rejected_by_size_check() {
2056 use super::super::protocol::MAX_FRAME_SIZE;
2057 use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
2058
2059 let dir = tempfile::TempDir::new().unwrap();
2060 let store = Store::open(dir.path()).await.unwrap();
2061 let graph = make_test_graph(store).await;
2062
2063 let (client, server) = UnixStream::pair().unwrap();
2064 let peer = super::super::metadata::PeerContext {
2065 uid: 501,
2066 pid: None,
2067 };
2068
2069 let request = serde_json::json!({
2071 "v": 2,
2072 "id": uuid::Uuid::new_v4(),
2073 "session": uuid::Uuid::nil(),
2074 "cmd": { "type": "ping" }
2075 });
2076 let payload = format!("{}\n", serde_json::to_string(&request).unwrap());
2077 assert!(
2078 payload.len() < MAX_FRAME_SIZE,
2079 "test payload should be small"
2080 );
2081
2082 let (client_read, client_write) = client.into_split();
2083
2084 let write_handle = tokio::spawn(async move {
2085 let mut w = client_write;
2086 w.write_all(payload.as_bytes()).await.unwrap();
2087 w.shutdown().await.unwrap();
2088 });
2089
2090 let handle_result =
2091 socket_handle_connection(graph, dir.path(), server, peer, uuid::Uuid::nil()).await;
2092 assert!(handle_result.is_ok());
2093
2094 write_handle.await.unwrap();
2095
2096 let mut reader = tokio::io::BufReader::new(client_read);
2098 let mut line = String::new();
2099 reader.read_line(&mut line).await.unwrap();
2100 let resp: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
2101
2102 assert_eq!(resp["status"], "ok", "ping should succeed, got: {resp}");
2103 }
2104
2105 async fn spawn_canned_responder(
2125 sock_path: std::path::PathBuf,
2126 responses: Vec<serde_json::Value>,
2127 ) -> tokio::task::JoinHandle<()> {
2128 let listener = tokio::net::UnixListener::bind(&sock_path).expect("bind responder socket");
2131 tokio::spawn(async move {
2132 for resp in responses {
2133 let (stream, _) = match listener.accept().await {
2134 Ok(s) => s,
2135 Err(_) => return,
2136 };
2137 let (reader, mut writer) = stream.into_split();
2138 let mut buf_reader = tokio::io::BufReader::new(reader);
2140 let mut line = String::new();
2141 let _ = tokio::io::AsyncBufReadExt::read_line(&mut buf_reader, &mut line).await;
2142 let mut bytes = serde_json::to_vec(&resp).unwrap();
2143 bytes.push(b'\n');
2144 let _ = tokio::io::AsyncWriteExt::write_all(&mut writer, &bytes).await;
2145 let _ = tokio::io::AsyncWriteExt::shutdown(&mut writer).await;
2146 }
2147 })
2148 }
2149
2150 #[tokio::test]
2151 async fn mcp_call_after_daemon_restart_does_not_kill_transport() {
2152 let dir = tempfile::TempDir::new().unwrap();
2158 let root = dir.path().to_path_buf();
2159 let sock_path = root.join("mati.sock");
2160
2161 let session_before = uuid::Uuid::new_v4();
2167 let session_after = uuid::Uuid::new_v4();
2168
2169 let meta_before = super::super::metadata::DaemonMetadata {
2170 pid: std::process::id(),
2171 session: session_before,
2172 owner: super::super::metadata::DaemonOwner::Daemon,
2173 };
2174 super::super::metadata::publish_metadata(&root, &meta_before).unwrap();
2175
2176 let responder_handle = spawn_canned_responder(
2180 sock_path.clone(),
2181 vec![
2182 serde_json::json!({
2183 "v": 2,
2184 "id": uuid::Uuid::new_v4(),
2185 "status": "err",
2186 "code": "session_mismatch",
2187 "message": "session mismatch: re-read daemon metadata and retry",
2188 }),
2189 serde_json::json!({
2190 "v": 2,
2191 "id": uuid::Uuid::new_v4(),
2192 "status": "ok",
2193 "data": "pong",
2194 }),
2195 ],
2196 )
2197 .await;
2198
2199 let root_for_rotate = root.clone();
2202 let rotate_handle = tokio::spawn(async move {
2203 tokio::time::sleep(Duration::from_millis(20)).await;
2206 let meta_after = super::super::metadata::DaemonMetadata {
2207 pid: std::process::id(),
2208 session: session_after,
2209 owner: super::super::metadata::DaemonOwner::Daemon,
2210 };
2211 super::super::metadata::publish_metadata(&root_for_rotate, &meta_after).unwrap();
2212 });
2213
2214 let result = tokio::time::timeout(
2221 Duration::from_secs(5),
2222 super::proxy_daemon_result(&root, "ping", serde_json::json!({})),
2223 )
2224 .await
2225 .expect("proxy_daemon_result should resolve within 5s — retry path appears wedged");
2226
2227 rotate_handle.await.unwrap();
2228 responder_handle.abort();
2232
2233 match result {
2236 super::ProxyDaemonResult::Ok(v) => {
2237 let ok = v.get("ok") == Some(&serde_json::Value::Bool(true));
2238 let code = v.get("code").and_then(|c| c.as_str()).unwrap_or("");
2239 assert!(
2240 ok,
2241 "second attempt should succeed after metadata rotation, \
2242 but caller saw the first attempt's session_mismatch envelope: \
2243 ok={ok} code={code:?} v={v}"
2244 );
2245 }
2246 other => panic!(
2247 "expected Ok(true) after auto-reconnect, got {other:?}; \
2248 the daemon-restart retry path is not engaging"
2249 ),
2250 }
2251 }
2252
2253 #[tokio::test]
2254 async fn mcp_call_session_mismatch_no_retry_target_returns_envelope() {
2255 let dir = tempfile::TempDir::new().unwrap();
2263 let root = dir.path().to_path_buf();
2264 let sock_path = root.join("mati.sock");
2265
2266 let session = uuid::Uuid::new_v4();
2267 let meta = super::super::metadata::DaemonMetadata {
2268 pid: std::process::id(),
2269 session,
2270 owner: super::super::metadata::DaemonOwner::Daemon,
2271 };
2272 super::super::metadata::publish_metadata(&root, &meta).unwrap();
2273
2274 let responder_handle = spawn_canned_responder(
2278 sock_path.clone(),
2279 vec![
2280 serde_json::json!({
2281 "v": 2,
2282 "id": uuid::Uuid::new_v4(),
2283 "status": "err",
2284 "code": "session_mismatch",
2285 "message": "session mismatch (1)",
2286 }),
2287 serde_json::json!({
2288 "v": 2,
2289 "id": uuid::Uuid::new_v4(),
2290 "status": "err",
2291 "code": "session_mismatch",
2292 "message": "session mismatch (2)",
2293 }),
2294 ],
2295 )
2296 .await;
2297
2298 let result = tokio::time::timeout(
2299 Duration::from_secs(5),
2300 super::proxy_daemon_result(&root, "ping", serde_json::json!({})),
2301 )
2302 .await
2303 .expect("proxy_daemon_result must resolve within 5s");
2304 responder_handle.abort();
2305
2306 match result {
2312 super::ProxyDaemonResult::Ok(v) => {
2313 assert_eq!(v.get("ok"), Some(&serde_json::Value::Bool(false)));
2314 assert_eq!(
2315 v.get("code").and_then(|c| c.as_str()),
2316 Some("session_mismatch")
2317 );
2318 }
2319 other => panic!("expected structured Ok envelope, got {other:?}"),
2320 }
2321 }
2322
2323 #[tokio::test]
2339 async fn proxy_daemon_result_handles_mem_get_translation_no_panic() {
2340 let dir = tempfile::TempDir::new().unwrap();
2341 let result = super::proxy_daemon_result(
2345 dir.path(),
2346 "mem_get",
2347 serde_json::json!({ "key": "file:src/main.rs" }),
2348 )
2349 .await;
2350 assert!(
2351 matches!(result, super::ProxyDaemonResult::NotRunning),
2352 "mem_get without daemon must return NotRunning, got {result:?}"
2353 );
2354 }
2355
2356 #[tokio::test]
2357 async fn proxy_daemon_result_handles_mem_bootstrap_translation_no_panic() {
2358 let dir = tempfile::TempDir::new().unwrap();
2359 let result = super::proxy_daemon_result(
2360 dir.path(),
2361 "mem_bootstrap",
2362 serde_json::json!({ "context_files": ["src/lib.rs"] }),
2363 )
2364 .await;
2365 assert!(
2366 matches!(result, super::ProxyDaemonResult::NotRunning),
2367 "mem_bootstrap without daemon must return NotRunning, got {result:?}"
2368 );
2369 }
2370
2371 #[tokio::test]
2372 async fn proxy_daemon_v2_typed_path_handles_mem_set_mutations_no_panic() {
2373 let dir = tempfile::TempDir::new().unwrap();
2381 let cmd = super::super::protocol::Command::GotchaConfirm(
2382 super::super::protocol::GotchaConfirmInput {
2383 key: "gotcha:test".into(),
2384 },
2385 );
2386 let result = super::proxy_daemon_v2(dir.path(), cmd).await;
2387 assert!(
2388 matches!(result, super::ProxyDaemonResult::NotRunning),
2389 "typed proxy_daemon_v2 must return NotRunning when daemon is absent, got {result:?}"
2390 );
2391 }
2392}