Skip to main content

runtimo_core/
wal.rs

1//! Write-Ahead Log (WAL) — Append-only, crash-resistant event log.
2//!
3//! Events are written as JSONL (one JSON object per line) with `fsync` after
4//! each write to guarantee durability. The WAL enables crash recovery by
5//! replaying events to reconstruct system state.
6//!
7//! Sequence numbers and rotation indices use explicit arithmetic — these
8//! are intentional, increment-by-one operations with known-safe ranges.
9
10#![allow(clippy::arithmetic_side_effects)]
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use runtimo_core::{WalWriter, WalReader, WalEvent, WalEventType};
16//! use std::path::Path;
17//!
18//! let mut wal = WalWriter::create(Path::new("/tmp/test.wal")).unwrap();
19//! wal.append(WalEvent {
20//!     seq: 0, ts: 1715800000,
21//!     event_type: WalEventType::JobStarted,
22//!     job_id: "abc123".into(),
23//!     capability: Some("FileRead".into()),
24//!     output: None, error: None,
25//! }).unwrap();
26//!
27//! let reader = WalReader::load(Path::new("/tmp/test.wal")).unwrap();
28//! assert_eq!(reader.events().len(), 1);
29//! ```
30
31use crate::processes::ProcessSummary;
32use crate::telemetry::Telemetry;
33use crate::Result;
34use serde::{Deserialize, Serialize};
35use std::path::Path;
36
37/// Reads the last event's sequence number from a WAL file without loading
38/// the entire file into memory. Reads only the trailing tail_bytes of the
39/// file to find the last valid JSON line.
40///
41/// Falls back gracefully to `None` on any parse error, partial line, or
42/// I/O failure — the caller falls back to full scan.
43fn read_last_seq(path: &Path, tail_bytes: usize) -> Option<u64> {
44    use std::io::{Read, Seek, SeekFrom};
45
46    let mut file = std::fs::File::open(path).ok()?;
47    let file_len = file.metadata().ok()?.len();
48    if file_len == 0 {
49        return None;
50    }
51
52    let start = file_len.saturating_sub(tail_bytes as u64);
53    file.seek(SeekFrom::Start(start)).ok()?;
54    let mut buf = vec![
55        0u8;
56        usize::try_from(file_len - start)
57            .unwrap_or(0)
58            .saturating_add(1)
59    ];
60    let n = file.read(&mut buf).ok()?;
61    buf.truncate(n);
62
63    // Split into lines, find the last non-empty line, trailing newline stripped
64    let lines: Vec<&[u8]> = buf
65        .split(|&b| b == b'\n')
66        .filter(|l| !l.is_empty())
67        .collect();
68
69    for line in lines.iter().rev() {
70        if let Ok(line_str) = std::str::from_utf8(line) {
71            if let Ok(event) = serde_json::from_str::<WalEvent>(line_str.trim()) {
72                return Some(event.seq);
73            }
74        }
75    }
76    None
77}
78
79/// A single WAL event record.
80///
81/// Events are appended sequentially and identified by `seq`. The `ts` field
82/// is a Unix timestamp in seconds. Optional fields (`capability`, `output`,
83/// `error`, `cmd_*`) are skipped during serialization when `None`.
84///
85/// # Command Execution Events
86///
87/// When `event_type` is [`WalEventType::CommandExecuted`], the `cmd*` fields
88/// capture the shell command, its output, and any auto-correction applied.
89/// These events are only written in debug builds (`#[cfg(debug_assertions)]`),
90/// but the variant exists in release builds for reading old WALs.
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[allow(clippy::exhaustive_structs)]
93pub struct WalEvent {
94    /// Sequence number (monotonically increasing within a writer session).
95    pub seq: u64,
96    /// Unix timestamp (seconds) when the event occurred.
97    pub ts: u64,
98    /// Type of the event (job lifecycle stage or command execution).
99    #[serde(rename = "type")]
100    pub event_type: WalEventType,
101    /// The job ID this event relates to.
102    pub job_id: String,
103    /// Capability name, if applicable.
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub capability: Option<String>,
106    /// Output data from the capability, if applicable.
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub output: Option<serde_json::Value>,
109    /// Error message, if the event represents a failure.
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub error: Option<String>,
112    /// Hardware telemetry snapshot before execution.
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub telemetry_before: Option<Telemetry>,
115    /// Hardware telemetry snapshot after execution.
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub telemetry_after: Option<Telemetry>,
118    /// Process summary before execution.
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub process_before: Option<ProcessSummary>,
121    /// Process summary after execution.
122    #[serde(skip_serializing_if = "Option::is_none")]
123    pub process_after: Option<ProcessSummary>,
124    /// Shell command string (CommandExecuted events only).
125    #[serde(skip_serializing_if = "Option::is_none")]
126    pub cmd: Option<String>,
127    /// Captured stdout, truncated to 1KB (CommandExecuted events only).
128    #[serde(skip_serializing_if = "Option::is_none")]
129    pub cmd_stdout: Option<String>,
130    /// Captured stderr, truncated to 1KB (CommandExecuted events only).
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub cmd_stderr: Option<String>,
133    /// Exit code of the command (CommandExecuted events only).
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub cmd_exit_code: Option<i32>,
136    /// Auto-corrected command, if correction was applied (future Phase 2).
137    #[serde(skip_serializing_if = "Option::is_none")]
138    pub cmd_corrected: Option<String>,
139}
140
141/// Types of WAL events, corresponding to job lifecycle stages
142/// and command executions.
143#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
144#[serde(rename_all = "snake_case")]
145// Additional variants may be added in future versions; the enum is
146// intentionally non-exhaustive for forward compatibility.
147#[allow(clippy::exhaustive_enums)]
148pub enum WalEventType {
149    /// Job has been submitted to the system.
150    JobSubmitted,
151    /// Job arguments passed validation.
152    JobValidated,
153    /// Job execution has started.
154    JobStarted,
155    /// Job completed successfully.
156    JobCompleted,
157    /// Job failed during validation or execution.
158    JobFailed,
159    /// A completed job was rolled back.
160    JobRolledBack,
161    /// Shell command executed (dev-only logging for error absorption).
162    CommandExecuted,
163}
164
165/// Append-only WAL writer.
166///
167/// Opens (or creates) a file in append mode and writes one JSONL line per
168/// event, using file locking for concurrent access safety and `fsync` after
169/// each write for durability.
170///
171/// # Example
172///
173/// ```rust,ignore
174/// use runtimo_core::{WalWriter, WalEvent, WalEventType};
175/// use std::path::Path;
176///
177/// let mut wal = WalWriter::create(Path::new("/tmp/app.wal")).unwrap();
178/// wal.append(WalEvent {
179///     seq: 0, ts: 1715800000,
180///     event_type: WalEventType::JobStarted,
181///     job_id: "job1".into(),
182///     capability: None, output: None, error: None,
183/// }).unwrap();
184/// ```
185#[allow(clippy::exhaustive_structs)]
186pub struct WalWriter {
187    path: std::path::PathBuf,
188    seq: u64,
189}
190
191impl WalWriter {
192    /// Creates or opens a WAL file at the given path.
193    ///
194    /// The file is opened in append mode. Existing content is preserved.
195    ///
196    /// # Errors
197    ///
198    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
199    /// be created or opened.
200    pub fn create(path: &Path) -> Result<Self> {
201        // Ensure parent directory exists
202        if let Some(parent) = path.parent() {
203            if !parent.exists() {
204                std::fs::create_dir_all(parent).map_err(|e| {
205                    crate::Error::WalError(format!(
206                        "Failed to create WAL directory {}: {}",
207                        parent.display(),
208                        e
209                    ))
210                })?;
211            }
212        }
213
214        // Create the file if it doesn't exist
215        if !path.exists() {
216            std::fs::File::create(path).map_err(|e| {
217                crate::Error::WalError(format!(
218                    "Failed to create WAL file {}: {}",
219                    path.display(),
220                    e
221                ))
222            })?;
223        }
224
225        // Recover sequence from existing WAL content to ensure monotonic
226        // ordering across process restarts. Acquire lock to prevent reading
227        // during a concurrent write (P2 FIX).
228        //
229        // Optimized: reads only the last 8KB of the WAL file to find the
230        // max seq. Falls back to full read only if tail-read fails.
231        let seq = if path.exists() {
232            let lock_file = std::fs::File::open(path)
233                .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
234            Self::lock_file(&lock_file)?;
235            let recovered = if let Some(last_seq) = read_last_seq(path, 8192) {
236                last_seq + 1
237            } else {
238                // Fall back to full scan if tail-read failed
239                let content = std::fs::read_to_string(path).map_err(|e| {
240                    crate::Error::WalError(format!("read WAL for seq recovery: {}", e))
241                })?;
242                content
243                    .lines()
244                    .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
245                    .map(|e| e.seq)
246                    .max()
247                    .map_or(0, |max| max + 1)
248            };
249            Self::unlock_file(&lock_file);
250            recovered
251        } else {
252            0
253        };
254
255        Ok(Self {
256            path: path.to_path_buf(),
257            seq,
258        })
259    }
260
261    /// Acquires an exclusive file lock for writing (FINDING #14).
262    #[cfg(unix)]
263    fn lock_file(file: &std::fs::File) -> Result<()> {
264        use std::os::unix::io::AsRawFd;
265        let fd = file.as_raw_fd();
266        // SAFETY: fd is a valid open file descriptor; LOCK_EX is a well-defined operation
267        let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
268        if result != 0 {
269            return Err(crate::Error::WalError(format!(
270                "Failed to acquire WAL lock: {}",
271                std::io::Error::last_os_error()
272            )));
273        }
274        Ok(())
275    }
276
277    /// Acquires an exclusive file lock (no-op on non-unix).
278    #[cfg(not(unix))]
279    fn lock_file(_file: &std::fs::File) -> Result<()> {
280        Ok(())
281    }
282
283    /// Releases an exclusive file lock (FINDING #14).
284    #[cfg(unix)]
285    fn unlock_file(file: &std::fs::File) {
286        use std::os::unix::io::AsRawFd;
287        let fd = file.as_raw_fd();
288        // SAFETY: fd is a valid open file descriptor; LOCK_UN is a well-defined operation
289        unsafe { libc::flock(fd, libc::LOCK_UN) };
290    }
291
292    /// Releases an exclusive file lock (no-op on non-unix).
293    #[cfg(not(unix))]
294    fn unlock_file(_file: &std::fs::File) {}
295
296    /// Appends an event to the WAL using true append mode (P0 FIX).
297    ///
298    /// Opens the file in append mode, acquires an exclusive lock, writes the
299    /// JSONL line, fsyncs, then releases the lock. This is O(1) per write
300    /// instead of O(N) read-rewrite, and the lock is held during the entire
301    /// write operation preventing concurrent write loss.
302    ///
303    /// Increments the internal sequence counter after a successful write.
304    ///
305    /// # Errors
306    ///
307    /// Returns [`Error::WalError`](crate::Error::WalError) on serialization
308    /// or I/O failure.
309    pub fn append(&mut self, event: WalEvent) -> Result<()> {
310        use std::io::Write;
311        let line =
312            serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
313
314        // Open in append mode — no read-rewrite, O(1) per write
315        let file = std::fs::OpenOptions::new()
316            .create(true)
317            .append(true)
318            .open(&self.path)
319            .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
320
321        // Hold exclusive lock during entire write
322        Self::lock_file(&file)?;
323        {
324            let mut buf = std::io::BufWriter::new(&file);
325            writeln!(buf, "{}", line)
326                .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
327            buf.flush()
328                .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
329            file.sync_all()
330                .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
331        }
332        Self::unlock_file(&file);
333
334        self.seq += 1;
335        Ok(())
336    }
337
338    /// Returns the current sequence number (next event will use this value).
339    #[must_use]
340    pub fn seq(&self) -> u64 {
341        self.seq
342    }
343}
344
345/// Reads and parses a WAL file into a list of events.
346///
347/// Malformed lines are silently skipped. This is intentional — partial writes
348/// from crashes may leave incomplete JSON at the end of the file.
349///
350/// # Example
351///
352/// ```rust,ignore
353/// use runtimo_core::WalReader;
354/// use std::path::Path;
355///
356/// let reader = WalReader::load(Path::new("/tmp/app.wal")).unwrap();
357/// for event in reader.events() {
358///     println!("Event: {:?} for job {}", event.event_type, event.job_id);
359/// }
360/// ```
361#[allow(clippy::exhaustive_structs)]
362pub struct WalReader {
363    events: Vec<WalEvent>,
364}
365
366impl WalReader {
367    /// Loads and parses all events from a WAL file.
368    ///
369    /// # Errors
370    ///
371    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
372    /// be read. Individual malformed lines are skipped, not treated as errors.
373    pub fn load(path: &Path) -> Result<Self> {
374        let content =
375            std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
376
377        let events: Vec<WalEvent> = content
378            .lines()
379            .filter_map(|line| serde_json::from_str(line).ok())
380            .collect();
381
382        Ok(Self { events })
383    }
384
385    /// Returns a slice of all parsed events.
386    #[must_use]
387    pub fn events(&self) -> &[WalEvent] {
388        &self.events
389    }
390
391    /// Reads only the last `n` lines from the WAL file.
392    ///
393    /// More efficient than [`WalReader::load`] when only recent events are needed.
394    /// Malformed lines are silently skipped.
395    ///
396    /// # Errors
397    ///
398    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
399    /// be read.
400    pub fn tail(path: &Path, n: usize) -> Result<Self> {
401        use std::collections::VecDeque;
402        use std::io::{BufRead, BufReader};
403        let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
404        let reader = BufReader::new(file);
405
406        let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
407        for line in reader.lines() {
408            let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
409            if let Ok(event) = serde_json::from_str(&line) {
410                window.push_back(event);
411                if window.len() > n {
412                    window.pop_front();
413                }
414            }
415        }
416
417        Ok(Self {
418            events: window.into(),
419        })
420    }
421}
422
423/// WAL cleanup and rotation utilities.
424impl WalWriter {
425    /// Returns the rotation path for a given WAL file and rotation index.
426    ///
427    /// For `foo.jsonl` with index `1`, returns `foo.jsonl.1` (preserves the
428    /// original extension instead of replacing it).
429    fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
430        let mut s = path.to_string_lossy().into_owned();
431        s.push('.');
432        s.push_str(&index.to_string());
433        std::path::PathBuf::from(s)
434    }
435
436    /// Rotates the WAL when it exceeds max_size_bytes.
437    ///
438    /// Moves the current WAL to `{path}.1` (shifting older rotations),
439    /// then creates a fresh empty WAL. Keeps at most `max_rotations` old files.
440    /// FINDING #15: basic WAL rotation to prevent unbounded growth.
441    ///
442    /// # Errors
443    /// Returns `IoError` or `BackupError` if WAL file operations fail.
444    pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
445        let Ok(metadata) = std::fs::metadata(path) else {
446            return Ok(()); // No WAL to rotate
447        };
448
449        if metadata.len() < max_size_bytes {
450            return Ok(());
451        }
452
453        // Shift existing rotations (P1 FIX: proper naming preserves extension)
454        for i in (1..max_rotations).rev() {
455            let old = Self::rotation_path(path, i);
456            let new = Self::rotation_path(path, i + 1);
457            if old.exists() {
458                let _ = std::fs::rename(&old, &new);
459            }
460        }
461
462        // Move current to .1
463        let rotated = Self::rotation_path(path, 1);
464        std::fs::rename(path, &rotated)
465            .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
466
467        // Create fresh empty WAL
468        std::fs::write(path, "")
469            .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
470
471        // Remove oldest rotation if exceeding max
472        let oldest = Self::rotation_path(path, max_rotations + 1);
473        if oldest.exists() {
474            let _ = std::fs::remove_file(&oldest);
475        }
476
477        Ok(())
478    }
479
480    /// Cleans up WAL entries older than max_age_secs.
481    ///
482    /// Writes retained events to a temporary file, then atomically renames
483    /// it over the original WAL. This prevents event loss if another writer
484    /// appends during cleanup.
485    ///
486    /// # Arguments
487    /// * `path` - Path to WAL file
488    /// * `max_age_secs` - Maximum age in seconds
489    ///
490    /// # Returns
491    /// * `Ok(usize)` - Number of entries removed
492    ///
493    /// # Errors
494    /// Returns `IoError` if the WAL file cannot be read, the temp file cannot
495    /// be written, or the atomic rename fails.
496    pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
497        use std::time::{SystemTime, UNIX_EPOCH};
498
499        let cutoff = SystemTime::now()
500            .duration_since(UNIX_EPOCH)
501            .map_or(0, |d| d.as_secs())
502            .saturating_sub(max_age_secs);
503
504        // Read all events under lock to prevent concurrent append loss (P1 FIX)
505        let lock_file = std::fs::File::open(path)
506            .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
507        Self::lock_file(&lock_file)?;
508        let content = std::fs::read_to_string(path)
509            .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
510
511        let all_events: Vec<WalEvent> = content
512            .lines()
513            .filter_map(|line| serde_json::from_str(line).ok())
514            .collect();
515
516        let total = all_events.len();
517
518        let retained: Vec<_> = all_events.into_iter().filter(|e| e.ts >= cutoff).collect();
519        let removed = total - retained.len();
520
521        if removed > 0 {
522            // Write retained events to temp file, then merge any events appended
523            // by concurrent writers during this cleanup window, then atomic rename.
524            let temp_path = path.with_extension("wal.tmp");
525            {
526                let mut new_wal = Self::create(&temp_path)?;
527                for event in &retained {
528                    new_wal.append(event.clone())?;
529                }
530
531                // Re-read the original WAL to catch any events appended during cleanup.
532                // Lock is still held from above, so no concurrent writer can interleave.
533                let last_seq = retained.last().map_or(0, |e| e.seq);
534                let current_content = std::fs::read_to_string(path).map_err(|e| {
535                    crate::Error::WalError(format!("re-read WAL during cleanup: {}", e))
536                })?;
537                for line in current_content.lines() {
538                    if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
539                        if event.seq > last_seq {
540                            new_wal.append(event)?;
541                        }
542                    }
543                }
544            }
545            // Release lock before rename
546            Self::unlock_file(&lock_file);
547            // Atomic rename replaces original — no window for lost events
548            std::fs::rename(&temp_path, path).map_err(|e| {
549                crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
550            })?;
551        } else {
552            Self::unlock_file(&lock_file);
553        }
554
555        Ok(removed)
556    }
557}
558
559/// Truncates a string to at most `max_bytes` bytes, respecting UTF-8 boundaries.
560///
561/// Used to bound command output stored in WAL events. 1KB is sufficient
562/// for error messages and pattern analysis while preventing WAL bloat.
563#[must_use]
564pub fn truncate_to(s: &str, max_bytes: usize) -> String {
565    if s.len() <= max_bytes {
566        return s.to_string();
567    }
568    let mut end = max_bytes;
569    while end > 0 && !s.is_char_boundary(end) {
570        end -= 1;
571    }
572    let mut truncated = s[..end].to_string();
573    truncated.push_str("...[truncated]");
574    truncated
575}
576
577#[cfg(test)]
578mod tests {
579    use super::*;
580
581    fn tmp_wal(name: &str) -> std::path::PathBuf {
582        std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
583    }
584
585    #[test]
586    fn test_wal_write_and_read() {
587        let path = tmp_wal("write_read");
588        let _ = std::fs::remove_file(&path);
589
590        let mut wal = WalWriter::create(&path).unwrap();
591        wal.append(WalEvent {
592            seq: 0,
593            ts: 1715800000,
594            event_type: WalEventType::JobStarted,
595            job_id: "test-job".into(),
596            capability: Some("FileRead".into()),
597            output: None,
598            error: None,
599            telemetry_before: None,
600            telemetry_after: None,
601            process_before: None,
602            process_after: None,
603            cmd: None,
604            cmd_stdout: None,
605            cmd_stderr: None,
606            cmd_exit_code: None,
607            cmd_corrected: None,
608        })
609        .unwrap();
610
611        let reader = WalReader::load(&path).unwrap();
612        assert_eq!(reader.events().len(), 1);
613        assert_eq!(reader.events()[0].job_id, "test-job");
614
615        let _ = std::fs::remove_file(&path);
616    }
617
618    #[test]
619    fn test_wal_seq_recovery() {
620        let path = tmp_wal("seq_recovery");
621        let _ = std::fs::remove_file(&path);
622
623        let mut wal = WalWriter::create(&path).unwrap();
624        assert_eq!(wal.seq(), 0);
625        wal.append(WalEvent {
626            seq: 0,
627            ts: 1715800000,
628            event_type: WalEventType::JobStarted,
629            job_id: "job1".into(),
630            capability: None,
631            output: None,
632            error: None,
633            telemetry_before: None,
634            telemetry_after: None,
635            process_before: None,
636            process_after: None,
637            cmd: None,
638            cmd_stdout: None,
639            cmd_stderr: None,
640            cmd_exit_code: None,
641            cmd_corrected: None,
642        })
643        .unwrap();
644        assert_eq!(wal.seq(), 1);
645
646        // Create new writer — should recover seq from file
647        let wal2 = WalWriter::create(&path).unwrap();
648        assert_eq!(wal2.seq(), 1);
649
650        let _ = std::fs::remove_file(&path);
651    }
652
653    #[test]
654    fn test_wal_rotation() {
655        let path = tmp_wal("rotation");
656        let _ = std::fs::remove_file(&path);
657
658        // Write enough data to trigger rotation
659        let mut wal = WalWriter::create(&path).unwrap();
660        for i in 0..100 {
661            wal.append(WalEvent {
662                seq: i,
663                ts: 1715800000 + i,
664                event_type: WalEventType::JobStarted,
665                job_id: format!("job-{}", i),
666                capability: None,
667                output: None,
668                error: None,
669                telemetry_before: None,
670                telemetry_after: None,
671                process_before: None,
672                process_after: None,
673                cmd: None,
674                cmd_stdout: None,
675                cmd_stderr: None,
676                cmd_exit_code: None,
677                cmd_corrected: None,
678            })
679            .unwrap();
680        }
681
682        let size = std::fs::metadata(&path).unwrap().len();
683        // Rotate with a threshold smaller than current size
684        WalWriter::rotate(&path, size - 1, 3).unwrap();
685
686        assert!(WalWriter::rotation_path(&path, 1).exists());
687        // New WAL should be empty
688        assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
689
690        let _ = std::fs::remove_file(&path);
691        let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
692    }
693
694    #[test]
695    fn test_wal_cleanup() {
696        let path = tmp_wal("cleanup");
697        let _ = std::fs::remove_file(&path);
698
699        let mut wal = WalWriter::create(&path).unwrap();
700        let now = std::time::SystemTime::now()
701            .duration_since(std::time::UNIX_EPOCH)
702            .unwrap()
703            .as_secs();
704
705        // Write old event
706        wal.append(WalEvent {
707            seq: 0,
708            ts: now - 1000,
709            event_type: WalEventType::JobStarted,
710            job_id: "old-job".into(),
711            capability: None,
712            output: None,
713            error: None,
714            telemetry_before: None,
715            telemetry_after: None,
716            process_before: None,
717            process_after: None,
718            cmd: None,
719            cmd_stdout: None,
720            cmd_stderr: None,
721            cmd_exit_code: None,
722            cmd_corrected: None,
723        })
724        .unwrap();
725
726        // Write recent event
727        wal.append(WalEvent {
728            seq: 1,
729            ts: now,
730            event_type: WalEventType::JobCompleted,
731            job_id: "new-job".into(),
732            capability: None,
733            output: None,
734            error: None,
735            telemetry_before: None,
736            telemetry_after: None,
737            process_before: None,
738            process_after: None,
739            cmd: None,
740            cmd_stdout: None,
741            cmd_stderr: None,
742            cmd_exit_code: None,
743            cmd_corrected: None,
744        })
745        .unwrap();
746
747        let removed = WalWriter::cleanup(&path, 500).unwrap();
748        assert_eq!(removed, 1); // Old event removed
749
750        let reader = WalReader::load(&path).unwrap();
751        assert_eq!(reader.events().len(), 1);
752        assert_eq!(reader.events()[0].job_id, "new-job");
753
754        let _ = std::fs::remove_file(&path);
755    }
756
757    #[test]
758    fn test_wal_skip_serializing_optional_fields() {
759        // FINDING #15: verify optional fields are skipped when None
760        let event = WalEvent {
761            seq: 0,
762            ts: 1715800000,
763            event_type: WalEventType::JobStarted,
764            job_id: "test".into(),
765            capability: None,
766            output: None,
767            error: None,
768            telemetry_before: None,
769            telemetry_after: None,
770            process_before: None,
771            process_after: None,
772            cmd: None,
773            cmd_stdout: None,
774            cmd_stderr: None,
775            cmd_exit_code: None,
776            cmd_corrected: None,
777        };
778
779        let json = serde_json::to_string(&event).unwrap();
780        assert!(!json.contains("capability"));
781        assert!(!json.contains("telemetry_before"));
782        assert!(!json.contains("process_before"));
783        assert!(!json.contains("cmd"));
784    }
785
786    #[test]
787    fn test_command_executed_event() {
788        let path = tmp_wal("cmd_exec");
789        let _ = std::fs::remove_file(&path);
790
791        let mut wal = WalWriter::create(&path).unwrap();
792        wal.append(WalEvent {
793            seq: 0,
794            ts: 1715800000,
795            event_type: WalEventType::CommandExecuted,
796            job_id: "job-cmd".into(),
797            capability: None,
798            output: None,
799            error: None,
800            telemetry_before: None,
801            telemetry_after: None,
802            process_before: None,
803            process_after: None,
804            cmd: Some("ls | hed -3".into()),
805            cmd_stdout: None,
806            cmd_stderr: Some("hed: command not found".into()),
807            cmd_exit_code: Some(127),
808            cmd_corrected: None,
809        })
810        .unwrap();
811
812        let reader = WalReader::load(&path).unwrap();
813        assert_eq!(reader.events().len(), 1);
814        assert_eq!(reader.events()[0].event_type, WalEventType::CommandExecuted);
815        assert_eq!(reader.events()[0].cmd.as_deref(), Some("ls | hed -3"));
816        assert_eq!(reader.events()[0].cmd_exit_code, Some(127));
817
818        let _ = std::fs::remove_file(&path);
819    }
820
821    #[test]
822    fn test_truncate_to() {
823        assert_eq!(truncate_to("hello", 1024), "hello");
824        assert_eq!(truncate_to("hello", 3), "hel...[truncated]");
825        let long = "a".repeat(2000);
826        let truncated = truncate_to(&long, 1024);
827        assert!(truncated.len() < 1100);
828        assert!(truncated.ends_with("...[truncated]"));
829    }
830}