use std::path::Path;
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use crate::pool::PoolError;
use crate::service::lock::LockError;
use crate::service::manifest::now_iso8601;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PoolSessionEntry {
pub sid: String,
pub pid: u32,
pub sock: PathBuf,
pub version: String,
pub created_at: String,
}
impl PoolSessionEntry {
pub fn new(
sid: impl Into<String>,
pid: u32,
sock: PathBuf,
version: impl Into<String>,
) -> Self {
Self {
sid: sid.into(),
pid,
sock,
version: version.into(),
created_at: now_iso8601(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct PoolRegistry {
pub sessions: Vec<PoolSessionEntry>,
}
impl PoolRegistry {
pub fn load_or_default(path: &Path) -> Result<Self, PoolError> {
if !path.exists() {
return Ok(Self::default());
}
let content = std::fs::read_to_string(path).map_err(|e| {
PoolError::RegistryCorrupted(format!("failed to read {}: {e}", path.display()))
})?;
serde_json::from_str(&content).map_err(|e| {
PoolError::RegistryCorrupted(format!("failed to parse {}: {e}", path.display()))
})
}
pub fn save(&self, path: &Path) -> Result<(), PoolError> {
let parent = path.parent().ok_or_else(|| {
PoolError::RegistryCorrupted(format!(
"registry path has no parent directory: {}",
path.display()
))
})?;
std::fs::create_dir_all(parent).map_err(|e| {
PoolError::RegistryCorrupted(format!(
"failed to create registry directory {}: {e}",
parent.display()
))
})?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700)).map_err(
|e| {
PoolError::RegistryCorrupted(format!(
"failed to set permissions on {}: {e}",
parent.display()
))
},
)?;
}
let content = serde_json::to_string_pretty(self).map_err(|e| {
PoolError::RegistryCorrupted(format!("failed to serialize registry: {e}"))
})?;
let mut tmp = tempfile::NamedTempFile::new_in(parent).map_err(|e| {
PoolError::RegistryCorrupted(format!(
"failed to create temp file in {}: {e}",
parent.display()
))
})?;
{
use std::io::Write;
tmp.write_all(content.as_bytes()).map_err(|e| {
PoolError::RegistryCorrupted(format!("failed to write registry temp file: {e}"))
})?;
tmp.as_file().sync_all().map_err(|e| {
PoolError::RegistryCorrupted(format!("failed to fsync registry temp file: {e}"))
})?;
}
tmp.persist(path).map_err(|e| {
PoolError::RegistryCorrupted(format!(
"failed to atomically replace {} with temp file: {e}",
path.display()
))
})?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600)).map_err(
|e| {
PoolError::RegistryCorrupted(format!(
"failed to set permissions on {}: {e}",
path.display()
))
},
)?;
}
Ok(())
}
pub fn scan_and_gc(&mut self) -> Result<Vec<PoolSessionEntry>, PoolError> {
let before_len = self.sessions.len();
#[cfg(unix)]
self.sessions.retain(|entry| {
let pid_t = match i32::try_from(entry.pid) {
Ok(p) => p,
Err(_) => {
tracing::warn!(
pid = entry.pid,
sid = %entry.sid,
"pid exceeds i32::MAX, treating as dead (K-52)"
);
return false;
}
};
let result = unsafe { libc::kill(pid_t, 0) };
result == 0
});
#[cfg(not(unix))]
let _ = before_len;
let _ = before_len; Ok(self.sessions.clone())
}
pub fn add(&mut self, entry: PoolSessionEntry) {
self.sessions.push(entry);
}
pub fn remove(&mut self, sid: &str) -> bool {
let before = self.sessions.len();
self.sessions.retain(|e| e.sid != sid);
self.sessions.len() < before
}
pub fn find(&self, sid: &str) -> Option<&PoolSessionEntry> {
self.sessions.iter().find(|e| e.sid == sid)
}
}
pub fn with_registry_lock<F, R>(lock_path: &Path, f: F) -> Result<R, PoolError>
where
F: FnOnce() -> Result<R, PoolError>,
{
crate::service::lock::with_exclusive_lock(lock_path, f)
}
impl From<LockError> for PoolError {
fn from(e: LockError) -> Self {
PoolError::RegistryCorrupted(e.to_string())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
fn make_entry(sid: &str, pid: u32) -> PoolSessionEntry {
PoolSessionEntry::new(
sid,
pid,
PathBuf::from(format!("/tmp/alc-pool/{sid}.sock")),
"0.30.0",
)
}
#[test]
fn load_default_when_absent() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("registry.json");
let reg = PoolRegistry::load_or_default(&path).expect("load_or_default");
assert!(reg.sessions.is_empty(), "expected empty registry");
}
#[test]
fn scan_and_gc_removes_dead_pid() {
let live_pid = std::process::id();
let mut reg = PoolRegistry::default();
reg.add(make_entry("live-session", live_pid));
reg.add(make_entry("dead-session", 999_999));
let survivors = reg.scan_and_gc().expect("scan_and_gc");
assert_eq!(survivors.len(), 1, "expected 1 survivor");
assert_eq!(survivors[0].sid, "live-session");
assert_eq!(
reg.sessions.len(),
1,
"in-place mutation must prune dead entry"
);
assert!(
reg.find("dead-session").is_none(),
"dead entry must be gone"
);
assert!(reg.find("live-session").is_some(), "live entry must remain");
}
#[test]
fn load_corrupted_returns_pool_error() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("registry.json");
std::fs::write(&path, b"{ not valid json !!!").expect("write");
let result = PoolRegistry::load_or_default(&path);
match result {
Err(PoolError::RegistryCorrupted(msg)) => {
assert!(!msg.is_empty(), "error message must not be empty");
}
other => panic!("expected RegistryCorrupted, got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_writers_no_entry_loss() {
let dir = Arc::new(tempfile::tempdir().expect("tempdir"));
let reg_path = Arc::new(dir.path().join("registry.json"));
let lock_path = Arc::new(dir.path().join("registry.lock"));
let n_per_task: u32 = 50;
let n_tasks: u32 = 2;
let mut handles = Vec::new();
for task_id in 0..n_tasks {
let reg_path = Arc::clone(®_path);
let lock_path = Arc::clone(&lock_path);
let handle = tokio::task::spawn_blocking(move || {
for i in 0..n_per_task {
let sid = format!("t{task_id}-s{i}");
let entry = make_entry(&sid, std::process::id());
with_registry_lock(&lock_path, || {
let mut reg = PoolRegistry::load_or_default(®_path)?;
reg.add(entry);
reg.save(®_path)
})
.expect("lock + save must not fail");
}
});
handles.push(handle);
}
for h in handles {
h.await.expect("task did not panic");
}
let final_reg = PoolRegistry::load_or_default(®_path).expect("final load_or_default");
let expected = (n_per_task * n_tasks) as usize;
assert_eq!(
final_reg.sessions.len(),
expected,
"all {expected} entries must be present (no last-writer-wins loss)"
);
}
#[test]
#[cfg(unix)]
fn save_sets_secure_permissions() {
use std::os::unix::fs::PermissionsExt;
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("pool/registry.json");
let reg = PoolRegistry::default();
reg.save(&path).expect("save");
let parent_meta = std::fs::metadata(path.parent().expect("parent")).expect("dir metadata");
assert_eq!(
parent_meta.permissions().mode() & 0o777,
0o700,
"pool dir must be 0700 (drwx------)"
);
let file_meta = std::fs::metadata(&path).expect("file metadata");
assert_eq!(
file_meta.permissions().mode() & 0o777,
0o600,
"registry.json must be 0600 (-rw-------)"
);
}
#[test]
#[cfg(unix)]
fn lock_file_sets_secure_permissions() {
use std::os::unix::fs::PermissionsExt;
let dir = tempfile::tempdir().expect("tempdir");
let lock_path = dir.path().join("registry.lock");
let reg_path = dir.path().join("registry.json");
with_registry_lock(&lock_path, || {
let reg = PoolRegistry::default();
reg.save(®_path)
})
.expect("with_registry_lock");
let lock_meta = std::fs::metadata(&lock_path).expect("lock metadata");
assert_eq!(
lock_meta.permissions().mode() & 0o777,
0o600,
"registry.lock must be 0600 (-rw-------)"
);
}
}