Skip to main content

runtimo_core/
wal.rs

1//! Write-Ahead Log (WAL) — Append-only, crash-resistant event log.
2//!
3//! Events are written as JSONL (one JSON object per line) with `fsync` after
4//! each write to guarantee durability. The WAL enables crash recovery by
5//! replaying events to reconstruct system state.
6//!
7//! Sequence numbers and rotation indices use explicit arithmetic — these
8//! are intentional, increment-by-one operations with known-safe ranges.
9
10#![allow(clippy::arithmetic_side_effects)]
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use runtimo_core::{WalWriter, WalReader, WalEvent, WalEventType};
16//! use std::path::Path;
17//!
18//! let mut wal = WalWriter::create(Path::new("/tmp/test.wal")).unwrap();
19//! wal.append(WalEvent {
20//!     seq: 0, ts: 1715800000,
21//!     event_type: WalEventType::JobStarted,
22//!     job_id: "abc123".into(),
23//!     capability: Some("FileRead".into()),
24//!     output: None, error: None,
25//! }).unwrap();
26//!
27//! let reader = WalReader::load(Path::new("/tmp/test.wal")).unwrap();
28//! assert_eq!(reader.events().len(), 1);
29//! ```
30
31use crate::processes::ProcessSummary;
32use crate::telemetry::Telemetry;
33use crate::Result;
34use serde::{Deserialize, Serialize};
35use std::path::Path;
36
37/// Reads the last event's sequence number from a WAL file without loading
38/// the entire file into memory. Reads only the trailing tail_bytes of the
39/// file to find the last valid JSON line.
40///
41/// Falls back gracefully to `None` on any parse error, partial line, or
42/// I/O failure — the caller falls back to full scan.
43fn read_last_seq(path: &Path, tail_bytes: usize) -> Option<u64> {
44    use std::io::{Read, Seek, SeekFrom};
45
46    let mut file = std::fs::File::open(path).ok()?;
47    let file_len = file.metadata().ok()?.len();
48    if file_len == 0 {
49        return None;
50    }
51
52    let start = file_len.saturating_sub(tail_bytes as u64);
53    file.seek(SeekFrom::Start(start)).ok()?;
54    let mut buf = vec![0u8; usize::try_from(file_len - start).unwrap_or(0).saturating_add(1)];
55    let n = file.read(&mut buf).ok()?;
56    buf.truncate(n);
57
58    // Split into lines, find the last non-empty line, trailing newline stripped
59    let lines: Vec<&[u8]> = buf
60        .split(|&b| b == b'\n')
61        .filter(|l| !l.is_empty())
62        .collect();
63
64    for line in lines.iter().rev() {
65        if let Ok(line_str) = std::str::from_utf8(line) {
66            if let Ok(event) = serde_json::from_str::<WalEvent>(line_str.trim()) {
67                return Some(event.seq);
68            }
69        }
70    }
71    None
72}
73
74/// A single WAL event record.
75///
76/// Events are appended sequentially and identified by `seq`. The `ts` field
77/// is a Unix timestamp in seconds. Optional fields (`capability`, `output`,
78/// `error`, `cmd_*`) are skipped during serialization when `None`.
79///
80/// # Command Execution Events
81///
82/// When `event_type` is [`WalEventType::CommandExecuted`], the `cmd*` fields
83/// capture the shell command, its output, and any auto-correction applied.
84/// These events are only written in debug builds (`#[cfg(debug_assertions)]`),
85/// but the variant exists in release builds for reading old WALs.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87#[allow(clippy::exhaustive_structs)]
88pub struct WalEvent {
89    /// Sequence number (monotonically increasing within a writer session).
90    pub seq: u64,
91    /// Unix timestamp (seconds) when the event occurred.
92    pub ts: u64,
93    /// Type of the event (job lifecycle stage or command execution).
94    #[serde(rename = "type")]
95    pub event_type: WalEventType,
96    /// The job ID this event relates to.
97    pub job_id: String,
98    /// Capability name, if applicable.
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub capability: Option<String>,
101    /// Output data from the capability, if applicable.
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub output: Option<serde_json::Value>,
104    /// Error message, if the event represents a failure.
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub error: Option<String>,
107    /// Hardware telemetry snapshot before execution.
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub telemetry_before: Option<Telemetry>,
110    /// Hardware telemetry snapshot after execution.
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub telemetry_after: Option<Telemetry>,
113    /// Process summary before execution.
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub process_before: Option<ProcessSummary>,
116    /// Process summary after execution.
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub process_after: Option<ProcessSummary>,
119    /// Shell command string (CommandExecuted events only).
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub cmd: Option<String>,
122    /// Captured stdout, truncated to 1KB (CommandExecuted events only).
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub cmd_stdout: Option<String>,
125    /// Captured stderr, truncated to 1KB (CommandExecuted events only).
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub cmd_stderr: Option<String>,
128    /// Exit code of the command (CommandExecuted events only).
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub cmd_exit_code: Option<i32>,
131    /// Auto-corrected command, if correction was applied (future Phase 2).
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub cmd_corrected: Option<String>,
134}
135
136/// Types of WAL events, corresponding to job lifecycle stages
137/// and command executions.
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
139#[serde(rename_all = "snake_case")]
140// Additional variants may be added in future versions; the enum is
141// intentionally non-exhaustive for forward compatibility.
142#[allow(clippy::exhaustive_enums)]
143pub enum WalEventType {
144    /// Job has been submitted to the system.
145    JobSubmitted,
146    /// Job arguments passed validation.
147    JobValidated,
148    /// Job execution has started.
149    JobStarted,
150    /// Job completed successfully.
151    JobCompleted,
152    /// Job failed during validation or execution.
153    JobFailed,
154    /// A completed job was rolled back.
155    JobRolledBack,
156    /// Shell command executed (dev-only logging for error absorption).
157    CommandExecuted,
158}
159
160/// Append-only WAL writer.
161///
162/// Opens (or creates) a file in append mode and writes one JSONL line per
163/// event, using file locking for concurrent access safety and `fsync` after
164/// each write for durability.
165///
166/// # Example
167///
168/// ```rust,ignore
169/// use runtimo_core::{WalWriter, WalEvent, WalEventType};
170/// use std::path::Path;
171///
172/// let mut wal = WalWriter::create(Path::new("/tmp/app.wal")).unwrap();
173/// wal.append(WalEvent {
174///     seq: 0, ts: 1715800000,
175///     event_type: WalEventType::JobStarted,
176///     job_id: "job1".into(),
177///     capability: None, output: None, error: None,
178/// }).unwrap();
179/// ```
180#[allow(clippy::exhaustive_structs)]
181pub struct WalWriter {
182    path: std::path::PathBuf,
183    seq: u64,
184}
185
186impl WalWriter {
187    /// Creates or opens a WAL file at the given path.
188    ///
189    /// The file is opened in append mode. Existing content is preserved.
190    ///
191    /// # Errors
192    ///
193    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
194    /// be created or opened.
195    pub fn create(path: &Path) -> Result<Self> {
196        // Ensure parent directory exists
197        if let Some(parent) = path.parent() {
198            if !parent.exists() {
199                std::fs::create_dir_all(parent).map_err(|e| {
200                    crate::Error::WalError(format!(
201                        "Failed to create WAL directory {}: {}",
202                        parent.display(),
203                        e
204                    ))
205                })?;
206            }
207        }
208
209        // Create the file if it doesn't exist
210        if !path.exists() {
211            std::fs::File::create(path).map_err(|e| {
212                crate::Error::WalError(format!(
213                    "Failed to create WAL file {}: {}",
214                    path.display(),
215                    e
216                ))
217            })?;
218        }
219
220        // Recover sequence from existing WAL content to ensure monotonic
221        // ordering across process restarts. Acquire lock to prevent reading
222        // during a concurrent write (P2 FIX).
223        //
224        // Optimized: reads only the last 8KB of the WAL file to find the
225        // max seq. Falls back to full read only if tail-read fails.
226        let seq = if path.exists() {
227            let lock_file = std::fs::File::open(path)
228                .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
229            Self::lock_file(&lock_file)?;
230            let recovered = if let Some(last_seq) = read_last_seq(path, 8192) {
231                last_seq + 1
232            } else {
233                // Fall back to full scan if tail-read failed
234                let content = std::fs::read_to_string(path).map_err(|e| {
235                    crate::Error::WalError(format!("read WAL for seq recovery: {}", e))
236                })?;
237                content
238                    .lines()
239                    .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
240                    .map(|e| e.seq)
241                    .max()
242                    .map_or(0, |max| max + 1)
243            };
244            Self::unlock_file(&lock_file);
245            recovered
246        } else {
247            0
248        };
249
250        Ok(Self {
251            path: path.to_path_buf(),
252            seq,
253        })
254    }
255
256    /// Acquires an exclusive file lock for writing (FINDING #14).
257    #[cfg(unix)]
258    fn lock_file(file: &std::fs::File) -> Result<()> {
259        use std::os::unix::io::AsRawFd;
260        let fd = file.as_raw_fd();
261        // SAFETY: fd is a valid open file descriptor; LOCK_EX is a well-defined operation
262        let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
263        if result != 0 {
264            return Err(crate::Error::WalError(format!(
265                "Failed to acquire WAL lock: {}",
266                std::io::Error::last_os_error()
267            )));
268        }
269        Ok(())
270    }
271
272    /// Acquires an exclusive file lock (no-op on non-unix).
273    #[cfg(not(unix))]
274    fn lock_file(_file: &std::fs::File) -> Result<()> {
275        Ok(())
276    }
277
278    /// Releases an exclusive file lock (FINDING #14).
279    #[cfg(unix)]
280    fn unlock_file(file: &std::fs::File) {
281        use std::os::unix::io::AsRawFd;
282        let fd = file.as_raw_fd();
283        // SAFETY: fd is a valid open file descriptor; LOCK_UN is a well-defined operation
284        unsafe { libc::flock(fd, libc::LOCK_UN) };
285    }
286
287    /// Releases an exclusive file lock (no-op on non-unix).
288    #[cfg(not(unix))]
289    fn unlock_file(_file: &std::fs::File) {}
290
291    /// Appends an event to the WAL using true append mode (P0 FIX).
292    ///
293    /// Opens the file in append mode, acquires an exclusive lock, writes the
294    /// JSONL line, fsyncs, then releases the lock. This is O(1) per write
295    /// instead of O(N) read-rewrite, and the lock is held during the entire
296    /// write operation preventing concurrent write loss.
297    ///
298    /// Increments the internal sequence counter after a successful write.
299    ///
300    /// # Errors
301    ///
302    /// Returns [`Error::WalError`](crate::Error::WalError) on serialization
303    /// or I/O failure.
304    pub fn append(&mut self, event: WalEvent) -> Result<()> {
305        use std::io::Write;
306        let line =
307            serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
308
309        // Open in append mode — no read-rewrite, O(1) per write
310        let file = std::fs::OpenOptions::new()
311            .create(true)
312            .append(true)
313            .open(&self.path)
314            .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
315
316        // Hold exclusive lock during entire write
317        Self::lock_file(&file)?;
318        {
319            let mut buf = std::io::BufWriter::new(&file);
320            writeln!(buf, "{}", line)
321                .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
322            buf.flush()
323                .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
324            file.sync_all()
325                .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
326        }
327        Self::unlock_file(&file);
328
329        self.seq += 1;
330        Ok(())
331    }
332
333    /// Returns the current sequence number (next event will use this value).
334    #[must_use] 
335    pub fn seq(&self) -> u64 {
336        self.seq
337    }
338}
339
340/// Reads and parses a WAL file into a list of events.
341///
342/// Malformed lines are silently skipped. This is intentional — partial writes
343/// from crashes may leave incomplete JSON at the end of the file.
344///
345/// # Example
346///
347/// ```rust,ignore
348/// use runtimo_core::WalReader;
349/// use std::path::Path;
350///
351/// let reader = WalReader::load(Path::new("/tmp/app.wal")).unwrap();
352/// for event in reader.events() {
353///     println!("Event: {:?} for job {}", event.event_type, event.job_id);
354/// }
355/// ```
356#[allow(clippy::exhaustive_structs)]
357pub struct WalReader {
358    events: Vec<WalEvent>,
359}
360
361impl WalReader {
362    /// Loads and parses all events from a WAL file.
363    ///
364    /// # Errors
365    ///
366    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
367    /// be read. Individual malformed lines are skipped, not treated as errors.
368    pub fn load(path: &Path) -> Result<Self> {
369        let content =
370            std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
371
372        let events: Vec<WalEvent> = content
373            .lines()
374            .filter_map(|line| serde_json::from_str(line).ok())
375            .collect();
376
377        Ok(Self { events })
378    }
379
380    /// Returns a slice of all parsed events.
381    #[must_use] 
382    pub fn events(&self) -> &[WalEvent] {
383        &self.events
384    }
385
386    /// Reads only the last `n` lines from the WAL file.
387    ///
388    /// More efficient than [`WalReader::load`] when only recent events are needed.
389    /// Malformed lines are silently skipped.
390    ///
391    /// # Errors
392    ///
393    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
394    /// be read.
395    pub fn tail(path: &Path, n: usize) -> Result<Self> {
396        use std::collections::VecDeque;
397        use std::io::{BufRead, BufReader};
398        let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
399        let reader = BufReader::new(file);
400
401        let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
402        for line in reader.lines() {
403            let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
404            if let Ok(event) = serde_json::from_str(&line) {
405                window.push_back(event);
406                if window.len() > n {
407                    window.pop_front();
408                }
409            }
410        }
411
412        Ok(Self {
413            events: window.into(),
414        })
415    }
416}
417
418/// WAL cleanup and rotation utilities.
419impl WalWriter {
420    /// Returns the rotation path for a given WAL file and rotation index.
421    ///
422    /// For `foo.jsonl` with index `1`, returns `foo.jsonl.1` (preserves the
423    /// original extension instead of replacing it).
424    fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
425        let mut s = path.to_string_lossy().into_owned();
426        s.push('.');
427        s.push_str(&index.to_string());
428        std::path::PathBuf::from(s)
429    }
430
431    /// Rotates the WAL when it exceeds max_size_bytes.
432    ///
433    /// Moves the current WAL to `{path}.1` (shifting older rotations),
434    /// then creates a fresh empty WAL. Keeps at most `max_rotations` old files.
435    /// FINDING #15: basic WAL rotation to prevent unbounded growth.
436    ///
437    /// # Errors
438    /// Returns `IoError` or `BackupError` if WAL file operations fail.
439    pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
440        let Ok(metadata) = std::fs::metadata(path) else {
441            return Ok(()); // No WAL to rotate
442        };
443
444        if metadata.len() < max_size_bytes {
445            return Ok(());
446        }
447
448        // Shift existing rotations (P1 FIX: proper naming preserves extension)
449        for i in (1..max_rotations).rev() {
450            let old = Self::rotation_path(path, i);
451            let new = Self::rotation_path(path, i + 1);
452            if old.exists() {
453                let _ = std::fs::rename(&old, &new);
454            }
455        }
456
457        // Move current to .1
458        let rotated = Self::rotation_path(path, 1);
459        std::fs::rename(path, &rotated)
460            .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
461
462        // Create fresh empty WAL
463        std::fs::write(path, "")
464            .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
465
466        // Remove oldest rotation if exceeding max
467        let oldest = Self::rotation_path(path, max_rotations + 1);
468        if oldest.exists() {
469            let _ = std::fs::remove_file(&oldest);
470        }
471
472        Ok(())
473    }
474
475    /// Cleans up WAL entries older than max_age_secs.
476    ///
477    /// Writes retained events to a temporary file, then atomically renames
478    /// it over the original WAL. This prevents event loss if another writer
479    /// appends during cleanup.
480    ///
481    /// # Arguments
482    /// * `path` - Path to WAL file
483    /// * `max_age_secs` - Maximum age in seconds
484    ///
485    /// # Returns
486    /// * `Ok(usize)` - Number of entries removed
487    ///
488    /// # Errors
489    /// Returns `IoError` if the WAL file cannot be read, the temp file cannot
490    /// be written, or the atomic rename fails.
491    pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
492        use std::time::{SystemTime, UNIX_EPOCH};
493
494        let cutoff = SystemTime::now()
495            .duration_since(UNIX_EPOCH)
496            .map_or(0, |d| d.as_secs())
497            .saturating_sub(max_age_secs);
498
499        // Read all events under lock to prevent concurrent append loss (P1 FIX)
500        let lock_file = std::fs::File::open(path)
501            .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
502        Self::lock_file(&lock_file)?;
503        let content = std::fs::read_to_string(path)
504            .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
505
506        let all_events: Vec<WalEvent> = content
507            .lines()
508            .filter_map(|line| serde_json::from_str(line).ok())
509            .collect();
510
511        let total = all_events.len();
512
513        let retained: Vec<_> = all_events.into_iter().filter(|e| e.ts >= cutoff).collect();
514        let removed = total - retained.len();
515
516        if removed > 0 {
517            // Write retained events to temp file, then merge any events appended
518            // by concurrent writers during this cleanup window, then atomic rename.
519            let temp_path = path.with_extension("wal.tmp");
520            {
521                let mut new_wal = Self::create(&temp_path)?;
522                for event in &retained {
523                    new_wal.append(event.clone())?;
524                }
525
526                // Re-read the original WAL to catch any events appended during cleanup.
527                // Lock is still held from above, so no concurrent writer can interleave.
528                let last_seq = retained.last().map_or(0, |e| e.seq);
529                let current_content = std::fs::read_to_string(path).map_err(|e| {
530                    crate::Error::WalError(format!("re-read WAL during cleanup: {}", e))
531                })?;
532                for line in current_content.lines() {
533                    if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
534                        if event.seq > last_seq {
535                            new_wal.append(event)?;
536                        }
537                    }
538                }
539            }
540            // Release lock before rename
541            Self::unlock_file(&lock_file);
542            // Atomic rename replaces original — no window for lost events
543            std::fs::rename(&temp_path, path).map_err(|e| {
544                crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
545            })?;
546        } else {
547            Self::unlock_file(&lock_file);
548        }
549
550        Ok(removed)
551    }
552}
553
554/// Truncates a string to at most `max_bytes` bytes, respecting UTF-8 boundaries.
555///
556/// Used to bound command output stored in WAL events. 1KB is sufficient
557/// for error messages and pattern analysis while preventing WAL bloat.
558#[must_use] 
559pub fn truncate_to(s: &str, max_bytes: usize) -> String {
560    if s.len() <= max_bytes {
561        return s.to_string();
562    }
563    let mut end = max_bytes;
564    while end > 0 && !s.is_char_boundary(end) {
565        end -= 1;
566    }
567    let mut truncated = s[..end].to_string();
568    truncated.push_str("...[truncated]");
569    truncated
570}
571
572#[cfg(test)]
573mod tests {
574    use super::*;
575
576    fn tmp_wal(name: &str) -> std::path::PathBuf {
577        std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
578    }
579
580    #[test]
581    fn test_wal_write_and_read() {
582        let path = tmp_wal("write_read");
583        let _ = std::fs::remove_file(&path);
584
585        let mut wal = WalWriter::create(&path).unwrap();
586        wal.append(WalEvent {
587            seq: 0,
588            ts: 1715800000,
589            event_type: WalEventType::JobStarted,
590            job_id: "test-job".into(),
591            capability: Some("FileRead".into()),
592            output: None,
593            error: None,
594            telemetry_before: None,
595            telemetry_after: None,
596            process_before: None,
597            process_after: None,
598            cmd: None,
599            cmd_stdout: None,
600            cmd_stderr: None,
601            cmd_exit_code: None,
602            cmd_corrected: None,
603        })
604        .unwrap();
605
606        let reader = WalReader::load(&path).unwrap();
607        assert_eq!(reader.events().len(), 1);
608        assert_eq!(reader.events()[0].job_id, "test-job");
609
610        let _ = std::fs::remove_file(&path);
611    }
612
613    #[test]
614    fn test_wal_seq_recovery() {
615        let path = tmp_wal("seq_recovery");
616        let _ = std::fs::remove_file(&path);
617
618        let mut wal = WalWriter::create(&path).unwrap();
619        assert_eq!(wal.seq(), 0);
620        wal.append(WalEvent {
621            seq: 0,
622            ts: 1715800000,
623            event_type: WalEventType::JobStarted,
624            job_id: "job1".into(),
625            capability: None,
626            output: None,
627            error: None,
628            telemetry_before: None,
629            telemetry_after: None,
630            process_before: None,
631            process_after: None,
632            cmd: None,
633            cmd_stdout: None,
634            cmd_stderr: None,
635            cmd_exit_code: None,
636            cmd_corrected: None,
637        })
638        .unwrap();
639        assert_eq!(wal.seq(), 1);
640
641        // Create new writer — should recover seq from file
642        let wal2 = WalWriter::create(&path).unwrap();
643        assert_eq!(wal2.seq(), 1);
644
645        let _ = std::fs::remove_file(&path);
646    }
647
648    #[test]
649    fn test_wal_rotation() {
650        let path = tmp_wal("rotation");
651        let _ = std::fs::remove_file(&path);
652
653        // Write enough data to trigger rotation
654        let mut wal = WalWriter::create(&path).unwrap();
655        for i in 0..100 {
656            wal.append(WalEvent {
657                seq: i,
658                ts: 1715800000 + i,
659                event_type: WalEventType::JobStarted,
660                job_id: format!("job-{}", i),
661                capability: None,
662                output: None,
663                error: None,
664                telemetry_before: None,
665                telemetry_after: None,
666                process_before: None,
667                process_after: None,
668                cmd: None,
669                cmd_stdout: None,
670                cmd_stderr: None,
671                cmd_exit_code: None,
672                cmd_corrected: None,
673            })
674            .unwrap();
675        }
676
677        let size = std::fs::metadata(&path).unwrap().len();
678        // Rotate with a threshold smaller than current size
679        WalWriter::rotate(&path, size - 1, 3).unwrap();
680
681        assert!(WalWriter::rotation_path(&path, 1).exists());
682        // New WAL should be empty
683        assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
684
685        let _ = std::fs::remove_file(&path);
686        let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
687    }
688
689    #[test]
690    fn test_wal_cleanup() {
691        let path = tmp_wal("cleanup");
692        let _ = std::fs::remove_file(&path);
693
694        let mut wal = WalWriter::create(&path).unwrap();
695        let now = std::time::SystemTime::now()
696            .duration_since(std::time::UNIX_EPOCH)
697            .unwrap()
698            .as_secs();
699
700        // Write old event
701        wal.append(WalEvent {
702            seq: 0,
703            ts: now - 1000,
704            event_type: WalEventType::JobStarted,
705            job_id: "old-job".into(),
706            capability: None,
707            output: None,
708            error: None,
709            telemetry_before: None,
710            telemetry_after: None,
711            process_before: None,
712            process_after: None,
713            cmd: None,
714            cmd_stdout: None,
715            cmd_stderr: None,
716            cmd_exit_code: None,
717            cmd_corrected: None,
718        })
719        .unwrap();
720
721        // Write recent event
722        wal.append(WalEvent {
723            seq: 1,
724            ts: now,
725            event_type: WalEventType::JobCompleted,
726            job_id: "new-job".into(),
727            capability: None,
728            output: None,
729            error: None,
730            telemetry_before: None,
731            telemetry_after: None,
732            process_before: None,
733            process_after: None,
734            cmd: None,
735            cmd_stdout: None,
736            cmd_stderr: None,
737            cmd_exit_code: None,
738            cmd_corrected: None,
739        })
740        .unwrap();
741
742        let removed = WalWriter::cleanup(&path, 500).unwrap();
743        assert_eq!(removed, 1); // Old event removed
744
745        let reader = WalReader::load(&path).unwrap();
746        assert_eq!(reader.events().len(), 1);
747        assert_eq!(reader.events()[0].job_id, "new-job");
748
749        let _ = std::fs::remove_file(&path);
750    }
751
752    #[test]
753    fn test_wal_skip_serializing_optional_fields() {
754        // FINDING #15: verify optional fields are skipped when None
755        let event = WalEvent {
756            seq: 0,
757            ts: 1715800000,
758            event_type: WalEventType::JobStarted,
759            job_id: "test".into(),
760            capability: None,
761            output: None,
762            error: None,
763            telemetry_before: None,
764            telemetry_after: None,
765            process_before: None,
766            process_after: None,
767            cmd: None,
768            cmd_stdout: None,
769            cmd_stderr: None,
770            cmd_exit_code: None,
771            cmd_corrected: None,
772        };
773
774        let json = serde_json::to_string(&event).unwrap();
775        assert!(!json.contains("capability"));
776        assert!(!json.contains("telemetry_before"));
777        assert!(!json.contains("process_before"));
778        assert!(!json.contains("cmd"));
779    }
780
781    #[test]
782    fn test_command_executed_event() {
783        let path = tmp_wal("cmd_exec");
784        let _ = std::fs::remove_file(&path);
785
786        let mut wal = WalWriter::create(&path).unwrap();
787        wal.append(WalEvent {
788            seq: 0,
789            ts: 1715800000,
790            event_type: WalEventType::CommandExecuted,
791            job_id: "job-cmd".into(),
792            capability: None,
793            output: None,
794            error: None,
795            telemetry_before: None,
796            telemetry_after: None,
797            process_before: None,
798            process_after: None,
799            cmd: Some("ls | hed -3".into()),
800            cmd_stdout: None,
801            cmd_stderr: Some("hed: command not found".into()),
802            cmd_exit_code: Some(127),
803            cmd_corrected: None,
804        })
805        .unwrap();
806
807        let reader = WalReader::load(&path).unwrap();
808        assert_eq!(reader.events().len(), 1);
809        assert_eq!(reader.events()[0].event_type, WalEventType::CommandExecuted);
810        assert_eq!(reader.events()[0].cmd.as_deref(), Some("ls | hed -3"));
811        assert_eq!(reader.events()[0].cmd_exit_code, Some(127));
812
813        let _ = std::fs::remove_file(&path);
814    }
815
816    #[test]
817    fn test_truncate_to() {
818        assert_eq!(truncate_to("hello", 1024), "hello");
819        assert_eq!(truncate_to("hello", 3), "hel...[truncated]");
820        let long = "a".repeat(2000);
821        let truncated = truncate_to(&long, 1024);
822        assert!(truncated.len() < 1100);
823        assert!(truncated.ends_with("...[truncated]"));
824    }
825}