1#![allow(dead_code)]
31
32use std::fs::{File, OpenOptions};
33use std::io::{BufRead, BufReader, Write};
34use std::path::{Path, PathBuf};
35use std::sync::Mutex;
36use std::thread;
37use std::time::{Duration, Instant};
38
39use fs2::FileExt;
40use thiserror::Error;
41
42use crate::events::Event;
43use crate::volume::{events_log_path, run_dir, runs_dir};
44
45pub const DEFAULT_LOCK_TIMEOUT: Duration = Duration::from_secs(5);
49
50const LOCK_POLL_INTERVAL: Duration = Duration::from_millis(25);
51
52#[derive(Debug, Error)]
53pub enum Error {
54 #[error("I/O error on {path}: {source}")]
57 Io {
58 path: PathBuf,
59 #[source]
60 source: std::io::Error,
61 },
62
63 #[error("failed to serialize event: {source}")]
69 Serialize {
70 #[source]
71 source: serde_json::Error,
72 },
73
74 #[error("malformed event at {path}:{line_number}: {source}")]
78 Deserialize {
79 path: PathBuf,
80 line_number: usize,
81 #[source]
82 source: serde_json::Error,
83 },
84
85 #[error("event log lock {path} was not released within the acquire budget")]
88 LockTimeout { path: PathBuf },
89}
90
91pub struct EventLog {
98 path: PathBuf,
99 file: Mutex<File>,
100 lock_timeout: Duration,
101}
102
103impl EventLog {
104 pub fn for_run(volume_root: &Path, run_id: &str) -> Result<Self, Error> {
108 let path = events_log_path(volume_root, run_id);
109 if let Some(parent) = path.parent() {
110 std::fs::create_dir_all(parent).map_err(|source| Error::Io {
111 path: parent.to_path_buf(),
112 source,
113 })?;
114 }
115 let file = OpenOptions::new()
116 .create(true)
117 .append(true)
118 .read(true)
119 .open(&path)
120 .map_err(|source| Error::Io {
121 path: path.clone(),
122 source,
123 })?;
124 Ok(Self {
125 path,
126 file: Mutex::new(file),
127 lock_timeout: DEFAULT_LOCK_TIMEOUT,
128 })
129 }
130
131 pub fn path(&self) -> &Path {
134 &self.path
135 }
136
137 pub fn append(&self, event: &Event) -> Result<(), Error> {
144 let line = serde_json::to_string(event).map_err(|source| Error::Serialize { source })?;
145
146 let mut guard = self
147 .file
148 .lock()
149 .expect("event-log mutex poisoned by a prior panic in append");
150
151 acquire_lock(&guard, &self.path, self.lock_timeout)?;
152
153 let write_result = (|| -> Result<(), Error> {
154 guard
155 .write_all(line.as_bytes())
156 .map_err(|source| Error::Io {
157 path: self.path.clone(),
158 source,
159 })?;
160 guard.write_all(b"\n").map_err(|source| Error::Io {
161 path: self.path.clone(),
162 source,
163 })?;
164 guard.flush().map_err(|source| Error::Io {
165 path: self.path.clone(),
166 source,
167 })
168 })();
169
170 let _ = FileExt::unlock(&*guard);
174
175 write_result
176 }
177}
178
179pub fn read_run(volume_root: &Path, run_id: &str) -> Result<Vec<Event>, Error> {
187 let path = events_log_path(volume_root, run_id);
188 let file = File::open(&path).map_err(|source| Error::Io {
189 path: path.clone(),
190 source,
191 })?;
192 let mut events = Vec::new();
193 for (i, line) in BufReader::new(file).lines().enumerate() {
194 let line = line.map_err(|source| Error::Io {
195 path: path.clone(),
196 source,
197 })?;
198 if line.trim().is_empty() {
199 continue;
200 }
201 let event = serde_json::from_str(&line).map_err(|source| Error::Deserialize {
202 path: path.clone(),
203 line_number: i + 1,
204 source,
205 })?;
206 events.push(event);
207 }
208 Ok(events)
209}
210
211pub fn enumerate_runs(volume_root: &Path) -> Result<Vec<String>, Error> {
218 let dir = runs_dir(volume_root);
219 if !dir.is_dir() {
220 return Ok(Vec::new());
221 }
222 let entries = std::fs::read_dir(&dir).map_err(|source| Error::Io {
223 path: dir.clone(),
224 source,
225 })?;
226 let mut ids = Vec::new();
227 for entry in entries {
228 let entry = entry.map_err(|source| Error::Io {
229 path: dir.clone(),
230 source,
231 })?;
232 let is_dir = entry.file_type().map(|t| t.is_dir()).unwrap_or(false);
233 if !is_dir {
234 continue;
235 }
236 if let Some(name) = entry.file_name().to_str() {
237 ids.push(name.to_string());
238 }
239 }
240 ids.sort();
241 Ok(ids)
242}
243
244pub fn run_exists(volume_root: &Path, run_id: &str) -> bool {
248 run_dir(volume_root, run_id).is_dir()
249}
250
251fn acquire_lock(file: &File, path: &Path, timeout: Duration) -> Result<(), Error> {
252 let contended_os = fs2::lock_contended_error().raw_os_error();
256 let deadline = Instant::now() + timeout;
257 loop {
258 if Instant::now() >= deadline {
259 return Err(Error::LockTimeout {
260 path: path.to_path_buf(),
261 });
262 }
263 match FileExt::try_lock_exclusive(file) {
264 Ok(()) => return Ok(()),
265 Err(e)
266 if e.kind() == std::io::ErrorKind::WouldBlock
267 || e.raw_os_error() == contended_os =>
268 {
269 let remaining = deadline.saturating_duration_since(Instant::now());
270 thread::sleep(LOCK_POLL_INTERVAL.min(remaining));
271 }
272 Err(source) => {
273 return Err(Error::Io {
274 path: path.to_path_buf(),
275 source,
276 });
277 }
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use crate::events::{NodeKind, NodeStarted};
286 use tempfile::TempDir;
287
288 fn node_started(run_id: &str, node_id: &str, seq: usize) -> Event {
289 Event::NodeStarted(NodeStarted {
290 id: format!("ev-{seq:04}"),
291 ts: "2026-04-15T00:00:00Z".to_string(),
292 run_id: run_id.to_string(),
293 node_id: node_id.to_string(),
294 kind: NodeKind::Bash,
295 name: None,
296 model: None,
297 })
298 }
299
300 #[test]
301 fn append_then_read_roundtrip_one_event() {
302 let tmp = TempDir::new().unwrap();
303 let log = EventLog::for_run(tmp.path(), "pipe-01").expect("open log");
304 log.append(&node_started("pipe-01", "n1", 1))
305 .expect("append");
306 drop(log);
307
308 let events = read_run(tmp.path(), "pipe-01").expect("read");
309 assert_eq!(events.len(), 1);
310 assert_eq!(events[0].run_id(), "pipe-01");
311 assert_eq!(events[0].event_type(), "node.started");
312 }
313
314 #[test]
315 fn hundred_sequential_appends_produce_hundred_parseable_lines() {
316 let tmp = TempDir::new().unwrap();
317 let log = EventLog::for_run(tmp.path(), "pipe-seq").unwrap();
318 for i in 0..100 {
319 log.append(&node_started("pipe-seq", &format!("n{i}"), i))
320 .unwrap();
321 }
322 drop(log);
323
324 let events = read_run(tmp.path(), "pipe-seq").unwrap();
325 assert_eq!(events.len(), 100);
326 for (i, ev) in events.iter().enumerate() {
328 if let Event::NodeStarted(ns) = ev {
329 assert_eq!(ns.node_id, format!("n{i}"));
330 } else {
331 panic!("unexpected event variant at index {i}");
332 }
333 }
334 }
335
336 #[test]
337 fn for_run_creates_parent_directories() {
338 let tmp = TempDir::new().unwrap();
339 let log = EventLog::for_run(tmp.path(), "pipe-fresh").expect("open");
341 assert!(log.path().parent().unwrap().is_dir());
342 log.append(&node_started("pipe-fresh", "n1", 1)).unwrap();
343 }
344
345 #[test]
346 fn read_run_on_malformed_line_reports_line_number() {
347 let tmp = TempDir::new().unwrap();
348 let log = EventLog::for_run(tmp.path(), "pipe-bad").unwrap();
349 log.append(&node_started("pipe-bad", "n1", 1)).unwrap();
350 let mut raw = std::fs::OpenOptions::new()
352 .append(true)
353 .open(log.path())
354 .unwrap();
355 writeln!(raw, "not-json").unwrap();
356 drop(raw);
357 drop(log);
358
359 let err = read_run(tmp.path(), "pipe-bad").unwrap_err();
360 match err {
361 Error::Deserialize { line_number, .. } => assert_eq!(line_number, 2),
362 other => panic!("expected Deserialize at line 2, got {other:?}"),
363 }
364 }
365
366 #[test]
367 fn enumerate_runs_lists_sorted_directory_names() {
368 let tmp = TempDir::new().unwrap();
369 EventLog::for_run(tmp.path(), "pipe-b").unwrap();
370 EventLog::for_run(tmp.path(), "pipe-a").unwrap();
371 EventLog::for_run(tmp.path(), "pipe-c").unwrap();
372 std::fs::create_dir_all(runs_dir(tmp.path()).join("pipe-pending")).unwrap();
376
377 let ids = enumerate_runs(tmp.path()).unwrap();
378 assert_eq!(ids, vec!["pipe-a", "pipe-b", "pipe-c", "pipe-pending"]);
379 }
380
381 #[test]
382 fn enumerate_runs_on_missing_var_runs_returns_empty() {
383 let tmp = TempDir::new().unwrap();
384 let ids = enumerate_runs(tmp.path()).expect("empty volume");
385 assert!(ids.is_empty());
386 }
387
388 #[test]
389 fn run_exists_discriminates_created_from_absent() {
390 let tmp = TempDir::new().unwrap();
391 assert!(!run_exists(tmp.path(), "pipe-no"));
392 EventLog::for_run(tmp.path(), "pipe-yes").unwrap();
393 assert!(run_exists(tmp.path(), "pipe-yes"));
394 }
395}