Skip to main content

runtimo_core/
wal.rs

1//! Write-Ahead Log (WAL) — Append-only, crash-resistant event log.
2//!
3//! Events are written as JSONL (one JSON object per line) with `fsync` after
4//! each write to guarantee durability. The WAL supports crash recovery by
5//! replaying events to reconstruct system state.
6//!
7//! Sequence numbers and rotation indices use explicit arithmetic — these
8//! are intentional, increment-by-one operations with known-safe ranges.
9
10#![allow(clippy::arithmetic_side_effects)]
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use runtimo_core::{WalWriter, WalReader, WalEvent, WalEventType};
16//! use std::path::Path;
17//!
18//! let mut wal = WalWriter::create(Path::new("/tmp/test.wal")).unwrap();
19//! wal.append(WalEvent {
20//!     seq: 0, ts: 1715800000,
21//!     event_type: WalEventType::JobStarted,
22//!     job_id: "abc123".into(),
23//!     capability: Some("FileRead".into()),
24//!     output: None, error: None,
25//! }).unwrap();
26//!
27//! let reader = WalReader::load(Path::new("/tmp/test.wal")).unwrap();
28//! assert_eq!(reader.events().len(), 1);
29//! ```
30
31use crate::processes::ProcessSummary;
32use crate::telemetry::Telemetry;
33use crate::Result;
34
35use serde::{Deserialize, Serialize};
36use std::path::Path;
37
38/// Reads the last event's sequence number from a WAL file without loading
39/// the entire file into memory. Reads only the trailing tail_bytes of the
40/// file to find the last valid JSON line.
41///
42/// Falls back gracefully to `None` on any parse error, partial line, or
43/// I/O failure — the caller falls back to full scan.
44fn read_last_seq(path: &Path, tail_bytes: usize) -> Option<u64> {
45    use std::io::{Read, Seek, SeekFrom};
46
47    let mut file = std::fs::File::open(path).ok()?;
48    let file_len = file.metadata().ok()?.len();
49    if file_len == 0 {
50        return None;
51    }
52
53    let start = file_len.saturating_sub(tail_bytes as u64);
54    file.seek(SeekFrom::Start(start)).ok()?;
55    let mut buf = vec![
56        0u8;
57        usize::try_from(file_len - start)
58            .unwrap_or(0)
59            .saturating_add(1)
60    ];
61    let n = file.read(&mut buf).ok()?;
62    buf.truncate(n);
63
64    // Split into lines, find the last non-empty line, trailing newline stripped
65    let lines: Vec<&[u8]> = buf
66        .split(|&b| b == b'\n')
67        .filter(|l| !l.is_empty())
68        .collect();
69
70    for line in lines.iter().rev() {
71        if let Ok(line_str) = std::str::from_utf8(line) {
72            if let Ok(event) = serde_json::from_str::<WalEvent>(line_str.trim()) {
73                return Some(event.seq);
74            }
75        }
76    }
77    None
78}
79
80/// A single WAL event record.
81///
82/// Events are appended sequentially and identified by `seq`. The `ts` field
83/// is a Unix timestamp in seconds. Optional fields (`capability`, `output`,
84/// `error`, `cmd_*`) are skipped during serialization when `None`.
85///
86/// # Command Execution Events
87///
88/// When `event_type` is [`WalEventType::CommandExecuted`], the `cmd*` fields
89/// capture the shell command, its output, and any auto-correction applied.
90/// These events are only written in debug builds (`#[cfg(debug_assertions)]`),
91/// but the variant exists in release builds for reading old WALs.
92#[derive(Debug, Clone, Serialize, Deserialize, Default)]
93#[allow(clippy::exhaustive_structs)]
94pub struct WalEvent {
95    /// Sequence number (monotonically increasing within a writer session).
96    pub seq: u64,
97    /// Unix timestamp (seconds) when the event occurred.
98    pub ts: u64,
99    /// Type of the event (job lifecycle stage or command execution).
100    #[serde(rename = "type")]
101    pub event_type: WalEventType,
102    /// The job ID this event relates to.
103    pub job_id: String,
104    /// Capability name, if applicable.
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub capability: Option<String>,
107    /// Output data from the capability, if applicable.
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub output: Option<serde_json::Value>,
110    /// Error message, if the event represents a failure.
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub error: Option<String>,
113    /// Hardware telemetry snapshot before execution.
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub telemetry_before: Option<Telemetry>,
116    /// Hardware telemetry snapshot after execution.
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub telemetry_after: Option<Telemetry>,
119    /// Process summary before execution.
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub process_before: Option<ProcessSummary>,
122    /// Process summary after execution.
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub process_after: Option<ProcessSummary>,
125    /// Shell command string (CommandExecuted events only).
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub cmd: Option<String>,
128    /// Captured stdout, truncated to 1KB (CommandExecuted events only).
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub cmd_stdout: Option<String>,
131    /// Captured stderr, truncated to 1KB (CommandExecuted events only).
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub cmd_stderr: Option<String>,
134    /// Exit code of the command (CommandExecuted events only).
135    #[serde(skip_serializing_if = "Option::is_none")]
136    pub cmd_exit_code: Option<i32>,
137    /// Auto-corrected command, if correction was applied (future Phase 2).
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub cmd_corrected: Option<String>,
140    /// OOV ratio from LLMOSafe (for CognitiveSafetyViolation).
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub oov_ratio: Option<u8>,
143    /// Detection flags from LLMOSafe (for CognitiveSafetyViolation).
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub detection_flags: Option<u8>,
146}
147
148/// Types of WAL events, corresponding to job lifecycle stages
149/// and command executions.
150#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
151#[serde(rename_all = "snake_case")]
152// Additional variants may be added in future versions; the enum is
153// intentionally non-exhaustive for forward compatibility.
154#[allow(clippy::exhaustive_enums)]
155pub enum WalEventType {
156    /// Job has been submitted to the system.
157    #[default]
158    JobSubmitted,
159    /// Job arguments passed validation.
160    JobValidated,
161    /// Job execution has started.
162    JobStarted,
163    /// Job completed successfully.
164    JobCompleted,
165    /// Job failed during validation or execution.
166    JobFailed,
167    /// A completed job was rolled back.
168    JobRolledBack,
169    /// Shell command executed (dev-only logging for error absorption).
170    CommandExecuted,
171}
172
173/// Append-only WAL writer.
174///
175/// Opens (or creates) a file in append mode and writes one JSONL line per
176/// event, using file locking for concurrent access safety and `fsync` after
177/// each write for durability.
178///
179/// # Example
180///
181/// ```rust,ignore
182/// use runtimo_core::{WalWriter, WalEvent, WalEventType};
183/// use std::path::Path;
184///
185/// let mut wal = WalWriter::create(Path::new("/tmp/app.wal")).unwrap();
186/// wal.append(WalEvent {
187///     seq: 0, ts: 1715800000,
188///     event_type: WalEventType::JobStarted,
189///     job_id: "job1".into(),
190///     capability: None, output: None, error: None,
191/// }).unwrap();
192/// ```
193#[allow(clippy::exhaustive_structs)]
194pub struct WalWriter {
195    path: std::path::PathBuf,
196    seq: u64,
197}
198
199impl WalWriter {
200    /// Creates or opens a WAL file at the given path.
201    ///
202    /// The file is opened in append mode. Existing content is preserved.
203    ///
204    /// # Errors
205    ///
206    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
207    /// be created or opened.
208    pub fn create(path: &Path) -> Result<Self> {
209        // Ensure parent directory exists
210        if let Some(parent) = path.parent() {
211            if !parent.exists() {
212                std::fs::create_dir_all(parent).map_err(|e| {
213                    crate::Error::WalError(format!(
214                        "Failed to create WAL directory {}: {}",
215                        parent.display(),
216                        e
217                    ))
218                })?;
219            }
220        }
221
222        // Create the file if it doesn't exist
223        if !path.exists() {
224            std::fs::File::create(path).map_err(|e| {
225                crate::Error::WalError(format!(
226                    "Failed to create WAL file {}: {}",
227                    path.display(),
228                    e
229                ))
230            })?;
231        }
232
233        // Recover sequence from existing WAL content to ensure monotonic
234        // ordering across process restarts. Acquire lock to prevent reading
235        // during a concurrent write (P2 FIX).
236        //
237        // Optimized: reads only the last 8KB of the WAL file to find the
238        // max seq. Falls back to full read only if tail-read fails.
239        let seq = if path.exists() {
240            let lock_file = std::fs::File::open(path)
241                .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
242            Self::lock_file(&lock_file)?;
243            let recovered = if let Some(last_seq) = read_last_seq(path, 8192) {
244                last_seq + 1
245            } else {
246                // Fall back to full scan if tail-read failed
247                let content = std::fs::read_to_string(path).map_err(|e| {
248                    crate::Error::WalError(format!("read WAL for seq recovery: {}", e))
249                })?;
250                content
251                    .lines()
252                    .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
253                    .map(|e| e.seq)
254                    .max()
255                    .map_or(0, |max| max + 1)
256            };
257            Self::unlock_file(&lock_file);
258            recovered
259        } else {
260            0
261        };
262
263        Ok(Self {
264            path: path.to_path_buf(),
265            seq,
266        })
267    }
268
269    /// Acquires an exclusive file lock for writing (FINDING #14).
270    #[cfg(unix)]
271    fn lock_file(file: &std::fs::File) -> Result<()> {
272        use std::os::unix::io::AsRawFd;
273        let fd = file.as_raw_fd();
274        // SAFETY: fd is a valid open file descriptor; LOCK_EX is a well-defined operation
275        let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
276        if result != 0 {
277            return Err(crate::Error::WalError(format!(
278                "Failed to acquire WAL lock: {}",
279                std::io::Error::last_os_error()
280            )));
281        }
282        Ok(())
283    }
284
285    /// Acquires an exclusive file lock (no-op on non-unix).
286    #[cfg(not(unix))]
287    fn lock_file(_file: &std::fs::File) -> Result<()> {
288        Ok(())
289    }
290
291    /// Releases an exclusive file lock (FINDING #14).
292    #[cfg(unix)]
293    fn unlock_file(file: &std::fs::File) {
294        use std::os::unix::io::AsRawFd;
295        let fd = file.as_raw_fd();
296        // SAFETY: fd is a valid open file descriptor; LOCK_UN is a well-defined operation
297        unsafe { libc::flock(fd, libc::LOCK_UN) };
298    }
299
300    /// Releases an exclusive file lock (no-op on non-unix).
301    #[cfg(not(unix))]
302    fn unlock_file(_file: &std::fs::File) {}
303
304    /// Appends an event to the WAL using true append mode (P0 FIX).
305    ///
306    /// Opens the file in append mode, acquires an exclusive lock, writes the
307    /// JSONL line, fsyncs, then releases the lock. This is O(1) per write
308    /// instead of O(N) read-rewrite, and the lock is held during the entire
309    /// write operation preventing concurrent write loss.
310    ///
311    /// Increments the internal sequence counter after a successful write.
312    ///
313    /// # Errors
314    ///
315    /// Returns [`Error::WalError`](crate::Error::WalError) on serialization
316    /// or I/O failure.
317    pub fn append(&mut self, event: WalEvent) -> Result<()> {
318        use std::io::Write;
319        let line =
320            serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
321
322        // Open in append mode — no read-rewrite, O(1) per write
323        let file = std::fs::OpenOptions::new()
324            .create(true)
325            .append(true)
326            .open(&self.path)
327            .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
328
329        // Hold exclusive lock during entire write
330        Self::lock_file(&file)?;
331        {
332            let mut buf = std::io::BufWriter::new(&file);
333            writeln!(buf, "{}", line)
334                .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
335            buf.flush()
336                .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
337            file.sync_all()
338                .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
339        }
340        Self::unlock_file(&file);
341
342        self.seq += 1;
343        Ok(())
344    }
345
346    /// Returns the current sequence number (next event will use this value).
347    #[must_use]
348    pub fn seq(&self) -> u64 {
349        self.seq
350    }
351}
352
353/// Reads and parses a WAL file into a list of events.
354///
355/// Malformed lines are silently skipped. This is intentional — partial writes
356/// from crashes may leave incomplete JSON at the end of the file.
357///
358/// # Single-File vs Multi-File Loading
359///
360/// - [`WalReader::load`] reads only the current (active) WAL file.
361/// - [`WalReader::load_all`] reads the current file PLUS all archived rotation
362///   files (`{path}.1`, `{path}.2`, ...) in chronological order (oldest first,
363///   newest last). Use `load_all` when consumers need complete event history
364///   across WAL rotations.
365///
366/// # Example
367///
368/// ```rust,ignore
369/// use runtimo_core::WalReader;
370/// use std::path::Path;
371///
372/// let reader = WalReader::load(Path::new("/tmp/app.wal")).unwrap();
373/// for event in reader.events() {
374///     println!("Event: {:?} for job {}", event.event_type, event.job_id);
375/// }
376/// ```
377#[allow(clippy::exhaustive_structs)]
378pub struct WalReader {
379    events: Vec<WalEvent>,
380}
381
382impl WalReader {
383    /// Loads and parses all events from a single WAL file.
384    ///
385    /// Reads only the file at `path`. If WAL rotation has occurred, events
386    /// in archived files (`{path}.1`, `{path}.2`, ...) are NOT included.
387    /// Use [`WalReader::load_all`] when complete event history is needed.
388    ///
389    /// # Errors
390    ///
391    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
392    /// be read. Individual malformed lines are skipped, not treated as errors.
393    pub fn load(path: &Path) -> Result<Self> {
394        let content =
395            std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
396
397        let events: Vec<WalEvent> = content
398            .lines()
399            .filter_map(|line| serde_json::from_str(line).ok())
400            .collect();
401
402        Ok(Self { events })
403    }
404
405    /// Loads and parses all events from the current WAL file and all archived
406    /// rotation files.
407    ///
408    /// Reads archived files (`{path}.1`, `{path}.2`, ..., `{path}.N`) first
409    /// in reverse index order (newest archive → oldest archive), then the
410    /// current file. This produces events in chronological order (oldest first,
411    /// newest last) — the order needed by consumers that must see the full
412    /// event history across WAL rotations.
413    ///
414    /// Archived files that do not exist are silently skipped. Malformed lines
415    /// within any file are silently skipped (same behavior as [`Self::load`]).
416    ///
417    /// # Errors
418    ///
419    /// Returns [`Error::WalError`](crate::Error::WalError) if the current WAL
420    /// file at `path` cannot be read. Archived file read failures are logged
421    /// and skipped (non-fatal).
422    pub fn load_all(path: &Path) -> Result<Self> {
423        // Discover archived rotation files by scanning for numeric suffixes.
424        // Rotation naming: path.1, path.2, ..., path.N
425        // We read in reverse order (newest archive first) so events end up
426        // in chronological order after we append the current file last.
427        let mut archived_indices: Vec<usize> = Vec::new();
428        if let Some(parent) = path.parent() {
429            let dir_name = path
430                .file_name()
431                .map(|n| n.to_string_lossy().into_owned())
432                .unwrap_or_default();
433            if let Ok(entries) = std::fs::read_dir(parent) {
434                for entry in entries.flatten() {
435                    let name = entry.file_name().to_string_lossy().into_owned();
436                    if name.starts_with(&dir_name) {
437                        let suffix = &name[dir_name.len()..];
438                        if let Some(index_str) = suffix.strip_prefix('.') {
439                            if let Ok(index) = index_str.parse::<usize>() {
440                                if index > 0 {
441                                    archived_indices.push(index);
442                                }
443                            }
444                        }
445                    }
446                }
447            }
448        }
449        // Sort descending so we read newest archive first, oldest last.
450        // After prepending these to the event list, the current file's
451        // events are appended last, preserving chronological order.
452        archived_indices.sort_unstable_by(|a, b| b.cmp(a));
453
454        let mut events: Vec<WalEvent> = Vec::new();
455
456        // Read archived files (newest archive first → oldest archive last)
457        for index in &archived_indices {
458            let rotated = rotation_path_for(path, *index);
459            if let Ok(content) = std::fs::read_to_string(&rotated) {
460                let file_events: Vec<WalEvent> = content
461                    .lines()
462                    .filter_map(|line| serde_json::from_str(line).ok())
463                    .collect();
464                // Prepend: archived events are older, so they go before
465                // whatever we've already collected.
466                let mut combined = file_events;
467                combined.extend(events);
468                events = combined;
469            }
470        }
471
472        // Read the current (active) WAL file
473        let content =
474            std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
475        let current_events: Vec<WalEvent> = content
476            .lines()
477            .filter_map(|line| serde_json::from_str(line).ok())
478            .collect();
479        events.extend(current_events);
480
481        Ok(Self { events })
482    }
483
484    /// Returns a slice of all parsed events.
485    #[must_use]
486    pub fn events(&self) -> &[WalEvent] {
487        &self.events
488    }
489
490    /// Reads only the last `n` lines from the WAL file.
491    ///
492    /// More efficient than [`WalReader::load`] when only recent events are needed.
493    /// Malformed lines are silently skipped.
494    ///
495    /// # Errors
496    ///
497    /// Returns [`Error::WalError`](crate::Error::WalError) if the file cannot
498    /// be read.
499    pub fn tail(path: &Path, n: usize) -> Result<Self> {
500        use std::collections::VecDeque;
501        use std::io::{BufRead, BufReader};
502        let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
503        let reader = BufReader::new(file);
504
505        let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
506        for line in reader.lines() {
507            let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
508            if let Ok(event) = serde_json::from_str(&line) {
509                window.push_back(event);
510                if window.len() > n {
511                    window.pop_front();
512                }
513            }
514        }
515
516        Ok(Self {
517            events: window.into(),
518        })
519    }
520}
521
522/// Computes the rotation path for a WAL file and rotation index.
523///
524/// For `foo.jsonl` with index `1`, returns `foo.jsonl.1` (preserves the
525/// original extension instead of replacing it).
526fn rotation_path_for(path: &Path, index: usize) -> std::path::PathBuf {
527    let mut s = path.to_string_lossy().into_owned();
528    s.push('.');
529    s.push_str(&index.to_string());
530    std::path::PathBuf::from(s)
531}
532
533/// WAL cleanup and rotation utilities.
534impl WalWriter {
535    /// Returns the rotation path for a given WAL file and rotation index.
536    ///
537    /// For `foo.jsonl` with index `1`, returns `foo.jsonl.1` (preserves the
538    /// original extension instead of replacing it).
539    fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
540        rotation_path_for(path, index)
541    }
542
543    /// Rotates the WAL when it exceeds max_size_bytes.
544    ///
545    /// Moves the current WAL to `{path}.1` (shifting older rotations),
546    /// then creates a fresh empty WAL. Keeps at most `max_rotations` old files.
547    /// FINDING #15: basic WAL rotation to prevent unbounded growth.
548    /// P1 FIX: Acquires exclusive lock to prevent concurrent append loss during rotation.
549    ///
550    /// # Errors
551    /// Returns `IoError` or `BackupError` if WAL file operations fail.
552    pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
553        let Ok(metadata) = std::fs::metadata(path) else {
554            return Ok(()); // No WAL to rotate
555        };
556
557        if metadata.len() < max_size_bytes {
558            return Ok(());
559        }
560
561        // Acquire lock to prevent concurrent append during rotation
562        let lock_file = std::fs::File::open(path)
563            .map_err(|e| crate::Error::WalError(format!("open WAL for rotation: {}", e)))?;
564        Self::lock_file(&lock_file)?;
565
566        // Shift existing rotations (P1 FIX: proper naming preserves extension)
567        for i in (1..max_rotations).rev() {
568            let old = Self::rotation_path(path, i);
569            let new = Self::rotation_path(path, i + 1);
570            if old.exists() {
571                if let Err(e) = std::fs::rename(&old, &new) {
572                    log::error!(
573                        "WAL rotation shift failed: {} -> {}: {}",
574                        old.display(),
575                        new.display(),
576                        e
577                    );
578                }
579            }
580        }
581
582        // Move current to .1
583        let rotated = Self::rotation_path(path, 1);
584        std::fs::rename(path, &rotated)
585            .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
586
587        // Create fresh empty WAL
588        std::fs::write(path, "")
589            .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
590
591        // Remove oldest rotation if exceeding max
592        let oldest = Self::rotation_path(path, max_rotations + 1);
593        if oldest.exists() {
594            if let Err(e) = std::fs::remove_file(&oldest) {
595                log::error!("WAL cleanup failed: {}: {}", oldest.display(), e);
596            }
597        }
598
599        Self::unlock_file(&lock_file);
600        Ok(())
601    }
602
603    /// Cleans up WAL entries older than max_age_secs.
604    ///
605    /// Writes retained events to a temporary file, then atomically renames
606    /// it over the original WAL. This prevents event loss if another writer
607    /// appends during cleanup.
608    ///
609    /// # Arguments
610    /// * `path` - Path to WAL file
611    /// * `max_age_secs` - Maximum age in seconds
612    ///
613    /// # Returns
614    /// * `Ok(usize)` - Number of entries removed
615    ///
616    /// # Errors
617    /// Returns `IoError` if the WAL file cannot be read, the temp file cannot
618    /// be written, or the atomic rename fails.
619    pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
620        use std::time::{SystemTime, UNIX_EPOCH};
621
622        let cutoff = SystemTime::now()
623            .duration_since(UNIX_EPOCH)
624            .map_or(0, |d| d.as_secs())
625            .saturating_sub(max_age_secs);
626
627        // Read all events under lock to prevent concurrent append loss (P1 FIX)
628        let lock_file = std::fs::File::open(path)
629            .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
630        Self::lock_file(&lock_file)?;
631        let content = std::fs::read_to_string(path)
632            .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
633
634        let all_events: Vec<WalEvent> = content
635            .lines()
636            .filter_map(|line| serde_json::from_str(line).ok())
637            .collect();
638
639        let total = all_events.len();
640
641        let retained: Vec<_> = all_events.into_iter().filter(|e| e.ts >= cutoff).collect();
642        let removed = total - retained.len();
643
644        if removed > 0 {
645            // Write retained events to temp file, then merge any events appended
646            // by concurrent writers during this cleanup window, then atomic rename.
647            let temp_path = path.with_extension("wal.tmp");
648            {
649                let mut new_wal = Self::create(&temp_path)?;
650                for event in &retained {
651                    new_wal.append(event.clone())?;
652                }
653
654                // Re-read the original WAL to catch any events appended during cleanup.
655                // Lock is still held from above, so no concurrent writer can interleave.
656                let last_seq = retained.last().map_or(0, |e| e.seq);
657                let current_content = std::fs::read_to_string(path).map_err(|e| {
658                    crate::Error::WalError(format!("re-read WAL during cleanup: {}", e))
659                })?;
660                for line in current_content.lines() {
661                    if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
662                        if event.seq > last_seq {
663                            new_wal.append(event)?;
664                        }
665                    }
666                }
667            }
668            // Release lock before rename
669            Self::unlock_file(&lock_file);
670            // Atomic rename replaces original — no window for lost events
671            std::fs::rename(&temp_path, path).map_err(|e| {
672                crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
673            })?;
674        } else {
675            Self::unlock_file(&lock_file);
676        }
677
678        Ok(removed)
679    }
680}
681
682/// Truncates a string to at most `max_bytes` bytes, respecting UTF-8 boundaries.
683///
684/// Used to bound command output stored in WAL events. 1KB is sufficient
685/// for error messages and pattern analysis while preventing WAL bloat.
686#[must_use]
687pub fn truncate_to(s: &str, max_bytes: usize) -> String {
688    if s.len() <= max_bytes {
689        return s.to_string();
690    }
691    let mut end = max_bytes;
692    while end > 0 && !s.is_char_boundary(end) {
693        end -= 1;
694    }
695    let mut truncated = s[..end].to_string();
696    truncated.push_str("...[truncated]");
697    truncated
698}
699
700#[cfg(test)]
701// Allow unwrap_used and indexing_slicing in tests as panicking on failure is desired test behavior.
702#[allow(
703    clippy::unwrap_used,
704    clippy::indexing_slicing,
705    clippy::items_after_statements
706)]
707mod tests {
708    use super::*;
709
710    fn tmp_wal(name: &str) -> std::path::PathBuf {
711        std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
712    }
713
714    #[test]
715    fn test_wal_write_and_read() {
716        let path = tmp_wal("write_read");
717        let _ = std::fs::remove_file(&path);
718
719        let mut wal = WalWriter::create(&path).unwrap();
720        wal.append(WalEvent {
721            seq: 0,
722            ts: 1715800000,
723            event_type: WalEventType::JobStarted,
724            job_id: "test-job".into(),
725            capability: Some("FileRead".into()),
726            output: None,
727            error: None,
728            telemetry_before: None,
729            telemetry_after: None,
730            process_before: None,
731            process_after: None,
732            cmd: None,
733            cmd_stdout: None,
734            cmd_stderr: None,
735            cmd_exit_code: None,
736            cmd_corrected: None,
737            ..Default::default()
738        })
739        .unwrap();
740
741        let reader = WalReader::load(&path).unwrap();
742        assert_eq!(reader.events().len(), 1);
743        assert_eq!(reader.events()[0].job_id, "test-job");
744
745        let _ = std::fs::remove_file(&path);
746    }
747
748    #[test]
749    fn test_wal_seq_recovery() {
750        let path = tmp_wal("seq_recovery");
751        let _ = std::fs::remove_file(&path);
752
753        let mut wal = WalWriter::create(&path).unwrap();
754        assert_eq!(wal.seq(), 0);
755        wal.append(WalEvent {
756            seq: 0,
757            ts: 1715800000,
758            event_type: WalEventType::JobStarted,
759            job_id: "job1".into(),
760            capability: None,
761            output: None,
762            error: None,
763            telemetry_before: None,
764            telemetry_after: None,
765            process_before: None,
766            process_after: None,
767            cmd: None,
768            cmd_stdout: None,
769            cmd_stderr: None,
770            cmd_exit_code: None,
771            cmd_corrected: None,
772            ..Default::default()
773        })
774        .unwrap();
775        assert_eq!(wal.seq(), 1);
776
777        // Create new writer — should recover seq from file
778        let wal2 = WalWriter::create(&path).unwrap();
779        assert_eq!(wal2.seq(), 1);
780
781        let _ = std::fs::remove_file(&path);
782    }
783
784    #[test]
785    fn test_wal_rotation() {
786        let path = tmp_wal("rotation");
787        let _ = std::fs::remove_file(&path);
788
789        // Write enough data to trigger rotation
790        let mut wal = WalWriter::create(&path).unwrap();
791        for i in 0..100 {
792            wal.append(WalEvent {
793                seq: i,
794                ts: 1715800000 + i,
795                event_type: WalEventType::JobStarted,
796                job_id: format!("job-{}", i),
797                capability: None,
798                output: None,
799                error: None,
800                telemetry_before: None,
801                telemetry_after: None,
802                process_before: None,
803                process_after: None,
804                cmd: None,
805                cmd_stdout: None,
806                cmd_stderr: None,
807                cmd_exit_code: None,
808                cmd_corrected: None,
809                ..Default::default()
810            })
811            .unwrap();
812        }
813
814        let size = std::fs::metadata(&path).unwrap().len();
815        // Rotate with a threshold smaller than current size
816        WalWriter::rotate(&path, size - 1, 3).unwrap();
817
818        assert!(WalWriter::rotation_path(&path, 1).exists());
819        // New WAL should be empty
820        assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
821
822        let _ = std::fs::remove_file(&path);
823        let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
824    }
825
826    #[test]
827    fn test_wal_cleanup() {
828        let path = tmp_wal("cleanup");
829        let _ = std::fs::remove_file(&path);
830
831        let mut wal = WalWriter::create(&path).unwrap();
832        let now = std::time::SystemTime::now()
833            .duration_since(std::time::UNIX_EPOCH)
834            .unwrap()
835            .as_secs();
836
837        // Write old event
838        wal.append(WalEvent {
839            seq: 0,
840            ts: now - 1000,
841            event_type: WalEventType::JobStarted,
842            job_id: "old-job".into(),
843            capability: None,
844            output: None,
845            error: None,
846            telemetry_before: None,
847            telemetry_after: None,
848            process_before: None,
849            process_after: None,
850            cmd: None,
851            cmd_stdout: None,
852            cmd_stderr: None,
853            cmd_exit_code: None,
854            cmd_corrected: None,
855            ..Default::default()
856        })
857        .unwrap();
858
859        // Write recent event
860        wal.append(WalEvent {
861            seq: 1,
862            ts: now,
863            event_type: WalEventType::JobCompleted,
864            job_id: "new-job".into(),
865            capability: None,
866            output: None,
867            error: None,
868            telemetry_before: None,
869            telemetry_after: None,
870            process_before: None,
871            process_after: None,
872            cmd: None,
873            cmd_stdout: None,
874            cmd_stderr: None,
875            cmd_exit_code: None,
876            cmd_corrected: None,
877            ..Default::default()
878        })
879        .unwrap();
880
881        let removed = WalWriter::cleanup(&path, 500).unwrap();
882        assert_eq!(removed, 1); // Old event removed
883
884        let reader = WalReader::load(&path).unwrap();
885        assert_eq!(reader.events().len(), 1);
886        assert_eq!(reader.events()[0].job_id, "new-job");
887
888        let _ = std::fs::remove_file(&path);
889    }
890
891    #[test]
892    fn test_wal_skip_serializing_optional_fields() {
893        // FINDING #15: verify optional fields are skipped when None
894        let event = WalEvent {
895            seq: 0,
896            ts: 1715800000,
897            event_type: WalEventType::JobStarted,
898            job_id: "test".into(),
899            capability: None,
900            output: None,
901            error: None,
902            telemetry_before: None,
903            telemetry_after: None,
904            process_before: None,
905            process_after: None,
906            cmd: None,
907            cmd_stdout: None,
908            cmd_stderr: None,
909            cmd_exit_code: None,
910            cmd_corrected: None,
911            ..Default::default()
912        };
913
914        let json = serde_json::to_string(&event).unwrap();
915        assert!(!json.contains("capability"));
916        assert!(!json.contains("telemetry_before"));
917        assert!(!json.contains("process_before"));
918        assert!(!json.contains("cmd"));
919    }
920
921    #[test]
922    fn test_command_executed_event() {
923        let path = tmp_wal("cmd_exec");
924        let _ = std::fs::remove_file(&path);
925
926        let mut wal = WalWriter::create(&path).unwrap();
927        wal.append(WalEvent {
928            seq: 0,
929            ts: 1715800000,
930            event_type: WalEventType::CommandExecuted,
931            job_id: "job-cmd".into(),
932            capability: None,
933            output: None,
934            error: None,
935            telemetry_before: None,
936            telemetry_after: None,
937            process_before: None,
938            process_after: None,
939            cmd: Some("ls | hed -3".into()),
940            cmd_stdout: None,
941            cmd_stderr: Some("hed: command not found".into()),
942            cmd_exit_code: Some(127),
943            cmd_corrected: None,
944            ..Default::default()
945        })
946        .unwrap();
947
948        let reader = WalReader::load(&path).unwrap();
949        assert_eq!(reader.events().len(), 1);
950        assert_eq!(reader.events()[0].event_type, WalEventType::CommandExecuted);
951        assert_eq!(reader.events()[0].cmd.as_deref(), Some("ls | hed -3"));
952        assert_eq!(reader.events()[0].cmd_exit_code, Some(127));
953
954        let _ = std::fs::remove_file(&path);
955    }
956
957    #[test]
958    fn test_truncate_to() {
959        assert_eq!(truncate_to("hello", 1024), "hello");
960        assert_eq!(truncate_to("hello", 3), "hel...[truncated]");
961        let long = "a".repeat(2000);
962        let truncated = truncate_to(&long, 1024);
963        assert!(truncated.len() < 1100);
964        assert!(truncated.ends_with("...[truncated]"));
965    }
966
967    // ── GAP 9: WAL recovery from truncated file ──────────────────────
968
969    #[test]
970    fn test_wal_recovers_from_truncated_last_line() {
971        let path = tmp_wal("truncated");
972        let _ = std::fs::remove_file(&path);
973
974        // Write valid events
975        let mut wal = WalWriter::create(&path).unwrap();
976        wal.append(WalEvent {
977            seq: 0,
978            ts: 1000,
979            event_type: WalEventType::JobStarted,
980            job_id: "job1".into(),
981            capability: None,
982            output: None,
983            error: None,
984            telemetry_before: None,
985            telemetry_after: None,
986            process_before: None,
987            process_after: None,
988            cmd: None,
989            cmd_stdout: None,
990            cmd_stderr: None,
991            cmd_exit_code: None,
992            cmd_corrected: None,
993            ..Default::default()
994        })
995        .unwrap();
996
997        // Append a valid second event
998        wal.append(WalEvent {
999            seq: 1,
1000            ts: 1001,
1001            event_type: WalEventType::JobCompleted,
1002            job_id: "job1".into(),
1003            capability: None,
1004            output: None,
1005            error: None,
1006            telemetry_before: None,
1007            telemetry_after: None,
1008            process_before: None,
1009            process_after: None,
1010            cmd: None,
1011            cmd_stdout: None,
1012            cmd_stderr: None,
1013            cmd_exit_code: None,
1014            cmd_corrected: None,
1015            ..Default::default()
1016        })
1017        .unwrap();
1018
1019        // Verify 2 events exist
1020        let reader_before = WalReader::load(&path).unwrap();
1021        assert_eq!(reader_before.events().len(), 2);
1022
1023        // Simulate a crash by appending a partial line (truncated JSON)
1024        // Open file in append mode without WalWriter to write raw bytes
1025        use std::io::Write;
1026        let mut file = std::fs::OpenOptions::new()
1027            .append(true)
1028            .open(&path)
1029            .unwrap();
1030        // Write a partial JSON line (no newline, incomplete object)
1031        file.write_all(b"{\"seq\":2,\"ts\":1002,\"type\":\"job_started\",\"job_id\":\"truncated")
1032            .unwrap();
1033        file.flush().unwrap();
1034
1035        // Now load again — should skip the truncated last line and still read 2 valid events
1036        let reader_after = WalReader::load(&path).unwrap();
1037        assert_eq!(
1038            reader_after.events().len(),
1039            2,
1040            "Should skip truncated last line and read 2 valid events, got {}",
1041            reader_after.events().len()
1042        );
1043        assert_eq!(reader_after.events()[0].job_id, "job1");
1044        assert_eq!(reader_after.events()[1].job_id, "job1");
1045
1046        let _ = std::fs::remove_file(&path);
1047    }
1048
1049    #[test]
1050    fn test_wal_skips_garbage_lines() {
1051        let path = tmp_wal("garbage");
1052        let _ = std::fs::remove_file(&path);
1053
1054        // Write one valid event followed by a garbage line
1055        let mut wal = WalWriter::create(&path).unwrap();
1056        wal.append(WalEvent {
1057            seq: 0,
1058            ts: 1000,
1059            event_type: WalEventType::JobStarted,
1060            job_id: "valid".into(),
1061            capability: None,
1062            output: None,
1063            error: None,
1064            telemetry_before: None,
1065            telemetry_after: None,
1066            process_before: None,
1067            process_after: None,
1068            cmd: None,
1069            cmd_stdout: None,
1070            cmd_stderr: None,
1071            cmd_exit_code: None,
1072            cmd_corrected: None,
1073            ..Default::default()
1074        })
1075        .unwrap();
1076
1077        use std::io::Write;
1078        let mut file = std::fs::OpenOptions::new()
1079            .append(true)
1080            .open(&path)
1081            .unwrap();
1082        file.write_all(b"not valid json at all\n").unwrap();
1083        file.write_all(b"{\"seq\":999,\"type\":\"garbage\"}\n")
1084            .unwrap(); // partial, missing required fields
1085        file.flush().unwrap();
1086
1087        let reader = WalReader::load(&path).unwrap();
1088        assert_eq!(
1089            reader.events().len(),
1090            1,
1091            "Should only find 1 valid event, got {}",
1092            reader.events().len()
1093        );
1094        assert_eq!(reader.events()[0].job_id, "valid");
1095
1096        let _ = std::fs::remove_file(&path);
1097    }
1098
1099    #[test]
1100    fn test_wal_load_all_reads_archived_files() {
1101        // Regression test for WAL rotation ↔ consumer mismatch:
1102        // After rotation, WalReader::load_all must still see events
1103        // from the archived .1 file.
1104        let path = tmp_wal("load_all");
1105        let rotated = WalWriter::rotation_path(&path, 1);
1106        let _ = std::fs::remove_file(&path);
1107        let _ = std::fs::remove_file(&rotated);
1108
1109        // Write events to current WAL
1110        let mut wal = WalWriter::create(&path).unwrap();
1111        wal.append(WalEvent {
1112            seq: 0,
1113            ts: 1000,
1114            event_type: WalEventType::JobStarted,
1115            job_id: "archived-job".into(),
1116            capability: None,
1117            output: Some(serde_json::json!({
1118                "data": {
1119                    "path": "/tmp/original.txt",
1120                    "backup_path": "/tmp/backups/archived-job/test.txt"
1121                }
1122            })),
1123            error: None,
1124            telemetry_before: None,
1125            telemetry_after: None,
1126            process_before: None,
1127            process_after: None,
1128            cmd: None,
1129            cmd_stdout: None,
1130            cmd_stderr: None,
1131            cmd_exit_code: None,
1132            cmd_corrected: None,
1133            ..Default::default()
1134        })
1135        .unwrap();
1136
1137        // Rotate: moves events to .1, creates empty current file
1138        let size = std::fs::metadata(&path).unwrap().len();
1139        WalWriter::rotate(&path, size - 1, 3).unwrap();
1140        assert!(rotated.exists());
1141        assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
1142
1143        // Write new events to current WAL after rotation
1144        wal.append(WalEvent {
1145            seq: 0,
1146            ts: 2000,
1147            event_type: WalEventType::JobStarted,
1148            job_id: "current-job".into(),
1149            capability: Some("FileRead".into()),
1150            output: None,
1151            error: None,
1152            telemetry_before: None,
1153            telemetry_after: None,
1154            process_before: None,
1155            process_after: None,
1156            cmd: None,
1157            cmd_stdout: None,
1158            cmd_stderr: None,
1159            cmd_exit_code: None,
1160            cmd_corrected: None,
1161            ..Default::default()
1162        })
1163        .unwrap();
1164
1165        // WalReader::load (current only) should see 1 event
1166        let single = WalReader::load(&path).unwrap();
1167        assert_eq!(single.events().len(), 1);
1168        assert_eq!(single.events()[0].job_id, "current-job");
1169
1170        // WalReader::load_all should see BOTH events (archived + current)
1171        let all = WalReader::load_all(&path).unwrap();
1172        assert_eq!(
1173            all.events().len(),
1174            2,
1175            "load_all should see events from both current and .1 archived file"
1176        );
1177
1178        // Archived event should come first (chronological order)
1179        assert_eq!(all.events()[0].job_id, "archived-job");
1180        assert_eq!(all.events()[0].ts, 1000);
1181        assert_eq!(all.events()[1].job_id, "current-job");
1182        assert_eq!(all.events()[1].ts, 2000);
1183
1184        // Verify the backup_path mapping is accessible from load_all
1185        let backup_mapping: std::collections::HashMap<_, _> = all
1186            .events()
1187            .iter()
1188            .filter_map(|e| {
1189                e.output.as_ref().and_then(|o| {
1190                    let data = o.get("data")?;
1191                    let path = data.get("path")?.as_str()?;
1192                    let backup = data.get("backup_path")?.as_str()?;
1193                    Some((backup.to_string(), path.to_string()))
1194                })
1195            })
1196            .collect();
1197        assert_eq!(
1198            backup_mapping
1199                .get("/tmp/backups/archived-job/test.txt")
1200                .map(String::as_str),
1201            Some("/tmp/original.txt"),
1202            "load_all must expose backup→original mapping from archived WAL"
1203        );
1204
1205        let _ = std::fs::remove_file(&path);
1206        let _ = std::fs::remove_file(&rotated);
1207    }
1208}