Skip to main content

runtimo_core/
wal.rs

1//! Write-Ahead Log (WAL) — Append-only, crash-resistant event log.
2//!
3//! Events are written as JSONL (one JSON object per line) with `fsync` after
4//! each write to guarantee durability. The WAL enables crash recovery by
5//! replaying events to reconstruct system state.
6//!
7//! # Example
8//!
9//! ```rust,ignore
10//! use runtimo_core::{WalWriter, WalReader, WalEvent, WalEventType};
11//! use std::path::Path;
12//!
13//! let mut wal = WalWriter::create(Path::new("/tmp/test.wal")).unwrap();
14//! wal.append(WalEvent {
15//!     seq: 0, ts: 1715800000,
16//!     event_type: WalEventType::JobStarted,
17//!     job_id: "abc123".into(),
18//!     capability: Some("FileRead".into()),
19//!     output: None, error: None,
20//! }).unwrap();
21//!
22//! let reader = WalReader::load(Path::new("/tmp/test.wal")).unwrap();
23//! assert_eq!(reader.events().len(), 1);
24//! ```
25
26use crate::processes::ProcessSummary;
27use crate::telemetry::Telemetry;
28use crate::Result;
29use serde::{Deserialize, Serialize};
30use std::path::Path;
31
32
33
34/// A single WAL event record.
35///
36/// Events are appended sequentially and identified by `seq`. The `ts` field
37/// is a Unix timestamp in seconds. Optional fields (`capability`, `output`,
38/// `error`) are skipped during serialization when `None`.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct WalEvent {
41    /// Sequence number (monotonically increasing within a writer session).
42    pub seq: u64,
43    /// Unix timestamp (seconds) when the event occurred.
44    pub ts: u64,
45    /// Type of the event (job lifecycle stage).
46    #[serde(rename = "type")]
47    pub event_type: WalEventType,
48    /// The job ID this event relates to.
49    pub job_id: String,
50    /// Capability name, if applicable.
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub capability: Option<String>,
53    /// Output data from the capability, if applicable.
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub output: Option<serde_json::Value>,
56    /// Error message, if the event represents a failure.
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub error: Option<String>,
59    /// Hardware telemetry snapshot before execution.
60    #[serde(skip_serializing_if = "Option::is_none")]
61    pub telemetry_before: Option<Telemetry>,
62    /// Hardware telemetry snapshot after execution.
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub telemetry_after: Option<Telemetry>,
65    /// Process summary before execution.
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub process_before: Option<ProcessSummary>,
68    /// Process summary after execution.
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub process_after: Option<ProcessSummary>,
71}
72
73/// Types of WAL events, corresponding to job lifecycle stages.
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub enum WalEventType {
77    /// Job has been submitted to the system.
78    JobSubmitted,
79    /// Job arguments passed validation.
80    JobValidated,
81    /// Job execution has started.
82    JobStarted,
83    /// Job completed successfully.
84    JobCompleted,
85    /// Job failed during validation or execution.
86    JobFailed,
87    /// A completed job was rolled back.
88    JobRolledBack,
89}
90
91/// Append-only WAL writer.
92///
93/// Opens (or creates) a file in append mode and writes one JSONL line per
94/// event, using file locking for concurrent access safety and `fsync` after
95/// each write for durability.
96///
97/// # Example
98///
99/// ```rust,ignore
100/// use runtimo_core::{WalWriter, WalEvent, WalEventType};
101/// use std::path::Path;
102///
103/// let mut wal = WalWriter::create(Path::new("/tmp/app.wal")).unwrap();
104/// wal.append(WalEvent {
105///     seq: 0, ts: 1715800000,
106///     event_type: WalEventType::JobStarted,
107///     job_id: "job1".into(),
108///     capability: None, output: None, error: None,
109/// }).unwrap();
110/// ```
111pub struct WalWriter {
112    path: std::path::PathBuf,
113    seq: u64,
114}
115
116impl WalWriter {
117    /// Creates or opens a WAL file at the given path.
118    ///
119    /// The file is opened in append mode. Existing content is preserved.
120    ///
121    /// # Errors
122    ///
123    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
124    /// be created or opened.
125    pub fn create(path: &Path) -> Result<Self> {
126        // Ensure parent directory exists
127        if let Some(parent) = path.parent() {
128            if !parent.exists() {
129                std::fs::create_dir_all(parent).map_err(|e| {
130                    crate::Error::WalError(format!(
131                        "Failed to create WAL directory {}: {}",
132                        parent.display(),
133                        e
134                    ))
135                })?;
136            }
137        }
138
139        // Create the file if it doesn't exist
140        if !path.exists() {
141            std::fs::File::create(path).map_err(|e| {
142                crate::Error::WalError(format!(
143                    "Failed to create WAL file {}: {}",
144                    path.display(),
145                    e
146                ))
147            })?;
148        }
149
150        // Recover sequence from existing WAL content to ensure monotonic
151        // ordering across process restarts. Acquire lock to prevent reading
152        // during a concurrent write (P2 FIX).
153        let seq = if path.exists() {
154            let lock_file = std::fs::File::open(path)
155                .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
156            Self::lock_file(&lock_file)?;
157            let content = std::fs::read_to_string(path)
158                .map_err(|e| crate::Error::WalError(format!("read WAL for seq recovery: {}", e)))?;
159            let recovered = content
160                .lines()
161                .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
162                .map(|e| e.seq)
163                .max()
164                .map(|max| max + 1)
165                .unwrap_or(0);
166            Self::unlock_file(&lock_file);
167            recovered
168        } else {
169            0
170        };
171
172        Ok(Self {
173            path: path.to_path_buf(),
174            seq,
175        })
176    }
177
178    /// Acquires an exclusive file lock for writing (FINDING #14).
179    #[cfg(unix)]
180    fn lock_file(file: &std::fs::File) -> Result<()> {
181        use std::os::unix::io::AsRawFd;
182        let fd = file.as_raw_fd();
183        let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
184        if result != 0 {
185            return Err(crate::Error::WalError(format!(
186                "Failed to acquire WAL lock: {}",
187                std::io::Error::last_os_error()
188            )));
189        }
190        Ok(())
191    }
192
193    /// Acquires an exclusive file lock (no-op on non-unix).
194    #[cfg(not(unix))]
195    fn lock_file(_file: &std::fs::File) -> Result<()> {
196        Ok(())
197    }
198
199    /// Releases an exclusive file lock (FINDING #14).
200    #[cfg(unix)]
201    fn unlock_file(file: &std::fs::File) {
202        use std::os::unix::io::AsRawFd;
203        let fd = file.as_raw_fd();
204        unsafe { libc::flock(fd, libc::LOCK_UN) };
205    }
206
207    /// Releases an exclusive file lock (no-op on non-unix).
208    #[cfg(not(unix))]
209    fn unlock_file(_file: &std::fs::File) {}
210
211    /// Appends an event to the WAL using true append mode (P0 FIX).
212    ///
213    /// Opens the file in append mode, acquires an exclusive lock, writes the
214    /// JSONL line, fsyncs, then releases the lock. This is O(1) per write
215    /// instead of O(N) read-rewrite, and the lock is held during the entire
216    /// write operation preventing concurrent write loss.
217    ///
218    /// Increments the internal sequence counter after a successful write.
219    ///
220    /// # Errors
221    ///
222    /// Returns [`Error::WalError`](crate::Error::WalError) on serialization
223    /// or I/O failure.
224    pub fn append(&mut self, event: WalEvent) -> Result<()> {
225        use std::io::Write;
226        let line =
227            serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
228
229        // Open in append mode — no read-rewrite, O(1) per write
230        let file = std::fs::OpenOptions::new()
231            .create(true)
232            .append(true)
233            .open(&self.path)
234            .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
235
236        // Hold exclusive lock during entire write
237        Self::lock_file(&file)?;
238        {
239            let mut buf = std::io::BufWriter::new(&file);
240            writeln!(buf, "{}", line)
241                .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
242            buf.flush()
243                .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
244            file.sync_all()
245                .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
246        }
247        Self::unlock_file(&file);
248
249        self.seq += 1;
250        Ok(())
251    }
252
253    /// Returns the current sequence number (next event will use this value).
254    pub fn seq(&self) -> u64 {
255        self.seq
256    }
257}
258
259/// Reads and parses a WAL file into a list of events.
260///
261/// Malformed lines are silently skipped. This is intentional — partial writes
262/// from crashes may leave incomplete JSON at the end of the file.
263///
264/// # Example
265///
266/// ```rust,ignore
267/// use runtimo_core::WalReader;
268/// use std::path::Path;
269///
270/// let reader = WalReader::load(Path::new("/tmp/app.wal")).unwrap();
271/// for event in reader.events() {
272///     println!("Event: {:?} for job {}", event.event_type, event.job_id);
273/// }
274/// ```
275pub struct WalReader {
276    events: Vec<WalEvent>,
277}
278
279impl WalReader {
280    /// Loads and parses all events from a WAL file.
281    ///
282    /// # Errors
283    ///
284    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
285    /// be read. Individual malformed lines are skipped, not treated as errors.
286    pub fn load(path: &Path) -> Result<Self> {
287        let content =
288            std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
289
290        let events: Vec<WalEvent> = content
291            .lines()
292            .filter_map(|line| serde_json::from_str(line).ok())
293            .collect();
294
295        Ok(Self { events })
296    }
297
298    /// Returns a slice of all parsed events.
299    pub fn events(&self) -> &[WalEvent] {
300        &self.events
301    }
302
303    /// Reads only the last `n` lines from the WAL file.
304    ///
305    /// More efficient than [`WalReader::load`] when only recent events are needed.
306    /// Malformed lines are silently skipped.
307    ///
308    /// # Errors
309    ///
310    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
311    /// be read.
312    pub fn tail(path: &Path, n: usize) -> Result<Self> {
313        use std::collections::VecDeque;
314        use std::io::{BufRead, BufReader};
315        let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
316        let reader = BufReader::new(file);
317
318        let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
319        for line in reader.lines() {
320            let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
321            if let Ok(event) = serde_json::from_str(&line) {
322                window.push_back(event);
323                if window.len() > n {
324                    window.pop_front();
325                }
326            }
327        }
328
329        Ok(Self {
330            events: window.into(),
331        })
332    }
333}
334
335/// WAL cleanup and rotation utilities.
336impl WalWriter {
337    /// Returns the rotation path for a given WAL file and rotation index.
338    ///
339    /// For `foo.jsonl` with index `1`, returns `foo.jsonl.1` (preserves the
340    /// original extension instead of replacing it).
341    fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
342        let mut s = path.to_string_lossy().into_owned();
343        s.push('.');
344        s.push_str(&index.to_string());
345        std::path::PathBuf::from(s)
346    }
347
348    /// Rotates the WAL when it exceeds max_size_bytes.
349    ///
350    /// Moves the current WAL to `{path}.1` (shifting older rotations),
351    /// then creates a fresh empty WAL. Keeps at most `max_rotations` old files.
352    /// FINDING #15: basic WAL rotation to prevent unbounded growth.
353    pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
354        let metadata = match std::fs::metadata(path) {
355            Ok(m) => m,
356            Err(_) => return Ok(()), // No WAL to rotate
357        };
358
359        if metadata.len() < max_size_bytes {
360            return Ok(());
361        }
362
363        // Shift existing rotations (P1 FIX: proper naming preserves extension)
364        for i in (1..max_rotations).rev() {
365            let old = Self::rotation_path(path, i);
366            let new = Self::rotation_path(path, i + 1);
367            if old.exists() {
368                let _ = std::fs::rename(&old, &new);
369            }
370        }
371
372        // Move current to .1
373        let rotated = Self::rotation_path(path, 1);
374        std::fs::rename(path, &rotated)
375            .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
376
377        // Create fresh empty WAL
378        std::fs::write(path, "")
379            .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
380
381        // Remove oldest rotation if exceeding max
382        let oldest = Self::rotation_path(path, max_rotations + 1);
383        if oldest.exists() {
384            let _ = std::fs::remove_file(&oldest);
385        }
386
387        Ok(())
388    }
389
390    /// Cleans up WAL entries older than max_age_secs.
391    ///
392    /// Writes retained events to a temporary file, then atomically renames
393    /// it over the original WAL. This prevents event loss if another writer
394    /// appends during cleanup.
395    ///
396    /// # Arguments
397    /// * `path` - Path to WAL file
398    /// * `max_age_secs` - Maximum age in seconds
399    ///
400    /// # Returns
401    /// * `Ok(usize)` - Number of entries removed
402    /// * `Err(Error)` - Cleanup failure
403    pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
404        use std::time::{SystemTime, UNIX_EPOCH};
405
406        let cutoff = SystemTime::now()
407            .duration_since(UNIX_EPOCH)
408            .map(|d| d.as_secs())
409            .unwrap_or(0)
410            .saturating_sub(max_age_secs);
411
412        // Read all events under lock to prevent concurrent append loss (P1 FIX)
413        let lock_file = std::fs::File::open(path)
414            .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
415        Self::lock_file(&lock_file)?;
416        let content = std::fs::read_to_string(path)
417            .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
418
419        let events: Vec<WalEvent> = content
420            .lines()
421            .filter_map(|line| serde_json::from_str(line).ok())
422            .collect();
423
424        // Filter out old events
425        let retained: Vec<_> = events.into_iter().filter(|e| e.ts >= cutoff).collect();
426
427        let total = content
428            .lines()
429            .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
430            .count();
431        let removed = total - retained.len();
432
433        if removed > 0 {
434            // Write retained events to temp file, then merge any events appended
435            // by concurrent writers during this cleanup window, then atomic rename.
436            let temp_path = path.with_extension("wal.tmp");
437            {
438                let mut new_wal = WalWriter::create(&temp_path)?;
439                for event in &retained {
440                    new_wal.append(event.clone())?;
441                }
442
443                // Re-read the original WAL to catch any events appended during cleanup.
444                // Lock is still held from above, so no concurrent writer can interleave.
445                let last_seq = retained.last().map(|e| e.seq).unwrap_or(0);
446                let current_content = std::fs::read_to_string(path)
447                    .map_err(|e| crate::Error::WalError(format!("re-read WAL during cleanup: {}", e)))?;
448                for line in current_content.lines() {
449                    if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
450                        if event.seq > last_seq {
451                            new_wal.append(event)?;
452                        }
453                    }
454                }
455            }
456            // Release lock before rename
457            Self::unlock_file(&lock_file);
458            // Atomic rename replaces original — no window for lost events
459            std::fs::rename(&temp_path, path).map_err(|e| {
460                crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
461            })?;
462        } else {
463            Self::unlock_file(&lock_file);
464        }
465
466        Ok(removed)
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    fn tmp_wal(name: &str) -> std::path::PathBuf {
475        std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
476    }
477
478    #[test]
479    fn test_wal_write_and_read() {
480        let path = tmp_wal("write_read");
481        let _ = std::fs::remove_file(&path);
482
483        let mut wal = WalWriter::create(&path).unwrap();
484        wal.append(WalEvent {
485            seq: 0,
486            ts: 1715800000,
487            event_type: WalEventType::JobStarted,
488            job_id: "test-job".into(),
489            capability: Some("FileRead".into()),
490            output: None,
491            error: None,
492            telemetry_before: None,
493            telemetry_after: None,
494            process_before: None,
495            process_after: None,
496        })
497        .unwrap();
498
499        let reader = WalReader::load(&path).unwrap();
500        assert_eq!(reader.events().len(), 1);
501        assert_eq!(reader.events()[0].job_id, "test-job");
502
503        let _ = std::fs::remove_file(&path);
504    }
505
506    #[test]
507    fn test_wal_seq_recovery() {
508        let path = tmp_wal("seq_recovery");
509        let _ = std::fs::remove_file(&path);
510
511        let mut wal = WalWriter::create(&path).unwrap();
512        assert_eq!(wal.seq(), 0);
513        wal.append(WalEvent {
514            seq: 0,
515            ts: 1715800000,
516            event_type: WalEventType::JobStarted,
517            job_id: "job1".into(),
518            capability: None,
519            output: None,
520            error: None,
521            telemetry_before: None,
522            telemetry_after: None,
523            process_before: None,
524            process_after: None,
525        })
526        .unwrap();
527        assert_eq!(wal.seq(), 1);
528
529        // Create new writer — should recover seq from file
530        let wal2 = WalWriter::create(&path).unwrap();
531        assert_eq!(wal2.seq(), 1);
532
533        let _ = std::fs::remove_file(&path);
534    }
535
536    #[test]
537    fn test_wal_rotation() {
538        let path = tmp_wal("rotation");
539        let _ = std::fs::remove_file(&path);
540
541        // Write enough data to trigger rotation
542        let mut wal = WalWriter::create(&path).unwrap();
543        for i in 0..100 {
544            wal.append(WalEvent {
545                seq: i,
546                ts: 1715800000 + i,
547                event_type: WalEventType::JobStarted,
548                job_id: format!("job-{}", i),
549                capability: None,
550                output: None,
551                error: None,
552                telemetry_before: None,
553                telemetry_after: None,
554                process_before: None,
555                process_after: None,
556            })
557            .unwrap();
558        }
559
560        let size = std::fs::metadata(&path).unwrap().len();
561        // Rotate with a threshold smaller than current size
562        WalWriter::rotate(&path, size - 1, 3).unwrap();
563
564        assert!(WalWriter::rotation_path(&path, 1).exists());
565        // New WAL should be empty
566        assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
567
568        let _ = std::fs::remove_file(&path);
569        let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
570    }
571
572    #[test]
573    fn test_wal_cleanup() {
574        let path = tmp_wal("cleanup");
575        let _ = std::fs::remove_file(&path);
576
577        let mut wal = WalWriter::create(&path).unwrap();
578        let now = std::time::SystemTime::now()
579            .duration_since(std::time::UNIX_EPOCH)
580            .unwrap()
581            .as_secs();
582
583        // Write old event
584        wal.append(WalEvent {
585            seq: 0,
586            ts: now - 1000,
587            event_type: WalEventType::JobStarted,
588            job_id: "old-job".into(),
589            capability: None,
590            output: None,
591            error: None,
592            telemetry_before: None,
593            telemetry_after: None,
594            process_before: None,
595            process_after: None,
596        })
597        .unwrap();
598
599        // Write recent event
600        wal.append(WalEvent {
601            seq: 1,
602            ts: now,
603            event_type: WalEventType::JobCompleted,
604            job_id: "new-job".into(),
605            capability: None,
606            output: None,
607            error: None,
608            telemetry_before: None,
609            telemetry_after: None,
610            process_before: None,
611            process_after: None,
612        })
613        .unwrap();
614
615        let removed = WalWriter::cleanup(&path, 500).unwrap();
616        assert_eq!(removed, 1); // Old event removed
617
618        let reader = WalReader::load(&path).unwrap();
619        assert_eq!(reader.events().len(), 1);
620        assert_eq!(reader.events()[0].job_id, "new-job");
621
622        let _ = std::fs::remove_file(&path);
623    }
624
625    #[test]
626    fn test_wal_skip_serializing_optional_fields() {
627        // FINDING #15: verify optional fields are skipped when None
628        let event = WalEvent {
629            seq: 0,
630            ts: 1715800000,
631            event_type: WalEventType::JobStarted,
632            job_id: "test".into(),
633            capability: None,
634            output: None,
635            error: None,
636            telemetry_before: None,
637            telemetry_after: None,
638            process_before: None,
639            process_after: None,
640        };
641
642        let json = serde_json::to_string(&event).unwrap();
643        assert!(!json.contains("capability"));
644        assert!(!json.contains("telemetry_before"));
645        assert!(!json.contains("process_before"));
646    }
647}