use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::{Mutex, OnceLock};
use agent_team_mail_core::home::get_home_dir;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::AsyncWriteExt;
static IN_PROCESS_LOCKS: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
fn in_process_locks() -> &'static Mutex<HashSet<String>> {
IN_PROCESS_LOCKS.get_or_init(|| Mutex::new(HashSet::new()))
}
fn lock_key(team: &str, identity: &str) -> String {
format!("{team}/{identity}")
}
#[derive(Debug, Serialize, Deserialize)]
struct LockPayload {
pid: u32,
agent_id: String,
}
pub fn sessions_dir() -> PathBuf {
get_home_dir()
.unwrap_or_else(|_| PathBuf::from("/tmp"))
.join(".config")
.join("atm")
.join("agent-sessions")
}
fn lock_path(team: &str, identity: &str) -> PathBuf {
sessions_dir().join(team).join(format!("{identity}.lock"))
}
pub async fn acquire_lock(team: &str, identity: &str, agent_id: &str) -> anyhow::Result<()> {
let path = lock_path(team, identity);
let key = lock_key(team, identity);
{
let guard = in_process_locks().lock().unwrap();
if guard.contains(&key) {
anyhow::bail!("identity '{}' is already locked by this process", identity);
}
}
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let payload = LockPayload {
pid: std::process::id(),
agent_id: agent_id.to_string(),
};
let json = serde_json::to_string(&payload)?;
for _ in 0..2 {
match fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&path)
.await
{
Ok(mut file) => {
if let Err(e) = file.write_all(json.as_bytes()).await {
let _ = fs::remove_file(&path).await;
return Err(e.into());
}
if let Err(e) = file.flush().await {
let _ = fs::remove_file(&path).await;
return Err(e.into());
}
in_process_locks().lock().unwrap().insert(key);
return Ok(());
}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
if let Some((pid, existing_id)) = check_lock(team, identity).await {
anyhow::bail!(
"identity '{}' already locked by PID {} (agent_id: {})",
identity,
pid,
existing_id
);
}
let _ = fs::remove_file(&path).await;
}
Err(e) => return Err(e.into()),
}
}
anyhow::bail!("failed to acquire lock for identity '{}'", identity)
}
pub async fn release_lock(team: &str, identity: &str) -> anyhow::Result<()> {
let key = lock_key(team, identity);
in_process_locks().lock().unwrap().remove(&key);
let path = lock_path(team, identity);
match fs::remove_file(&path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(e.into()),
}
}
pub async fn check_lock(team: &str, identity: &str) -> Option<(u32, String)> {
let path = lock_path(team, identity);
let contents = fs::read_to_string(&path).await.ok()?;
let payload: LockPayload = serde_json::from_str(&contents).ok()?;
let our_pid = std::process::id();
if payload.pid == our_pid {
let key = lock_key(team, identity);
let is_active = in_process_locks().lock().unwrap().contains(&key);
if is_active {
return Some((payload.pid, payload.agent_id));
}
let _ = fs::remove_file(&path).await;
return None;
}
if is_pid_alive(payload.pid) {
Some((payload.pid, payload.agent_id))
} else {
let _ = fs::remove_file(&path).await;
None
}
}
fn is_pid_alive(pid: u32) -> bool {
#[cfg(unix)]
{
unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
}
#[cfg(not(unix))]
{
let _ = pid;
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
async fn with_temp_atm_home<F, Fut>(f: F)
where
F: FnOnce(tempfile::TempDir) -> Fut,
Fut: std::future::Future<Output = ()>,
{
let dir = tempfile::tempdir().unwrap();
let path = dir.path().to_string_lossy().to_string();
unsafe { std::env::set_var("ATM_HOME", &path) };
f(dir).await;
unsafe { std::env::remove_var("ATM_HOME") };
}
#[tokio::test]
#[serial]
async fn acquire_and_release_lock() {
with_temp_atm_home(|_dir| async {
acquire_lock("test-team", "agent-x", "codex:abc-123")
.await
.unwrap();
let info = check_lock("test-team", "agent-x").await;
assert!(info.is_some());
let (_, agent_id) = info.unwrap();
assert_eq!(agent_id, "codex:abc-123");
release_lock("test-team", "agent-x").await.unwrap();
assert!(check_lock("test-team", "agent-x").await.is_none());
})
.await;
}
#[tokio::test]
#[serial]
async fn check_lock_returns_none_for_missing_lock() {
with_temp_atm_home(|_dir| async {
let result = check_lock("team-none", "nobody").await;
assert!(result.is_none());
})
.await;
}
#[tokio::test]
#[serial]
async fn check_lock_reclaims_dead_pid_lock() {
with_temp_atm_home(|_dir| async {
let path = lock_path("dead-team", "ghost");
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await.unwrap();
}
let payload = serde_json::json!({"pid": 4_194_304u32, "agent_id": "codex:dead"});
fs::write(&path, payload.to_string()).await.unwrap();
let result = check_lock("dead-team", "ghost").await;
assert!(result.is_none());
assert!(!path.exists());
})
.await;
}
#[tokio::test]
#[serial]
async fn acquire_live_lock_fails() {
with_temp_atm_home(|_dir| async {
acquire_lock("team-live", "live-agent", "codex:first")
.await
.unwrap();
let result = acquire_lock("team-live", "live-agent", "codex:second").await;
assert!(result.is_err());
release_lock("team-live", "live-agent").await.unwrap();
})
.await;
}
#[tokio::test]
#[serial]
async fn release_nonexistent_lock_is_ok() {
with_temp_atm_home(|_dir| async {
release_lock("ghost-team", "ghost-agent").await.unwrap();
})
.await;
}
#[test]
#[serial]
fn sessions_dir_uses_atm_home() {
let dir = "/tmp/test-atm-home-lock";
unsafe { std::env::set_var("ATM_HOME", dir) };
let path = sessions_dir();
unsafe { std::env::remove_var("ATM_HOME") };
assert_eq!(
path,
PathBuf::from(dir)
.join(".config")
.join("atm")
.join("agent-sessions")
);
}
#[test]
fn is_pid_alive_self() {
let alive = is_pid_alive(std::process::id());
#[cfg(unix)]
assert!(alive, "current process should be alive");
#[cfg(not(unix))]
let _ = alive;
}
}