use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use super::server::{proxy_daemon_result_no_spawn, ProxyDaemonResult};
pub(crate) const READINESS_HARD_CAP: Duration = Duration::from_secs(60);
pub(crate) const READINESS_WEDGE_THRESHOLD: Duration = Duration::from_secs(15);
const READINESS_POLL_INTERVAL: Duration = Duration::from_millis(50);
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum ReadinessOutcome {
Ready,
Failed(String),
Wedged { last_phase: String, since: Duration },
HardCap { last_phase: String },
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct LifecycleEvent {
#[allow(dead_code)] ts: u64,
#[allow(dead_code)]
pid: u32,
event: String,
detail: String,
}
impl LifecycleEvent {
fn phase(&self) -> Option<&str> {
self.detail
.split(' ')
.find_map(|tok| tok.strip_prefix("phase="))
}
}
fn parse_lifecycle_line(line: &str) -> Option<LifecycleEvent> {
let mut parts = line.splitn(4, '\t');
let ts: u64 = parts.next()?.parse().ok()?;
let pid: u32 = parts.next()?.parse().ok()?;
let event = parts.next()?.to_string();
let detail = parts.next().unwrap_or("").to_string();
Some(LifecycleEvent {
ts,
pid,
event,
detail,
})
}
struct LifecycleTail {
path: PathBuf,
offset: u64,
}
impl LifecycleTail {
fn opened_at_end(mati_root: &Path) -> Self {
let path = mati_root.join("lifecycle.log");
let offset = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
Self { path, offset }
}
fn poll(&mut self) -> Vec<LifecycleEvent> {
let Ok(mut file) = std::fs::File::open(&self.path) else {
return Vec::new();
};
let len = file.metadata().map(|m| m.len()).unwrap_or(0);
if len < self.offset {
self.offset = len;
return Vec::new();
}
if len == self.offset {
return Vec::new();
}
if file.seek(SeekFrom::Start(self.offset)).is_err() {
return Vec::new();
}
let to_read = len - self.offset;
let mut buf = Vec::with_capacity(to_read as usize);
if (&mut file).take(to_read).read_to_end(&mut buf).is_err() {
return Vec::new();
}
let s = String::from_utf8_lossy(&buf);
let mut events = Vec::new();
let mut consumed = 0usize;
for line in s.split_inclusive('\n') {
if !line.ends_with('\n') {
break;
}
consumed += line.len();
let stripped = line.strip_suffix('\n').unwrap_or(line);
if let Some(ev) = parse_lifecycle_line(stripped) {
events.push(ev);
}
}
self.offset += consumed as u64;
events
}
}
pub(crate) async fn wait_for_ready(
mati_root: &Path,
hard_cap: Duration,
wedge_threshold: Duration,
) -> ReadinessOutcome {
let started_at = Instant::now();
let mut last_progress_at = started_at;
let mut tail = LifecycleTail::opened_at_end(mati_root);
let mut last_phase = String::from("spawned");
loop {
if started_at.elapsed() >= hard_cap {
return ReadinessOutcome::HardCap { last_phase };
}
tokio::time::sleep(READINESS_POLL_INTERVAL).await;
let new_events = tail.poll();
if !new_events.is_empty() {
last_progress_at = Instant::now();
for ev in &new_events {
if ev.event == "serve_failed" || ev.event == "panic" {
return ReadinessOutcome::Failed(ev.detail.clone());
}
if let Some(p) = ev.phase() {
last_phase = p.to_string();
}
if ev.event == "startup" && ev.phase() == Some("ready") {
let ping_ok = matches!(
proxy_daemon_result_no_spawn(mati_root, "ping", &serde_json::json!({}))
.await,
ProxyDaemonResult::Ok(_)
);
if ping_ok {
return ReadinessOutcome::Ready;
}
}
}
}
if matches!(
proxy_daemon_result_no_spawn(mati_root, "ping", &serde_json::json!({})).await,
ProxyDaemonResult::Ok(_)
) {
return ReadinessOutcome::Ready;
}
if last_progress_at.elapsed() >= wedge_threshold {
return ReadinessOutcome::Wedged {
last_phase,
since: last_progress_at.elapsed(),
};
}
}
}
pub async fn ensure_daemon(mati_root: &Path) -> bool {
match proxy_daemon_result_no_spawn(mati_root, "ping", &serde_json::json!({})).await {
ProxyDaemonResult::Ok(_) => return true,
ProxyDaemonResult::NotRunning | ProxyDaemonResult::StaleSocket => {}
ProxyDaemonResult::Unresponsive => {
tokio::time::sleep(Duration::from_millis(300)).await;
match proxy_daemon_result_no_spawn(mati_root, "ping", &serde_json::json!({})).await {
ProxyDaemonResult::Ok(_) => return true,
ProxyDaemonResult::NotRunning | ProxyDaemonResult::StaleSocket => {
}
ProxyDaemonResult::Unresponsive => {
let stale_pid = super::metadata::read_metadata(mati_root).map(|m| m.pid);
if let Some(pid) = stale_pid {
let _ = super::metadata::kill_and_wait(pid, Duration::from_secs(2)).await;
}
let _ = std::fs::remove_file(super::metadata::socket_path(mati_root));
let _ = std::fs::remove_file(mati_root.join("mati.pid"));
}
}
}
}
let starting = mati_root.join("mati.starting");
if starting.exists() {
if let Ok(meta) = starting.metadata() {
if let Ok(modified) = meta.modified() {
if modified.elapsed().unwrap_or_default() < Duration::from_secs(5) {
if let ReadinessOutcome::Ready =
wait_for_ready(mati_root, READINESS_HARD_CAP, READINESS_WEDGE_THRESHOLD)
.await
{
return true;
}
}
}
}
}
if cfg!(test) || std::env::var_os("MATI_DISABLE_AUTO_SPAWN").is_some() {
return false;
}
let exe = match std::env::current_exe() {
Ok(e) => e,
Err(_) => return false,
};
let stderr_target = dirs::home_dir()
.map(|h| h.join(".mati").join("daemon_start.log"))
.and_then(|p| {
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&p)
.ok()
})
.map(std::process::Stdio::from)
.unwrap_or_else(std::process::Stdio::null);
let _ = std::process::Command::new(&exe)
.args(["daemon", "start"])
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(stderr_target)
.spawn();
match wait_for_ready(mati_root, READINESS_HARD_CAP, READINESS_WEDGE_THRESHOLD).await {
ReadinessOutcome::Ready => true,
ReadinessOutcome::Failed(_)
| ReadinessOutcome::Wedged { .. }
| ReadinessOutcome::HardCap { .. } => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mcp::metadata::{publish_metadata, DaemonMetadata, DaemonOwner};
use crate::mcp::server::proxy_daemon_result;
#[tokio::test]
async fn ensure_daemon_returns_true_when_daemon_already_running() {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixListener;
let dir = tempfile::TempDir::new().unwrap();
let root = dir.path().to_path_buf();
let mut meta = DaemonMetadata::new(DaemonOwner::Daemon);
meta.pid = std::process::id();
publish_metadata(&root, &meta).unwrap();
let session = meta.session;
let sock_path = root.join("mati.sock");
let listener = UnixListener::bind(&sock_path).unwrap();
let server_handle = tokio::spawn(async move {
if let Ok((stream, _)) = listener.accept().await {
let (reader, mut writer) = stream.into_split();
let mut br = BufReader::new(reader);
let mut line = String::new();
let _ = br.read_line(&mut line).await;
let resp = serde_json::json!({
"v": 2,
"id": uuid::Uuid::new_v4(),
"session": session,
"status": "ok",
"data": { "pong": true }
});
let mut bytes = serde_json::to_vec(&resp).unwrap();
bytes.push(b'\n');
let _ = writer.write_all(&bytes).await;
let _ = writer.shutdown().await;
}
});
std::env::set_var("MATI_DISABLE_AUTO_SPAWN", "1");
let result = ensure_daemon(&root).await;
std::env::remove_var("MATI_DISABLE_AUTO_SPAWN");
let _ = server_handle.await;
assert!(result, "ensure_daemon must return true when ping succeeds");
}
#[tokio::test]
async fn ensure_daemon_returns_false_when_spawn_disabled_and_no_daemon() {
let dir = tempfile::TempDir::new().unwrap();
std::env::set_var("MATI_DISABLE_AUTO_SPAWN", "1");
let result = ensure_daemon(dir.path()).await;
std::env::remove_var("MATI_DISABLE_AUTO_SPAWN");
assert!(
!result,
"ensure_daemon must return false when no daemon is running and spawn is disabled"
);
}
#[tokio::test]
async fn proxy_daemon_result_invokes_ensure_daemon_on_persistent_notrunning() {
let dir = tempfile::TempDir::new().unwrap();
std::env::set_var("MATI_DISABLE_AUTO_SPAWN", "1");
let result = proxy_daemon_result(dir.path(), "ping", serde_json::json!({})).await;
std::env::remove_var("MATI_DISABLE_AUTO_SPAWN");
assert!(
matches!(result, ProxyDaemonResult::NotRunning),
"proxy_daemon_result must return NotRunning when daemon absent and spawn disabled, got {result:?}"
);
}
#[test]
fn parse_lifecycle_line_extracts_all_fields() {
let line = "1234567890\t42\tstartup\tphase=ready elapsed_ms=120";
let ev = parse_lifecycle_line(line).expect("must parse");
assert_eq!(ev.ts, 1234567890);
assert_eq!(ev.pid, 42);
assert_eq!(ev.event, "startup");
assert_eq!(ev.detail, "phase=ready elapsed_ms=120");
assert_eq!(ev.phase(), Some("ready"));
}
#[test]
fn parse_lifecycle_line_tolerates_empty_detail() {
let line = "1\t2\tserve_start\t";
let ev = parse_lifecycle_line(line).expect("must parse with empty detail");
assert_eq!(ev.event, "serve_start");
assert_eq!(ev.detail, "");
assert_eq!(ev.phase(), None);
}
#[test]
fn parse_lifecycle_line_returns_none_on_malformed() {
assert!(parse_lifecycle_line("garbage with no tabs").is_none());
assert!(parse_lifecycle_line("not-a-number\t42\tevent\tdetail").is_none());
assert!(parse_lifecycle_line("123\tnot-a-pid\tevent\tdetail").is_none());
assert!(parse_lifecycle_line("").is_none());
}
#[test]
fn phase_extracts_value_from_complex_detail() {
let ev = LifecycleEvent {
ts: 0,
pid: 0,
event: "migration".into(),
detail: "phase=apply_complete version=2 records_migrated=14 elapsed_ms=820".into(),
};
assert_eq!(ev.phase(), Some("apply_complete"));
}
#[test]
fn phase_returns_none_when_no_phase_token() {
let ev = LifecycleEvent {
ts: 0,
pid: 0,
event: "serve_start".into(),
detail: "pid=123 owner=daemon".into(),
};
assert_eq!(ev.phase(), None);
}
fn write_event(root: &Path, event: &str, detail: &str) {
use std::io::Write;
let path = root.join("lifecycle.log");
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.expect("open lifecycle.log");
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let pid = std::process::id();
writeln!(f, "{ts}\t{pid}\t{event}\t{detail}").expect("write event");
}
#[test]
fn tail_opened_at_end_skips_pre_existing_events() {
let dir = tempfile::TempDir::new().unwrap();
write_event(dir.path(), "serve_start", "old=event");
write_event(dir.path(), "serve_shutdown", "old=event");
let mut tail = LifecycleTail::opened_at_end(dir.path());
let evs = tail.poll();
assert!(
evs.is_empty(),
"pre-existing events must not be replayed, got {evs:?}"
);
}
#[test]
fn tail_picks_up_events_appended_after_open() {
let dir = tempfile::TempDir::new().unwrap();
write_event(dir.path(), "serve_start", "older=event");
let mut tail = LifecycleTail::opened_at_end(dir.path());
write_event(dir.path(), "startup", "phase=opening_store");
write_event(dir.path(), "startup", "phase=ready elapsed_ms=210");
let evs = tail.poll();
assert_eq!(evs.len(), 2, "must see both new events, got {evs:?}");
assert_eq!(evs[0].event, "startup");
assert_eq!(evs[0].phase(), Some("opening_store"));
assert_eq!(evs[1].phase(), Some("ready"));
}
#[test]
fn tail_handles_missing_file_gracefully() {
let dir = tempfile::TempDir::new().unwrap();
let mut tail = LifecycleTail::opened_at_end(dir.path());
assert!(tail.poll().is_empty());
write_event(dir.path(), "startup", "phase=ready elapsed_ms=42");
let evs = tail.poll();
assert_eq!(evs.len(), 1);
assert_eq!(evs[0].phase(), Some("ready"));
}
#[test]
fn tail_resets_offset_on_truncation() {
let dir = tempfile::TempDir::new().unwrap();
write_event(dir.path(), "startup", "phase=opening_store");
write_event(dir.path(), "startup", "phase=ready elapsed_ms=100");
let mut tail = LifecycleTail::opened_at_end(dir.path());
std::fs::write(dir.path().join("lifecycle.log"), b"").unwrap();
let evs = tail.poll();
assert!(
evs.is_empty(),
"first poll after truncation must yield no events"
);
write_event(dir.path(), "startup", "phase=ready elapsed_ms=55");
let evs = tail.poll();
assert_eq!(evs.len(), 1, "must see new events after truncation reset");
}
async fn spawn_ping_responder(root: &Path) -> tokio::task::JoinHandle<()> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixListener;
let mut meta = DaemonMetadata::new(DaemonOwner::Daemon);
meta.pid = std::process::id();
publish_metadata(root, &meta).unwrap();
let session = meta.session;
let sock_path = root.join("mati.sock");
let _ = std::fs::remove_file(&sock_path);
let listener = UnixListener::bind(&sock_path).expect("bind responder");
tokio::spawn(async move {
loop {
let Ok((stream, _)) = listener.accept().await else {
return;
};
let session = session;
tokio::spawn(async move {
let (reader, mut writer) = stream.into_split();
let mut br = BufReader::new(reader);
let mut line = String::new();
let _ = br.read_line(&mut line).await;
let resp = serde_json::json!({
"v": 2,
"id": uuid::Uuid::new_v4(),
"session": session,
"status": "ok",
"data": { "pong": true }
});
let mut bytes = serde_json::to_vec(&resp).unwrap();
bytes.push(b'\n');
let _ = writer.write_all(&bytes).await;
let _ = writer.shutdown().await;
});
}
})
}
#[tokio::test]
async fn wait_for_ready_returns_ready_when_startup_phase_ready_lands_and_ping_ok() {
let dir = tempfile::TempDir::new().unwrap();
let root = dir.path().to_path_buf();
let responder = spawn_ping_responder(&root).await;
let emitter_root = root.clone();
let emitter = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(30)).await;
write_event(&emitter_root, "startup", "phase=opening_store");
tokio::time::sleep(Duration::from_millis(30)).await;
write_event(&emitter_root, "startup", "phase=store_opened elapsed_ms=20");
tokio::time::sleep(Duration::from_millis(30)).await;
write_event(&emitter_root, "startup", "phase=ready elapsed_ms=90");
});
let start = Instant::now();
let outcome = wait_for_ready(&root, Duration::from_secs(5), Duration::from_secs(2)).await;
let elapsed = start.elapsed();
let _ = emitter.await;
responder.abort();
assert_eq!(outcome, ReadinessOutcome::Ready);
assert!(
elapsed < Duration::from_secs(1),
"happy-path readiness must complete in <1s, took {elapsed:?}"
);
}
#[tokio::test]
async fn wait_for_ready_returns_failed_immediately_on_serve_failed_event() {
let dir = tempfile::TempDir::new().unwrap();
let root = dir.path().to_path_buf();
let emitter_root = root.clone();
let emitter = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(30)).await;
write_event(&emitter_root, "startup", "phase=opening_store");
tokio::time::sleep(Duration::from_millis(30)).await;
write_event(&emitter_root, "serve_failed", "store open: lock contention");
});
let start = Instant::now();
let outcome = wait_for_ready(&root, Duration::from_secs(10), Duration::from_secs(5)).await;
let elapsed = start.elapsed();
let _ = emitter.await;
match outcome {
ReadinessOutcome::Failed(detail) => {
assert!(
detail.contains("lock contention"),
"failure detail must surface the daemon's reason, got {detail:?}"
);
}
other => panic!("expected Failed, got {other:?}"),
}
assert!(
elapsed < Duration::from_secs(2),
"Failed must surface quickly, took {elapsed:?}"
);
}
#[tokio::test]
async fn wait_for_ready_returns_failed_immediately_on_panic_event() {
let dir = tempfile::TempDir::new().unwrap();
let root = dir.path().to_path_buf();
let emitter_root = root.clone();
let emitter = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(30)).await;
write_event(&emitter_root, "startup", "phase=opening_store");
tokio::time::sleep(Duration::from_millis(30)).await;
write_event(
&emitter_root,
"panic",
"src/store/db.rs:1119 called `Option::unwrap()` on a `None` value",
);
});
let start = Instant::now();
let outcome = wait_for_ready(&root, Duration::from_secs(10), Duration::from_secs(5)).await;
let elapsed = start.elapsed();
let _ = emitter.await;
match outcome {
ReadinessOutcome::Failed(detail) => {
assert!(
detail.contains("db.rs:1119"),
"failure detail must surface the panic location, got {detail:?}"
);
}
other => panic!("expected Failed on panic event, got {other:?}"),
}
assert!(
elapsed < Duration::from_secs(2),
"a startup panic must surface as Failed quickly, took {elapsed:?}"
);
}
#[tokio::test]
async fn wait_for_ready_returns_wedged_when_no_events_change_within_threshold() {
let dir = tempfile::TempDir::new().unwrap();
let root = dir.path().to_path_buf();
let emitter_root = root.clone();
let emitter = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(30)).await;
write_event(&emitter_root, "migration", "phase=apply_begin version=2");
});
let start = Instant::now();
let outcome = wait_for_ready(
&root,
Duration::from_secs(10), Duration::from_millis(200), )
.await;
let elapsed = start.elapsed();
let _ = emitter.await;
match outcome {
ReadinessOutcome::Wedged { last_phase, since } => {
assert_eq!(last_phase, "apply_begin");
assert!(
since >= Duration::from_millis(200),
"wedge `since` must be at least the threshold, got {since:?}"
);
}
other => panic!("expected Wedged, got {other:?}"),
}
assert!(
elapsed < Duration::from_secs(2),
"wedge detection must fire near the wedge threshold, took {elapsed:?}"
);
}
#[tokio::test]
async fn wait_for_ready_returns_hard_cap_when_progress_never_signals_ready() {
let dir = tempfile::TempDir::new().unwrap();
let root = dir.path().to_path_buf();
let emitter_root = root.clone();
let emitter = tokio::spawn(async move {
for i in 0..50 {
tokio::time::sleep(Duration::from_millis(30)).await;
write_event(
&emitter_root,
"migration",
&format!("phase=apply_progress version=2 records_seen={i}"),
);
}
});
let start = Instant::now();
let outcome = wait_for_ready(
&root,
Duration::from_millis(250), Duration::from_secs(10), )
.await;
let elapsed = start.elapsed();
emitter.abort();
let _ = emitter.await;
match outcome {
ReadinessOutcome::HardCap { last_phase } => {
assert_eq!(
last_phase, "apply_progress",
"hard-cap outcome must report the most recent phase"
);
}
other => panic!("expected HardCap, got {other:?}"),
}
assert!(
elapsed >= Duration::from_millis(250),
"hard-cap must elapse before firing, got {elapsed:?}"
);
assert!(
elapsed < Duration::from_secs(1),
"hard-cap must fire near the cap, took {elapsed:?}"
);
}
#[tokio::test]
async fn wait_for_ready_accepts_ping_ok_without_any_lifecycle_events() {
let dir = tempfile::TempDir::new().unwrap();
let root = dir.path().to_path_buf();
let responder = spawn_ping_responder(&root).await;
let start = Instant::now();
let outcome = wait_for_ready(&root, Duration::from_secs(5), Duration::from_secs(2)).await;
let elapsed = start.elapsed();
responder.abort();
assert_eq!(outcome, ReadinessOutcome::Ready);
assert!(
elapsed < Duration::from_millis(500),
"ping-fallback readiness must complete near-instantly, took {elapsed:?}"
);
}
#[tokio::test]
async fn wait_for_ready_tolerates_corrupted_lifecycle_lines() {
let dir = tempfile::TempDir::new().unwrap();
let root = dir.path().to_path_buf();
use std::io::Write;
let path = root.join("lifecycle.log");
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.unwrap();
writeln!(f, "definitely not a real lifecycle line").unwrap();
writeln!(f, "\t\t\t").unwrap();
writeln!(f, "abc\tdef\tghi\tjkl").unwrap();
let start = Instant::now();
let outcome =
wait_for_ready(&root, Duration::from_secs(5), Duration::from_millis(200)).await;
let elapsed = start.elapsed();
assert!(
matches!(outcome, ReadinessOutcome::Wedged { .. }),
"corrupted events must surface as Wedged, got {outcome:?}"
);
assert!(
elapsed < Duration::from_secs(2),
"wedge under garbage must fire near threshold, took {elapsed:?}"
);
}
}