use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result};
use clap::{Args, Subcommand};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use mati_core::graph::Graph;
use mati_core::store::{derive_slug, Store};
#[derive(Args, Debug)]
pub struct DaemonArgs {
#[command(subcommand)]
pub command: DaemonCommand,
}
#[derive(Subcommand, Debug)]
pub enum DaemonCommand {
Start,
Stop(DaemonStopArgs),
Status,
}
#[derive(Args, Debug, Default, Clone)]
pub struct DaemonStopArgs {
#[arg(long)]
pub force: bool,
#[arg(long)]
pub include_mcp: bool,
#[arg(long, default_value_t = 20)]
pub timeout: u64,
#[arg(long)]
pub no_wait: bool,
}
impl DaemonStopArgs {
fn timeout_clamped(&self) -> Duration {
Duration::from_secs(self.timeout.clamp(1, 60))
}
}
use mati_core::mcp::server::{
IDLE_CHECK_INTERVAL_SECS, IDLE_SHUTDOWN_SECS,
MAX_CONCURRENT_CONNECTIONS as MAX_DAEMON_CONNECTIONS, UNIX_SOCK_PATH_MAX,
};
#[derive(Debug)]
pub enum DaemonResult {
Ok(serde_json::Value),
NotRunning,
StaleSocket,
Unresponsive,
}
pub async fn run_daemon_start() -> Result<()> {
let startup_t0 = std::time::Instant::now();
let cwd = std::env::current_dir()?;
let mati_root = mati_root_for(&cwd)?;
mati_core::mcp::metadata::ensure_runtime_dir(&mati_root)?;
{
use mati_core::mcp::metadata::{self as meta, StaleCheckResult};
match meta::check_and_cleanup_stale(&mati_root) {
StaleCheckResult::Clean | StaleCheckResult::StaleRemoved => {}
StaleCheckResult::LiveDaemon { pid, owner, .. } => {
anyhow::bail!(
"another mati {owner} (pid {pid}) is already running.\n\
Stop it with: mati daemon stop"
);
}
StaleCheckResult::OrphanSocket => {
let _ = std::fs::remove_file(meta::socket_path(&mati_root));
}
}
}
if check_starting_peer_active(&mati_root) {
anyhow::bail!(
"another mati daemon is starting up. Wait a few seconds, then retry:\n\
\n mati daemon status\n\
\nIf the previous start crashed, the sentinel will expire after {STARTING_STALE_SECS}s."
);
}
mati_core::mcp::metadata::install_panic_hook(mati_root.clone());
mati_core::mcp::metadata::record_lifecycle_event(
&mati_root,
"serve_start",
&format!("pid={} owner=daemon", std::process::id()),
);
let starting_path = mati_root.join("mati.starting");
let _ = std::fs::write(
&starting_path,
format_sentinel(wall_secs(), std::process::id()),
);
let cleanup_sentinel = || {
let _ = std::fs::remove_file(&starting_path);
};
let repo_root = Arc::new(std::fs::canonicalize(&cwd).inspect_err(|e| {
mati_core::mcp::metadata::record_lifecycle_event(
&mati_root,
"serve_failed",
&format!("canonicalize cwd: {e}"),
);
cleanup_sentinel();
})?);
mati_core::mcp::metadata::record_lifecycle_event(&mati_root, "startup", "phase=opening_store");
let store_t0 = std::time::Instant::now();
let store = Store::open(&cwd).await.inspect_err(|e| {
mati_core::mcp::metadata::record_lifecycle_event(
&mati_root,
"serve_failed",
&format!("store open: {e:#}"),
);
cleanup_sentinel();
})?;
mati_core::mcp::metadata::record_lifecycle_event(
&mati_root,
"startup",
&format!(
"phase=store_opened elapsed_ms={}",
store_t0.elapsed().as_millis()
),
);
if let Ok(keys) = store.scan_keys("session:consulted:").await {
for k in &keys {
let _ = store.delete(k).await;
}
if !keys.is_empty() {
tracing::debug!(
"daemon: cleared {} stale session:consulted markers",
keys.len()
);
}
}
let graph = Graph::load(store)
.await
.context("failed to load knowledge graph")
.inspect_err(|e| {
mati_core::mcp::metadata::record_lifecycle_event(
&mati_root,
"serve_failed",
&format!("graph load: {e:#}"),
);
cleanup_sentinel();
})?;
if mati_core::store::repair::is_dirty(graph.store()).await {
let drain_fut = mati_core::store::repair::repair_gotcha_indexes(
graph.store(),
mati_core::store::repair::RepairMode::Fast,
);
match tokio::time::timeout(mati_core::mcp::server::AUTO_DRAIN_TIMEOUT, drain_fut).await {
Ok(Ok(report)) => {
tracing::info!(
"daemon: auto-drained dirty gotcha index (drift_remaining={})",
report.total_drift()
);
mati_core::mcp::metadata::record_lifecycle_event(
&mati_root,
"auto_repair",
&format!("drift_remaining={}", report.total_drift()),
);
}
Ok(Err(e)) => {
tracing::warn!("daemon: auto-drain failed: {e}");
mati_core::mcp::metadata::record_lifecycle_event(
&mati_root,
"auto_repair_failed",
&format!("{e}"),
);
}
Err(_) => {
tracing::warn!("daemon: auto-drain timed out — serving with stale derived state");
mati_core::mcp::metadata::record_lifecycle_event(
&mati_root,
"auto_repair_timeout",
&format!("timeout={:?}", mati_core::mcp::server::AUTO_DRAIN_TIMEOUT),
);
}
}
}
let graph = Arc::new(tokio::sync::RwLock::new(graph));
let (sock_path, pid_path) = {
let g = graph.read().await;
let root = &g.store().root;
(root.join("mati.sock"), root.join("mati.pid"))
};
let sock_path_bytes = sock_path.as_os_str().len();
if sock_path_bytes > UNIX_SOCK_PATH_MAX {
mati_core::mcp::metadata::record_lifecycle_event(
&mati_root,
"serve_failed",
&format!("sock_path_too_long: {sock_path_bytes}>{UNIX_SOCK_PATH_MAX}"),
);
cleanup_sentinel();
anyhow::bail!(
"socket path too long ({sock_path_bytes} > {UNIX_SOCK_PATH_MAX} bytes): {}\n\
Shorten your home directory path or symlink ~/.mati to a shorter location.",
sock_path.display()
);
}
let listener = UnixListener::bind(&sock_path)
.with_context(|| format!("failed to bind Unix socket at {}", sock_path.display()))
.inspect_err(|e| {
mati_core::mcp::metadata::record_lifecycle_event(
&mati_root,
"serve_failed",
&format!("bind: {e:#}"),
);
cleanup_sentinel();
})?;
if let Err(e) = mati_core::mcp::metadata::harden_socket(&sock_path) {
tracing::warn!("failed to harden socket permissions: {e}");
}
let daemon_meta = mati_core::mcp::metadata::DaemonMetadata::new(
mati_core::mcp::metadata::DaemonOwner::Daemon,
);
let daemon_session = daemon_meta.session;
if let Err(e) = mati_core::mcp::metadata::publish_metadata(
sock_path.parent().unwrap_or(std::path::Path::new(".")),
&daemon_meta,
) {
tracing::warn!("failed to publish v2 daemon metadata: {e}");
std::fs::write(
&pid_path,
format!(r#"{{"pid":{},"owner":"daemon"}}"#, std::process::id()),
)
.with_context(|| format!("failed to write PID file at {}", pid_path.display()))
.inspect_err(|e2| {
mati_core::mcp::metadata::record_lifecycle_event(
&mati_root,
"serve_failed",
&format!("publish+pid fallback both failed: publish={e:#} legacy={e2:#}"),
);
cleanup_sentinel();
})?;
}
let _ = std::fs::remove_file(&starting_path);
mati_core::mcp::metadata::record_lifecycle_event(
&mati_root,
"startup",
&format!(
"phase=ready elapsed_ms={}",
startup_t0.elapsed().as_millis()
),
);
tracing::info!(
path = %sock_path.display(),
pid = std::process::id(),
"mati daemon listening"
);
eprintln!(
"mati daemon listening on {} (idle shutdown: {}min)",
sock_path.display(),
IDLE_SHUTDOWN_SECS / 60
);
let last_wall = Arc::new(AtomicU64::new(wall_secs()));
let active_connections = Arc::new(AtomicU64::new(0));
let idle_notify = Arc::new(tokio::sync::Notify::new());
{
let last_wall = last_wall.clone();
let active_connections = active_connections.clone();
let notify = idle_notify.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(IDLE_CHECK_INTERVAL_SECS));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let now = wall_secs();
let last = last_wall.load(Ordering::Relaxed);
let active = active_connections.load(Ordering::Relaxed);
if now.saturating_sub(last) >= IDLE_SHUTDOWN_SECS && active == 0 {
tracing::info!(
idle_secs = now.saturating_sub(last),
"mati daemon: idle shutdown"
);
eprintln!(
"mati daemon: idle {}min — shutting down",
IDLE_SHUTDOWN_SECS / 60
);
notify.notify_one();
break;
}
}
});
}
let shutdown = mati_core::mcp::server::Shutdown::new();
use std::sync::atomic::AtomicUsize;
const REASONS: &[&str] = &[
"unknown", "signal_sigint", "signal_sigterm", "idle_timeout", "serve_loop_exit", "signal_sighup", ];
let reason_idx = Arc::new(AtomicUsize::new(0));
let daemon_euid = mati_core::mcp::metadata::current_euid();
let reason_idx_clone = Arc::clone(&reason_idx);
tokio::join!(
serve_loop_graceful(
Arc::clone(&graph),
&repo_root,
&listener,
&last_wall,
&active_connections,
&shutdown,
daemon_euid,
daemon_session,
),
async {
let ctrl_c = tokio::signal::ctrl_c();
#[cfg(unix)]
let idx = {
let mut sigterm =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to register SIGTERM handler");
let sighup_result =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup());
if let Err(ref e) = sighup_result {
tracing::warn!(
error = %e,
"daemon: failed to install SIGHUP handler — \
SIGHUP will use OS default (terminate without cleanup)"
);
}
let mut sighup_opt = sighup_result.ok();
tokio::select! {
_ = ctrl_c => {
tracing::info!("mati daemon: signal shutdown (SIGINT)");
eprintln!("mati daemon shutting down");
1
}
_ = sigterm.recv() => {
tracing::info!("mati daemon: signal shutdown (SIGTERM)");
eprintln!("mati daemon shutting down");
2
}
_ = idle_notify.notified() => {
3
}
_ = shutdown.wait() => {
tracing::warn!("mati daemon: serve_loop exited — initiating shutdown");
4
}
Some(_) = async {
if let Some(ref mut s) = sighup_opt { s.recv().await } else { None }
} => {
tracing::info!("mati daemon: signal shutdown (SIGHUP)");
eprintln!("mati daemon shutting down");
5
}
}
};
#[cfg(not(unix))]
let idx = tokio::select! {
_ = ctrl_c => {
tracing::info!("mati daemon: signal shutdown");
eprintln!("mati daemon shutting down");
1
}
_ = idle_notify.notified() => 3,
_ = shutdown.wait() => 4,
};
reason_idx_clone.store(idx, std::sync::atomic::Ordering::SeqCst);
shutdown.signal();
}
);
let shutdown_reason: &'static str = {
let i = reason_idx.load(std::sync::atomic::Ordering::SeqCst);
REASONS.get(i).copied().unwrap_or("unknown")
};
let _ = std::fs::remove_file(&starting_path); let _ = std::fs::remove_file(&sock_path);
let _ = std::fs::remove_file(&pid_path);
mati_core::mcp::metadata::record_lifecycle_event(&mati_root, "serve_shutdown", shutdown_reason);
match Arc::try_unwrap(graph) {
Ok(rwlock) => {
if let Err(e) = rwlock.into_inner().close().await {
tracing::warn!("daemon: store close warning on shutdown: {e}");
}
}
Err(graph) => {
tracing::warn!(
"daemon: graph Arc still referenced on shutdown — flushing without close"
);
let g = graph.read().await;
g.store().flush_for_shutdown().await;
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn serve_loop_graceful(
graph: Arc<tokio::sync::RwLock<Graph>>,
repo_root: &Path,
listener: &UnixListener,
last_wall: &AtomicU64,
active_connections: &Arc<AtomicU64>,
shutdown: &mati_core::mcp::server::Shutdown,
daemon_euid: u32,
daemon_session: uuid::Uuid,
) {
let semaphore = Arc::new(tokio::sync::Semaphore::new(MAX_DAEMON_CONNECTIONS));
let mut in_flight: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
let repo_root_arc: Arc<PathBuf> = Arc::new(repo_root.to_path_buf());
struct ConnGuard(Arc<AtomicU64>);
impl Drop for ConnGuard {
fn drop(&mut self) {
self.0.fetch_sub(1, Ordering::Relaxed);
}
}
'accept: loop {
while let Some(res) = in_flight.try_join_next() {
if let Err(e) = res {
if e.is_panic() {
tracing::error!(error = ?e, "daemon: handler panicked");
break 'accept;
}
}
}
let permit = tokio::select! {
biased;
_ = shutdown.wait() => break 'accept,
res = Arc::clone(&semaphore).acquire_owned() => match res {
Ok(p) => p,
Err(_) => break 'accept,
},
};
let stream = tokio::select! {
biased;
_ = shutdown.wait() => break 'accept,
res = listener.accept() => match res {
Ok((s, _)) => s,
Err(e) => {
tracing::warn!(error = %e, "daemon: accept error");
drop(permit);
continue 'accept;
}
},
};
last_wall.store(wall_secs(), Ordering::Relaxed);
let peer = match mati_core::mcp::metadata::check_peer_cred(&stream, daemon_euid) {
Some(p) => p,
None => {
drop(permit);
continue;
}
};
let graph_clone = Arc::clone(&graph);
let repo_root_clone = Arc::clone(&repo_root_arc);
active_connections.fetch_add(1, Ordering::Relaxed);
let conn_guard = ConnGuard(Arc::clone(active_connections));
in_flight.spawn(async move {
let _permit = permit;
let _conn_guard = conn_guard;
if let Err(e) = mati_core::mcp::server::socket_handle_connection(
graph_clone,
&repo_root_clone,
stream,
peer,
daemon_session,
)
.await
{
tracing::warn!(error = %e, "daemon: connection error");
}
});
}
let drained = in_flight.len();
if drained > 0 {
tracing::debug!("daemon: draining {drained} in-flight handler(s)");
}
const DRAIN_TIMEOUT: Duration = Duration::from_secs(5);
let drain = tokio::time::timeout(DRAIN_TIMEOUT, async {
while in_flight.join_next().await.is_some() {}
})
.await;
if drain.is_err() {
tracing::warn!(
remaining = in_flight.len(),
"daemon: drain timed out after {DRAIN_TIMEOUT:?} — aborting handlers"
);
in_flight.abort_all();
let _ = tokio::time::timeout(Duration::from_secs(1), async {
while in_flight.join_next().await.is_some() {}
})
.await;
}
shutdown.signal();
}
const PING_RESPONSE_TIMEOUT: Duration = Duration::from_secs(2);
const REQUEST_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
pub async fn daemon_v2(root: &Path, cmd: mati_core::mcp::protocol::Command) -> DaemonResult {
let v2_cmd = match serde_json::to_value(&cmd) {
Ok(v) => v,
Err(_) => return DaemonResult::Unresponsive,
};
send_v2_raw(root, v2_cmd, REQUEST_RESPONSE_TIMEOUT).await
}
pub async fn daemon_result(root: &Path, cmd: &str, args: serde_json::Value) -> DaemonResult {
let v2_cmd = mati_core::mcp::protocol::v1_to_v2_command(cmd, &args);
let timeout = if cmd == "ping" {
PING_RESPONSE_TIMEOUT
} else {
REQUEST_RESPONSE_TIMEOUT
};
send_v2_raw(root, v2_cmd, timeout).await
}
async fn send_v2_raw(
root: &Path,
v2_cmd: serde_json::Value,
response_timeout: Duration,
) -> DaemonResult {
let sock_path = root.join("mati.sock");
if sock_path.as_os_str().len() > UNIX_SOCK_PATH_MAX {
tracing::warn!(
path = %sock_path.display(),
"daemon: socket path exceeds Unix limit — daemon unavailable"
);
return DaemonResult::NotRunning;
}
if !sock_path.exists() {
return DaemonResult::NotRunning;
}
let stream = match UnixStream::connect(&sock_path).await {
Ok(s) => s,
Err(e) => {
let is_refused = e.kind() == std::io::ErrorKind::ConnectionRefused;
if is_refused {
use mati_core::mcp::metadata::{self as meta, StaleCheckResult};
match meta::check_and_cleanup_stale(root) {
StaleCheckResult::StaleRemoved | StaleCheckResult::Clean => {
tracing::debug!("daemon: removed stale socket");
return DaemonResult::StaleSocket;
}
StaleCheckResult::OrphanSocket => {
let _ = std::fs::remove_file(&sock_path);
tracing::debug!("daemon: removed orphan socket");
return DaemonResult::StaleSocket;
}
StaleCheckResult::LiveDaemon { .. } => {
tracing::warn!("daemon: socket refused but PID alive — unresponsive");
return DaemonResult::Unresponsive;
}
}
}
tracing::debug!(error = %e, "daemon: connect failed, treating as not running");
return DaemonResult::NotRunning;
}
};
let daemon_session = mati_core::mcp::metadata::read_metadata(root)
.map(|m| m.session)
.unwrap_or_else(uuid::Uuid::nil);
let v2_request = serde_json::json!({
"v": mati_core::mcp::protocol::PROTOCOL_VERSION,
"id": uuid::Uuid::new_v4(),
"session": daemon_session,
"cmd": v2_cmd,
});
let (reader, mut writer) = stream.into_split();
let mut bytes = match serde_json::to_vec(&v2_request) {
Ok(b) => b,
Err(_) => return DaemonResult::Unresponsive,
};
bytes.push(b'\n');
if writer.write_all(&bytes).await.is_err() {
return DaemonResult::Unresponsive;
}
if writer.shutdown().await.is_err() {
return DaemonResult::Unresponsive;
}
let mut buf_reader = BufReader::new(reader);
let mut line = String::new();
match tokio::time::timeout(response_timeout, buf_reader.read_line(&mut line)).await {
Ok(Ok(n)) if n > 0 => {}
_ => return DaemonResult::Unresponsive,
}
let resp: serde_json::Value = match serde_json::from_str(line.trim()) {
Ok(v) => v,
Err(_) => return DaemonResult::Unresponsive,
};
match resp.get("status").and_then(|s| s.as_str()) {
Some("ok") => {
let data = resp.get("data").cloned().unwrap_or(serde_json::Value::Null);
DaemonResult::Ok(serde_json::json!({"ok": true, "v": 2, "data": data}))
}
Some("err") => {
let code = resp
.get("code")
.and_then(|c| c.as_str())
.unwrap_or("internal");
let message = resp
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("unknown error");
if code == "session_mismatch" {
tracing::debug!("daemon: session mismatch — daemon may have restarted");
}
DaemonResult::Ok(
serde_json::json!({"ok": false, "v": 2, "error": message, "code": code}),
)
}
_ => DaemonResult::Unresponsive,
}
}
#[allow(dead_code)]
pub async fn daemon_get(root: &Path, key: &str) -> Option<String> {
match daemon_result(root, "get", serde_json::json!({ "key": key })).await {
DaemonResult::Ok(resp) => {
if resp.get("ok") != Some(&serde_json::Value::Bool(true)) {
return None;
}
match resp.get("data") {
Some(d) if d.is_null() => Some("null".to_string()),
Some(d) => Some(d.to_string()),
None => None,
}
}
DaemonResult::NotRunning | DaemonResult::StaleSocket => None,
DaemonResult::Unresponsive => None,
}
}
pub const STARTING_STALE_SECS: u64 = 30;
fn format_sentinel(ts: u64, pid: u32) -> String {
format!("{ts} {pid}\n")
}
pub fn parse_sentinel(content: &str) -> Option<(u64, u32)> {
let mut parts = content.split_whitespace();
let ts = parts.next()?.parse::<u64>().ok()?;
let pid = parts.next()?.parse::<u32>().ok()?;
Some((ts, pid))
}
fn check_starting_peer_active(mati_root: &Path) -> bool {
let starting_path = mati_root.join("mati.starting");
let content = match std::fs::read_to_string(&starting_path) {
Ok(c) => c,
Err(_) => return false, };
let now = wall_secs();
let active = if let Some((_ts, pid)) = parse_sentinel(&content) {
pid != std::process::id() && mati_core::mcp::metadata::is_pid_alive(pid)
} else if let Ok(ts) = content.trim().parse::<u64>() {
now.saturating_sub(ts) < STARTING_STALE_SECS
} else {
false
};
if !active {
let _ = std::fs::remove_file(&starting_path);
}
active
}
pub fn mati_root_for(cwd: &Path) -> Result<PathBuf> {
let slug = derive_slug(cwd);
let home = dirs::home_dir().context("cannot determine home directory")?;
Ok(home.join(".mati").join(slug))
}
pub fn read_pid_file(root: &Path) -> Option<(u32, String)> {
let content = std::fs::read_to_string(root.join("mati.pid")).ok()?;
let trimmed = content.trim();
if let Ok(val) = serde_json::from_str::<serde_json::Value>(trimmed) {
let pid = val.get("pid").and_then(|v| v.as_u64())? as u32;
let owner = val
.get("owner")
.and_then(|v| v.as_str())
.unwrap_or("daemon")
.to_string();
return Some((pid, owner));
}
if let Ok(pid) = trimmed.parse::<u32>() {
return Some((pid, "daemon".to_string()));
}
None
}
fn project_root() -> Result<PathBuf> {
let cwd = std::env::current_dir()?;
mati_root_for(&cwd)
}
fn wall_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
pub(crate) use mati_core::mcp::metadata::KillOutcome as ExitOutcome;
#[derive(Debug)]
enum DaemonState {
Empty,
StaleFiles,
LiveOwnerDaemon { pid: u32 },
LiveOwnerMcp { pid: u32 },
LiveOwnerUnknown {
pid: Option<u32>,
from_metadata: bool,
},
StartingSentinelOnly { pid: u32 },
Unresponsive { pid: u32 },
}
fn live_starting_pid(root: &Path) -> Option<u32> {
let content = std::fs::read_to_string(root.join("mati.starting")).ok()?;
let (_, pid) = parse_sentinel(&content)?;
if mati_core::mcp::metadata::is_pid_alive(pid) {
Some(pid)
} else {
None
}
}
#[cfg(unix)]
fn lsof_owning_pid(sock_path: &Path) -> Option<u32> {
let out = std::process::Command::new("lsof")
.args(["-tU"])
.arg(sock_path)
.stdin(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.output()
.ok()?;
if !out.status.success() {
return None;
}
String::from_utf8_lossy(&out.stdout)
.split_whitespace()
.find_map(|tok| tok.parse::<u32>().ok())
}
#[cfg(not(unix))]
fn lsof_owning_pid(_sock_path: &Path) -> Option<u32> {
None
}
async fn classify_daemon(root: &Path, force: bool) -> DaemonState {
let pid_path = root.join("mati.pid");
let sock_path = root.join("mati.sock");
let starting_path = root.join("mati.starting");
let has_pid = pid_path.exists();
let has_sock = sock_path.exists();
let has_starting = starting_path.exists();
if !has_pid && !has_sock {
if has_starting {
if let Some(pid) = live_starting_pid(root) {
return DaemonState::StartingSentinelOnly { pid };
}
}
return DaemonState::Empty;
}
let pid_info = read_pid_file(root);
match pid_info {
Some((pid, owner)) => {
if !mati_core::mcp::metadata::is_pid_alive(pid) {
return DaemonState::StaleFiles;
}
if owner == "mcp" {
return DaemonState::LiveOwnerMcp { pid };
}
if has_sock {
match daemon_result(root, "ping", serde_json::json!({})).await {
DaemonResult::Ok(_) => DaemonState::LiveOwnerDaemon { pid },
DaemonResult::Unresponsive => DaemonState::Unresponsive { pid },
DaemonResult::NotRunning | DaemonResult::StaleSocket => DaemonState::StaleFiles,
}
} else {
DaemonState::LiveOwnerDaemon { pid }
}
}
None => {
if !has_sock {
return DaemonState::StaleFiles;
}
match daemon_result(root, "ping", serde_json::json!({})).await {
DaemonResult::Ok(_) => {
let meta_pid = mati_core::mcp::metadata::read_metadata(root).map(|m| m.pid);
if meta_pid.is_some() {
return DaemonState::LiveOwnerUnknown {
pid: meta_pid,
from_metadata: true,
};
}
let lsof_pid = if force {
lsof_owning_pid(&sock_path)
} else {
None
};
DaemonState::LiveOwnerUnknown {
pid: lsof_pid,
from_metadata: false,
}
}
DaemonResult::StaleSocket | DaemonResult::NotRunning => DaemonState::StaleFiles,
DaemonResult::Unresponsive => {
let pid = mati_core::mcp::metadata::read_metadata(root).map(|m| m.pid);
match pid {
Some(pid) => DaemonState::Unresponsive { pid },
None => DaemonState::StaleFiles,
}
}
}
}
}
}
pub(crate) use mati_core::mcp::metadata::kill_and_wait;
#[cfg(unix)]
fn send_sigterm_only(pid: u32) -> bool {
let ret = unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
if ret == 0 {
return true;
}
let errno = std::io::Error::last_os_error().raw_os_error();
matches!(errno, Some(libc::ESRCH))
}
#[cfg(not(unix))]
fn send_sigterm_only(_pid: u32) -> bool {
false
}
#[cfg(unix)]
fn send_sigkill_only(pid: u32) -> bool {
let ret = unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
if ret == 0 {
return true;
}
let errno = std::io::Error::last_os_error().raw_os_error();
matches!(errno, Some(libc::ESRCH))
}
#[cfg(not(unix))]
fn send_sigkill_only(_pid: u32) -> bool {
false
}
async fn kill_mati_serve_processes(root: &Path) {
let output = match std::process::Command::new("pgrep")
.arg("-f")
.arg("mati serve")
.output()
{
Ok(o) => o,
Err(e) => {
tracing::warn!(
"kill_mati_serve_processes: pgrep failed: {e} \
(is pgrep installed? skipping --include-mcp cleanup)"
);
eprintln!(
"[mati] warning: pgrep not available; could not locate `mati serve` processes"
);
return;
}
};
if !output.status.success() {
return;
}
let stdout = String::from_utf8_lossy(&output.stdout);
let my_pid = std::process::id();
let mut killed: Vec<u32> = Vec::new();
for line in stdout.lines() {
if let Ok(pid) = line.trim().parse::<u32>() {
if pid == my_pid {
continue;
}
#[cfg(unix)]
{
let ret = unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
if ret == 0 {
killed.push(pid);
} else {
let errno = std::io::Error::last_os_error().raw_os_error();
if !matches!(errno, Some(libc::ESRCH)) {
tracing::warn!(pid, ?errno, "kill_mati_serve_processes: SIGKILL failed");
}
}
}
}
}
if !killed.is_empty() {
let pid_list = killed
.iter()
.map(|p| p.to_string())
.collect::<Vec<_>>()
.join(",");
println!(
"mati daemon: --include-mcp killed {} serve proxy/proxies (pid={pid_list})",
killed.len()
);
mati_core::mcp::metadata::record_lifecycle_event(
root,
"stop_include_mcp",
&format!("killed={} pids={pid_list}", killed.len()),
);
}
}
async fn wait_for_files_removed(root: &Path) -> bool {
const FILE_POLL_BUDGET: Duration = Duration::from_millis(500);
const FILE_POLL_INTERVAL: Duration = Duration::from_millis(20);
let sock = root.join("mati.sock");
let pid = root.join("mati.pid");
let starting = root.join("mati.starting");
let deadline = std::time::Instant::now() + FILE_POLL_BUDGET;
while std::time::Instant::now() < deadline {
if !sock.exists() && !pid.exists() {
let _ = std::fs::remove_file(&starting);
return true;
}
tokio::time::sleep(FILE_POLL_INTERVAL).await;
}
let _ = std::fs::remove_file(&sock);
let _ = std::fs::remove_file(&pid);
let _ = std::fs::remove_file(&starting);
false
}
pub async fn run_daemon_stop(args: DaemonStopArgs) -> Result<()> {
let root = project_root()?;
let timeout = args.timeout_clamped();
let (start_pid, start_owner) = match read_pid_file(&root) {
Some((p, o)) => (Some(p), Some(o)),
None => (None, None),
};
let pid_target = start_pid
.map(|p| p.to_string())
.unwrap_or_else(|| "unknown".to_string());
let owner_str = start_owner.unwrap_or_else(|| "unknown".to_string());
mati_core::mcp::metadata::record_lifecycle_event(
&root,
"stop_start",
&format!(
"pid_target={pid_target} owner={owner_str} force={}",
args.force
),
);
let state = classify_daemon(&root, args.force).await;
match state {
DaemonState::Empty => {
println!("mati daemon: not running");
mati_core::mcp::metadata::record_lifecycle_event(
&root,
"stop_end",
"pid=none reason=noop elapsed_ms=0 signal=none",
);
Ok(())
}
DaemonState::StaleFiles => {
let sock = root.join("mati.sock");
let pid = root.join("mati.pid");
let starting = root.join("mati.starting");
let _ = std::fs::remove_file(&sock);
let _ = std::fs::remove_file(&pid);
let _ = std::fs::remove_file(&starting);
println!("mati daemon: cleaned up stale files (no live process)");
mati_core::mcp::metadata::record_lifecycle_event(
&root,
"stop_end",
"pid=none reason=stale elapsed_ms=0 signal=none",
);
Ok(())
}
DaemonState::LiveOwnerDaemon { pid } => {
kill_flow(&root, pid, "daemon", &args, timeout).await
}
DaemonState::LiveOwnerMcp { pid } => {
if !args.force {
println!(
"mati daemon: refused — owner=mcp, rerun with --force to stop the MCP server"
);
mati_core::mcp::metadata::record_lifecycle_event(
&root,
"stop_end",
&format!("pid={pid} reason=refused elapsed_ms=0 signal=none"),
);
anyhow::bail!(
"refused to stop the active MCP server (pid {pid}); rerun with --force"
);
}
kill_flow(&root, pid, "mcp", &args, timeout).await
}
DaemonState::LiveOwnerUnknown { pid, from_metadata } => {
if !args.force {
println!(
"mati daemon: refused — owner=unknown, rerun with --force to stop the active socket"
);
mati_core::mcp::metadata::record_lifecycle_event(
&root,
"stop_end",
"pid=unknown reason=refused elapsed_ms=0 signal=none",
);
anyhow::bail!("refused to stop a socket with unknown owner; rerun with --force");
}
match pid {
Some(pid) => {
let owner_label = if from_metadata { "unknown" } else { "lsof" };
kill_flow(&root, pid, owner_label, &args, timeout).await
}
None => {
mati_core::mcp::metadata::record_lifecycle_event(
&root,
"stop_end",
"pid=unknown reason=refused elapsed_ms=0 signal=none",
);
anyhow::bail!(
"could not identify the owning process (no metadata, lsof returned nothing); manual intervention required"
);
}
}
}
DaemonState::StartingSentinelOnly { pid } => {
if !args.force {
println!(
"mati daemon: refused — a daemon is starting (pid {pid}), rerun with --force to abort"
);
mati_core::mcp::metadata::record_lifecycle_event(
&root,
"stop_end",
&format!("pid={pid} reason=refused elapsed_ms=0 signal=none"),
);
anyhow::bail!("refused to abort starting daemon (pid {pid}); rerun with --force");
}
kill_flow(&root, pid, "starting", &args, timeout).await
}
DaemonState::Unresponsive { pid } => {
kill_flow(&root, pid, "unresponsive", &args, timeout).await
}
}
}
async fn kill_flow(
root: &Path,
pid: u32,
owner_label: &str,
args: &DaemonStopArgs,
timeout: Duration,
) -> Result<()> {
let pre_session = mati_core::mcp::metadata::read_metadata(root).map(|m| m.session);
if args.no_wait {
let (sent, signal_label) = if args.force {
(send_sigkill_only(pid), "KILL")
} else {
(send_sigterm_only(pid), "TERM")
};
if !sent {
mati_core::mcp::metadata::record_lifecycle_event(
root,
"stop_end",
&format!("pid={pid} reason=signal_failed elapsed_ms=0 signal={signal_label}"),
);
anyhow::bail!("failed to send SIG{signal_label} to pid {pid}");
}
println!("mati daemon: SIG{signal_label} sent (pid {pid}); not waiting");
mati_core::mcp::metadata::record_lifecycle_event(
root,
"stop_end",
&format!("pid={pid} reason=no_wait elapsed_ms=0 signal={signal_label}"),
);
if args.include_mcp {
kill_mati_serve_processes(root).await;
}
return Ok(());
}
let outer_start = std::time::Instant::now();
let outcome = if args.force {
mati_core::mcp::metadata::kill_directly(pid).await
} else {
kill_and_wait(pid, timeout).await
};
match outcome {
ExitOutcome::ExitedClean(elapsed) => {
let elapsed_ms = elapsed.as_millis();
let post_session = mati_core::mcp::metadata::read_metadata(root).map(|m| m.session);
let recycled = matches!((pre_session, post_session), (Some(a), Some(b)) if a != b);
if !recycled {
wait_for_files_removed(root).await;
}
let signal_label = if args.force { "KILL" } else { "TERM" };
println!(
"mati daemon: stopped (pid {pid}, owner={owner_label}, took {elapsed_ms}ms, signal={signal_label})"
);
mati_core::mcp::metadata::record_lifecycle_event(
root,
"stop_end",
&format!(
"pid={pid} reason=clean_exit elapsed_ms={elapsed_ms} signal={signal_label}"
),
);
if args.include_mcp {
kill_mati_serve_processes(root).await;
}
Ok(())
}
ExitOutcome::KilledHard(elapsed) => {
let elapsed_ms = elapsed.as_millis();
let post_session = mati_core::mcp::metadata::read_metadata(root).map(|m| m.session);
let recycled = matches!((pre_session, post_session), (Some(a), Some(b)) if a != b);
if !recycled {
wait_for_files_removed(root).await;
}
eprintln!(
"[mati] WARNING: daemon (pid {pid}) did not respond to SIGTERM; killed with SIGKILL"
);
println!(
"mati daemon: force-killed (pid {pid}, owner={owner_label}, took {elapsed_ms}ms, signal=KILL)"
);
mati_core::mcp::metadata::record_lifecycle_event(
root,
"stop_end",
&format!("pid={pid} reason=hard_kill elapsed_ms={elapsed_ms} signal=KILL"),
);
if args.include_mcp {
kill_mati_serve_processes(root).await;
}
Ok(())
}
ExitOutcome::Stuck(diag) => {
let elapsed_ms = outer_start.elapsed().as_millis();
let initial = diag.initial_snapshot.render();
let final_snap = diag.final_snapshot.render();
let sigterm_part = diag
.sigterm_elapsed_ms
.map(|ms| format!(" sigterm_ms={ms}"))
.unwrap_or_default();
let detail = format!(
"pid={pid} reason=stuck elapsed_ms={elapsed_ms} signal=KILL{sigterm_part} \
sigkill_ms={} initial={{{initial}}} final={{{final_snap}}}",
diag.sigkill_elapsed_ms,
);
mati_core::mcp::metadata::record_lifecycle_event(root, "stop_end", &detail);
eprintln!(
"[mati] daemon-stop Stuck diagnostic — \
pid={pid} total_ms={} sigterm_ms={:?} sigkill_ms={}",
diag.total_elapsed_ms, diag.sigterm_elapsed_ms, diag.sigkill_elapsed_ms
);
eprintln!("[mati] initial snapshot: {initial}");
eprintln!("[mati] final snapshot: {final_snap}");
anyhow::bail!(
"mati daemon: failed (pid {pid}) — process did not exit even after SIGKILL; \
manual intervention required (see lifecycle.log and stderr for diagnostic)"
);
}
}
}
pub async fn run_daemon_status() -> Result<()> {
let root = project_root()?;
let sock_path = root.join("mati.sock");
if !sock_path.exists() {
println!("mati daemon is not running (no socket)");
return Ok(());
}
let pid_info = read_pid_file(&root);
match daemon_result(&root, "ping", serde_json::json!({})).await {
DaemonResult::Ok(resp) if resp.get("ok") == Some(&serde_json::Value::Bool(true)) => {
if let Some((pid, owner)) = &pid_info {
println!("mati daemon is running (pid {pid})");
println!(" owner: {owner}");
} else {
println!("mati daemon is running (pid unknown — PID file absent)");
println!(" owner: likely mcp (socket created by older binary without PID file)");
println!(
" note: mati daemon stop will refuse to close a live unknown-owner socket"
);
println!(" to stop: close the Claude Code session that uses mati");
}
println!(" socket: {}", sock_path.display());
println!(
" protocol version: {}",
resp.get("v")
.and_then(|v| v.as_u64())
.map(|v| v.to_string())
.unwrap_or_else(|| "unknown".to_string())
);
}
DaemonResult::Unresponsive => {
println!("mati daemon socket exists but is not responding");
println!(" socket: {}", sock_path.display());
if let Some((pid, owner)) = &pid_info {
println!(" pid: {pid} (alive)");
println!(" owner: {owner}");
}
println!(" run `mati daemon stop` to clean up");
}
DaemonResult::StaleSocket => {
println!("mati daemon: stale socket cleaned up");
}
_ => {
println!("mati daemon is not running");
if let Some((pid, _)) = &pid_info {
println!(" stale pid: {pid}");
}
println!(" run `mati daemon stop` to clean up");
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn daemon_result_not_running_without_socket() {
let tmp = tempfile::tempdir().unwrap();
let result = daemon_result(tmp.path(), "ping", serde_json::json!({})).await;
assert!(matches!(result, DaemonResult::NotRunning));
}
#[tokio::test]
async fn daemon_get_returns_none_without_socket() {
let tmp = tempfile::tempdir().unwrap();
let result = daemon_get(tmp.path(), "file:src/main.rs").await;
assert!(result.is_none());
}
#[test]
fn parse_sentinel_roundtrip() {
let s = format_sentinel(1234567890, 42);
let (ts, pid) = parse_sentinel(&s).unwrap();
assert_eq!(ts, 1234567890);
assert_eq!(pid, 42);
}
#[test]
fn parse_sentinel_legacy_format_returns_none() {
assert!(parse_sentinel("1234567890").is_none());
}
#[test]
fn check_starting_peer_active_absent_sentinel_returns_false() {
let dir = tempfile::tempdir().unwrap();
assert!(!check_starting_peer_active(dir.path()));
}
#[test]
fn check_starting_peer_active_dead_pid_returns_false_and_cleans_up() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("mati.starting");
std::fs::write(&path, format_sentinel(wall_secs(), 4_000_000)).unwrap();
assert!(!check_starting_peer_active(dir.path()));
assert!(
!path.exists(),
"stale sentinel must be cleaned up so concurrent stale-cleanup paths don't race"
);
}
#[test]
fn check_starting_peer_active_alive_pid_returns_true() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("mati.starting");
let peer_pid = 1u32;
if std::process::id() == peer_pid {
return;
}
std::fs::write(&path, format_sentinel(wall_secs(), peer_pid)).unwrap();
assert!(
check_starting_peer_active(dir.path()),
"alive peer PID must be classified as active starting peer"
);
assert!(path.exists(), "active sentinel must NOT be removed");
}
#[test]
fn check_starting_peer_active_self_pid_returns_false() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("mati.starting");
std::fs::write(&path, format_sentinel(wall_secs(), std::process::id())).unwrap();
assert!(
!check_starting_peer_active(dir.path()),
"sentinel for our own PID must not block our own restart"
);
assert!(!path.exists(), "self-pid sentinel must be removed");
}
#[test]
fn check_starting_peer_active_legacy_recent_timestamp_returns_true() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("mati.starting");
std::fs::write(&path, format!("{}\n", wall_secs())).unwrap();
assert!(check_starting_peer_active(dir.path()));
}
#[test]
fn check_starting_peer_active_legacy_old_timestamp_returns_false() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("mati.starting");
let stale_ts = wall_secs().saturating_sub(STARTING_STALE_SECS + 60);
std::fs::write(&path, format!("{stale_ts}\n")).unwrap();
assert!(!check_starting_peer_active(dir.path()));
assert!(!path.exists(), "stale legacy sentinel must be removed");
}
#[test]
fn check_starting_peer_active_garbage_content_returns_false() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("mati.starting");
std::fs::write(&path, "not a real sentinel ~~").unwrap();
assert!(!check_starting_peer_active(dir.path()));
assert!(!path.exists());
}
#[cfg(unix)]
#[tokio::test]
async fn kill_and_wait_returns_exited_clean_on_sigterm_responsive_process() {
let mut child = tokio::process::Command::new("sleep")
.arg("60")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.kill_on_drop(true)
.spawn()
.expect("spawn sleep");
let pid = child.id().expect("child pid available pre-wait");
assert!(
mati_core::mcp::metadata::is_pid_alive(pid),
"spawned sleep should be alive"
);
let reaper = tokio::spawn(async move { child.wait().await });
let start = std::time::Instant::now();
let outcome = kill_and_wait(pid, Duration::from_secs(7)).await;
let elapsed = start.elapsed();
let _ = reaper.await;
assert!(
matches!(outcome, ExitOutcome::ExitedClean(_)),
"expected ExitedClean, got {outcome:?}"
);
assert!(
!mati_core::mcp::metadata::is_pid_alive(pid),
"after kill_and_wait returns ExitedClean, the PID must be gone — the SurrealKV flock guarantee depends on this"
);
assert!(
elapsed < Duration::from_secs(2),
"sleep exits cleanly on SIGTERM in well under 1s — kill_and_wait took {elapsed:?}, suggesting the poll loop is broken"
);
}
#[cfg(unix)]
#[tokio::test]
async fn kill_and_wait_escalates_to_sigkill_on_uncooperative_process() {
let mut child = tokio::process::Command::new("sh")
.arg("-c")
.arg("trap '' TERM; sleep 60")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.kill_on_drop(true)
.spawn()
.expect("spawn trap sh");
let pid = child.id().expect("child pid available pre-wait");
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
mati_core::mcp::metadata::is_pid_alive(pid),
"spawned trap sh should be alive"
);
let reaper = tokio::spawn(async move { child.wait().await });
let budget = Duration::from_secs(2);
let start = std::time::Instant::now();
let outcome = kill_and_wait(pid, budget).await;
let elapsed = start.elapsed();
let _ = reaper.await;
assert!(
matches!(outcome, ExitOutcome::KilledHard(_)),
"expected KilledHard, got {outcome:?}"
);
assert!(
!mati_core::mcp::metadata::is_pid_alive(pid),
"after SIGKILL, the PID must be gone — process is still alive"
);
assert!(
elapsed >= budget,
"SIGKILL escalation must wait the full SIGTERM budget ({budget:?}); took only {elapsed:?}"
);
assert!(
elapsed < budget + Duration::from_secs(3),
"SIGKILL should have reaped within ~500ms of escalation; took {elapsed:?}"
);
}
}