Skip to main content

runtimo_core/
wal.rs

1//! Write-Ahead Log (WAL) — Append-only, crash-resistant event log.
2//!
3//! Events are written as JSONL (one JSON object per line) with `fsync` after
4//! each write to guarantee durability. The WAL supports crash recovery by
5//! replaying events to reconstruct system state.
6//!
7//! Sequence numbers and rotation indices use explicit arithmetic — these
8//! are intentional, increment-by-one operations with known-safe ranges.
9
10#![allow(clippy::arithmetic_side_effects)]
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use runtimo_core::{WalWriter, WalReader, WalEvent, WalEventType};
16//! use std::path::Path;
17//!
18//! let mut wal = WalWriter::create(Path::new("/tmp/test.wal")).unwrap();
19//! wal.append(WalEvent {
20//!     seq: 0, ts: 1715800000,
21//!     event_type: WalEventType::JobStarted,
22//!     job_id: "abc123".into(),
23//!     capability: Some("FileRead".into()),
24//!     output: None, error: None,
25//! }).unwrap();
26//!
27//! let reader = WalReader::load(Path::new("/tmp/test.wal")).unwrap();
28//! assert_eq!(reader.events().len(), 1);
29//! ```
30
31use crate::processes::ProcessSummary;
32use crate::telemetry::Telemetry;
33use crate::Result;
34use serde::{Deserialize, Serialize};
35use std::path::Path;
36
37/// Reads the last event's sequence number from a WAL file without loading
38/// the entire file into memory. Reads only the trailing tail_bytes of the
39/// file to find the last valid JSON line.
40///
41/// Falls back gracefully to `None` on any parse error, partial line, or
42/// I/O failure — the caller falls back to full scan.
43fn read_last_seq(path: &Path, tail_bytes: usize) -> Option<u64> {
44    use std::io::{Read, Seek, SeekFrom};
45
46    let mut file = std::fs::File::open(path).ok()?;
47    let file_len = file.metadata().ok()?.len();
48    if file_len == 0 {
49        return None;
50    }
51
52    let start = file_len.saturating_sub(tail_bytes as u64);
53    file.seek(SeekFrom::Start(start)).ok()?;
54    let mut buf = vec![
55        0u8;
56        usize::try_from(file_len - start)
57            .unwrap_or(0)
58            .saturating_add(1)
59    ];
60    let n = file.read(&mut buf).ok()?;
61    buf.truncate(n);
62
63    // Split into lines, find the last non-empty line, trailing newline stripped
64    let lines: Vec<&[u8]> = buf
65        .split(|&b| b == b'\n')
66        .filter(|l| !l.is_empty())
67        .collect();
68
69    for line in lines.iter().rev() {
70        if let Ok(line_str) = std::str::from_utf8(line) {
71            if let Ok(event) = serde_json::from_str::<WalEvent>(line_str.trim()) {
72                return Some(event.seq);
73            }
74        }
75    }
76    None
77}
78
79/// A single WAL event record.
80///
81/// Events are appended sequentially and identified by `seq`. The `ts` field
82/// is a Unix timestamp in seconds. Optional fields (`capability`, `output`,
83/// `error`, `cmd_*`) are skipped during serialization when `None`.
84///
85/// # Command Execution Events
86///
87/// When `event_type` is [`WalEventType::CommandExecuted`], the `cmd*` fields
88/// capture the shell command, its output, and any auto-correction applied.
89/// These events are only written in debug builds (`#[cfg(debug_assertions)]`),
90/// but the variant exists in release builds for reading old WALs.
91#[derive(Debug, Clone, Serialize, Deserialize, Default)]
92#[allow(clippy::exhaustive_structs)]
93pub struct WalEvent {
94    /// Sequence number (monotonically increasing within a writer session).
95    pub seq: u64,
96    /// Unix timestamp (seconds) when the event occurred.
97    pub ts: u64,
98    /// Type of the event (job lifecycle stage or command execution).
99    #[serde(rename = "type")]
100    pub event_type: WalEventType,
101    /// The job ID this event relates to.
102    pub job_id: String,
103    /// Capability name, if applicable.
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub capability: Option<String>,
106    /// Output data from the capability, if applicable.
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub output: Option<serde_json::Value>,
109    /// Error message, if the event represents a failure.
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub error: Option<String>,
112    /// Hardware telemetry snapshot before execution.
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub telemetry_before: Option<Telemetry>,
115    /// Hardware telemetry snapshot after execution.
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub telemetry_after: Option<Telemetry>,
118    /// Process summary before execution.
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub process_before: Option<ProcessSummary>,
121    /// Process summary after execution.
122    #[serde(skip_serializing_if = "Option::is_none")]
123    pub process_after: Option<ProcessSummary>,
124    /// Shell command string (CommandExecuted events only).
125    #[serde(skip_serializing_if = "Option::is_none")]
126    pub cmd: Option<String>,
127    /// Captured stdout, truncated to 1KB (CommandExecuted events only).
128    #[serde(skip_serializing_if = "Option::is_none")]
129    pub cmd_stdout: Option<String>,
130    /// Captured stderr, truncated to 1KB (CommandExecuted events only).
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub cmd_stderr: Option<String>,
133    /// Exit code of the command (CommandExecuted events only).
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub cmd_exit_code: Option<i32>,
136    /// Auto-corrected command, if correction was applied (future Phase 2).
137    #[serde(skip_serializing_if = "Option::is_none")]
138    pub cmd_corrected: Option<String>,
139    /// OOV ratio from LLMOSafe (for CognitiveSafetyViolation).
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub oov_ratio: Option<u8>,
142    /// Detection flags from LLMOSafe (for CognitiveSafetyViolation).
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub detection_flags: Option<u8>,
145}
146
147/// Types of WAL events, corresponding to job lifecycle stages
148/// and command executions.
149#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
150#[serde(rename_all = "snake_case")]
151// Additional variants may be added in future versions; the enum is
152// intentionally non-exhaustive for forward compatibility.
153#[allow(clippy::exhaustive_enums)]
154pub enum WalEventType {
155    /// Job has been submitted to the system.
156    #[default]
157    JobSubmitted,
158    /// Job arguments passed validation.
159    JobValidated,
160    /// Job execution has started.
161    JobStarted,
162    /// Job completed successfully.
163    JobCompleted,
164    /// Job failed during validation or execution.
165    JobFailed,
166    /// A completed job was rolled back.
167    JobRolledBack,
168    /// Shell command executed (dev-only logging for error absorption).
169    CommandExecuted,
170}
171
172/// Append-only WAL writer.
173///
174/// Opens (or creates) a file in append mode and writes one JSONL line per
175/// event, using file locking for concurrent access safety and `fsync` after
176/// each write for durability.
177///
178/// # Example
179///
180/// ```rust,ignore
181/// use runtimo_core::{WalWriter, WalEvent, WalEventType};
182/// use std::path::Path;
183///
184/// let mut wal = WalWriter::create(Path::new("/tmp/app.wal")).unwrap();
185/// wal.append(WalEvent {
186///     seq: 0, ts: 1715800000,
187///     event_type: WalEventType::JobStarted,
188///     job_id: "job1".into(),
189///     capability: None, output: None, error: None,
190/// }).unwrap();
191/// ```
192#[allow(clippy::exhaustive_structs)]
193pub struct WalWriter {
194    path: std::path::PathBuf,
195    seq: u64,
196}
197
198impl WalWriter {
199    /// Creates or opens a WAL file at the given path.
200    ///
201    /// The file is opened in append mode. Existing content is preserved.
202    ///
203    /// # Errors
204    ///
205    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
206    /// be created or opened.
207    pub fn create(path: &Path) -> Result<Self> {
208        // Ensure parent directory exists
209        if let Some(parent) = path.parent() {
210            if !parent.exists() {
211                std::fs::create_dir_all(parent).map_err(|e| {
212                    crate::Error::WalError(format!(
213                        "Failed to create WAL directory {}: {}",
214                        parent.display(),
215                        e
216                    ))
217                })?;
218            }
219        }
220
221        // Create the file if it doesn't exist
222        if !path.exists() {
223            std::fs::File::create(path).map_err(|e| {
224                crate::Error::WalError(format!(
225                    "Failed to create WAL file {}: {}",
226                    path.display(),
227                    e
228                ))
229            })?;
230        }
231
232        // Recover sequence from existing WAL content to ensure monotonic
233        // ordering across process restarts. Acquire lock to prevent reading
234        // during a concurrent write (P2 FIX).
235        //
236        // Optimized: reads only the last 8KB of the WAL file to find the
237        // max seq. Falls back to full read only if tail-read fails.
238        let seq = if path.exists() {
239            let lock_file = std::fs::File::open(path)
240                .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
241            Self::lock_file(&lock_file)?;
242            let recovered = if let Some(last_seq) = read_last_seq(path, 8192) {
243                last_seq + 1
244            } else {
245                // Fall back to full scan if tail-read failed
246                let content = std::fs::read_to_string(path).map_err(|e| {
247                    crate::Error::WalError(format!("read WAL for seq recovery: {}", e))
248                })?;
249                content
250                    .lines()
251                    .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
252                    .map(|e| e.seq)
253                    .max()
254                    .map_or(0, |max| max + 1)
255            };
256            Self::unlock_file(&lock_file);
257            recovered
258        } else {
259            0
260        };
261
262        Ok(Self {
263            path: path.to_path_buf(),
264            seq,
265        })
266    }
267
268    /// Acquires an exclusive file lock for writing (FINDING #14).
269    #[cfg(unix)]
270    fn lock_file(file: &std::fs::File) -> Result<()> {
271        use std::os::unix::io::AsRawFd;
272        let fd = file.as_raw_fd();
273        // SAFETY: fd is a valid open file descriptor; LOCK_EX is a well-defined operation
274        let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
275        if result != 0 {
276            return Err(crate::Error::WalError(format!(
277                "Failed to acquire WAL lock: {}",
278                std::io::Error::last_os_error()
279            )));
280        }
281        Ok(())
282    }
283
284    /// Acquires an exclusive file lock (no-op on non-unix).
285    #[cfg(not(unix))]
286    fn lock_file(_file: &std::fs::File) -> Result<()> {
287        Ok(())
288    }
289
290    /// Releases an exclusive file lock (FINDING #14).
291    #[cfg(unix)]
292    fn unlock_file(file: &std::fs::File) {
293        use std::os::unix::io::AsRawFd;
294        let fd = file.as_raw_fd();
295        // SAFETY: fd is a valid open file descriptor; LOCK_UN is a well-defined operation
296        unsafe { libc::flock(fd, libc::LOCK_UN) };
297    }
298
299    /// Releases an exclusive file lock (no-op on non-unix).
300    #[cfg(not(unix))]
301    fn unlock_file(_file: &std::fs::File) {}
302
303    /// Appends an event to the WAL using true append mode (P0 FIX).
304    ///
305    /// Opens the file in append mode, acquires an exclusive lock, writes the
306    /// JSONL line, fsyncs, then releases the lock. This is O(1) per write
307    /// instead of O(N) read-rewrite, and the lock is held during the entire
308    /// write operation preventing concurrent write loss.
309    ///
310    /// Increments the internal sequence counter after a successful write.
311    ///
312    /// # Errors
313    ///
314    /// Returns [`Error::WalError`](crate::Error::WalError) on serialization
315    /// or I/O failure.
316    pub fn append(&mut self, event: WalEvent) -> Result<()> {
317        use std::io::Write;
318        let line =
319            serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
320
321        // Open in append mode — no read-rewrite, O(1) per write
322        let file = std::fs::OpenOptions::new()
323            .create(true)
324            .append(true)
325            .open(&self.path)
326            .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
327
328        // Hold exclusive lock during entire write
329        Self::lock_file(&file)?;
330        {
331            let mut buf = std::io::BufWriter::new(&file);
332            writeln!(buf, "{}", line)
333                .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
334            buf.flush()
335                .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
336            file.sync_all()
337                .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
338        }
339        Self::unlock_file(&file);
340
341        self.seq += 1;
342        Ok(())
343    }
344
345    /// Returns the current sequence number (next event will use this value).
346    #[must_use]
347    pub fn seq(&self) -> u64 {
348        self.seq
349    }
350}
351
352/// Reads and parses a WAL file into a list of events.
353///
354/// Malformed lines are silently skipped. This is intentional — partial writes
355/// from crashes may leave incomplete JSON at the end of the file.
356///
357/// # Example
358///
359/// ```rust,ignore
360/// use runtimo_core::WalReader;
361/// use std::path::Path;
362///
363/// let reader = WalReader::load(Path::new("/tmp/app.wal")).unwrap();
364/// for event in reader.events() {
365///     println!("Event: {:?} for job {}", event.event_type, event.job_id);
366/// }
367/// ```
368#[allow(clippy::exhaustive_structs)]
369pub struct WalReader {
370    events: Vec<WalEvent>,
371}
372
373impl WalReader {
374    /// Loads and parses all events from a WAL file.
375    ///
376    /// # Errors
377    ///
378    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
379    /// be read. Individual malformed lines are skipped, not treated as errors.
380    pub fn load(path: &Path) -> Result<Self> {
381        let content =
382            std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
383
384        let events: Vec<WalEvent> = content
385            .lines()
386            .filter_map(|line| serde_json::from_str(line).ok())
387            .collect();
388
389        Ok(Self { events })
390    }
391
392    /// Returns a slice of all parsed events.
393    #[must_use]
394    pub fn events(&self) -> &[WalEvent] {
395        &self.events
396    }
397
398    /// Reads only the last `n` lines from the WAL file.
399    ///
400    /// More efficient than [`WalReader::load`] when only recent events are needed.
401    /// Malformed lines are silently skipped.
402    ///
403    /// # Errors
404    ///
405    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
406    /// be read.
407    pub fn tail(path: &Path, n: usize) -> Result<Self> {
408        use std::collections::VecDeque;
409        use std::io::{BufRead, BufReader};
410        let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
411        let reader = BufReader::new(file);
412
413        let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
414        for line in reader.lines() {
415            let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
416            if let Ok(event) = serde_json::from_str(&line) {
417                window.push_back(event);
418                if window.len() > n {
419                    window.pop_front();
420                }
421            }
422        }
423
424        Ok(Self {
425            events: window.into(),
426        })
427    }
428}
429
430/// WAL cleanup and rotation utilities.
431impl WalWriter {
432    /// Returns the rotation path for a given WAL file and rotation index.
433    ///
434    /// For `foo.jsonl` with index `1`, returns `foo.jsonl.1` (preserves the
435    /// original extension instead of replacing it).
436    fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
437        let mut s = path.to_string_lossy().into_owned();
438        s.push('.');
439        s.push_str(&index.to_string());
440        std::path::PathBuf::from(s)
441    }
442
443    /// Rotates the WAL when it exceeds max_size_bytes.
444    ///
445    /// Moves the current WAL to `{path}.1` (shifting older rotations),
446    /// then creates a fresh empty WAL. Keeps at most `max_rotations` old files.
447    /// FINDING #15: basic WAL rotation to prevent unbounded growth.
448    /// P1 FIX: Acquires exclusive lock to prevent concurrent append loss during rotation.
449    ///
450    /// # Errors
451    /// Returns `IoError` or `BackupError` if WAL file operations fail.
452    pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
453        let Ok(metadata) = std::fs::metadata(path) else {
454            return Ok(()); // No WAL to rotate
455        };
456
457        if metadata.len() < max_size_bytes {
458            return Ok(());
459        }
460
461        // Acquire lock to prevent concurrent append during rotation
462        let lock_file = std::fs::File::open(path)
463            .map_err(|e| crate::Error::WalError(format!("open WAL for rotation: {}", e)))?;
464        Self::lock_file(&lock_file)?;
465
466        // Shift existing rotations (P1 FIX: proper naming preserves extension)
467        for i in (1..max_rotations).rev() {
468            let old = Self::rotation_path(path, i);
469            let new = Self::rotation_path(path, i + 1);
470            if old.exists() {
471                let _ = std::fs::rename(&old, &new);
472            }
473        }
474
475        // Move current to .1
476        let rotated = Self::rotation_path(path, 1);
477        std::fs::rename(path, &rotated)
478            .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
479
480        // Create fresh empty WAL
481        std::fs::write(path, "")
482            .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
483
484        // Remove oldest rotation if exceeding max
485        let oldest = Self::rotation_path(path, max_rotations + 1);
486        if oldest.exists() {
487            let _ = std::fs::remove_file(&oldest);
488        }
489
490        Self::unlock_file(&lock_file);
491        Ok(())
492    }
493
494    /// Cleans up WAL entries older than max_age_secs.
495    ///
496    /// Writes retained events to a temporary file, then atomically renames
497    /// it over the original WAL. This prevents event loss if another writer
498    /// appends during cleanup.
499    ///
500    /// # Arguments
501    /// * `path` - Path to WAL file
502    /// * `max_age_secs` - Maximum age in seconds
503    ///
504    /// # Returns
505    /// * `Ok(usize)` - Number of entries removed
506    ///
507    /// # Errors
508    /// Returns `IoError` if the WAL file cannot be read, the temp file cannot
509    /// be written, or the atomic rename fails.
510    pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
511        use std::time::{SystemTime, UNIX_EPOCH};
512
513        let cutoff = SystemTime::now()
514            .duration_since(UNIX_EPOCH)
515            .map_or(0, |d| d.as_secs())
516            .saturating_sub(max_age_secs);
517
518        // Read all events under lock to prevent concurrent append loss (P1 FIX)
519        let lock_file = std::fs::File::open(path)
520            .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
521        Self::lock_file(&lock_file)?;
522        let content = std::fs::read_to_string(path)
523            .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
524
525        let all_events: Vec<WalEvent> = content
526            .lines()
527            .filter_map(|line| serde_json::from_str(line).ok())
528            .collect();
529
530        let total = all_events.len();
531
532        let retained: Vec<_> = all_events.into_iter().filter(|e| e.ts >= cutoff).collect();
533        let removed = total - retained.len();
534
535        if removed > 0 {
536            // Write retained events to temp file, then merge any events appended
537            // by concurrent writers during this cleanup window, then atomic rename.
538            let temp_path = path.with_extension("wal.tmp");
539            {
540                let mut new_wal = Self::create(&temp_path)?;
541                for event in &retained {
542                    new_wal.append(event.clone())?;
543                }
544
545                // Re-read the original WAL to catch any events appended during cleanup.
546                // Lock is still held from above, so no concurrent writer can interleave.
547                let last_seq = retained.last().map_or(0, |e| e.seq);
548                let current_content = std::fs::read_to_string(path).map_err(|e| {
549                    crate::Error::WalError(format!("re-read WAL during cleanup: {}", e))
550                })?;
551                for line in current_content.lines() {
552                    if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
553                        if event.seq > last_seq {
554                            new_wal.append(event)?;
555                        }
556                    }
557                }
558            }
559            // Release lock before rename
560            Self::unlock_file(&lock_file);
561            // Atomic rename replaces original — no window for lost events
562            std::fs::rename(&temp_path, path).map_err(|e| {
563                crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
564            })?;
565        } else {
566            Self::unlock_file(&lock_file);
567        }
568
569        Ok(removed)
570    }
571}
572
573/// Truncates a string to at most `max_bytes` bytes, respecting UTF-8 boundaries.
574///
575/// Used to bound command output stored in WAL events. 1KB is sufficient
576/// for error messages and pattern analysis while preventing WAL bloat.
577#[must_use]
578pub fn truncate_to(s: &str, max_bytes: usize) -> String {
579    if s.len() <= max_bytes {
580        return s.to_string();
581    }
582    let mut end = max_bytes;
583    while end > 0 && !s.is_char_boundary(end) {
584        end -= 1;
585    }
586    let mut truncated = s[..end].to_string();
587    truncated.push_str("...[truncated]");
588    truncated
589}
590
591#[cfg(test)]
592// Allow unwrap_used and indexing_slicing in tests as panicking on failure is desired test behavior.
593#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
594mod tests {
595    use super::*;
596
597    fn tmp_wal(name: &str) -> std::path::PathBuf {
598        std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
599    }
600
601    #[test]
602    fn test_wal_write_and_read() {
603        let path = tmp_wal("write_read");
604        let _ = std::fs::remove_file(&path);
605
606        let mut wal = WalWriter::create(&path).unwrap();
607        wal.append(WalEvent {
608            seq: 0,
609            ts: 1715800000,
610            event_type: WalEventType::JobStarted,
611            job_id: "test-job".into(),
612            capability: Some("FileRead".into()),
613            output: None,
614            error: None,
615            telemetry_before: None,
616            telemetry_after: None,
617            process_before: None,
618            process_after: None,
619            cmd: None,
620            cmd_stdout: None,
621            cmd_stderr: None,
622            cmd_exit_code: None,
623            cmd_corrected: None,
624            ..Default::default()
625        })
626        .unwrap();
627
628        let reader = WalReader::load(&path).unwrap();
629        assert_eq!(reader.events().len(), 1);
630        assert_eq!(reader.events()[0].job_id, "test-job");
631
632        let _ = std::fs::remove_file(&path);
633    }
634
635    #[test]
636    fn test_wal_seq_recovery() {
637        let path = tmp_wal("seq_recovery");
638        let _ = std::fs::remove_file(&path);
639
640        let mut wal = WalWriter::create(&path).unwrap();
641        assert_eq!(wal.seq(), 0);
642        wal.append(WalEvent {
643            seq: 0,
644            ts: 1715800000,
645            event_type: WalEventType::JobStarted,
646            job_id: "job1".into(),
647            capability: None,
648            output: None,
649            error: None,
650            telemetry_before: None,
651            telemetry_after: None,
652            process_before: None,
653            process_after: None,
654            cmd: None,
655            cmd_stdout: None,
656            cmd_stderr: None,
657            cmd_exit_code: None,
658            cmd_corrected: None,
659            ..Default::default()
660        })
661        .unwrap();
662        assert_eq!(wal.seq(), 1);
663
664        // Create new writer — should recover seq from file
665        let wal2 = WalWriter::create(&path).unwrap();
666        assert_eq!(wal2.seq(), 1);
667
668        let _ = std::fs::remove_file(&path);
669    }
670
671    #[test]
672    fn test_wal_rotation() {
673        let path = tmp_wal("rotation");
674        let _ = std::fs::remove_file(&path);
675
676        // Write enough data to trigger rotation
677        let mut wal = WalWriter::create(&path).unwrap();
678        for i in 0..100 {
679            wal.append(WalEvent {
680                seq: i,
681                ts: 1715800000 + i,
682                event_type: WalEventType::JobStarted,
683                job_id: format!("job-{}", i),
684                capability: None,
685                output: None,
686                error: None,
687                telemetry_before: None,
688                telemetry_after: None,
689                process_before: None,
690                process_after: None,
691                cmd: None,
692                cmd_stdout: None,
693                cmd_stderr: None,
694                cmd_exit_code: None,
695                cmd_corrected: None,
696                ..Default::default()
697            })
698            .unwrap();
699        }
700
701        let size = std::fs::metadata(&path).unwrap().len();
702        // Rotate with a threshold smaller than current size
703        WalWriter::rotate(&path, size - 1, 3).unwrap();
704
705        assert!(WalWriter::rotation_path(&path, 1).exists());
706        // New WAL should be empty
707        assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
708
709        let _ = std::fs::remove_file(&path);
710        let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
711    }
712
713    #[test]
714    fn test_wal_cleanup() {
715        let path = tmp_wal("cleanup");
716        let _ = std::fs::remove_file(&path);
717
718        let mut wal = WalWriter::create(&path).unwrap();
719        let now = std::time::SystemTime::now()
720            .duration_since(std::time::UNIX_EPOCH)
721            .unwrap()
722            .as_secs();
723
724        // Write old event
725        wal.append(WalEvent {
726            seq: 0,
727            ts: now - 1000,
728            event_type: WalEventType::JobStarted,
729            job_id: "old-job".into(),
730            capability: None,
731            output: None,
732            error: None,
733            telemetry_before: None,
734            telemetry_after: None,
735            process_before: None,
736            process_after: None,
737            cmd: None,
738            cmd_stdout: None,
739            cmd_stderr: None,
740            cmd_exit_code: None,
741            cmd_corrected: None,
742            ..Default::default()
743        })
744        .unwrap();
745
746        // Write recent event
747        wal.append(WalEvent {
748            seq: 1,
749            ts: now,
750            event_type: WalEventType::JobCompleted,
751            job_id: "new-job".into(),
752            capability: None,
753            output: None,
754            error: None,
755            telemetry_before: None,
756            telemetry_after: None,
757            process_before: None,
758            process_after: None,
759            cmd: None,
760            cmd_stdout: None,
761            cmd_stderr: None,
762            cmd_exit_code: None,
763            cmd_corrected: None,
764            ..Default::default()
765        })
766        .unwrap();
767
768        let removed = WalWriter::cleanup(&path, 500).unwrap();
769        assert_eq!(removed, 1); // Old event removed
770
771        let reader = WalReader::load(&path).unwrap();
772        assert_eq!(reader.events().len(), 1);
773        assert_eq!(reader.events()[0].job_id, "new-job");
774
775        let _ = std::fs::remove_file(&path);
776    }
777
778    #[test]
779    fn test_wal_skip_serializing_optional_fields() {
780        // FINDING #15: verify optional fields are skipped when None
781        let event = WalEvent {
782            seq: 0,
783            ts: 1715800000,
784            event_type: WalEventType::JobStarted,
785            job_id: "test".into(),
786            capability: None,
787            output: None,
788            error: None,
789            telemetry_before: None,
790            telemetry_after: None,
791            process_before: None,
792            process_after: None,
793            cmd: None,
794            cmd_stdout: None,
795            cmd_stderr: None,
796            cmd_exit_code: None,
797            cmd_corrected: None,
798            ..Default::default()
799        };
800
801        let json = serde_json::to_string(&event).unwrap();
802        assert!(!json.contains("capability"));
803        assert!(!json.contains("telemetry_before"));
804        assert!(!json.contains("process_before"));
805        assert!(!json.contains("cmd"));
806    }
807
808    #[test]
809    fn test_command_executed_event() {
810        let path = tmp_wal("cmd_exec");
811        let _ = std::fs::remove_file(&path);
812
813        let mut wal = WalWriter::create(&path).unwrap();
814        wal.append(WalEvent {
815            seq: 0,
816            ts: 1715800000,
817            event_type: WalEventType::CommandExecuted,
818            job_id: "job-cmd".into(),
819            capability: None,
820            output: None,
821            error: None,
822            telemetry_before: None,
823            telemetry_after: None,
824            process_before: None,
825            process_after: None,
826            cmd: Some("ls | hed -3".into()),
827            cmd_stdout: None,
828            cmd_stderr: Some("hed: command not found".into()),
829            cmd_exit_code: Some(127),
830            cmd_corrected: None,
831            ..Default::default()
832        })
833        .unwrap();
834
835        let reader = WalReader::load(&path).unwrap();
836        assert_eq!(reader.events().len(), 1);
837        assert_eq!(reader.events()[0].event_type, WalEventType::CommandExecuted);
838        assert_eq!(reader.events()[0].cmd.as_deref(), Some("ls | hed -3"));
839        assert_eq!(reader.events()[0].cmd_exit_code, Some(127));
840
841        let _ = std::fs::remove_file(&path);
842    }
843
844    #[test]
845    fn test_truncate_to() {
846        assert_eq!(truncate_to("hello", 1024), "hello");
847        assert_eq!(truncate_to("hello", 3), "hel...[truncated]");
848        let long = "a".repeat(2000);
849        let truncated = truncate_to(&long, 1024);
850        assert!(truncated.len() < 1100);
851        assert!(truncated.ends_with("...[truncated]"));
852    }
853
854    // ── GAP 9: WAL recovery from truncated file ──────────────────────
855
856    #[test]
857    fn test_wal_recovers_from_truncated_last_line() {
858        let path = tmp_wal("truncated");
859        let _ = std::fs::remove_file(&path);
860
861        // Write valid events
862        let mut wal = WalWriter::create(&path).unwrap();
863        wal.append(WalEvent {
864            seq: 0,
865            ts: 1000,
866            event_type: WalEventType::JobStarted,
867            job_id: "job1".into(),
868            capability: None,
869            output: None,
870            error: None,
871            telemetry_before: None,
872            telemetry_after: None,
873            process_before: None,
874            process_after: None,
875            cmd: None,
876            cmd_stdout: None,
877            cmd_stderr: None,
878            cmd_exit_code: None,
879            cmd_corrected: None,
880            ..Default::default()
881        })
882        .unwrap();
883
884        // Append a valid second event
885        wal.append(WalEvent {
886            seq: 1,
887            ts: 1001,
888            event_type: WalEventType::JobCompleted,
889            job_id: "job1".into(),
890            capability: None,
891            output: None,
892            error: None,
893            telemetry_before: None,
894            telemetry_after: None,
895            process_before: None,
896            process_after: None,
897            cmd: None,
898            cmd_stdout: None,
899            cmd_stderr: None,
900            cmd_exit_code: None,
901            cmd_corrected: None,
902            ..Default::default()
903        })
904        .unwrap();
905
906        // Verify 2 events exist
907        let reader_before = WalReader::load(&path).unwrap();
908        assert_eq!(reader_before.events().len(), 2);
909
910        // Simulate a crash by appending a partial line (truncated JSON)
911        // Open file in append mode without WalWriter to write raw bytes
912        use std::io::Write;
913        let mut file = std::fs::OpenOptions::new()
914            .append(true)
915            .open(&path)
916            .unwrap();
917        // Write a partial JSON line (no newline, incomplete object)
918        file.write_all(b"{\"seq\":2,\"ts\":1002,\"type\":\"job_started\",\"job_id\":\"truncated")
919            .unwrap();
920        file.flush().unwrap();
921
922        // Now load again — should skip the truncated last line and still read 2 valid events
923        let reader_after = WalReader::load(&path).unwrap();
924        assert_eq!(
925            reader_after.events().len(),
926            2,
927            "Should skip truncated last line and read 2 valid events, got {}",
928            reader_after.events().len()
929        );
930        assert_eq!(reader_after.events()[0].job_id, "job1");
931        assert_eq!(reader_after.events()[1].job_id, "job1");
932
933        let _ = std::fs::remove_file(&path);
934    }
935
936    #[test]
937    fn test_wal_skips_garbage_lines() {
938        let path = tmp_wal("garbage");
939        let _ = std::fs::remove_file(&path);
940
941        // Write one valid event followed by a garbage line
942        let mut wal = WalWriter::create(&path).unwrap();
943        wal.append(WalEvent {
944            seq: 0,
945            ts: 1000,
946            event_type: WalEventType::JobStarted,
947            job_id: "valid".into(),
948            capability: None,
949            output: None,
950            error: None,
951            telemetry_before: None,
952            telemetry_after: None,
953            process_before: None,
954            process_after: None,
955            cmd: None,
956            cmd_stdout: None,
957            cmd_stderr: None,
958            cmd_exit_code: None,
959            cmd_corrected: None,
960            ..Default::default()
961        })
962        .unwrap();
963
964        use std::io::Write;
965        let mut file = std::fs::OpenOptions::new()
966            .append(true)
967            .open(&path)
968            .unwrap();
969        file.write_all(b"not valid json at all\n").unwrap();
970        file.write_all(b"{\"seq\":999,\"type\":\"garbage\"}\n")
971            .unwrap(); // partial, missing required fields
972        file.flush().unwrap();
973
974        let reader = WalReader::load(&path).unwrap();
975        assert_eq!(
976            reader.events().len(),
977            1,
978            "Should only find 1 valid event, got {}",
979            reader.events().len()
980        );
981        assert_eq!(reader.events()[0].job_id, "valid");
982
983        let _ = std::fs::remove_file(&path);
984    }
985}