#![allow(unsafe_code)]
use super::client::CoordinatorHandle;
use super::protocol::{MessageType, WireMessage};
use super::{lock_path, pid_path, socket_path};
use std::io;
use std::path::Path;
use std::process::Stdio;
use std::time::Duration;
use tokio::net::UnixStream;
use tokio::process::Command;
#[derive(Debug, Clone)]
pub enum CoordinatorStatus {
Running {
pid: u32,
socket: std::path::PathBuf,
},
NotRunning,
Stale {
socket: std::path::PathBuf,
},
}
pub async fn detect_coordinator() -> CoordinatorStatus {
let socket = socket_path();
if !socket.exists() {
return CoordinatorStatus::NotRunning;
}
let connect_result = tokio::time::timeout(Duration::from_millis(500), try_ping(&socket)).await;
match connect_result {
Ok(Ok(pid)) => CoordinatorStatus::Running { pid, socket },
Ok(Err(_)) | Err(_) => CoordinatorStatus::Stale { socket },
}
}
async fn try_ping(socket: &Path) -> io::Result<u32> {
let mut stream = UnixStream::connect(socket).await?;
let client_id = uuid::Uuid::new_v4();
let reg = WireMessage::register(
client_id,
super::protocol::ClientType::Producer {
command: "_health_check".to_string(),
},
std::process::id(),
);
reg.write_to(&mut stream).await?;
let ack = WireMessage::read_from(&mut stream).await?;
if ack.msg_type != MessageType::RegisterAck {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"expected registration ack",
));
}
let pid_str = tokio::fs::read_to_string(pid_path()).await?;
let pid: u32 = pid_str
.trim()
.parse()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
Ok(pid)
}
pub async fn ensure_coordinator_running() -> io::Result<CoordinatorHandle> {
match detect_coordinator().await {
CoordinatorStatus::Running { pid, socket } => Ok(CoordinatorHandle::existing(pid, socket)),
CoordinatorStatus::NotRunning => start_coordinator().await,
CoordinatorStatus::Stale { socket } => {
cleanup_stale_coordinator(&socket).await?;
start_coordinator().await
}
}
}
#[cfg(unix)]
fn is_cuenv_process(pid: i32) -> bool {
#[cfg(target_os = "linux")]
{
let cmdline_path = format!("/proc/{pid}/cmdline");
if let Ok(cmdline) = std::fs::read_to_string(&cmdline_path) {
return cmdline.contains("cuenv") && cmdline.contains("__coordinator");
}
false
}
#[cfg(target_os = "macos")]
{
std::process::Command::new("ps")
.args(["-p", &pid.to_string(), "-o", "command="])
.output()
.ok()
.is_some_and(|o| {
let cmd = String::from_utf8_lossy(&o.stdout);
cmd.contains("cuenv") && cmd.contains("__coordinator")
})
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
let _ = pid;
true
}
}
async fn cleanup_stale_coordinator(socket: &Path) -> io::Result<()> {
let pid_file = socket.with_extension("pid");
if let Ok(pid_str) = tokio::fs::read_to_string(&pid_file).await
&& let Ok(pid) = pid_str.trim().parse::<i32>()
{
#[cfg(unix)]
if is_cuenv_process(pid) {
unsafe {
let _ = libc::kill(pid, libc::SIGTERM);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
let _ = tokio::fs::remove_file(socket).await;
let _ = tokio::fs::remove_file(&pid_file).await;
Ok(())
}
async fn start_coordinator() -> io::Result<CoordinatorHandle> {
let socket = socket_path();
let lock = lock_path();
if let Some(parent) = socket.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let _lock_guard = acquire_lock(&lock).await?;
if let CoordinatorStatus::Running { pid, socket } = detect_coordinator().await {
return Ok(CoordinatorHandle::existing(pid, socket));
}
let exe = std::env::current_exe()?;
let child = Command::new(&exe)
.arg("__coordinator")
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()?;
let pid = child.id().unwrap_or(0);
for _ in 0..50 {
tokio::time::sleep(Duration::from_millis(100)).await;
if socket.exists()
&& let CoordinatorStatus::Running { .. } = detect_coordinator().await
{
return Ok(CoordinatorHandle::new(pid, socket));
}
}
Err(io::Error::new(
io::ErrorKind::TimedOut,
"coordinator failed to start",
))
}
async fn acquire_lock(lock_path: &Path) -> io::Result<LockGuard> {
for _ in 0..10 {
match tokio::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(lock_path)
.await
{
Ok(file) => {
drop(file);
return Ok(LockGuard {
path: lock_path.to_path_buf(),
});
}
Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
if let Ok(meta) = tokio::fs::metadata(lock_path).await
&& let Ok(modified) = meta.modified()
&& modified.elapsed().unwrap_or(Duration::ZERO) > Duration::from_secs(30)
{
let _ = tokio::fs::remove_file(lock_path).await;
continue;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(e) => return Err(e),
}
}
Err(io::Error::new(
io::ErrorKind::WouldBlock,
"could not acquire lock",
))
}
struct LockGuard {
path: std::path::PathBuf,
}
impl Drop for LockGuard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[tokio::test]
async fn test_detect_coordinator_not_running() {
let status = detect_coordinator().await;
assert!(matches!(
status,
CoordinatorStatus::NotRunning | CoordinatorStatus::Stale { .. }
));
}
#[test]
fn test_coordinator_status_variants() {
let running = CoordinatorStatus::Running {
pid: 1234,
socket: PathBuf::from("/tmp/test.sock"),
};
assert!(matches!(
running,
CoordinatorStatus::Running { pid: 1234, .. }
));
let not_running = CoordinatorStatus::NotRunning;
assert!(matches!(not_running, CoordinatorStatus::NotRunning));
let stale = CoordinatorStatus::Stale {
socket: PathBuf::from("/tmp/stale.sock"),
};
assert!(matches!(stale, CoordinatorStatus::Stale { .. }));
}
#[test]
fn test_coordinator_status_debug() {
let running = CoordinatorStatus::Running {
pid: 5678,
socket: PathBuf::from("/var/run/cuenv.sock"),
};
let debug_str = format!("{running:?}");
assert!(debug_str.contains("Running"));
assert!(debug_str.contains("5678"));
let stale = CoordinatorStatus::Stale {
socket: PathBuf::from("/tmp/stale.sock"),
};
let debug_str = format!("{stale:?}");
assert!(debug_str.contains("Stale"));
}
#[test]
fn test_coordinator_status_clone() {
let running = CoordinatorStatus::Running {
pid: 1000,
socket: PathBuf::from("/tmp/clone.sock"),
};
let cloned = running.clone();
assert!(matches!(
cloned,
CoordinatorStatus::Running { pid: 1000, .. }
));
}
#[test]
fn test_lock_guard_drops_file() {
let temp_dir = std::env::temp_dir();
let lock_path = temp_dir.join(format!("test_lock_{}.lock", std::process::id()));
std::fs::write(&lock_path, "locked").expect("could not create lock file");
assert!(lock_path.exists());
{
let _guard = LockGuard {
path: lock_path.clone(),
};
}
assert!(!lock_path.exists());
}
#[cfg(unix)]
#[test]
fn test_is_cuenv_process_for_nonexistent_pid() {
let result = is_cuenv_process(99_999_999);
assert!(!result);
}
#[cfg(unix)]
#[test]
fn test_is_cuenv_process_for_current_process() {
#[allow(clippy::cast_possible_wrap)]
let pid = std::process::id() as i32;
let result = is_cuenv_process(pid);
assert!(!result);
}
}