#![allow(dead_code)]
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::thread;
use std::time::{Duration, Instant};
use fs2::FileExt;
use thiserror::Error;
use crate::events::Event;
use crate::volume::{events_log_path, run_dir, runs_dir};
pub const DEFAULT_LOCK_TIMEOUT: Duration = Duration::from_secs(5);
const LOCK_POLL_INTERVAL: Duration = Duration::from_millis(25);
#[derive(Debug, Error)]
pub enum Error {
#[error("I/O error on {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("failed to serialize event: {source}")]
Serialize {
#[source]
source: serde_json::Error,
},
#[error("malformed event at {path}:{line_number}: {source}")]
Deserialize {
path: PathBuf,
line_number: usize,
#[source]
source: serde_json::Error,
},
#[error("event log lock {path} was not released within the acquire budget")]
LockTimeout { path: PathBuf },
}
pub struct EventLog {
path: PathBuf,
file: Mutex<File>,
lock_timeout: Duration,
}
impl EventLog {
pub fn for_run(volume_root: &Path, run_id: &str) -> Result<Self, Error> {
let path = events_log_path(volume_root, run_id);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(|source| Error::Io {
path: parent.to_path_buf(),
source,
})?;
}
let file = OpenOptions::new()
.create(true)
.append(true)
.read(true)
.open(&path)
.map_err(|source| Error::Io {
path: path.clone(),
source,
})?;
Ok(Self {
path,
file: Mutex::new(file),
lock_timeout: DEFAULT_LOCK_TIMEOUT,
})
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn append(&self, event: &Event) -> Result<(), Error> {
let line = serde_json::to_string(event).map_err(|source| Error::Serialize { source })?;
let mut guard = self
.file
.lock()
.expect("event-log mutex poisoned by a prior panic in append");
acquire_lock(&guard, &self.path, self.lock_timeout)?;
let write_result = (|| -> Result<(), Error> {
guard
.write_all(line.as_bytes())
.map_err(|source| Error::Io {
path: self.path.clone(),
source,
})?;
guard.write_all(b"\n").map_err(|source| Error::Io {
path: self.path.clone(),
source,
})?;
guard.flush().map_err(|source| Error::Io {
path: self.path.clone(),
source,
})
})();
let _ = FileExt::unlock(&*guard);
write_result
}
}
pub fn read_run(volume_root: &Path, run_id: &str) -> Result<Vec<Event>, Error> {
let path = events_log_path(volume_root, run_id);
let file = File::open(&path).map_err(|source| Error::Io {
path: path.clone(),
source,
})?;
let mut events = Vec::new();
for (i, line) in BufReader::new(file).lines().enumerate() {
let line = line.map_err(|source| Error::Io {
path: path.clone(),
source,
})?;
if line.trim().is_empty() {
continue;
}
let event = serde_json::from_str(&line).map_err(|source| Error::Deserialize {
path: path.clone(),
line_number: i + 1,
source,
})?;
events.push(event);
}
Ok(events)
}
pub fn enumerate_runs(volume_root: &Path) -> Result<Vec<String>, Error> {
let dir = runs_dir(volume_root);
if !dir.is_dir() {
return Ok(Vec::new());
}
let entries = std::fs::read_dir(&dir).map_err(|source| Error::Io {
path: dir.clone(),
source,
})?;
let mut ids = Vec::new();
for entry in entries {
let entry = entry.map_err(|source| Error::Io {
path: dir.clone(),
source,
})?;
let is_dir = entry.file_type().map(|t| t.is_dir()).unwrap_or(false);
if !is_dir {
continue;
}
if let Some(name) = entry.file_name().to_str() {
ids.push(name.to_string());
}
}
ids.sort();
Ok(ids)
}
pub fn run_exists(volume_root: &Path, run_id: &str) -> bool {
run_dir(volume_root, run_id).is_dir()
}
fn acquire_lock(file: &File, path: &Path, timeout: Duration) -> Result<(), Error> {
let contended_os = fs2::lock_contended_error().raw_os_error();
let deadline = Instant::now() + timeout;
loop {
if Instant::now() >= deadline {
return Err(Error::LockTimeout {
path: path.to_path_buf(),
});
}
match FileExt::try_lock_exclusive(file) {
Ok(()) => return Ok(()),
Err(e)
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.raw_os_error() == contended_os =>
{
let remaining = deadline.saturating_duration_since(Instant::now());
thread::sleep(LOCK_POLL_INTERVAL.min(remaining));
}
Err(source) => {
return Err(Error::Io {
path: path.to_path_buf(),
source,
});
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events::{NodeKind, NodeStarted};
use tempfile::TempDir;
fn node_started(run_id: &str, node_id: &str, seq: usize) -> Event {
Event::NodeStarted(NodeStarted {
id: format!("ev-{seq:04}"),
ts: "2026-04-15T00:00:00Z".to_string(),
run_id: run_id.to_string(),
node_id: node_id.to_string(),
kind: NodeKind::Bash,
name: None,
model: None,
})
}
#[test]
fn append_then_read_roundtrip_one_event() {
let tmp = TempDir::new().unwrap();
let log = EventLog::for_run(tmp.path(), "pipe-01").expect("open log");
log.append(&node_started("pipe-01", "n1", 1))
.expect("append");
drop(log);
let events = read_run(tmp.path(), "pipe-01").expect("read");
assert_eq!(events.len(), 1);
assert_eq!(events[0].run_id(), "pipe-01");
assert_eq!(events[0].event_type(), "node.started");
}
#[test]
fn hundred_sequential_appends_produce_hundred_parseable_lines() {
let tmp = TempDir::new().unwrap();
let log = EventLog::for_run(tmp.path(), "pipe-seq").unwrap();
for i in 0..100 {
log.append(&node_started("pipe-seq", &format!("n{i}"), i))
.unwrap();
}
drop(log);
let events = read_run(tmp.path(), "pipe-seq").unwrap();
assert_eq!(events.len(), 100);
for (i, ev) in events.iter().enumerate() {
if let Event::NodeStarted(ns) = ev {
assert_eq!(ns.node_id, format!("n{i}"));
} else {
panic!("unexpected event variant at index {i}");
}
}
}
#[test]
fn for_run_creates_parent_directories() {
let tmp = TempDir::new().unwrap();
let log = EventLog::for_run(tmp.path(), "pipe-fresh").expect("open");
assert!(log.path().parent().unwrap().is_dir());
log.append(&node_started("pipe-fresh", "n1", 1)).unwrap();
}
#[test]
fn read_run_on_malformed_line_reports_line_number() {
let tmp = TempDir::new().unwrap();
let log = EventLog::for_run(tmp.path(), "pipe-bad").unwrap();
log.append(&node_started("pipe-bad", "n1", 1)).unwrap();
let mut raw = std::fs::OpenOptions::new()
.append(true)
.open(log.path())
.unwrap();
writeln!(raw, "not-json").unwrap();
drop(raw);
drop(log);
let err = read_run(tmp.path(), "pipe-bad").unwrap_err();
match err {
Error::Deserialize { line_number, .. } => assert_eq!(line_number, 2),
other => panic!("expected Deserialize at line 2, got {other:?}"),
}
}
#[test]
fn enumerate_runs_lists_sorted_directory_names() {
let tmp = TempDir::new().unwrap();
EventLog::for_run(tmp.path(), "pipe-b").unwrap();
EventLog::for_run(tmp.path(), "pipe-a").unwrap();
EventLog::for_run(tmp.path(), "pipe-c").unwrap();
std::fs::create_dir_all(runs_dir(tmp.path()).join("pipe-pending")).unwrap();
let ids = enumerate_runs(tmp.path()).unwrap();
assert_eq!(ids, vec!["pipe-a", "pipe-b", "pipe-c", "pipe-pending"]);
}
#[test]
fn enumerate_runs_on_missing_var_runs_returns_empty() {
let tmp = TempDir::new().unwrap();
let ids = enumerate_runs(tmp.path()).expect("empty volume");
assert!(ids.is_empty());
}
#[test]
fn run_exists_discriminates_created_from_absent() {
let tmp = TempDir::new().unwrap();
assert!(!run_exists(tmp.path(), "pipe-no"));
EventLog::for_run(tmp.path(), "pipe-yes").unwrap();
assert!(run_exists(tmp.path(), "pipe-yes"));
}
}