use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::Mutex;
use serde_json::{json, Value};
use super::ipc::ErrPayload;
use super::ops::OpResult;
use super::state::DaemonState;
#[derive(Debug, Clone)]
pub struct LockEntry {
pub holder_pid: i32,
pub token: u128,
pub acquired_at_ns: i64,
}
pub type LockTable = Mutex<HashMap<String, LockEntry>>;
pub fn new_table() -> LockTable {
Mutex::new(HashMap::new())
}
fn name_arg(args: &Value) -> std::result::Result<String, ErrPayload> {
args.get("name")
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.map(str::to_string)
.ok_or_else(|| ErrPayload::new("bad_args", "missing `name`"))
}
fn caller_pid(args: &Value) -> i32 {
args.get("pid")
.and_then(Value::as_i64)
.map(|n| n as i32)
.unwrap_or(0)
}
fn pid_alive(pid: i32) -> bool {
if pid <= 0 {
return false;
}
let rc = unsafe { libc::kill(pid, 0) };
if rc == 0 {
return true;
}
matches!(
std::io::Error::last_os_error().raw_os_error(),
Some(libc::EPERM)
)
}
fn now_ns() -> i64 {
chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
}
fn try_acquire_inner(table: &LockTable, name: &str, pid: i32) -> Option<u128> {
let mut g = table.lock();
if let Some(existing) = g.get(name) {
if !pid_alive(existing.holder_pid) {
tracing::info!(
lock = name,
stale_pid = existing.holder_pid,
"lock force-released (holder dead)"
);
g.remove(name);
}
}
if g.contains_key(name) {
return None;
}
let token = rand::random::<u128>();
g.insert(
name.to_string(),
LockEntry {
holder_pid: pid,
token,
acquired_at_ns: now_ns(),
},
);
Some(token)
}
pub async fn op_lock_try_acquire(state: &Arc<DaemonState>, args: Value) -> OpResult {
let name = name_arg(&args)?;
let pid = caller_pid(&args);
match try_acquire_inner(&state.locks, &name, pid) {
Some(token) => Ok(json!({
"name": name,
"token": token.to_string(),
"holder_pid": pid,
})),
None => Err(ErrPayload::new("busy", format!("lock `{name}` is held"))),
}
}
pub async fn op_lock_acquire(state: &Arc<DaemonState>, args: Value) -> OpResult {
let name = name_arg(&args)?;
let pid = caller_pid(&args);
let timeout_secs = args
.get("timeout_secs")
.and_then(Value::as_f64)
.unwrap_or(0.0);
let deadline = if timeout_secs > 0.0 {
Some(Instant::now() + Duration::from_secs_f64(timeout_secs))
} else {
None
};
const POLL: Duration = Duration::from_millis(50);
loop {
if let Some(token) = try_acquire_inner(&state.locks, &name, pid) {
return Ok(json!({
"name": name,
"token": token.to_string(),
"holder_pid": pid,
}));
}
match deadline {
Some(dl) if Instant::now() >= dl => {
return Err(ErrPayload::new(
"timeout",
format!("lock `{name}` not acquired within {timeout_secs}s"),
));
}
None => {
return Err(ErrPayload::new(
"busy",
format!("lock `{name}` is held (pass `timeout_secs` to wait)"),
));
}
_ => tokio::time::sleep(POLL).await,
}
}
}
pub async fn op_lock_release(state: &Arc<DaemonState>, args: Value) -> OpResult {
let name = name_arg(&args)?;
let token_str = args
.get("token")
.and_then(Value::as_str)
.ok_or_else(|| ErrPayload::new("bad_args", "missing `token`"))?;
let token: u128 = token_str
.parse()
.map_err(|_| ErrPayload::new("bad_args", "token must be a u128 string"))?;
let mut g = state.locks.lock();
let released = match g.get(&name) {
Some(entry) if entry.token == token => {
g.remove(&name);
true
}
Some(_) => {
return Err(ErrPayload::new(
"wrong_token",
format!("lock `{name}` token mismatch — refusing to release"),
));
}
None => false,
};
Ok(json!({ "name": name, "released": released }))
}
pub async fn op_lock_list(state: &Arc<DaemonState>, _args: Value) -> OpResult {
let now = now_ns();
let g = state.locks.lock();
let mut entries: Vec<Value> = g
.iter()
.map(|(name, e)| {
json!({
"name": name,
"holder_pid": e.holder_pid,
"alive": pid_alive(e.holder_pid),
"acquired_at_ns": e.acquired_at_ns,
"age_ns": now - e.acquired_at_ns,
})
})
.collect();
entries.sort_by(|a, b| {
a["name"]
.as_str()
.unwrap_or("")
.cmp(b["name"].as_str().unwrap_or(""))
});
let count = entries.len();
Ok(json!({ "locks": entries, "count": count }))
}