use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use crate::error::IpcError;
use crate::DEFAULT_LEASE_MS;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaemonLock {
pub pid: u32,
pub started_ts: u64,
pub repo_root: String,
pub actor_id: String,
pub host_id: String,
pub ipc_endpoint: String,
pub lease_ms: u64,
pub last_heartbeat_ts: u64,
pub expires_ts: u64,
}
impl DaemonLock {
pub fn new(
pid: u32,
repo_root: String,
actor_id: String,
host_id: String,
ipc_endpoint: String,
) -> Self {
let now = current_time_ms();
Self {
pid,
started_ts: now,
repo_root,
actor_id,
host_id,
ipc_endpoint,
lease_ms: DEFAULT_LEASE_MS,
last_heartbeat_ts: now,
expires_ts: now + DEFAULT_LEASE_MS,
}
}
pub fn with_lease(mut self, lease_ms: u64) -> Self {
let now = current_time_ms();
self.lease_ms = lease_ms;
self.expires_ts = now + lease_ms;
self
}
pub fn is_expired(&self) -> bool {
current_time_ms() > self.expires_ts
}
pub fn is_owned_by_current_process(&self) -> bool {
self.pid == std::process::id()
}
pub fn time_remaining_ms(&self) -> u64 {
self.expires_ts.saturating_sub(current_time_ms())
}
pub fn refresh(&mut self) {
let now = current_time_ms();
self.last_heartbeat_ts = now;
self.expires_ts = now + self.lease_ms;
}
pub fn lock_path(data_dir: &Path) -> PathBuf {
data_dir.join("daemon.lock")
}
pub fn read(data_dir: &Path) -> Result<Option<Self>, IpcError> {
let path = Self::lock_path(data_dir);
match fs::read_to_string(&path) {
Ok(contents) => {
let lock: DaemonLock = serde_json::from_str(&contents)?;
Ok(Some(lock))
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn write(&self, data_dir: &Path) -> Result<(), IpcError> {
let path = Self::lock_path(data_dir);
let contents = serde_json::to_string_pretty(self)?;
fs::write(&path, contents)?;
Ok(())
}
pub fn remove(data_dir: &Path) -> Result<(), IpcError> {
let path = Self::lock_path(data_dir);
match fs::remove_file(&path) {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(e.into()),
}
}
pub fn acquire(
data_dir: &Path,
repo_root: String,
actor_id: String,
host_id: String,
ipc_endpoint: String,
) -> Result<Self, IpcError> {
let path = Self::lock_path(data_dir);
if let Some(existing) = Self::read(data_dir)? {
if !existing.is_expired() {
return Err(IpcError::LockHeld {
pid: existing.pid,
expires_in_ms: existing.time_remaining_ms(),
});
}
let _ = std::fs::remove_file(&path);
}
let lock = DaemonLock::new(
std::process::id(),
repo_root,
actor_id,
host_id,
ipc_endpoint,
);
let contents = serde_json::to_string_pretty(&lock)?;
match OpenOptions::new().write(true).create_new(true).open(&path) {
Ok(mut f) => {
f.write_all(contents.as_bytes())?;
Ok(lock)
}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
Err(IpcError::LockRace)
}
Err(e) => Err(e.into()),
}
}
pub fn release(data_dir: &Path) -> Result<(), IpcError> {
if let Some(lock) = Self::read(data_dir)? {
if lock.is_owned_by_current_process() {
Self::remove(data_dir)?;
}
}
Ok(())
}
}
fn current_time_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_lock_creation() {
let lock = DaemonLock::new(
1234,
"/repo".to_string(),
"actor123".to_string(),
"host456".to_string(),
"/tmp/test.sock".to_string(),
);
assert_eq!(lock.pid, 1234);
assert_eq!(lock.repo_root, "/repo");
assert_eq!(lock.actor_id, "actor123");
assert!(!lock.is_expired());
}
#[test]
fn test_lock_expiration() {
let mut lock = DaemonLock::new(
1234,
"/repo".to_string(),
"actor".to_string(),
"host".to_string(),
"/tmp/test.sock".to_string(),
);
lock.expires_ts = 0;
assert!(lock.is_expired());
lock.refresh();
assert!(!lock.is_expired());
}
#[test]
fn test_lock_read_write() {
let temp = TempDir::new().unwrap();
let data_dir = temp.path();
let lock = DaemonLock::new(
std::process::id(),
"/repo".to_string(),
"actor123".to_string(),
"host456".to_string(),
"/tmp/test.sock".to_string(),
);
lock.write(data_dir).unwrap();
let read_lock = DaemonLock::read(data_dir).unwrap().unwrap();
assert_eq!(read_lock.pid, lock.pid);
assert_eq!(read_lock.actor_id, lock.actor_id);
}
#[test]
fn test_lock_acquire_release() {
let temp = TempDir::new().unwrap();
let data_dir = temp.path();
let lock = DaemonLock::acquire(
data_dir,
"/repo".to_string(),
"actor".to_string(),
"host".to_string(),
"/tmp/test.sock".to_string(),
)
.unwrap();
assert!(lock.is_owned_by_current_process());
DaemonLock::release(data_dir).unwrap();
assert!(DaemonLock::read(data_dir).unwrap().is_none());
}
#[test]
fn test_lock_acquire_expired() {
let temp = TempDir::new().unwrap();
let data_dir = temp.path();
let mut old_lock = DaemonLock::new(
9999, "/repo".to_string(),
"actor".to_string(),
"host".to_string(),
"/tmp/old.sock".to_string(),
);
old_lock.expires_ts = 0; old_lock.write(data_dir).unwrap();
let new_lock = DaemonLock::acquire(
data_dir,
"/repo".to_string(),
"actor".to_string(),
"host".to_string(),
"/tmp/new.sock".to_string(),
)
.unwrap();
assert!(new_lock.is_owned_by_current_process());
}
#[test]
fn test_custom_lease() {
let lock = DaemonLock::new(
1234,
"/repo".to_string(),
"actor".to_string(),
"host".to_string(),
"/tmp/test.sock".to_string(),
)
.with_lease(60_000);
assert_eq!(lock.lease_ms, 60_000);
}
}