use std::path::Path;
use std::time::Duration;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use sysinfo::{ProcessRefreshKind, RefreshKind};
const CLAIM_MAX_AGE: Duration = Duration::from_secs(120);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DaemonAction {
Spawned { port: u16 },
Joined { port: u16 },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(transparent)]
pub struct DaemonPid(u32);
impl DaemonPid {
pub fn new(pid: u32) -> Option<Self> {
if pid == 0 { None } else { Some(Self(pid)) }
}
pub fn get(self) -> u32 {
self.0
}
}
impl<'de> Deserialize<'de> for DaemonPid {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let pid = u32::deserialize(deserializer)?;
DaemonPid::new(pid).ok_or_else(|| serde::de::Error::custom("daemon_pid must be non-zero"))
}
}
#[derive(Clone, PartialEq, Eq, Serialize)]
#[serde(transparent)]
pub struct AuthToken(String);
impl AuthToken {
pub fn new(token: impl Into<String>) -> Option<Self> {
let token = token.into();
if token.is_empty() {
None
} else {
Some(Self(token))
}
}
pub fn as_str(&self) -> &str {
&self.0
}
pub fn into_string(self) -> String {
self.0
}
}
impl std::fmt::Debug for AuthToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("AuthToken(<redacted>)")
}
}
impl<'de> Deserialize<'de> for AuthToken {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let token = String::deserialize(deserializer)?;
AuthToken::new(token)
.ok_or_else(|| serde::de::Error::custom("auth_token must be non-empty"))
}
}
fn now_unix_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum WebState {
Claiming {
claimer_pid: u32,
port: u16,
since: u64,
},
Ready {
daemon_pid: DaemonPid,
port: u16,
auth_token: AuthToken,
clients: Vec<u32>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReadyDaemon {
pub daemon_pid: DaemonPid,
pub port: u16,
pub auth_token: AuthToken,
}
impl ReadyDaemon {
pub fn base_url(&self) -> String {
format!("http://127.0.0.1:{}", self.port)
}
}
fn state_path(data_dir: &Path) -> std::path::PathBuf {
data_dir.join("web.state")
}
pub fn read_state(data_dir: &Path) -> Result<Option<WebState>> {
let path = state_path(data_dir);
match std::fs::read_to_string(&path) {
Ok(contents) => match serde_json::from_str(&contents) {
Ok(state) => Ok(Some(state)),
Err(e) => {
tracing::warn!(
"unparseable daemon state file {} (treating as vacant): {e}",
path.display()
);
Ok(None)
}
},
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e.into()),
}
}
fn write_state(data_dir: &Path, state: &WebState) -> Result<()> {
use anyhow::Context;
let path = state_path(data_dir);
let tmp_path = data_dir.join(format!(".web.state.{}.tmp", std::process::id()));
let json = serde_json::to_string_pretty(state).context("failed to serialize daemon state")?;
std::fs::write(&tmp_path, json.as_bytes())
.with_context(|| format!("failed to write temp state: {}", tmp_path.display()))?;
let result = std::fs::rename(&tmp_path, &path);
if result.is_err() {
let _ = std::fs::remove_file(&tmp_path);
}
result.with_context(|| {
format!(
"failed to rename {} to {}",
tmp_path.display(),
path.display()
)
})?;
Ok(())
}
fn remove_state(data_dir: &Path) -> Result<()> {
let path = state_path(data_dir);
match std::fs::remove_file(&path) {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(e.into()),
}
}
fn lock_path(data_dir: &Path) -> std::path::PathBuf {
data_dir.join(".web.state.lock")
}
pub fn update_state<F>(data_dir: &Path, f: F) -> Result<Option<WebState>>
where
F: FnOnce(Option<WebState>) -> Option<WebState>,
{
use anyhow::Context;
use fs2::FileExt;
let lock_file_path = lock_path(data_dir);
let lock_file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&lock_file_path)
.with_context(|| format!("failed to open lock file: {}", lock_file_path.display()))?;
lock_file
.lock_exclusive()
.context("failed to acquire exclusive lock")?;
let current = read_state(data_dir)?;
let new_state = f(current);
match &new_state {
Some(state) => write_state(data_dir, state)?,
None => remove_state(data_dir)?,
}
Ok(new_state)
}
pub trait PidProbe: Send + Sync {
fn is_alive(&self, pid: u32) -> bool;
}
pub struct SysinfoProbe;
impl PidProbe for SysinfoProbe {
fn is_alive(&self, pid: u32) -> bool {
is_pid_alive(pid)
}
}
pub fn is_pid_alive(pid: u32) -> bool {
let s = sysinfo::System::new_with_specifics(
RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing()),
);
s.process(sysinfo::Pid::from_u32(pid)).is_some()
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Discovery {
Ready(ReadyDaemon),
Claiming {
claimer_pid: u32,
port: u16,
age: Duration,
},
Vacant,
}
fn classify(state: &WebState, probe: &dyn PidProbe, now: u64) -> (Discovery, bool) {
match state {
WebState::Ready {
daemon_pid,
port,
auth_token,
..
} => {
if probe.is_alive(daemon_pid.get()) {
(
Discovery::Ready(ReadyDaemon {
daemon_pid: *daemon_pid,
port: *port,
auth_token: auth_token.clone(),
}),
false,
)
} else {
(Discovery::Vacant, true)
}
}
WebState::Claiming {
claimer_pid,
port,
since,
} => {
let age = Duration::from_secs(now.saturating_sub(*since));
let claimer_dead = !probe.is_alive(*claimer_pid);
if claimer_dead || age >= CLAIM_MAX_AGE {
(Discovery::Vacant, true)
} else {
(
Discovery::Claiming {
claimer_pid: *claimer_pid,
port: *port,
age,
},
false,
)
}
}
}
}
pub fn discover(data_dir: &Path, probe: &dyn PidProbe) -> Result<Discovery> {
let now = now_unix_secs();
let mut decision = Discovery::Vacant;
update_state(data_dir, |state| match state {
None => None,
Some(state) => {
let (disc, reap) = classify(&state, probe, now);
decision = disc;
if reap { None } else { Some(state) }
}
})?;
Ok(decision)
}
pub enum ClaimOutcome<'a> {
Claimed(DaemonClaim<'a>),
AlreadyReady(ReadyDaemon),
AlreadyClaiming { claimer_pid: u32, age: Duration },
}
pub struct DaemonClaim<'a> {
data_dir: &'a Path,
claimer_pid: u32,
port: u16,
}
impl<'a> DaemonClaim<'a> {
pub fn publish(self, daemon_pid: DaemonPid, auth_token: AuthToken) -> Result<ReadyDaemon> {
let claimer_pid = self.claimer_pid;
let port = self.port;
let ready = ReadyDaemon {
daemon_pid,
port,
auth_token: auth_token.clone(),
};
let mut ok = false;
update_state(self.data_dir, |state| {
match &state {
Some(WebState::Claiming {
claimer_pid: cur, ..
}) if *cur == claimer_pid => {
ok = true;
Some(WebState::Ready {
daemon_pid,
port,
auth_token: auth_token.clone(),
clients: vec![],
})
}
other => other.clone(),
}
})?;
if !ok {
anyhow::bail!("cannot publish: claim no longer held by this process");
}
Ok(ready)
}
pub fn abandon(self) -> Result<()> {
let claimer_pid = self.claimer_pid;
update_state(self.data_dir, |state| match &state {
Some(WebState::Claiming {
claimer_pid: cur, ..
}) if *cur == claimer_pid => None,
other => other.clone(),
})?;
Ok(())
}
}
pub fn publish_ready(
data_dir: &Path,
daemon_pid: DaemonPid,
port: u16,
auth_token: AuthToken,
) -> Result<ReadyDaemon> {
update_state(data_dir, |state| {
let clients = match state {
Some(WebState::Ready { clients, .. }) => clients,
_ => vec![],
};
Some(WebState::Ready {
daemon_pid,
port,
auth_token: auth_token.clone(),
clients,
})
})?;
Ok(ReadyDaemon {
daemon_pid,
port,
auth_token,
})
}
pub fn try_claim<'a>(
data_dir: &'a Path,
claimer_pid: u32,
port: u16,
probe: &dyn PidProbe,
) -> Result<ClaimOutcome<'a>> {
let now = now_unix_secs();
let mut outcome_kind = ClaimDecision::Claimed;
update_state(data_dir, |state| {
match state {
None => {
outcome_kind = ClaimDecision::Claimed;
Some(WebState::Claiming {
claimer_pid,
port,
since: now,
})
}
Some(existing) => {
let (disc, reap) = classify(&existing, probe, now);
match disc {
Discovery::Ready(ready) => {
outcome_kind = ClaimDecision::AlreadyReady(ready);
Some(existing)
}
Discovery::Claiming {
claimer_pid: cur,
age,
..
} => {
outcome_kind = ClaimDecision::AlreadyClaiming {
claimer_pid: cur,
age,
};
Some(existing)
}
Discovery::Vacant => {
debug_assert!(reap);
outcome_kind = ClaimDecision::Claimed;
Some(WebState::Claiming {
claimer_pid,
port,
since: now,
})
}
}
}
}
})?;
Ok(match outcome_kind {
ClaimDecision::Claimed => ClaimOutcome::Claimed(DaemonClaim {
data_dir,
claimer_pid,
port,
}),
ClaimDecision::AlreadyReady(ready) => ClaimOutcome::AlreadyReady(ready),
ClaimDecision::AlreadyClaiming { claimer_pid, age } => {
ClaimOutcome::AlreadyClaiming { claimer_pid, age }
}
})
}
enum ClaimDecision {
Claimed,
AlreadyReady(ReadyDaemon),
AlreadyClaiming { claimer_pid: u32, age: Duration },
}
pub fn register_client(data_dir: &Path, client_pid: u32) -> Result<()> {
let mut registered = false;
update_state(data_dir, |state| match state {
Some(WebState::Ready {
daemon_pid,
port,
auth_token,
mut clients,
}) => {
registered = true;
if !clients.contains(&client_pid) {
clients.push(client_pid);
}
Some(WebState::Ready {
daemon_pid,
port,
auth_token,
clients,
})
}
other => other,
})?;
if !registered {
anyhow::bail!("cannot register client: no ready daemon state file exists");
}
Ok(())
}
pub fn deregister_client(data_dir: &Path, client_pid: u32) -> Result<()> {
let new_state = update_state(data_dir, |state| match state {
Some(WebState::Ready {
daemon_pid,
port,
auth_token,
mut clients,
}) => {
clients.retain(|&pid| pid != client_pid);
Some(WebState::Ready {
daemon_pid,
port,
auth_token,
clients,
})
}
other => other,
})?;
if let Some(WebState::Ready {
daemon_pid,
clients,
..
}) = new_state
&& clients.is_empty()
{
signal_daemon_shutdown(daemon_pid.get());
}
Ok(())
}
fn signal_daemon_shutdown(daemon_pid: u32) {
#[cfg(unix)]
{
unsafe {
libc::kill(daemon_pid as i32, libc::SIGTERM);
}
}
#[cfg(windows)]
{
let _ = daemon_pid;
}
}
pub fn ensure_daemon(data_dir: &Path, port: u16, bind: &str) -> Result<DaemonAction> {
let probe = SysinfoProbe;
let our_pid = std::process::id();
let action = match try_claim(data_dir, our_pid, port, &probe)? {
ClaimOutcome::AlreadyReady(ready) => DaemonAction::Joined { port: ready.port },
ClaimOutcome::AlreadyClaiming { .. } => {
let ready = wait_for_ready(data_dir, &probe)?;
DaemonAction::Joined { port: ready.port }
}
ClaimOutcome::Claimed(claim) => {
if let Err(e) = spawn_daemon(data_dir, port, bind) {
let _ = claim.abandon();
return Err(e);
}
let _consumed = claim;
match wait_for_ready(data_dir, &probe) {
Ok(ready) => DaemonAction::Spawned { port: ready.port },
Err(e) => {
let _ = update_state(data_dir, |state| match state {
Some(WebState::Claiming {
claimer_pid: cur, ..
}) if cur == our_pid => None,
other => other,
});
return Err(start_failure_error(data_dir).context(e));
}
}
}
};
register_client(data_dir, our_pid)?;
Ok(action)
}
fn start_failure_error(data_dir: &Path) -> anyhow::Error {
let log_path = data_dir.join("daemon.log");
let hint = if log_path.exists() {
let log = std::fs::read_to_string(&log_path).unwrap_or_default();
log.lines()
.rev()
.find(|l| !l.trim().is_empty())
.map(|l| format!("\n Last log line: {l}"))
.unwrap_or_default()
} else {
String::new()
};
anyhow::anyhow!("daemon failed to start. Check {}{hint}", log_path.display())
}
fn wait_for_ready(data_dir: &Path, probe: &dyn PidProbe) -> Result<ReadyDaemon> {
for _ in 0..40 {
std::thread::sleep(std::time::Duration::from_millis(500));
match discover(data_dir, probe)? {
Discovery::Ready(ready) => return Ok(ready),
Discovery::Claiming { .. } => continue,
Discovery::Vacant => break,
}
}
Err(start_failure_error(data_dir))
}
fn spawn_daemon(data_dir: &Path, port: u16, bind: &str) -> Result<u32> {
use anyhow::Context;
let exe = std::env::current_exe().context("failed to get current executable path")?;
let log_file = std::fs::File::create(data_dir.join("daemon.log"))
.context("failed to create daemon.log")?;
let mut cmd = std::process::Command::new(exe);
cmd.arg("--data-dir")
.arg(data_dir.as_os_str())
.arg("_daemon")
.arg("--port")
.arg(port.to_string())
.arg("--bind")
.arg(bind)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::from(log_file));
#[cfg(unix)]
{
use std::os::unix::process::CommandExt;
cmd.process_group(0);
}
#[cfg(windows)]
{
use std::os::windows::process::CommandExt;
cmd.creation_flags(0x00000008); }
let child = cmd.spawn().context("failed to spawn daemon process")?;
Ok(child.id())
}
pub fn should_shutdown(
clients: &[u32],
grace_start: &mut Option<std::time::Instant>,
grace_period: std::time::Duration,
) -> bool {
if clients.is_empty() {
let start = grace_start.get_or_insert_with(std::time::Instant::now);
start.elapsed() >= grace_period
} else {
*grace_start = None;
false
}
}
pub fn cleanup_stale_state(data_dir: &Path, probe: &dyn PidProbe) -> Result<Option<Vec<u32>>> {
let mut live_clients: Option<Vec<u32>> = None;
update_state(data_dir, |state| match state {
None => None,
claiming @ Some(WebState::Claiming { .. }) => claiming,
Some(WebState::Ready {
daemon_pid,
port,
auth_token,
mut clients,
}) => {
if !probe.is_alive(daemon_pid.get()) {
None
} else {
clients.retain(|&pid| probe.is_alive(pid));
live_clients = Some(clients.clone());
Some(WebState::Ready {
daemon_pid,
port,
auth_token,
clients,
})
}
}
})?;
Ok(live_clients)
}
#[derive(Debug, PartialEq, Eq)]
pub enum StartupMode {
Relay(ReadyDaemon),
Standalone { spawn_web: bool },
}
pub fn resolve_startup_mode(disc: Discovery, web_requested: bool) -> StartupMode {
match disc {
Discovery::Ready(ready) => StartupMode::Relay(ready),
Discovery::Claiming { .. } | Discovery::Vacant => StartupMode::Standalone {
spawn_web: web_requested,
},
}
}
#[derive(Debug)]
pub enum RelayOutcome {
Completed,
FailedBeforeFirstByte(anyhow::Error),
FailedMidSession(anyhow::Error),
}
struct FirstByteTracker<W> {
inner: W,
wrote: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for FirstByteTracker<W> {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
let res = std::pin::Pin::new(&mut self.inner).poll_write(cx, buf);
if let std::task::Poll::Ready(Ok(n)) = &res
&& *n > 0
{
self.wrote.store(true, std::sync::atomic::Ordering::SeqCst);
}
res
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::pin::Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_shutdown(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::pin::Pin::new(&mut self.inner).poll_shutdown(cx)
}
}
pub async fn run_relay_mode(ready: &ReadyDaemon) -> RelayOutcome {
let stdin = tokio::io::BufReader::new(tokio::io::stdin());
let stdout = tokio::io::stdout();
run_relay_mode_with(ready, stdin, stdout).await
}
pub async fn run_relay_mode_with<R, W>(ready: &ReadyDaemon, reader: R, writer: W) -> RelayOutcome
where
R: tokio::io::AsyncBufRead + Unpin,
W: tokio::io::AsyncWrite + Unpin,
{
let wrote = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let tracked = FirstByteTracker {
inner: writer,
wrote: wrote.clone(),
};
match crate::relay::run_relay(reader, tracked, ready).await {
Ok(()) => RelayOutcome::Completed,
Err(e) => {
if wrote.load(std::sync::atomic::Ordering::SeqCst) {
RelayOutcome::FailedMidSession(e)
} else {
RelayOutcome::FailedBeforeFirstByte(e)
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use super::*;
struct FakeProbe {
alive: HashSet<u32>,
}
impl FakeProbe {
fn with(pids: &[u32]) -> Self {
Self {
alive: pids.iter().copied().collect(),
}
}
fn none() -> Self {
Self {
alive: HashSet::new(),
}
}
}
impl PidProbe for FakeProbe {
fn is_alive(&self, pid: u32) -> bool {
self.alive.contains(&pid)
}
}
fn ready_state(daemon_pid: u32, port: u16, token: &str, clients: Vec<u32>) -> WebState {
WebState::Ready {
daemon_pid: DaemonPid::new(daemon_pid).unwrap(),
port,
auth_token: AuthToken::new(token).unwrap(),
clients,
}
}
fn ready_daemon(daemon_pid: u32, port: u16, token: &str) -> ReadyDaemon {
ReadyDaemon {
daemon_pid: DaemonPid::new(daemon_pid).unwrap(),
port,
auth_token: AuthToken::new(token).unwrap(),
}
}
#[test]
fn daemon_pid_rejects_zero() {
assert_eq!(DaemonPid::new(0), None);
assert_eq!(DaemonPid::new(1).map(|p| p.get()), Some(1));
}
#[test]
fn auth_token_rejects_empty() {
assert!(AuthToken::new("").is_none());
assert_eq!(
AuthToken::new("tok").map(|t| t.as_str().to_string()),
Some("tok".to_string())
);
}
#[test]
fn auth_token_debug_redacts_value() {
let tok = AuthToken::new("super-secret-token").unwrap();
let dbg = format!("{tok:?}");
assert!(
!dbg.contains("super-secret-token"),
"Debug must not leak token: {dbg}"
);
assert!(
dbg.contains("redacted"),
"Debug should indicate redaction: {dbg}"
);
}
#[test]
fn ready_with_zero_pid_does_not_deserialize() {
let json =
r#"{"status":"ready","daemon_pid":0,"port":9090,"auth_token":"tok","clients":[]}"#;
let parsed: std::result::Result<WebState, _> = serde_json::from_str(json);
assert!(
parsed.is_err(),
"daemon_pid:0 must not deserialize as Ready"
);
}
#[test]
fn ready_with_empty_token_does_not_deserialize() {
let json =
r#"{"status":"ready","daemon_pid":1234,"port":9090,"auth_token":"","clients":[]}"#;
let parsed: std::result::Result<WebState, _> = serde_json::from_str(json);
assert!(
parsed.is_err(),
"auth_token:\"\" must not deserialize as Ready"
);
}
#[test]
fn discover_treats_zero_pid_record_as_vacant() {
let dir = tempfile::tempdir().unwrap();
std::fs::write(
dir.path().join("web.state"),
r#"{"status":"ready","daemon_pid":0,"port":9090,"auth_token":"tok","clients":[]}"#,
)
.unwrap();
let disc = discover(dir.path(), &FakeProbe::none()).unwrap();
assert_eq!(
disc,
Discovery::Vacant,
"tampered 0-pid record must be Vacant, not fake-ready"
);
}
#[test]
fn discover_treats_empty_token_record_as_vacant() {
let dir = tempfile::tempdir().unwrap();
std::fs::write(
dir.path().join("web.state"),
r#"{"status":"ready","daemon_pid":1234,"port":9090,"auth_token":"","clients":[]}"#,
)
.unwrap();
let disc = discover(dir.path(), &FakeProbe::with(&[1234])).unwrap();
assert_eq!(
disc,
Discovery::Vacant,
"empty-token record must be Vacant, not fake-ready"
);
}
#[test]
fn read_state_treats_legacy_flat_record_as_none() {
let dir = tempfile::tempdir().unwrap();
std::fs::write(
dir.path().join("web.state"),
r#"{"daemon_pid":1234,"port":9090,"clients":[],"auth_token":"tok"}"#,
)
.unwrap();
assert_eq!(
read_state(dir.path()).unwrap(),
None,
"legacy flat record is unparseable -> None"
);
}
#[test]
fn write_and_read_round_trip_ready() {
let dir = tempfile::tempdir().unwrap();
let state = ready_state(1234, 9090, "tok", vec![5678, 9012]);
write_state(dir.path(), &state).unwrap();
assert_eq!(read_state(dir.path()).unwrap(), Some(state));
}
#[test]
fn write_and_read_round_trip_claiming() {
let dir = tempfile::tempdir().unwrap();
let state = WebState::Claiming {
claimer_pid: 4321,
port: 9090,
since: 100,
};
write_state(dir.path(), &state).unwrap();
assert_eq!(read_state(dir.path()).unwrap(), Some(state));
}
#[test]
fn write_is_atomic_no_temp_file_remains() {
let dir = tempfile::tempdir().unwrap();
write_state(dir.path(), &ready_state(1234, 9090, "tok", vec![])).unwrap();
let entries: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name().to_string_lossy().to_string())
.collect();
assert_eq!(entries, vec!["web.state"]);
}
#[test]
fn read_returns_none_when_no_state_file() {
let dir = tempfile::tempdir().unwrap();
assert_eq!(read_state(dir.path()).unwrap(), None);
}
#[test]
fn read_returns_none_for_corrupt_state_file() {
let dir = tempfile::tempdir().unwrap();
std::fs::write(dir.path().join("web.state"), b"not json at all").unwrap();
assert_eq!(read_state(dir.path()).unwrap(), None);
}
#[test]
fn update_can_create_and_delete() {
let dir = tempfile::tempdir().unwrap();
let created = update_state(dir.path(), |prev| {
assert_eq!(prev, None);
Some(ready_state(1, 8080, "tok", vec![]))
})
.unwrap();
assert!(created.is_some());
let deleted = update_state(dir.path(), |prev| {
assert!(prev.is_some());
None
})
.unwrap();
assert_eq!(deleted, None);
assert_eq!(read_state(dir.path()).unwrap(), None);
}
#[test]
fn update_serializes_concurrent_access() {
let dir = tempfile::tempdir().unwrap();
let data_dir = dir.path().to_path_buf();
write_state(&data_dir, &ready_state(1, 8080, "tok", vec![])).unwrap();
let iterations = 50;
let barrier = std::sync::Arc::new(std::sync::Barrier::new(2));
let handles: Vec<_> = (0..2)
.map(|i| {
let dir = data_dir.clone();
let barrier = barrier.clone();
std::thread::spawn(move || {
barrier.wait();
for j in 0..iterations {
update_state(&dir, |prev| {
let mut s = prev.unwrap_or_else(|| ready_state(1, 8080, "tok", vec![]));
if let WebState::Ready { clients, .. } = &mut s {
clients.push(i * 1000 + j);
}
Some(s)
})
.unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let final_state = read_state(&data_dir).unwrap().unwrap();
if let WebState::Ready { clients, .. } = final_state {
assert_eq!(
clients.len(),
2 * iterations as usize,
"concurrent updates must be serialized"
);
} else {
panic!("expected Ready");
}
}
#[test]
fn resolver_ready_resolves_to_relay_regardless_of_web_flag() {
let ready = ready_daemon(1234, 9090, "tok");
assert_eq!(
resolve_startup_mode(Discovery::Ready(ready.clone()), false),
StartupMode::Relay(ready.clone())
);
assert_eq!(
resolve_startup_mode(Discovery::Ready(ready.clone()), true),
StartupMode::Relay(ready)
);
}
#[test]
fn resolver_vacant_resolves_to_standalone_carrying_web_flag() {
assert_eq!(
resolve_startup_mode(Discovery::Vacant, false),
StartupMode::Standalone { spawn_web: false }
);
assert_eq!(
resolve_startup_mode(Discovery::Vacant, true),
StartupMode::Standalone { spawn_web: true }
);
}
#[test]
fn resolver_claiming_never_resolves_to_relay() {
for web in [false, true] {
let mode = resolve_startup_mode(
Discovery::Claiming {
claimer_pid: 4321,
port: 9090,
age: Duration::from_secs(1),
},
web,
);
assert_eq!(
mode,
StartupMode::Standalone { spawn_web: web },
"Claiming must never become Relay (web={web})"
);
}
}
#[test]
fn discover_vacant_when_no_file() {
let dir = tempfile::tempdir().unwrap();
assert_eq!(
discover(dir.path(), &FakeProbe::none()).unwrap(),
Discovery::Vacant
);
}
#[test]
fn discover_ready_when_daemon_alive() {
let dir = tempfile::tempdir().unwrap();
write_state(dir.path(), &ready_state(1234, 9090, "tok", vec![])).unwrap();
let disc = discover(dir.path(), &FakeProbe::with(&[1234])).unwrap();
assert_eq!(disc, Discovery::Ready(ready_daemon(1234, 9090, "tok")));
assert!(read_state(dir.path()).unwrap().is_some());
}
#[test]
fn discover_reaps_ready_with_dead_daemon() {
let dir = tempfile::tempdir().unwrap();
write_state(dir.path(), &ready_state(1234, 9090, "tok", vec![])).unwrap();
let disc = discover(dir.path(), &FakeProbe::none()).unwrap();
assert_eq!(disc, Discovery::Vacant);
assert_eq!(
read_state(dir.path()).unwrap(),
None,
"dead Ready record must be reaped"
);
}
#[test]
fn discover_claiming_when_claimer_alive_and_fresh() {
let dir = tempfile::tempdir().unwrap();
let since = now_unix_secs();
write_state(
dir.path(),
&WebState::Claiming {
claimer_pid: 4321,
port: 9090,
since,
},
)
.unwrap();
let disc = discover(dir.path(), &FakeProbe::with(&[4321])).unwrap();
match disc {
Discovery::Claiming {
claimer_pid, port, ..
} => {
assert_eq!(claimer_pid, 4321);
assert_eq!(port, 9090);
}
other => panic!("expected Claiming, got {other:?}"),
}
assert!(
read_state(dir.path()).unwrap().is_some(),
"fresh claim must be preserved"
);
}
#[test]
fn discover_reaps_claiming_with_dead_claimer() {
let dir = tempfile::tempdir().unwrap();
write_state(
dir.path(),
&WebState::Claiming {
claimer_pid: 4321,
port: 9090,
since: now_unix_secs(),
},
)
.unwrap();
let disc = discover(dir.path(), &FakeProbe::none()).unwrap();
assert_eq!(disc, Discovery::Vacant, "dead-claimer claim is reclaimable");
assert_eq!(read_state(dir.path()).unwrap(), None);
}
#[test]
fn discover_reaps_aged_out_claiming_even_if_claimer_alive() {
let dir = tempfile::tempdir().unwrap();
let stale_since = now_unix_secs().saturating_sub(CLAIM_MAX_AGE.as_secs() + 10);
write_state(
dir.path(),
&WebState::Claiming {
claimer_pid: 4321,
port: 9090,
since: stale_since,
},
)
.unwrap();
let disc = discover(dir.path(), &FakeProbe::with(&[4321])).unwrap();
assert_eq!(
disc,
Discovery::Vacant,
"aged-out claim is reclaimable even with live claimer"
);
assert_eq!(read_state(dir.path()).unwrap(), None);
}
#[test]
fn try_claim_on_empty_returns_claimed() {
let dir = tempfile::tempdir().unwrap();
let probe = FakeProbe::with(&[100]);
match try_claim(dir.path(), 100, 9090, &probe).unwrap() {
ClaimOutcome::Claimed(_) => {}
_ => panic!("empty slot should be Claimed"),
}
match read_state(dir.path()).unwrap() {
Some(WebState::Claiming {
claimer_pid, port, ..
}) => {
assert_eq!(claimer_pid, 100);
assert_eq!(port, 9090);
}
other => panic!("expected Claiming on disk, got {other:?}"),
}
}
#[test]
fn try_claim_second_caller_sees_already_claiming() {
let dir = tempfile::tempdir().unwrap();
let probe = FakeProbe::with(&[100, 200]);
let _first = match try_claim(dir.path(), 100, 9090, &probe).unwrap() {
ClaimOutcome::Claimed(c) => c,
_ => panic!("first caller should claim"),
};
match try_claim(dir.path(), 200, 9090, &probe).unwrap() {
ClaimOutcome::AlreadyClaiming { claimer_pid, .. } => {
assert_eq!(claimer_pid, 100, "second caller sees first claimer");
}
_ => panic!("second caller should see AlreadyClaiming"),
}
}
#[test]
fn publish_promotes_claim_and_both_see_ready() {
let dir = tempfile::tempdir().unwrap();
let probe = FakeProbe::with(&[100, 555]);
let claim = match try_claim(dir.path(), 100, 9090, &probe).unwrap() {
ClaimOutcome::Claimed(c) => c,
_ => panic!("should claim"),
};
let ready = claim
.publish(
DaemonPid::new(555).unwrap(),
AuthToken::new("daemon-tok").unwrap(),
)
.unwrap();
assert_eq!(ready, ready_daemon(555, 9090, "daemon-tok"));
assert_eq!(
discover(dir.path(), &probe).unwrap(),
Discovery::Ready(ready_daemon(555, 9090, "daemon-tok"))
);
match try_claim(dir.path(), 200, 9090, &FakeProbe::with(&[200, 555])).unwrap() {
ClaimOutcome::AlreadyReady(r) => assert_eq!(r, ready_daemon(555, 9090, "daemon-tok")),
_ => panic!("after publish a new caller should see AlreadyReady"),
}
}
#[test]
fn abandon_removes_the_claim() {
let dir = tempfile::tempdir().unwrap();
let probe = FakeProbe::with(&[100]);
let claim = match try_claim(dir.path(), 100, 9090, &probe).unwrap() {
ClaimOutcome::Claimed(c) => c,
_ => panic!("should claim"),
};
claim.abandon().unwrap();
assert_eq!(
read_state(dir.path()).unwrap(),
None,
"abandon removes the claim"
);
match try_claim(dir.path(), 200, 9090, &FakeProbe::with(&[200])).unwrap() {
ClaimOutcome::Claimed(_) => {}
_ => panic!("after abandon the slot should be claimable again"),
}
}
#[test]
fn try_claim_reclaims_dead_claimer_slot() {
let dir = tempfile::tempdir().unwrap();
write_state(
dir.path(),
&WebState::Claiming {
claimer_pid: 999,
port: 9090,
since: now_unix_secs(),
},
)
.unwrap();
match try_claim(dir.path(), 100, 9090, &FakeProbe::with(&[100])).unwrap() {
ClaimOutcome::Claimed(_) => {}
_ => panic!("dead-claimer slot should be reclaimable"),
}
match read_state(dir.path()).unwrap() {
Some(WebState::Claiming { claimer_pid, .. }) => assert_eq!(claimer_pid, 100),
other => panic!("expected our Claiming, got {other:?}"),
}
}
#[test]
fn try_claim_over_dead_ready_reclaims_slot() {
let dir = tempfile::tempdir().unwrap();
write_state(dir.path(), &ready_state(999, 9090, "old", vec![])).unwrap();
match try_claim(dir.path(), 100, 9090, &FakeProbe::with(&[100])).unwrap() {
ClaimOutcome::Claimed(_) => {}
_ => panic!("dead Ready daemon slot should be reclaimable"),
}
}
#[test]
fn publish_fails_when_claim_no_longer_ours() {
let dir = tempfile::tempdir().unwrap();
let probe = FakeProbe::with(&[100, 200]);
let claim = match try_claim(dir.path(), 100, 9090, &probe).unwrap() {
ClaimOutcome::Claimed(c) => c,
_ => panic!("should claim"),
};
write_state(
dir.path(),
&WebState::Claiming {
claimer_pid: 200,
port: 9090,
since: now_unix_secs(),
},
)
.unwrap();
let err = claim
.publish(DaemonPid::new(555).unwrap(), AuthToken::new("tok").unwrap())
.unwrap_err();
assert!(err.to_string().contains("no longer held"), "got: {err}");
}
#[test]
fn register_client_requires_ready_state() {
let dir = tempfile::tempdir().unwrap();
assert!(register_client(dir.path(), 1111).is_err());
write_state(
dir.path(),
&WebState::Claiming {
claimer_pid: 100,
port: 9090,
since: now_unix_secs(),
},
)
.unwrap();
assert!(register_client(dir.path(), 1111).is_err());
}
#[test]
fn register_and_deregister_round_trip() {
let dir = tempfile::tempdir().unwrap();
write_state(dir.path(), &ready_state(u32::MAX - 1, 9090, "tok", vec![])).unwrap();
register_client(dir.path(), 1111).unwrap();
register_client(dir.path(), 1111).unwrap(); match read_state(dir.path()).unwrap().unwrap() {
WebState::Ready { clients, .. } => assert_eq!(clients, vec![1111]),
_ => panic!("expected Ready"),
}
deregister_client(dir.path(), 1111).unwrap();
match read_state(dir.path()).unwrap().unwrap() {
WebState::Ready { clients, .. } => assert!(clients.is_empty()),
_ => panic!("expected Ready"),
}
deregister_client(dir.path(), 1111).unwrap();
}
#[test]
fn cleanup_returns_none_when_no_file() {
let dir = tempfile::tempdir().unwrap();
assert_eq!(
cleanup_stale_state(dir.path(), &FakeProbe::none()).unwrap(),
None
);
}
#[test]
fn cleanup_removes_dead_daemon_record() {
let dir = tempfile::tempdir().unwrap();
write_state(dir.path(), &ready_state(1234, 9090, "tok", vec![100, 200])).unwrap();
let result = cleanup_stale_state(dir.path(), &FakeProbe::none()).unwrap();
assert_eq!(result, None);
assert_eq!(read_state(dir.path()).unwrap(), None);
}
#[test]
fn cleanup_sweeps_dead_clients_of_live_daemon() {
let dir = tempfile::tempdir().unwrap();
write_state(dir.path(), &ready_state(1234, 9090, "tok", vec![100, 200])).unwrap();
let result = cleanup_stale_state(dir.path(), &FakeProbe::with(&[1234, 100])).unwrap();
assert_eq!(result, Some(vec![100]));
match read_state(dir.path()).unwrap().unwrap() {
WebState::Ready { clients, .. } => assert_eq!(clients, vec![100]),
_ => panic!("expected Ready"),
}
}
#[test]
fn cleanup_preserves_foreign_claiming_record() {
let dir = tempfile::tempdir().unwrap();
let claim = WebState::Claiming {
claimer_pid: 4321,
port: 9090,
since: now_unix_secs(),
};
write_state(dir.path(), &claim).unwrap();
let result = cleanup_stale_state(dir.path(), &FakeProbe::with(&[4321])).unwrap();
assert_eq!(result, None, "Claiming yields no client list");
assert_eq!(
read_state(dir.path()).unwrap(),
Some(claim),
"claim preserved"
);
}
#[test]
fn should_shutdown_false_with_clients_resets_grace() {
let mut grace = Some(std::time::Instant::now());
assert!(!should_shutdown(
&[100],
&mut grace,
Duration::from_secs(60)
));
assert!(grace.is_none());
}
#[test]
fn should_shutdown_starts_grace_when_empty() {
let mut grace = None;
assert!(!should_shutdown(&[], &mut grace, Duration::from_secs(60)));
assert!(grace.is_some());
}
#[test]
fn should_shutdown_true_after_grace_expires() {
let mut grace = Some(std::time::Instant::now() - Duration::from_secs(120));
assert!(should_shutdown(&[], &mut grace, Duration::from_secs(60)));
}
#[test]
fn ensure_daemon_joins_existing_alive_daemon() {
let dir = tempfile::tempdir().unwrap();
let our_pid = std::process::id();
write_state(dir.path(), &ready_state(our_pid, 9090, "tok", vec![])).unwrap();
let action = ensure_daemon(dir.path(), 9090, "127.0.0.1").unwrap();
assert_eq!(action, DaemonAction::Joined { port: 9090 });
match read_state(dir.path()).unwrap().unwrap() {
WebState::Ready { clients, .. } => assert!(clients.contains(&our_pid)),
_ => panic!("expected Ready"),
}
}
#[test]
fn publish_ready_promotes_claiming_and_preserves_nothing() {
let dir = tempfile::tempdir().unwrap();
write_state(
dir.path(),
&WebState::Claiming {
claimer_pid: 100,
port: 9090,
since: now_unix_secs(),
},
)
.unwrap();
let ready = publish_ready(
dir.path(),
DaemonPid::new(555).unwrap(),
9090,
AuthToken::new("tok").unwrap(),
)
.unwrap();
assert_eq!(ready, ready_daemon(555, 9090, "tok"));
match read_state(dir.path()).unwrap().unwrap() {
WebState::Ready {
daemon_pid,
clients,
..
} => {
assert_eq!(daemon_pid.get(), 555);
assert!(clients.is_empty());
}
_ => panic!("expected Ready"),
}
}
#[test]
fn publish_ready_preserves_clients_on_restart() {
let dir = tempfile::tempdir().unwrap();
write_state(dir.path(), &ready_state(111, 9090, "old", vec![7, 8])).unwrap();
publish_ready(
dir.path(),
DaemonPid::new(222).unwrap(),
9090,
AuthToken::new("new").unwrap(),
)
.unwrap();
match read_state(dir.path()).unwrap().unwrap() {
WebState::Ready {
daemon_pid,
auth_token,
clients,
..
} => {
assert_eq!(daemon_pid.get(), 222);
assert_eq!(auth_token.as_str(), "new");
assert_eq!(clients, vec![7, 8], "client list preserved across restart");
}
_ => panic!("expected Ready"),
}
}
use crate::db::{Database, DbConfig};
use crate::embedding::MockEmbedder;
use crate::service::{MemoryService, ServiceConfig};
use crate::web::AppState;
async fn start_killable_server(token: &str) -> (tokio::task::JoinHandle<()>, ReadyDaemon) {
let db = Database::open_in_memory(&DbConfig::default()).unwrap();
let service = MemoryService::new(
Arc::new(Mutex::new(db)),
Arc::new(MockEmbedder::new(768)),
None,
ServiceConfig::default(),
);
let state = AppState {
service,
auth_token: token.to_string(),
};
let app = crate::web::app_router(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let handle = tokio::spawn(async move {
let _ = axum::serve(listener, app).await;
});
(handle, ready_daemon(std::process::id(), port, token))
}
#[tokio::test]
async fn run_relay_mode_returns_failed_before_first_byte_on_connection_refused() {
let ready = ready_daemon(std::process::id(), 1, "tok");
let (mut stdin_w, stdin_r) = tokio::io::duplex(8192);
let (stdout_w, _stdout_r) = tokio::io::duplex(8192);
use tokio::io::AsyncWriteExt;
stdin_w
.write_all(b"{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"initialize\"}\n")
.await
.unwrap();
stdin_w.flush().await.unwrap();
drop(stdin_w);
let outcome =
run_relay_mode_with(&ready, tokio::io::BufReader::new(stdin_r), stdout_w).await;
match outcome {
RelayOutcome::FailedBeforeFirstByte(_) => {}
other => panic!("expected FailedBeforeFirstByte, got {other:?}"),
}
}
#[tokio::test]
async fn run_relay_mode_returns_failed_mid_session_when_server_dies_after_first_response() {
let (server, ready) = start_killable_server("mid-tok").await;
let (mut stdin_w, stdin_r) = tokio::io::duplex(8192);
let (stdout_w, stdout_r) = tokio::io::duplex(8192);
let mut stdout_r = tokio::io::BufReader::new(stdout_r);
let ready_c = ready.clone();
let relay = tokio::spawn(async move {
run_relay_mode_with(&ready_c, tokio::io::BufReader::new(stdin_r), stdout_w).await
});
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
stdin_w
.write_all(b"{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"initialize\",\"params\":{\"protocolVersion\":\"2025-03-26\",\"capabilities\":{},\"clientInfo\":{\"name\":\"t\",\"version\":\"0.1\"}}}\n")
.await
.unwrap();
stdin_w.flush().await.unwrap();
let mut first = String::new();
stdout_r.read_line(&mut first).await.unwrap();
assert!(!first.is_empty(), "should have received first response");
server.abort();
let _ = server.await;
tokio::task::yield_now().await;
stdin_w
.write_all(b"{\"jsonrpc\":\"2.0\",\"id\":2,\"method\":\"tools/list\"}\n")
.await
.unwrap();
stdin_w.flush().await.unwrap();
let outcome = tokio::time::timeout(std::time::Duration::from_secs(35), relay)
.await
.expect("relay should finish")
.expect("relay task should not panic");
match outcome {
RelayOutcome::FailedMidSession(_) => {}
other => panic!("expected FailedMidSession, got {other:?}"),
}
}
}