#![expect(clippy::unwrap_used, reason = "test scaffolding")]
#![expect(clippy::expect_used, reason = "test scaffolding")]
#![expect(clippy::panic, reason = "test scaffolding")]
use std::net::TcpListener;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex, MutexGuard, OnceLock, PoisonError};
use llmenv::mcp::proxy::{EnsureOutcome, ensure_running, is_alive, probe_tcp};
use tempfile::tempdir;
fn port_guard() -> MutexGuard<'static, ()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
.lock()
.unwrap_or_else(PoisonError::into_inner)
}
fn free_port() -> (u16, String) {
for _ in 0..50 {
let l = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral");
let port = l.local_addr().expect("addr").port();
let bind = format!("127.0.0.1:{port}");
drop(l);
if !probe_tcp(&bind, 50) {
return (port, bind);
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
panic!("could not obtain a confirmed-closed ephemeral port after retries");
}
#[derive(Default)]
struct SpawnLog {
bind_args: Mutex<Vec<String>>,
next_pid: AtomicU32,
bind_listener: Mutex<Option<Arc<Mutex<Option<TcpListener>>>>>,
}
impl SpawnLog {
fn calls(&self) -> usize {
self.bind_args.lock().expect("lock").len()
}
fn with_listener_holder(self, holder: Arc<Mutex<Option<TcpListener>>>) -> Self {
*self.bind_listener.lock().expect("lock") = Some(holder);
self
}
}
fn spawner(log: Arc<SpawnLog>) -> impl Fn(&str) -> anyhow::Result<u32> {
move |bind: &str| {
log.bind_args.lock().expect("lock").push(bind.to_owned());
if let Some(holder) = log.bind_listener.lock().expect("lock").as_ref() {
let l = TcpListener::bind(bind).expect("bind for spawn");
*holder.lock().expect("lock") = Some(l);
}
let pid = log.next_pid.fetch_add(1, Ordering::SeqCst) + 4_000_000;
Ok(pid)
}
}
#[test]
fn ensure_running_spawns_when_no_pidfile() {
let _guard = port_guard();
let tmp = tempdir().expect("tempdir");
let pid_path: PathBuf = tmp.path().join("mcp-proxy.pid");
let (_, bind) = free_port();
let held: Arc<Mutex<Option<TcpListener>>> = Arc::new(Mutex::new(None));
let log = Arc::new(SpawnLog::default().with_listener_holder(Arc::clone(&held)));
let outcome = ensure_running(&bind, &pid_path, spawner(log.clone())).expect("ensure_running");
assert_eq!(outcome, EnsureOutcome::Spawned);
assert_eq!(log.calls(), 1, "spawn must be called exactly once");
assert!(pid_path.exists(), "pidfile must be written after spawn");
}
#[test]
fn ensure_running_passes_bind_to_spawner() {
let _guard = port_guard();
let tmp = tempdir().expect("tempdir");
let pid_path: PathBuf = tmp.path().join("mcp-proxy.pid");
let (_, bind) = free_port();
let held: Arc<Mutex<Option<TcpListener>>> = Arc::new(Mutex::new(None));
let log = Arc::new(SpawnLog::default().with_listener_holder(Arc::clone(&held)));
ensure_running(&bind, &pid_path, spawner(log.clone())).expect("ensure_running");
let calls = log.bind_args.lock().expect("lock");
assert_eq!(calls.as_slice(), &[bind]);
}
#[test]
fn ensure_running_no_op_when_proxy_is_listening() {
let _guard = port_guard();
let tmp = tempdir().expect("tempdir");
let pid_path: PathBuf = tmp.path().join("mcp-proxy.pid");
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let port = listener.local_addr().expect("addr").port();
let bind = format!("127.0.0.1:{port}");
std::fs::write(&pid_path, "12345").expect("write pidfile");
let held: Arc<Mutex<Option<TcpListener>>> = Arc::new(Mutex::new(None));
let log = Arc::new(SpawnLog::default().with_listener_holder(Arc::clone(&held)));
let outcome = ensure_running(&bind, &pid_path, spawner(log.clone())).expect("ensure_running");
assert_eq!(outcome, EnsureOutcome::AlreadyRunning);
assert_eq!(
log.calls(),
0,
"spawn must not be called when proxy is listening"
);
drop(listener);
}
#[test]
fn ensure_running_respawns_when_pidfile_exists_but_port_closed() {
let _guard = port_guard();
let tmp = tempdir().expect("tempdir");
let pid_path: PathBuf = tmp.path().join("mcp-proxy.pid");
let (_, bind) = free_port();
std::fs::write(&pid_path, "4000001").expect("write stale pidfile");
assert!(!probe_tcp(&bind, 50), "port must be closed before test");
let held: Arc<Mutex<Option<TcpListener>>> = Arc::new(Mutex::new(None));
let log = Arc::new(SpawnLog::default().with_listener_holder(Arc::clone(&held)));
let outcome = ensure_running(&bind, &pid_path, spawner(log.clone())).expect("ensure_running");
assert_eq!(outcome, EnsureOutcome::Spawned);
assert_eq!(
log.calls(),
1,
"spawn must be called when port is not bound"
);
let contents = std::fs::read_to_string(&pid_path).expect("read pid");
let parsed: u32 = contents.trim().parse().expect("parse pid");
assert_ne!(
parsed, 4_000_001,
"pidfile must be overwritten with new pid"
);
drop(held);
}
#[test]
fn ensure_running_errors_when_spawn_succeeds_but_port_never_binds() {
let _guard = port_guard();
let tmp = tempdir().expect("tempdir");
let pid_path: PathBuf = tmp.path().join("mcp-proxy.pid");
let (_, bind) = free_port();
let log = Arc::new(SpawnLog::default());
let result = ensure_running(&bind, &pid_path, spawner(log.clone()));
assert!(
result.is_err(),
"must error when spawn succeeds but port never binds"
);
let msg = result.unwrap_err().to_string();
assert!(
msg.contains("did not bind"),
"error must mention bind failure, got: {msg}"
);
}
#[test]
fn ensure_running_errors_when_lock_is_held_and_port_closed() {
let _guard = port_guard();
let tmp = tempdir().expect("tempdir");
let pid_path: PathBuf = tmp.path().join("mcp-proxy.pid");
let lock_path: PathBuf = tmp.path().join("mcp-proxy.pid.lock");
let (_, bind) = free_port();
std::fs::write(&lock_path, "").expect("write lockfile");
let held: Arc<Mutex<Option<TcpListener>>> = Arc::new(Mutex::new(None));
let log = Arc::new(SpawnLog::default().with_listener_holder(Arc::clone(&held)));
let result = ensure_running(&bind, &pid_path, spawner(log.clone()));
assert!(
result.is_err(),
"should error when peer holds lock and port is closed"
);
assert_eq!(log.calls(), 0, "must not spawn while lock is held");
}
#[test]
fn ensure_running_accepts_peer_published_pid_when_listening() {
let _guard = port_guard();
let tmp = tempdir().expect("tempdir");
let pid_path: PathBuf = tmp.path().join("mcp-proxy.pid");
let lock_path: PathBuf = tmp.path().join("mcp-proxy.pid.lock");
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let port = listener.local_addr().expect("addr").port();
let bind = format!("127.0.0.1:{port}");
std::fs::write(&lock_path, "").expect("write lockfile");
std::fs::write(&pid_path, "12345").expect("write pid");
let held: Arc<Mutex<Option<TcpListener>>> = Arc::new(Mutex::new(None));
let log = Arc::new(SpawnLog::default().with_listener_holder(Arc::clone(&held)));
let outcome = ensure_running(&bind, &pid_path, spawner(log.clone())).expect("ensure_running");
assert_eq!(outcome, EnsureOutcome::AlreadyRunning);
assert_eq!(log.calls(), 0);
drop(listener);
}
#[test]
fn is_alive_returns_false_for_almost_certainly_dead_pid() {
assert!(!is_alive(4_000_002));
}
#[test]
fn is_alive_returns_true_for_self() {
let my_pid = std::process::id();
assert!(is_alive(my_pid));
}
#[test]
fn probe_tcp_returns_false_for_invalid_address() {
assert!(
!probe_tcp("not-a-valid-address", 100),
"probe_tcp must return false for unparseable address"
);
assert!(
!probe_tcp("127.0.0.1:0", 100),
"probe_tcp must return false for port 0"
);
}
#[test]
fn probe_tcp_returns_true_for_open_port() {
let _guard = port_guard();
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let port = listener.local_addr().expect("addr").port();
let bind = format!("127.0.0.1:{port}");
assert!(
probe_tcp(&bind, 200),
"probe_tcp must return true when listener is bound"
);
drop(listener);
}