Skip to main content

runtimo_core/
wal.rs

1//! Write-Ahead Log (WAL) — Append-only, crash-resistant event log.
2//!
3//! Events are written as JSONL (one JSON object per line) with `fsync` after
4//! each write to guarantee durability. The WAL enables crash recovery by
5//! replaying events to reconstruct system state.
6//!
7//! # Example
8//!
9//! ```rust,ignore
10//! use runtimo_core::{WalWriter, WalReader, WalEvent, WalEventType};
11//! use std::path::Path;
12//!
13//! let mut wal = WalWriter::create(Path::new("/tmp/test.wal")).unwrap();
14//! wal.append(WalEvent {
15//!     seq: 0, ts: 1715800000,
16//!     event_type: WalEventType::JobStarted,
17//!     job_id: "abc123".into(),
18//!     capability: Some("FileRead".into()),
19//!     output: None, error: None,
20//! }).unwrap();
21//!
22//! let reader = WalReader::load(Path::new("/tmp/test.wal")).unwrap();
23//! assert_eq!(reader.events().len(), 1);
24//! ```
25
26use crate::processes::ProcessSummary;
27use crate::telemetry::Telemetry;
28use crate::Result;
29use serde::{Deserialize, Serialize};
30use std::path::Path;
31
32
33
34/// Reads the last event's sequence number from a WAL file without loading
35/// the entire file into memory. Reads only the trailing tail_bytes of the
36/// file to find the last valid JSON line.
37///
38/// Falls back gracefully to `None` on any parse error, partial line, or
39/// I/O failure — the caller falls back to full scan.
40fn read_last_seq(path: &Path, tail_bytes: usize) -> Option<u64> {
41    use std::io::{Read, Seek, SeekFrom};
42
43    let mut file = std::fs::File::open(path).ok()?;
44    let file_len = file.metadata().ok()?.len();
45    if file_len == 0 {
46        return None;
47    }
48
49    let start = file_len.saturating_sub(tail_bytes as u64);
50    file.seek(SeekFrom::Start(start)).ok()?;
51    let mut buf = vec![0u8; (file_len - start) as usize + 1];
52    let n = file.read(&mut buf).ok()?;
53    buf.truncate(n);
54
55    // Split into lines, find the last non-empty line, trailing newline stripped
56    let lines: Vec<&[u8]> = buf
57        .split(|&b| b == b'\n')
58        .filter(|l| !l.is_empty())
59        .collect();
60
61    for line in lines.iter().rev() {
62        if let Ok(line_str) = std::str::from_utf8(line) {
63            if let Ok(event) = serde_json::from_str::<WalEvent>(line_str.trim()) {
64                return Some(event.seq);
65            }
66        }
67    }
68    None
69}
70
71/// A single WAL event record.
72///
73/// Events are appended sequentially and identified by `seq`. The `ts` field
74/// is a Unix timestamp in seconds. Optional fields (`capability`, `output`,
75/// `error`, `cmd_*`) are skipped during serialization when `None`.
76///
77/// # Command Execution Events
78///
79/// When `event_type` is [`WalEventType::CommandExecuted`], the `cmd*` fields
80/// capture the shell command, its output, and any auto-correction applied.
81/// These events are only written in debug builds (`#[cfg(debug_assertions)]`),
82/// but the variant exists in release builds for reading old WALs.
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct WalEvent {
85    /// Sequence number (monotonically increasing within a writer session).
86    pub seq: u64,
87    /// Unix timestamp (seconds) when the event occurred.
88    pub ts: u64,
89    /// Type of the event (job lifecycle stage or command execution).
90    #[serde(rename = "type")]
91    pub event_type: WalEventType,
92    /// The job ID this event relates to.
93    pub job_id: String,
94    /// Capability name, if applicable.
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub capability: Option<String>,
97    /// Output data from the capability, if applicable.
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub output: Option<serde_json::Value>,
100    /// Error message, if the event represents a failure.
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub error: Option<String>,
103    /// Hardware telemetry snapshot before execution.
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub telemetry_before: Option<Telemetry>,
106    /// Hardware telemetry snapshot after execution.
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub telemetry_after: Option<Telemetry>,
109    /// Process summary before execution.
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub process_before: Option<ProcessSummary>,
112    /// Process summary after execution.
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub process_after: Option<ProcessSummary>,
115    /// Shell command string (CommandExecuted events only).
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub cmd: Option<String>,
118    /// Captured stdout, truncated to 1KB (CommandExecuted events only).
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub cmd_stdout: Option<String>,
121    /// Captured stderr, truncated to 1KB (CommandExecuted events only).
122    #[serde(skip_serializing_if = "Option::is_none")]
123    pub cmd_stderr: Option<String>,
124    /// Exit code of the command (CommandExecuted events only).
125    #[serde(skip_serializing_if = "Option::is_none")]
126    pub cmd_exit_code: Option<i32>,
127    /// Auto-corrected command, if correction was applied (future Phase 2).
128    #[serde(skip_serializing_if = "Option::is_none")]
129    pub cmd_corrected: Option<String>,
130}
131
132/// Types of WAL events, corresponding to job lifecycle stages
133/// and command executions.
134#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
135#[serde(rename_all = "snake_case")]
136pub enum WalEventType {
137    /// Job has been submitted to the system.
138    JobSubmitted,
139    /// Job arguments passed validation.
140    JobValidated,
141    /// Job execution has started.
142    JobStarted,
143    /// Job completed successfully.
144    JobCompleted,
145    /// Job failed during validation or execution.
146    JobFailed,
147    /// A completed job was rolled back.
148    JobRolledBack,
149    /// Shell command executed (dev-only logging for error absorption).
150    CommandExecuted,
151}
152
153/// Append-only WAL writer.
154///
155/// Opens (or creates) a file in append mode and writes one JSONL line per
156/// event, using file locking for concurrent access safety and `fsync` after
157/// each write for durability.
158///
159/// # Example
160///
161/// ```rust,ignore
162/// use runtimo_core::{WalWriter, WalEvent, WalEventType};
163/// use std::path::Path;
164///
165/// let mut wal = WalWriter::create(Path::new("/tmp/app.wal")).unwrap();
166/// wal.append(WalEvent {
167///     seq: 0, ts: 1715800000,
168///     event_type: WalEventType::JobStarted,
169///     job_id: "job1".into(),
170///     capability: None, output: None, error: None,
171/// }).unwrap();
172/// ```
173pub struct WalWriter {
174    path: std::path::PathBuf,
175    seq: u64,
176}
177
178impl WalWriter {
179    /// Creates or opens a WAL file at the given path.
180    ///
181    /// The file is opened in append mode. Existing content is preserved.
182    ///
183    /// # Errors
184    ///
185    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
186    /// be created or opened.
187    pub fn create(path: &Path) -> Result<Self> {
188        // Ensure parent directory exists
189        if let Some(parent) = path.parent() {
190            if !parent.exists() {
191                std::fs::create_dir_all(parent).map_err(|e| {
192                    crate::Error::WalError(format!(
193                        "Failed to create WAL directory {}: {}",
194                        parent.display(),
195                        e
196                    ))
197                })?;
198            }
199        }
200
201        // Create the file if it doesn't exist
202        if !path.exists() {
203            std::fs::File::create(path).map_err(|e| {
204                crate::Error::WalError(format!(
205                    "Failed to create WAL file {}: {}",
206                    path.display(),
207                    e
208                ))
209            })?;
210        }
211
212        // Recover sequence from existing WAL content to ensure monotonic
213        // ordering across process restarts. Acquire lock to prevent reading
214        // during a concurrent write (P2 FIX).
215        //
216        // Optimized: reads only the last 8KB of the WAL file to find the
217        // max seq. Falls back to full read only if tail-read fails.
218        let seq = if path.exists() {
219            let lock_file = std::fs::File::open(path)
220                .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
221            Self::lock_file(&lock_file)?;
222            let recovered = match read_last_seq(path, 8192) {
223                Some(last_seq) => last_seq + 1,
224                None => {
225                    // Fall back to full scan if tail-read failed
226                    let content = std::fs::read_to_string(path)
227                        .map_err(|e| crate::Error::WalError(format!("read WAL for seq recovery: {}", e)))?;
228                    content
229                        .lines()
230                        .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
231                        .map(|e| e.seq)
232                        .max()
233                        .map(|max| max + 1)
234                        .unwrap_or(0)
235                }
236            };
237            Self::unlock_file(&lock_file);
238            recovered
239        } else {
240            0
241        };
242
243        Ok(Self {
244            path: path.to_path_buf(),
245            seq,
246        })
247    }
248
249    /// Acquires an exclusive file lock for writing (FINDING #14).
250    #[cfg(unix)]
251    fn lock_file(file: &std::fs::File) -> Result<()> {
252        use std::os::unix::io::AsRawFd;
253        let fd = file.as_raw_fd();
254        let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
255        if result != 0 {
256            return Err(crate::Error::WalError(format!(
257                "Failed to acquire WAL lock: {}",
258                std::io::Error::last_os_error()
259            )));
260        }
261        Ok(())
262    }
263
264    /// Acquires an exclusive file lock (no-op on non-unix).
265    #[cfg(not(unix))]
266    fn lock_file(_file: &std::fs::File) -> Result<()> {
267        Ok(())
268    }
269
270    /// Releases an exclusive file lock (FINDING #14).
271    #[cfg(unix)]
272    fn unlock_file(file: &std::fs::File) {
273        use std::os::unix::io::AsRawFd;
274        let fd = file.as_raw_fd();
275        unsafe { libc::flock(fd, libc::LOCK_UN) };
276    }
277
278    /// Releases an exclusive file lock (no-op on non-unix).
279    #[cfg(not(unix))]
280    fn unlock_file(_file: &std::fs::File) {}
281
282    /// Appends an event to the WAL using true append mode (P0 FIX).
283    ///
284    /// Opens the file in append mode, acquires an exclusive lock, writes the
285    /// JSONL line, fsyncs, then releases the lock. This is O(1) per write
286    /// instead of O(N) read-rewrite, and the lock is held during the entire
287    /// write operation preventing concurrent write loss.
288    ///
289    /// Increments the internal sequence counter after a successful write.
290    ///
291    /// # Errors
292    ///
293    /// Returns [`Error::WalError`](crate::Error::WalError) on serialization
294    /// or I/O failure.
295    pub fn append(&mut self, event: WalEvent) -> Result<()> {
296        use std::io::Write;
297        let line =
298            serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
299
300        // Open in append mode — no read-rewrite, O(1) per write
301        let file = std::fs::OpenOptions::new()
302            .create(true)
303            .append(true)
304            .open(&self.path)
305            .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
306
307        // Hold exclusive lock during entire write
308        Self::lock_file(&file)?;
309        {
310            let mut buf = std::io::BufWriter::new(&file);
311            writeln!(buf, "{}", line)
312                .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
313            buf.flush()
314                .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
315            file.sync_all()
316                .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
317        }
318        Self::unlock_file(&file);
319
320        self.seq += 1;
321        Ok(())
322    }
323
324    /// Returns the current sequence number (next event will use this value).
325    pub fn seq(&self) -> u64 {
326        self.seq
327    }
328}
329
330/// Reads and parses a WAL file into a list of events.
331///
332/// Malformed lines are silently skipped. This is intentional — partial writes
333/// from crashes may leave incomplete JSON at the end of the file.
334///
335/// # Example
336///
337/// ```rust,ignore
338/// use runtimo_core::WalReader;
339/// use std::path::Path;
340///
341/// let reader = WalReader::load(Path::new("/tmp/app.wal")).unwrap();
342/// for event in reader.events() {
343///     println!("Event: {:?} for job {}", event.event_type, event.job_id);
344/// }
345/// ```
346pub struct WalReader {
347    events: Vec<WalEvent>,
348}
349
350impl WalReader {
351    /// Loads and parses all events from a WAL file.
352    ///
353    /// # Errors
354    ///
355    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
356    /// be read. Individual malformed lines are skipped, not treated as errors.
357    pub fn load(path: &Path) -> Result<Self> {
358        let content =
359            std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
360
361        let events: Vec<WalEvent> = content
362            .lines()
363            .filter_map(|line| serde_json::from_str(line).ok())
364            .collect();
365
366        Ok(Self { events })
367    }
368
369    /// Returns a slice of all parsed events.
370    pub fn events(&self) -> &[WalEvent] {
371        &self.events
372    }
373
374    /// Reads only the last `n` lines from the WAL file.
375    ///
376    /// More efficient than [`WalReader::load`] when only recent events are needed.
377    /// Malformed lines are silently skipped.
378    ///
379    /// # Errors
380    ///
381    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
382    /// be read.
383    pub fn tail(path: &Path, n: usize) -> Result<Self> {
384        use std::collections::VecDeque;
385        use std::io::{BufRead, BufReader};
386        let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
387        let reader = BufReader::new(file);
388
389        let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
390        for line in reader.lines() {
391            let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
392            if let Ok(event) = serde_json::from_str(&line) {
393                window.push_back(event);
394                if window.len() > n {
395                    window.pop_front();
396                }
397            }
398        }
399
400        Ok(Self {
401            events: window.into(),
402        })
403    }
404}
405
406/// WAL cleanup and rotation utilities.
407impl WalWriter {
408    /// Returns the rotation path for a given WAL file and rotation index.
409    ///
410    /// For `foo.jsonl` with index `1`, returns `foo.jsonl.1` (preserves the
411    /// original extension instead of replacing it).
412    fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
413        let mut s = path.to_string_lossy().into_owned();
414        s.push('.');
415        s.push_str(&index.to_string());
416        std::path::PathBuf::from(s)
417    }
418
419    /// Rotates the WAL when it exceeds max_size_bytes.
420    ///
421    /// Moves the current WAL to `{path}.1` (shifting older rotations),
422    /// then creates a fresh empty WAL. Keeps at most `max_rotations` old files.
423    /// FINDING #15: basic WAL rotation to prevent unbounded growth.
424    pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
425        let metadata = match std::fs::metadata(path) {
426            Ok(m) => m,
427            Err(_) => return Ok(()), // No WAL to rotate
428        };
429
430        if metadata.len() < max_size_bytes {
431            return Ok(());
432        }
433
434        // Shift existing rotations (P1 FIX: proper naming preserves extension)
435        for i in (1..max_rotations).rev() {
436            let old = Self::rotation_path(path, i);
437            let new = Self::rotation_path(path, i + 1);
438            if old.exists() {
439                let _ = std::fs::rename(&old, &new);
440            }
441        }
442
443        // Move current to .1
444        let rotated = Self::rotation_path(path, 1);
445        std::fs::rename(path, &rotated)
446            .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
447
448        // Create fresh empty WAL
449        std::fs::write(path, "")
450            .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
451
452        // Remove oldest rotation if exceeding max
453        let oldest = Self::rotation_path(path, max_rotations + 1);
454        if oldest.exists() {
455            let _ = std::fs::remove_file(&oldest);
456        }
457
458        Ok(())
459    }
460
461    /// Cleans up WAL entries older than max_age_secs.
462    ///
463    /// Writes retained events to a temporary file, then atomically renames
464    /// it over the original WAL. This prevents event loss if another writer
465    /// appends during cleanup.
466    ///
467    /// # Arguments
468    /// * `path` - Path to WAL file
469    /// * `max_age_secs` - Maximum age in seconds
470    ///
471    /// # Returns
472    /// * `Ok(usize)` - Number of entries removed
473    /// * `Err(Error)` - Cleanup failure
474    pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
475        use std::time::{SystemTime, UNIX_EPOCH};
476
477        let cutoff = SystemTime::now()
478            .duration_since(UNIX_EPOCH)
479            .map(|d| d.as_secs())
480            .unwrap_or(0)
481            .saturating_sub(max_age_secs);
482
483        // Read all events under lock to prevent concurrent append loss (P1 FIX)
484        let lock_file = std::fs::File::open(path)
485            .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
486        Self::lock_file(&lock_file)?;
487        let content = std::fs::read_to_string(path)
488            .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
489
490        let events: Vec<WalEvent> = content
491            .lines()
492            .filter_map(|line| serde_json::from_str(line).ok())
493            .collect();
494
495        // Filter out old events
496        let retained: Vec<_> = events.into_iter().filter(|e| e.ts >= cutoff).collect();
497
498        let total = content
499            .lines()
500            .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
501            .count();
502        let removed = total - retained.len();
503
504        if removed > 0 {
505            // Write retained events to temp file, then merge any events appended
506            // by concurrent writers during this cleanup window, then atomic rename.
507            let temp_path = path.with_extension("wal.tmp");
508            {
509                let mut new_wal = WalWriter::create(&temp_path)?;
510                for event in &retained {
511                    new_wal.append(event.clone())?;
512                }
513
514                // Re-read the original WAL to catch any events appended during cleanup.
515                // Lock is still held from above, so no concurrent writer can interleave.
516                let last_seq = retained.last().map(|e| e.seq).unwrap_or(0);
517                let current_content = std::fs::read_to_string(path)
518                    .map_err(|e| crate::Error::WalError(format!("re-read WAL during cleanup: {}", e)))?;
519                for line in current_content.lines() {
520                    if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
521                        if event.seq > last_seq {
522                            new_wal.append(event)?;
523                        }
524                    }
525                }
526            }
527            // Release lock before rename
528            Self::unlock_file(&lock_file);
529            // Atomic rename replaces original — no window for lost events
530            std::fs::rename(&temp_path, path).map_err(|e| {
531                crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
532            })?;
533        } else {
534            Self::unlock_file(&lock_file);
535        }
536
537        Ok(removed)
538    }
539}
540
541/// Truncates a string to at most `max_bytes` bytes, respecting UTF-8 boundaries.
542///
543/// Used to bound command output stored in WAL events. 1KB is sufficient
544/// for error messages and pattern analysis while preventing WAL bloat.
545pub fn truncate_to(s: &str, max_bytes: usize) -> String {
546    if s.len() <= max_bytes {
547        return s.to_string();
548    }
549    let mut end = max_bytes;
550    while end > 0 && !s.is_char_boundary(end) {
551        end -= 1;
552    }
553    let mut truncated = s[..end].to_string();
554    truncated.push_str("...[truncated]");
555    truncated
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561
562    fn tmp_wal(name: &str) -> std::path::PathBuf {
563        std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
564    }
565
566    #[test]
567    fn test_wal_write_and_read() {
568        let path = tmp_wal("write_read");
569        let _ = std::fs::remove_file(&path);
570
571        let mut wal = WalWriter::create(&path).unwrap();
572        wal.append(WalEvent {
573            seq: 0,
574            ts: 1715800000,
575            event_type: WalEventType::JobStarted,
576            job_id: "test-job".into(),
577        capability: Some("FileRead".into()),
578        output: None,
579        error: None,
580        telemetry_before: None,
581        telemetry_after: None,
582        process_before: None,
583        process_after: None,
584        cmd: None,
585        cmd_stdout: None,
586        cmd_stderr: None,
587        cmd_exit_code: None,
588        cmd_corrected: None,
589    })
590    .unwrap();
591
592    let reader = WalReader::load(&path).unwrap();
593    assert_eq!(reader.events().len(), 1);
594    assert_eq!(reader.events()[0].job_id, "test-job");
595
596        let _ = std::fs::remove_file(&path);
597    }
598
599    #[test]
600    fn test_wal_seq_recovery() {
601        let path = tmp_wal("seq_recovery");
602        let _ = std::fs::remove_file(&path);
603
604        let mut wal = WalWriter::create(&path).unwrap();
605        assert_eq!(wal.seq(), 0);
606        wal.append(WalEvent {
607            seq: 0,
608            ts: 1715800000,
609            event_type: WalEventType::JobStarted,
610            job_id: "job1".into(),
611            capability: None,
612            output: None,
613            error: None,
614            telemetry_before: None,
615            telemetry_after: None,
616            process_before: None,
617        process_after: None,
618        cmd: None,
619        cmd_stdout: None,
620        cmd_stderr: None,
621        cmd_exit_code: None,
622        cmd_corrected: None,
623    })
624        .unwrap();
625        assert_eq!(wal.seq(), 1);
626
627        // Create new writer — should recover seq from file
628        let wal2 = WalWriter::create(&path).unwrap();
629        assert_eq!(wal2.seq(), 1);
630
631        let _ = std::fs::remove_file(&path);
632    }
633
634    #[test]
635    fn test_wal_rotation() {
636        let path = tmp_wal("rotation");
637        let _ = std::fs::remove_file(&path);
638
639        // Write enough data to trigger rotation
640        let mut wal = WalWriter::create(&path).unwrap();
641    for i in 0..100 {
642        wal.append(WalEvent {
643            seq: i,
644            ts: 1715800000 + i,
645            event_type: WalEventType::JobStarted,
646            job_id: format!("job-{}", i),
647            capability: None,
648            output: None,
649            error: None,
650            telemetry_before: None,
651            telemetry_after: None,
652            process_before: None,
653            process_after: None,
654            cmd: None,
655            cmd_stdout: None,
656            cmd_stderr: None,
657            cmd_exit_code: None,
658            cmd_corrected: None,
659        })
660        .unwrap();
661        }
662
663        let size = std::fs::metadata(&path).unwrap().len();
664        // Rotate with a threshold smaller than current size
665        WalWriter::rotate(&path, size - 1, 3).unwrap();
666
667        assert!(WalWriter::rotation_path(&path, 1).exists());
668        // New WAL should be empty
669        assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
670
671        let _ = std::fs::remove_file(&path);
672        let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
673    }
674
675    #[test]
676    fn test_wal_cleanup() {
677        let path = tmp_wal("cleanup");
678        let _ = std::fs::remove_file(&path);
679
680        let mut wal = WalWriter::create(&path).unwrap();
681        let now = std::time::SystemTime::now()
682            .duration_since(std::time::UNIX_EPOCH)
683            .unwrap()
684            .as_secs();
685
686        // Write old event
687        wal.append(WalEvent {
688            seq: 0,
689            ts: now - 1000,
690            event_type: WalEventType::JobStarted,
691            job_id: "old-job".into(),
692            capability: None,
693            output: None,
694            error: None,
695            telemetry_before: None,
696            telemetry_after: None,
697            process_before: None,
698        process_after: None,
699        cmd: None,
700        cmd_stdout: None,
701        cmd_stderr: None,
702        cmd_exit_code: None,
703        cmd_corrected: None,
704    })
705        .unwrap();
706
707        // Write recent event
708        wal.append(WalEvent {
709            seq: 1,
710            ts: now,
711            event_type: WalEventType::JobCompleted,
712            job_id: "new-job".into(),
713            capability: None,
714            output: None,
715            error: None,
716            telemetry_before: None,
717            telemetry_after: None,
718            process_before: None,
719        process_after: None,
720        cmd: None,
721        cmd_stdout: None,
722        cmd_stderr: None,
723        cmd_exit_code: None,
724        cmd_corrected: None,
725    })
726        .unwrap();
727
728        let removed = WalWriter::cleanup(&path, 500).unwrap();
729        assert_eq!(removed, 1); // Old event removed
730
731        let reader = WalReader::load(&path).unwrap();
732        assert_eq!(reader.events().len(), 1);
733        assert_eq!(reader.events()[0].job_id, "new-job");
734
735        let _ = std::fs::remove_file(&path);
736    }
737
738    #[test]
739    fn test_wal_skip_serializing_optional_fields() {
740        // FINDING #15: verify optional fields are skipped when None
741    let event = WalEvent {
742        seq: 0,
743        ts: 1715800000,
744        event_type: WalEventType::JobStarted,
745        job_id: "test".into(),
746        capability: None,
747        output: None,
748        error: None,
749        telemetry_before: None,
750        telemetry_after: None,
751        process_before: None,
752        process_after: None,
753        cmd: None,
754        cmd_stdout: None,
755        cmd_stderr: None,
756        cmd_exit_code: None,
757        cmd_corrected: None,
758    };
759
760    let json = serde_json::to_string(&event).unwrap();
761    assert!(!json.contains("capability"));
762    assert!(!json.contains("telemetry_before"));
763    assert!(!json.contains("process_before"));
764    assert!(!json.contains("cmd"));
765    }
766
767    #[test]
768    fn test_command_executed_event() {
769        let path = tmp_wal("cmd_exec");
770        let _ = std::fs::remove_file(&path);
771
772        let mut wal = WalWriter::create(&path).unwrap();
773        wal.append(WalEvent {
774            seq: 0,
775            ts: 1715800000,
776            event_type: WalEventType::CommandExecuted,
777            job_id: "job-cmd".into(),
778            capability: None,
779            output: None,
780            error: None,
781            telemetry_before: None,
782            telemetry_after: None,
783            process_before: None,
784            process_after: None,
785            cmd: Some("ls | hed -3".into()),
786            cmd_stdout: None,
787            cmd_stderr: Some("hed: command not found".into()),
788            cmd_exit_code: Some(127),
789            cmd_corrected: None,
790        })
791        .unwrap();
792
793        let reader = WalReader::load(&path).unwrap();
794        assert_eq!(reader.events().len(), 1);
795        assert_eq!(reader.events()[0].event_type, WalEventType::CommandExecuted);
796        assert_eq!(reader.events()[0].cmd.as_deref(), Some("ls | hed -3"));
797        assert_eq!(reader.events()[0].cmd_exit_code, Some(127));
798
799        let _ = std::fs::remove_file(&path);
800    }
801
802    #[test]
803    fn test_truncate_to() {
804        assert_eq!(truncate_to("hello", 1024), "hello");
805        assert_eq!(truncate_to("hello", 3), "hel...[truncated]");
806        let long = "a".repeat(2000);
807        let truncated = truncate_to(&long, 1024);
808        assert!(truncated.len() < 1100);
809        assert!(truncated.ends_with("...[truncated]"));
810    }
811}