#![cfg(unix)]
use std::path::{Path, PathBuf};
use rustix::fd::OwnedFd;
use rustix::fs::{FlockOperation, Mode, OFlags};
use crate::error::SchedulerError;
#[derive(Debug)]
pub struct PidFile {
#[allow(dead_code)] fd: OwnedFd,
path: PathBuf,
}
impl PidFile {
pub fn acquire(path: &Path) -> Result<Self, SchedulerError> {
if let Some(parent) = path.parent().filter(|p| !p.as_os_str().is_empty()) {
std::fs::create_dir_all(parent).map_err(|e| {
SchedulerError::Io(format!(
"failed to create pid file directory {}: {e}",
parent.display()
))
})?;
}
let fd = rustix::fs::open(
path,
OFlags::RDWR | OFlags::CREATE | OFlags::CLOEXEC,
Mode::from_raw_mode(0o644),
)
.map_err(|e| {
SchedulerError::Io(format!("failed to open pid file {}: {e}", path.display()))
})?;
rustix::fs::flock(&fd, FlockOperation::NonBlockingLockExclusive).map_err(|e| {
let pid = Self::read_pid_from_path(path).unwrap_or(0);
if e == rustix::io::Errno::WOULDBLOCK {
SchedulerError::AlreadyRunning { pid }
} else {
SchedulerError::Io(format!("flock on pid file failed: {e}"))
}
})?;
rustix::fs::ftruncate(&fd, 0)
.map_err(|e| SchedulerError::Io(format!("truncate pid file failed: {e}")))?;
let pid_str = format!("{}", std::process::id());
rustix::io::write(&fd, pid_str.as_bytes())
.map_err(|e| SchedulerError::Io(format!("write pid file failed: {e}")))?;
Ok(Self {
fd,
path: path.to_owned(),
})
}
#[must_use]
pub fn read_alive(path: &Path) -> Option<u32> {
let pid = Self::read_pid_from_path(path)?;
if is_process_alive(pid) {
Some(pid)
} else {
None
}
}
fn read_pid_from_path(path: &Path) -> Option<u32> {
let content = std::fs::read_to_string(path).ok()?;
content.trim().parse::<u32>().ok()
}
}
impl Drop for PidFile {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
#[must_use]
pub fn is_process_alive(pid: u32) -> bool {
let Some(rustix_pid) = rustix::process::Pid::from_raw(pid.cast_signed()) else {
return false;
};
match rustix::process::test_kill_process(rustix_pid) {
Ok(()) => true,
Err(e) if e == rustix::io::Errno::PERM => true,
Err(_) => false,
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU32, Ordering};
use tempfile::TempDir;
use super::*;
static COUNTER: AtomicU32 = AtomicU32::new(0);
fn unique_pid_path(dir: &TempDir) -> PathBuf {
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
dir.path().join(format!("zeph-{n}.pid"))
}
#[test]
fn acquire_creates_file_with_pid() {
let dir = TempDir::new().unwrap();
let path = unique_pid_path(&dir);
let pf = PidFile::acquire(&path).expect("acquire should succeed");
let content = std::fs::read_to_string(&path).expect("pid file must exist");
assert_eq!(
content.trim().parse::<u32>().unwrap(),
std::process::id(),
"pid file must contain current process pid"
);
drop(pf);
assert!(!path.exists(), "pid file must be removed on drop");
}
#[test]
fn second_acquire_fails_with_already_running() {
let dir = TempDir::new().unwrap();
let path = unique_pid_path(&dir);
let _guard = PidFile::acquire(&path).expect("first acquire must succeed");
let err = PidFile::acquire(&path).expect_err("second acquire must fail");
assert!(
matches!(err, SchedulerError::AlreadyRunning { .. }),
"expected AlreadyRunning, got {err:?}"
);
}
#[test]
fn read_alive_returns_none_for_nonexistent_file() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("nonexistent.pid");
assert!(PidFile::read_alive(&path).is_none());
}
#[test]
fn read_alive_returns_none_for_dead_pid() {
let dir = TempDir::new().unwrap();
let path = unique_pid_path(&dir);
std::fs::write(&path, "999999999").unwrap();
let alive = PidFile::read_alive(&path);
let _ = alive;
}
}