Skip to main content

runtimo_core/
wal.rs

1//! Write-Ahead Log (WAL) — Append-only, crash-resistant event log.
2//!
3//! Events are written as JSONL (one JSON object per line) with `fsync` after
4//! each write to guarantee durability. The WAL supports crash recovery by
5//! replaying events to reconstruct system state.
6//!
7//! Sequence numbers and rotation indices use explicit arithmetic — these
8//! are intentional, increment-by-one operations with known-safe ranges.
9
10#![allow(clippy::arithmetic_side_effects)]
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use runtimo_core::{WalWriter, WalReader, WalEvent, WalEventType};
16//! use std::path::Path;
17//!
18//! let mut wal = WalWriter::create(Path::new("/tmp/test.wal")).unwrap();
19//! wal.append(WalEvent {
20//!     seq: 0, ts: 1715800000,
21//!     event_type: WalEventType::JobStarted,
22//!     job_id: "abc123".into(),
23//!     capability: Some("FileRead".into()),
24//!     output: None, error: None,
25//! }).unwrap();
26//!
27//! let reader = WalReader::load(Path::new("/tmp/test.wal")).unwrap();
28//! assert_eq!(reader.events().len(), 1);
29//! ```
30
31use crate::processes::ProcessSummary;
32use crate::telemetry::Telemetry;
33use crate::Result;
34
35use serde::{Deserialize, Serialize};
36use std::path::Path;
37
38/// Reads the last event's sequence number from a WAL file without loading
39/// the entire file into memory. Reads only the trailing tail_bytes of the
40/// file to find the last valid JSON line.
41///
42/// Falls back gracefully to `None` on any parse error, partial line, or
43/// I/O failure — the caller falls back to full scan.
44fn read_last_seq(path: &Path, tail_bytes: usize) -> Option<u64> {
45    use std::io::{Read, Seek, SeekFrom};
46
47    let mut file = std::fs::File::open(path).ok()?;
48    let file_len = file.metadata().ok()?.len();
49    if file_len == 0 {
50        return None;
51    }
52
53    let start = file_len.saturating_sub(tail_bytes as u64);
54    file.seek(SeekFrom::Start(start)).ok()?;
55    let mut buf = vec![
56        0u8;
57        usize::try_from(file_len - start)
58            .unwrap_or(0)
59            .saturating_add(1)
60    ];
61    let n = file.read(&mut buf).ok()?;
62    buf.truncate(n);
63
64    // Split into lines, find the last non-empty line, trailing newline stripped
65    let lines: Vec<&[u8]> = buf
66        .split(|&b| b == b'\n')
67        .filter(|l| !l.is_empty())
68        .collect();
69
70    for line in lines.iter().rev() {
71        if let Ok(line_str) = std::str::from_utf8(line) {
72            if let Ok(event) = serde_json::from_str::<WalEvent>(line_str.trim()) {
73                return Some(event.seq);
74            }
75        }
76    }
77    None
78}
79
80/// A single WAL event record.
81///
82/// Events are appended sequentially and identified by `seq`. The `ts` field
83/// is a Unix timestamp in seconds. Optional fields (`capability`, `output`,
84/// `error`, `cmd_*`) are skipped during serialization when `None`.
85///
86/// # Command Execution Events
87///
88/// When `event_type` is [`WalEventType::CommandExecuted`], the `cmd*` fields
89/// capture the shell command, its output, and any auto-correction applied.
90/// These events are only written in debug builds (`#[cfg(debug_assertions)]`),
91/// but the variant exists in release builds for reading old WALs.
92#[derive(Debug, Clone, Serialize, Deserialize, Default)]
93#[allow(clippy::exhaustive_structs)]
94pub struct WalEvent {
95    /// Sequence number (monotonically increasing within a writer session).
96    pub seq: u64,
97    /// Unix timestamp (seconds) when the event occurred.
98    pub ts: u64,
99    /// Type of the event (job lifecycle stage or command execution).
100    #[serde(rename = "type")]
101    pub event_type: WalEventType,
102    /// The job ID this event relates to.
103    pub job_id: String,
104    /// Capability name, if applicable.
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub capability: Option<String>,
107    /// Output data from the capability, if applicable.
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub output: Option<serde_json::Value>,
110    /// Error message, if the event represents a failure.
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub error: Option<String>,
113    /// Hardware telemetry snapshot before execution.
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub telemetry_before: Option<Telemetry>,
116    /// Hardware telemetry snapshot after execution.
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub telemetry_after: Option<Telemetry>,
119    /// Process summary before execution.
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub process_before: Option<ProcessSummary>,
122    /// Process summary after execution.
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub process_after: Option<ProcessSummary>,
125    /// Shell command string (CommandExecuted events only).
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub cmd: Option<String>,
128    /// Captured stdout, truncated to 1KB (CommandExecuted events only).
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub cmd_stdout: Option<String>,
131    /// Captured stderr, truncated to 1KB (CommandExecuted events only).
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub cmd_stderr: Option<String>,
134    /// Exit code of the command (CommandExecuted events only).
135    #[serde(skip_serializing_if = "Option::is_none")]
136    pub cmd_exit_code: Option<i32>,
137    /// Auto-corrected command, if correction was applied (future Phase 2).
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub cmd_corrected: Option<String>,
140    /// OOV ratio from LLMOSafe (for CognitiveSafetyViolation).
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub oov_ratio: Option<u8>,
143    /// Detection flags from LLMOSafe (for CognitiveSafetyViolation).
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub detection_flags: Option<u8>,
146}
147
148/// Types of WAL events, corresponding to job lifecycle stages
149/// and command executions.
150#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
151#[serde(rename_all = "snake_case")]
152// Additional variants may be added in future versions; the enum is
153// intentionally non-exhaustive for forward compatibility.
154#[allow(clippy::exhaustive_enums)]
155pub enum WalEventType {
156    /// Job has been submitted to the system.
157    #[default]
158    JobSubmitted,
159    /// Job arguments passed validation.
160    JobValidated,
161    /// Job execution has started.
162    JobStarted,
163    /// Job completed successfully.
164    JobCompleted,
165    /// Job failed during validation or execution.
166    JobFailed,
167    /// A completed job was rolled back.
168    JobRolledBack,
169    /// Shell command executed (dev-only logging for error absorption).
170    CommandExecuted,
171}
172
173/// Append-only WAL writer.
174///
175/// Opens (or creates) a file in append mode and writes one JSONL line per
176/// event, using file locking for concurrent access safety and `fsync` after
177/// each write for durability.
178///
179/// # Example
180///
181/// ```rust,ignore
182/// use runtimo_core::{WalWriter, WalEvent, WalEventType};
183/// use std::path::Path;
184///
185/// let mut wal = WalWriter::create(Path::new("/tmp/app.wal")).unwrap();
186/// wal.append(WalEvent {
187///     seq: 0, ts: 1715800000,
188///     event_type: WalEventType::JobStarted,
189///     job_id: "job1".into(),
190///     capability: None, output: None, error: None,
191/// }).unwrap();
192/// ```
193#[allow(clippy::exhaustive_structs)]
194pub struct WalWriter {
195    path: std::path::PathBuf,
196    seq: u64,
197}
198
199impl WalWriter {
200    /// Creates or opens a WAL file at the given path.
201    ///
202    /// The file is opened in append mode. Existing content is preserved.
203    ///
204    /// # Errors
205    ///
206    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
207    /// be created or opened.
208    pub fn create(path: &Path) -> Result<Self> {
209        // Ensure parent directory exists
210        if let Some(parent) = path.parent() {
211            if !parent.exists() {
212                std::fs::create_dir_all(parent).map_err(|e| {
213                    crate::Error::WalError(format!(
214                        "Failed to create WAL directory {}: {}",
215                        parent.display(),
216                        e
217                    ))
218                })?;
219            }
220        }
221
222        // Create the file if it doesn't exist
223        if !path.exists() {
224            std::fs::File::create(path).map_err(|e| {
225                crate::Error::WalError(format!(
226                    "Failed to create WAL file {}: {}",
227                    path.display(),
228                    e
229                ))
230            })?;
231        }
232
233        // Recover sequence from existing WAL content to ensure monotonic
234        // ordering across process restarts. Acquire lock to prevent reading
235        // during a concurrent write (P2 FIX).
236        //
237        // Optimized: reads only the last 8KB of the WAL file to find the
238        // max seq. Falls back to full read only if tail-read fails.
239        let seq = if path.exists() {
240            let lock_file = std::fs::File::open(path)
241                .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
242            Self::lock_file(&lock_file)?;
243            let recovered = if let Some(last_seq) = read_last_seq(path, 8192) {
244                last_seq + 1
245            } else {
246                // Fall back to full scan if tail-read failed
247                let content = std::fs::read_to_string(path).map_err(|e| {
248                    crate::Error::WalError(format!("read WAL for seq recovery: {}", e))
249                })?;
250                content
251                    .lines()
252                    .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
253                    .map(|e| e.seq)
254                    .max()
255                    .map_or(0, |max| max + 1)
256            };
257            Self::unlock_file(&lock_file);
258            recovered
259        } else {
260            0
261        };
262
263        Ok(Self {
264            path: path.to_path_buf(),
265            seq,
266        })
267    }
268
269    /// Acquires an exclusive file lock for writing (FINDING #14).
270    #[cfg(unix)]
271    fn lock_file(file: &std::fs::File) -> Result<()> {
272        use std::os::unix::io::AsRawFd;
273        let fd = file.as_raw_fd();
274        // SAFETY: fd is a valid open file descriptor; LOCK_EX is a well-defined operation
275        let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
276        if result != 0 {
277            return Err(crate::Error::WalError(format!(
278                "Failed to acquire WAL lock: {}",
279                std::io::Error::last_os_error()
280            )));
281        }
282        Ok(())
283    }
284
285    /// Acquires an exclusive file lock (no-op on non-unix).
286    #[cfg(not(unix))]
287    fn lock_file(_file: &std::fs::File) -> Result<()> {
288        Ok(())
289    }
290
291    /// Releases an exclusive file lock (FINDING #14).
292    #[cfg(unix)]
293    fn unlock_file(file: &std::fs::File) {
294        use std::os::unix::io::AsRawFd;
295        let fd = file.as_raw_fd();
296        // SAFETY: fd is a valid open file descriptor; LOCK_UN is a well-defined operation
297        unsafe { libc::flock(fd, libc::LOCK_UN) };
298    }
299
300    /// Releases an exclusive file lock (no-op on non-unix).
301    #[cfg(not(unix))]
302    fn unlock_file(_file: &std::fs::File) {}
303
304    /// Appends an event to the WAL using true append mode (P0 FIX).
305    ///
306    /// Opens the file in append mode, acquires an exclusive lock, writes the
307    /// JSONL line, fsyncs, then releases the lock. This is O(1) per write
308    /// instead of O(N) read-rewrite, and the lock is held during the entire
309    /// write operation preventing concurrent write loss.
310    ///
311    /// Increments the internal sequence counter after a successful write.
312    ///
313    /// # Errors
314    ///
315    /// Returns [`Error::WalError`](crate::Error::WalError) on serialization
316    /// or I/O failure.
317    pub fn append(&mut self, event: WalEvent) -> Result<()> {
318        use std::io::Write;
319        let line =
320            serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
321
322        // Open in append mode — no read-rewrite, O(1) per write
323        let file = std::fs::OpenOptions::new()
324            .create(true)
325            .append(true)
326            .open(&self.path)
327            .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
328
329        // Hold exclusive lock during entire write
330        Self::lock_file(&file)?;
331        {
332            let mut buf = std::io::BufWriter::new(&file);
333            writeln!(buf, "{}", line)
334                .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
335            buf.flush()
336                .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
337            file.sync_all()
338                .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
339        }
340        Self::unlock_file(&file);
341
342        self.seq += 1;
343        Ok(())
344    }
345
346    /// Returns the current sequence number (next event will use this value).
347    #[must_use]
348    pub fn seq(&self) -> u64 {
349        self.seq
350    }
351}
352
353/// Reads and parses a WAL file into a list of events.
354///
355/// Malformed lines are silently skipped. This is intentional — partial writes
356/// from crashes may leave incomplete JSON at the end of the file.
357///
358/// # Example
359///
360/// ```rust,ignore
361/// use runtimo_core::WalReader;
362/// use std::path::Path;
363///
364/// let reader = WalReader::load(Path::new("/tmp/app.wal")).unwrap();
365/// for event in reader.events() {
366///     println!("Event: {:?} for job {}", event.event_type, event.job_id);
367/// }
368/// ```
369#[allow(clippy::exhaustive_structs)]
370pub struct WalReader {
371    events: Vec<WalEvent>,
372}
373
374impl WalReader {
375    /// Loads and parses all events from a WAL file.
376    ///
377    /// # Errors
378    ///
379    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
380    /// be read. Individual malformed lines are skipped, not treated as errors.
381    pub fn load(path: &Path) -> Result<Self> {
382        let content =
383            std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
384
385        let events: Vec<WalEvent> = content
386            .lines()
387            .filter_map(|line| serde_json::from_str(line).ok())
388            .collect();
389
390        Ok(Self { events })
391    }
392
393    /// Returns a slice of all parsed events.
394    #[must_use]
395    pub fn events(&self) -> &[WalEvent] {
396        &self.events
397    }
398
399    /// Reads only the last `n` lines from the WAL file.
400    ///
401    /// More efficient than [`WalReader::load`] when only recent events are needed.
402    /// Malformed lines are silently skipped.
403    ///
404    /// # Errors
405    ///
406    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
407    /// be read.
408    pub fn tail(path: &Path, n: usize) -> Result<Self> {
409        use std::collections::VecDeque;
410        use std::io::{BufRead, BufReader};
411        let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
412        let reader = BufReader::new(file);
413
414        let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
415        for line in reader.lines() {
416            let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
417            if let Ok(event) = serde_json::from_str(&line) {
418                window.push_back(event);
419                if window.len() > n {
420                    window.pop_front();
421                }
422            }
423        }
424
425        Ok(Self {
426            events: window.into(),
427        })
428    }
429}
430
431/// WAL cleanup and rotation utilities.
432impl WalWriter {
433    /// Returns the rotation path for a given WAL file and rotation index.
434    ///
435    /// For `foo.jsonl` with index `1`, returns `foo.jsonl.1` (preserves the
436    /// original extension instead of replacing it).
437    fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
438        let mut s = path.to_string_lossy().into_owned();
439        s.push('.');
440        s.push_str(&index.to_string());
441        std::path::PathBuf::from(s)
442    }
443
444    /// Rotates the WAL when it exceeds max_size_bytes.
445    ///
446    /// Moves the current WAL to `{path}.1` (shifting older rotations),
447    /// then creates a fresh empty WAL. Keeps at most `max_rotations` old files.
448    /// FINDING #15: basic WAL rotation to prevent unbounded growth.
449    /// P1 FIX: Acquires exclusive lock to prevent concurrent append loss during rotation.
450    ///
451    /// # Errors
452    /// Returns `IoError` or `BackupError` if WAL file operations fail.
453    pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
454        let Ok(metadata) = std::fs::metadata(path) else {
455            return Ok(()); // No WAL to rotate
456        };
457
458        if metadata.len() < max_size_bytes {
459            return Ok(());
460        }
461
462        // Acquire lock to prevent concurrent append during rotation
463        let lock_file = std::fs::File::open(path)
464            .map_err(|e| crate::Error::WalError(format!("open WAL for rotation: {}", e)))?;
465        Self::lock_file(&lock_file)?;
466
467        // Shift existing rotations (P1 FIX: proper naming preserves extension)
468        for i in (1..max_rotations).rev() {
469            let old = Self::rotation_path(path, i);
470            let new = Self::rotation_path(path, i + 1);
471            if old.exists() {
472                if let Err(e) = std::fs::rename(&old, &new) {
473                    log::error!(
474                        "WAL rotation shift failed: {} -> {}: {}",
475                        old.display(),
476                        new.display(),
477                        e
478                    );
479                }
480            }
481        }
482
483        // Move current to .1
484        let rotated = Self::rotation_path(path, 1);
485        std::fs::rename(path, &rotated)
486            .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
487
488        // Create fresh empty WAL
489        std::fs::write(path, "")
490            .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
491
492        // Remove oldest rotation if exceeding max
493        let oldest = Self::rotation_path(path, max_rotations + 1);
494        if oldest.exists() {
495            if let Err(e) = std::fs::remove_file(&oldest) {
496                log::error!("WAL cleanup failed: {}: {}", oldest.display(), e);
497            }
498        }
499
500        Self::unlock_file(&lock_file);
501        Ok(())
502    }
503
504    /// Cleans up WAL entries older than max_age_secs.
505    ///
506    /// Writes retained events to a temporary file, then atomically renames
507    /// it over the original WAL. This prevents event loss if another writer
508    /// appends during cleanup.
509    ///
510    /// # Arguments
511    /// * `path` - Path to WAL file
512    /// * `max_age_secs` - Maximum age in seconds
513    ///
514    /// # Returns
515    /// * `Ok(usize)` - Number of entries removed
516    ///
517    /// # Errors
518    /// Returns `IoError` if the WAL file cannot be read, the temp file cannot
519    /// be written, or the atomic rename fails.
520    pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
521        use std::time::{SystemTime, UNIX_EPOCH};
522
523        let cutoff = SystemTime::now()
524            .duration_since(UNIX_EPOCH)
525            .map_or(0, |d| d.as_secs())
526            .saturating_sub(max_age_secs);
527
528        // Read all events under lock to prevent concurrent append loss (P1 FIX)
529        let lock_file = std::fs::File::open(path)
530            .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
531        Self::lock_file(&lock_file)?;
532        let content = std::fs::read_to_string(path)
533            .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
534
535        let all_events: Vec<WalEvent> = content
536            .lines()
537            .filter_map(|line| serde_json::from_str(line).ok())
538            .collect();
539
540        let total = all_events.len();
541
542        let retained: Vec<_> = all_events.into_iter().filter(|e| e.ts >= cutoff).collect();
543        let removed = total - retained.len();
544
545        if removed > 0 {
546            // Write retained events to temp file, then merge any events appended
547            // by concurrent writers during this cleanup window, then atomic rename.
548            let temp_path = path.with_extension("wal.tmp");
549            {
550                let mut new_wal = Self::create(&temp_path)?;
551                for event in &retained {
552                    new_wal.append(event.clone())?;
553                }
554
555                // Re-read the original WAL to catch any events appended during cleanup.
556                // Lock is still held from above, so no concurrent writer can interleave.
557                let last_seq = retained.last().map_or(0, |e| e.seq);
558                let current_content = std::fs::read_to_string(path).map_err(|e| {
559                    crate::Error::WalError(format!("re-read WAL during cleanup: {}", e))
560                })?;
561                for line in current_content.lines() {
562                    if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
563                        if event.seq > last_seq {
564                            new_wal.append(event)?;
565                        }
566                    }
567                }
568            }
569            // Release lock before rename
570            Self::unlock_file(&lock_file);
571            // Atomic rename replaces original — no window for lost events
572            std::fs::rename(&temp_path, path).map_err(|e| {
573                crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
574            })?;
575        } else {
576            Self::unlock_file(&lock_file);
577        }
578
579        Ok(removed)
580    }
581}
582
583/// Truncates a string to at most `max_bytes` bytes, respecting UTF-8 boundaries.
584///
585/// Used to bound command output stored in WAL events. 1KB is sufficient
586/// for error messages and pattern analysis while preventing WAL bloat.
587#[must_use]
588pub fn truncate_to(s: &str, max_bytes: usize) -> String {
589    if s.len() <= max_bytes {
590        return s.to_string();
591    }
592    let mut end = max_bytes;
593    while end > 0 && !s.is_char_boundary(end) {
594        end -= 1;
595    }
596    let mut truncated = s[..end].to_string();
597    truncated.push_str("...[truncated]");
598    truncated
599}
600
601#[cfg(test)]
602// Allow unwrap_used and indexing_slicing in tests as panicking on failure is desired test behavior.
603#[allow(
604    clippy::unwrap_used,
605    clippy::indexing_slicing,
606    clippy::items_after_statements
607)]
608mod tests {
609    use super::*;
610
611    fn tmp_wal(name: &str) -> std::path::PathBuf {
612        std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
613    }
614
615    #[test]
616    fn test_wal_write_and_read() {
617        let path = tmp_wal("write_read");
618        let _ = std::fs::remove_file(&path);
619
620        let mut wal = WalWriter::create(&path).unwrap();
621        wal.append(WalEvent {
622            seq: 0,
623            ts: 1715800000,
624            event_type: WalEventType::JobStarted,
625            job_id: "test-job".into(),
626            capability: Some("FileRead".into()),
627            output: None,
628            error: None,
629            telemetry_before: None,
630            telemetry_after: None,
631            process_before: None,
632            process_after: None,
633            cmd: None,
634            cmd_stdout: None,
635            cmd_stderr: None,
636            cmd_exit_code: None,
637            cmd_corrected: None,
638            ..Default::default()
639        })
640        .unwrap();
641
642        let reader = WalReader::load(&path).unwrap();
643        assert_eq!(reader.events().len(), 1);
644        assert_eq!(reader.events()[0].job_id, "test-job");
645
646        let _ = std::fs::remove_file(&path);
647    }
648
649    #[test]
650    fn test_wal_seq_recovery() {
651        let path = tmp_wal("seq_recovery");
652        let _ = std::fs::remove_file(&path);
653
654        let mut wal = WalWriter::create(&path).unwrap();
655        assert_eq!(wal.seq(), 0);
656        wal.append(WalEvent {
657            seq: 0,
658            ts: 1715800000,
659            event_type: WalEventType::JobStarted,
660            job_id: "job1".into(),
661            capability: None,
662            output: None,
663            error: None,
664            telemetry_before: None,
665            telemetry_after: None,
666            process_before: None,
667            process_after: None,
668            cmd: None,
669            cmd_stdout: None,
670            cmd_stderr: None,
671            cmd_exit_code: None,
672            cmd_corrected: None,
673            ..Default::default()
674        })
675        .unwrap();
676        assert_eq!(wal.seq(), 1);
677
678        // Create new writer — should recover seq from file
679        let wal2 = WalWriter::create(&path).unwrap();
680        assert_eq!(wal2.seq(), 1);
681
682        let _ = std::fs::remove_file(&path);
683    }
684
685    #[test]
686    fn test_wal_rotation() {
687        let path = tmp_wal("rotation");
688        let _ = std::fs::remove_file(&path);
689
690        // Write enough data to trigger rotation
691        let mut wal = WalWriter::create(&path).unwrap();
692        for i in 0..100 {
693            wal.append(WalEvent {
694                seq: i,
695                ts: 1715800000 + i,
696                event_type: WalEventType::JobStarted,
697                job_id: format!("job-{}", i),
698                capability: None,
699                output: None,
700                error: None,
701                telemetry_before: None,
702                telemetry_after: None,
703                process_before: None,
704                process_after: None,
705                cmd: None,
706                cmd_stdout: None,
707                cmd_stderr: None,
708                cmd_exit_code: None,
709                cmd_corrected: None,
710                ..Default::default()
711            })
712            .unwrap();
713        }
714
715        let size = std::fs::metadata(&path).unwrap().len();
716        // Rotate with a threshold smaller than current size
717        WalWriter::rotate(&path, size - 1, 3).unwrap();
718
719        assert!(WalWriter::rotation_path(&path, 1).exists());
720        // New WAL should be empty
721        assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
722
723        let _ = std::fs::remove_file(&path);
724        let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
725    }
726
727    #[test]
728    fn test_wal_cleanup() {
729        let path = tmp_wal("cleanup");
730        let _ = std::fs::remove_file(&path);
731
732        let mut wal = WalWriter::create(&path).unwrap();
733        let now = std::time::SystemTime::now()
734            .duration_since(std::time::UNIX_EPOCH)
735            .unwrap()
736            .as_secs();
737
738        // Write old event
739        wal.append(WalEvent {
740            seq: 0,
741            ts: now - 1000,
742            event_type: WalEventType::JobStarted,
743            job_id: "old-job".into(),
744            capability: None,
745            output: None,
746            error: None,
747            telemetry_before: None,
748            telemetry_after: None,
749            process_before: None,
750            process_after: None,
751            cmd: None,
752            cmd_stdout: None,
753            cmd_stderr: None,
754            cmd_exit_code: None,
755            cmd_corrected: None,
756            ..Default::default()
757        })
758        .unwrap();
759
760        // Write recent event
761        wal.append(WalEvent {
762            seq: 1,
763            ts: now,
764            event_type: WalEventType::JobCompleted,
765            job_id: "new-job".into(),
766            capability: None,
767            output: None,
768            error: None,
769            telemetry_before: None,
770            telemetry_after: None,
771            process_before: None,
772            process_after: None,
773            cmd: None,
774            cmd_stdout: None,
775            cmd_stderr: None,
776            cmd_exit_code: None,
777            cmd_corrected: None,
778            ..Default::default()
779        })
780        .unwrap();
781
782        let removed = WalWriter::cleanup(&path, 500).unwrap();
783        assert_eq!(removed, 1); // Old event removed
784
785        let reader = WalReader::load(&path).unwrap();
786        assert_eq!(reader.events().len(), 1);
787        assert_eq!(reader.events()[0].job_id, "new-job");
788
789        let _ = std::fs::remove_file(&path);
790    }
791
792    #[test]
793    fn test_wal_skip_serializing_optional_fields() {
794        // FINDING #15: verify optional fields are skipped when None
795        let event = WalEvent {
796            seq: 0,
797            ts: 1715800000,
798            event_type: WalEventType::JobStarted,
799            job_id: "test".into(),
800            capability: None,
801            output: None,
802            error: None,
803            telemetry_before: None,
804            telemetry_after: None,
805            process_before: None,
806            process_after: None,
807            cmd: None,
808            cmd_stdout: None,
809            cmd_stderr: None,
810            cmd_exit_code: None,
811            cmd_corrected: None,
812            ..Default::default()
813        };
814
815        let json = serde_json::to_string(&event).unwrap();
816        assert!(!json.contains("capability"));
817        assert!(!json.contains("telemetry_before"));
818        assert!(!json.contains("process_before"));
819        assert!(!json.contains("cmd"));
820    }
821
822    #[test]
823    fn test_command_executed_event() {
824        let path = tmp_wal("cmd_exec");
825        let _ = std::fs::remove_file(&path);
826
827        let mut wal = WalWriter::create(&path).unwrap();
828        wal.append(WalEvent {
829            seq: 0,
830            ts: 1715800000,
831            event_type: WalEventType::CommandExecuted,
832            job_id: "job-cmd".into(),
833            capability: None,
834            output: None,
835            error: None,
836            telemetry_before: None,
837            telemetry_after: None,
838            process_before: None,
839            process_after: None,
840            cmd: Some("ls | hed -3".into()),
841            cmd_stdout: None,
842            cmd_stderr: Some("hed: command not found".into()),
843            cmd_exit_code: Some(127),
844            cmd_corrected: None,
845            ..Default::default()
846        })
847        .unwrap();
848
849        let reader = WalReader::load(&path).unwrap();
850        assert_eq!(reader.events().len(), 1);
851        assert_eq!(reader.events()[0].event_type, WalEventType::CommandExecuted);
852        assert_eq!(reader.events()[0].cmd.as_deref(), Some("ls | hed -3"));
853        assert_eq!(reader.events()[0].cmd_exit_code, Some(127));
854
855        let _ = std::fs::remove_file(&path);
856    }
857
858    #[test]
859    fn test_truncate_to() {
860        assert_eq!(truncate_to("hello", 1024), "hello");
861        assert_eq!(truncate_to("hello", 3), "hel...[truncated]");
862        let long = "a".repeat(2000);
863        let truncated = truncate_to(&long, 1024);
864        assert!(truncated.len() < 1100);
865        assert!(truncated.ends_with("...[truncated]"));
866    }
867
868    // ── GAP 9: WAL recovery from truncated file ──────────────────────
869
870    #[test]
871    fn test_wal_recovers_from_truncated_last_line() {
872        let path = tmp_wal("truncated");
873        let _ = std::fs::remove_file(&path);
874
875        // Write valid events
876        let mut wal = WalWriter::create(&path).unwrap();
877        wal.append(WalEvent {
878            seq: 0,
879            ts: 1000,
880            event_type: WalEventType::JobStarted,
881            job_id: "job1".into(),
882            capability: None,
883            output: None,
884            error: None,
885            telemetry_before: None,
886            telemetry_after: None,
887            process_before: None,
888            process_after: None,
889            cmd: None,
890            cmd_stdout: None,
891            cmd_stderr: None,
892            cmd_exit_code: None,
893            cmd_corrected: None,
894            ..Default::default()
895        })
896        .unwrap();
897
898        // Append a valid second event
899        wal.append(WalEvent {
900            seq: 1,
901            ts: 1001,
902            event_type: WalEventType::JobCompleted,
903            job_id: "job1".into(),
904            capability: None,
905            output: None,
906            error: None,
907            telemetry_before: None,
908            telemetry_after: None,
909            process_before: None,
910            process_after: None,
911            cmd: None,
912            cmd_stdout: None,
913            cmd_stderr: None,
914            cmd_exit_code: None,
915            cmd_corrected: None,
916            ..Default::default()
917        })
918        .unwrap();
919
920        // Verify 2 events exist
921        let reader_before = WalReader::load(&path).unwrap();
922        assert_eq!(reader_before.events().len(), 2);
923
924        // Simulate a crash by appending a partial line (truncated JSON)
925        // Open file in append mode without WalWriter to write raw bytes
926        use std::io::Write;
927        let mut file = std::fs::OpenOptions::new()
928            .append(true)
929            .open(&path)
930            .unwrap();
931        // Write a partial JSON line (no newline, incomplete object)
932        file.write_all(b"{\"seq\":2,\"ts\":1002,\"type\":\"job_started\",\"job_id\":\"truncated")
933            .unwrap();
934        file.flush().unwrap();
935
936        // Now load again — should skip the truncated last line and still read 2 valid events
937        let reader_after = WalReader::load(&path).unwrap();
938        assert_eq!(
939            reader_after.events().len(),
940            2,
941            "Should skip truncated last line and read 2 valid events, got {}",
942            reader_after.events().len()
943        );
944        assert_eq!(reader_after.events()[0].job_id, "job1");
945        assert_eq!(reader_after.events()[1].job_id, "job1");
946
947        let _ = std::fs::remove_file(&path);
948    }
949
950    #[test]
951    fn test_wal_skips_garbage_lines() {
952        let path = tmp_wal("garbage");
953        let _ = std::fs::remove_file(&path);
954
955        // Write one valid event followed by a garbage line
956        let mut wal = WalWriter::create(&path).unwrap();
957        wal.append(WalEvent {
958            seq: 0,
959            ts: 1000,
960            event_type: WalEventType::JobStarted,
961            job_id: "valid".into(),
962            capability: None,
963            output: None,
964            error: None,
965            telemetry_before: None,
966            telemetry_after: None,
967            process_before: None,
968            process_after: None,
969            cmd: None,
970            cmd_stdout: None,
971            cmd_stderr: None,
972            cmd_exit_code: None,
973            cmd_corrected: None,
974            ..Default::default()
975        })
976        .unwrap();
977
978        use std::io::Write;
979        let mut file = std::fs::OpenOptions::new()
980            .append(true)
981            .open(&path)
982            .unwrap();
983        file.write_all(b"not valid json at all\n").unwrap();
984        file.write_all(b"{\"seq\":999,\"type\":\"garbage\"}\n")
985            .unwrap(); // partial, missing required fields
986        file.flush().unwrap();
987
988        let reader = WalReader::load(&path).unwrap();
989        assert_eq!(
990            reader.events().len(),
991            1,
992            "Should only find 1 valid event, got {}",
993            reader.events().len()
994        );
995        assert_eq!(reader.events()[0].job_id, "valid");
996
997        let _ = std::fs::remove_file(&path);
998    }
999}