1#![allow(clippy::arithmetic_side_effects)]
11use crate::processes::ProcessSummary;
32use crate::telemetry::Telemetry;
33use crate::Result;
34
35use serde::{Deserialize, Serialize};
36use std::path::Path;
37
38fn read_last_seq(path: &Path, tail_bytes: usize) -> Option<u64> {
45 use std::io::{Read, Seek, SeekFrom};
46
47 let mut file = std::fs::File::open(path).ok()?;
48 let file_len = file.metadata().ok()?.len();
49 if file_len == 0 {
50 return None;
51 }
52
53 let start = file_len.saturating_sub(tail_bytes as u64);
54 file.seek(SeekFrom::Start(start)).ok()?;
55 let mut buf = vec![
56 0u8;
57 usize::try_from(file_len - start)
58 .unwrap_or(0)
59 .saturating_add(1)
60 ];
61 let n = file.read(&mut buf).ok()?;
62 buf.truncate(n);
63
64 let lines: Vec<&[u8]> = buf
66 .split(|&b| b == b'\n')
67 .filter(|l| !l.is_empty())
68 .collect();
69
70 for line in lines.iter().rev() {
71 if let Ok(line_str) = std::str::from_utf8(line) {
72 if let Ok(event) = serde_json::from_str::<WalEvent>(line_str.trim()) {
73 return Some(event.seq);
74 }
75 }
76 }
77 None
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize, Default)]
93#[allow(clippy::exhaustive_structs)]
94pub struct WalEvent {
95 pub seq: u64,
97 pub ts: u64,
99 #[serde(rename = "type")]
101 pub event_type: WalEventType,
102 pub job_id: String,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub capability: Option<String>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub output: Option<serde_json::Value>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub error: Option<String>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub telemetry_before: Option<Telemetry>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub telemetry_after: Option<Telemetry>,
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub process_before: Option<ProcessSummary>,
122 #[serde(skip_serializing_if = "Option::is_none")]
124 pub process_after: Option<ProcessSummary>,
125 #[serde(skip_serializing_if = "Option::is_none")]
127 pub cmd: Option<String>,
128 #[serde(skip_serializing_if = "Option::is_none")]
130 pub cmd_stdout: Option<String>,
131 #[serde(skip_serializing_if = "Option::is_none")]
133 pub cmd_stderr: Option<String>,
134 #[serde(skip_serializing_if = "Option::is_none")]
136 pub cmd_exit_code: Option<i32>,
137 #[serde(skip_serializing_if = "Option::is_none")]
139 pub cmd_corrected: Option<String>,
140 #[serde(skip_serializing_if = "Option::is_none")]
142 pub oov_ratio: Option<u8>,
143 #[serde(skip_serializing_if = "Option::is_none")]
145 pub detection_flags: Option<u8>,
146}
147
148#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
151#[serde(rename_all = "snake_case")]
152#[allow(clippy::exhaustive_enums)]
155pub enum WalEventType {
156 #[default]
158 JobSubmitted,
159 JobValidated,
161 JobStarted,
163 JobCompleted,
165 JobFailed,
167 JobRolledBack,
169 CommandExecuted,
171}
172
173#[allow(clippy::exhaustive_structs)]
194pub struct WalWriter {
195 path: std::path::PathBuf,
196 seq: u64,
197}
198
199impl WalWriter {
200 pub fn create(path: &Path) -> Result<Self> {
209 if let Some(parent) = path.parent() {
211 if !parent.exists() {
212 std::fs::create_dir_all(parent).map_err(|e| {
213 crate::Error::WalError(format!(
214 "Failed to create WAL directory {}: {}",
215 parent.display(),
216 e
217 ))
218 })?;
219 }
220 }
221
222 if !path.exists() {
224 std::fs::File::create(path).map_err(|e| {
225 crate::Error::WalError(format!(
226 "Failed to create WAL file {}: {}",
227 path.display(),
228 e
229 ))
230 })?;
231 }
232
233 let seq = if path.exists() {
240 let lock_file = std::fs::File::open(path)
241 .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
242 Self::lock_file(&lock_file)?;
243 let recovered = if let Some(last_seq) = read_last_seq(path, 8192) {
244 last_seq + 1
245 } else {
246 let content = std::fs::read_to_string(path).map_err(|e| {
248 crate::Error::WalError(format!("read WAL for seq recovery: {}", e))
249 })?;
250 content
251 .lines()
252 .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
253 .map(|e| e.seq)
254 .max()
255 .map_or(0, |max| max + 1)
256 };
257 Self::unlock_file(&lock_file);
258 recovered
259 } else {
260 0
261 };
262
263 Ok(Self {
264 path: path.to_path_buf(),
265 seq,
266 })
267 }
268
269 #[cfg(unix)]
271 fn lock_file(file: &std::fs::File) -> Result<()> {
272 use std::os::unix::io::AsRawFd;
273 let fd = file.as_raw_fd();
274 let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
276 if result != 0 {
277 return Err(crate::Error::WalError(format!(
278 "Failed to acquire WAL lock: {}",
279 std::io::Error::last_os_error()
280 )));
281 }
282 Ok(())
283 }
284
285 #[cfg(not(unix))]
287 fn lock_file(_file: &std::fs::File) -> Result<()> {
288 Ok(())
289 }
290
291 #[cfg(unix)]
293 fn unlock_file(file: &std::fs::File) {
294 use std::os::unix::io::AsRawFd;
295 let fd = file.as_raw_fd();
296 unsafe { libc::flock(fd, libc::LOCK_UN) };
298 }
299
300 #[cfg(not(unix))]
302 fn unlock_file(_file: &std::fs::File) {}
303
304 pub fn append(&mut self, event: WalEvent) -> Result<()> {
318 use std::io::Write;
319 let line =
320 serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
321
322 let file = std::fs::OpenOptions::new()
324 .create(true)
325 .append(true)
326 .open(&self.path)
327 .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
328
329 Self::lock_file(&file)?;
331 {
332 let mut buf = std::io::BufWriter::new(&file);
333 writeln!(buf, "{}", line)
334 .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
335 buf.flush()
336 .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
337 file.sync_all()
338 .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
339 }
340 Self::unlock_file(&file);
341
342 self.seq += 1;
343 Ok(())
344 }
345
346 #[must_use]
348 pub fn seq(&self) -> u64 {
349 self.seq
350 }
351}
352
353#[allow(clippy::exhaustive_structs)]
378pub struct WalReader {
379 events: Vec<WalEvent>,
380}
381
382impl WalReader {
383 pub fn load(path: &Path) -> Result<Self> {
394 let content =
395 std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
396
397 let events: Vec<WalEvent> = content
398 .lines()
399 .filter_map(|line| serde_json::from_str(line).ok())
400 .collect();
401
402 Ok(Self { events })
403 }
404
405 pub fn load_all(path: &Path) -> Result<Self> {
423 let mut archived_indices: Vec<usize> = Vec::new();
428 if let Some(parent) = path.parent() {
429 let dir_name = path
430 .file_name()
431 .map(|n| n.to_string_lossy().into_owned())
432 .unwrap_or_default();
433 if let Ok(entries) = std::fs::read_dir(parent) {
434 for entry in entries.flatten() {
435 let name = entry.file_name().to_string_lossy().into_owned();
436 if name.starts_with(&dir_name) {
437 let suffix = &name[dir_name.len()..];
438 if let Some(index_str) = suffix.strip_prefix('.') {
439 if let Ok(index) = index_str.parse::<usize>() {
440 if index > 0 {
441 archived_indices.push(index);
442 }
443 }
444 }
445 }
446 }
447 }
448 }
449 archived_indices.sort_unstable_by(|a, b| b.cmp(a));
453
454 let mut events: Vec<WalEvent> = Vec::new();
455
456 for index in &archived_indices {
458 let rotated = rotation_path_for(path, *index);
459 if let Ok(content) = std::fs::read_to_string(&rotated) {
460 let file_events: Vec<WalEvent> = content
461 .lines()
462 .filter_map(|line| serde_json::from_str(line).ok())
463 .collect();
464 let mut combined = file_events;
467 combined.extend(events);
468 events = combined;
469 }
470 }
471
472 let content =
474 std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
475 let current_events: Vec<WalEvent> = content
476 .lines()
477 .filter_map(|line| serde_json::from_str(line).ok())
478 .collect();
479 events.extend(current_events);
480
481 Ok(Self { events })
482 }
483
484 #[must_use]
486 pub fn events(&self) -> &[WalEvent] {
487 &self.events
488 }
489
490 pub fn tail(path: &Path, n: usize) -> Result<Self> {
500 use std::collections::VecDeque;
501 use std::io::{BufRead, BufReader};
502 let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
503 let reader = BufReader::new(file);
504
505 let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
506 for line in reader.lines() {
507 let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
508 if let Ok(event) = serde_json::from_str(&line) {
509 window.push_back(event);
510 if window.len() > n {
511 window.pop_front();
512 }
513 }
514 }
515
516 Ok(Self {
517 events: window.into(),
518 })
519 }
520}
521
522fn rotation_path_for(path: &Path, index: usize) -> std::path::PathBuf {
527 let mut s = path.to_string_lossy().into_owned();
528 s.push('.');
529 s.push_str(&index.to_string());
530 std::path::PathBuf::from(s)
531}
532
533impl WalWriter {
535 fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
540 rotation_path_for(path, index)
541 }
542
543 pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
553 let Ok(metadata) = std::fs::metadata(path) else {
554 return Ok(()); };
556
557 if metadata.len() < max_size_bytes {
558 return Ok(());
559 }
560
561 let lock_file = std::fs::File::open(path)
563 .map_err(|e| crate::Error::WalError(format!("open WAL for rotation: {}", e)))?;
564 Self::lock_file(&lock_file)?;
565
566 for i in (1..max_rotations).rev() {
568 let old = Self::rotation_path(path, i);
569 let new = Self::rotation_path(path, i + 1);
570 if old.exists() {
571 if let Err(e) = std::fs::rename(&old, &new) {
572 log::error!(
573 "WAL rotation shift failed: {} -> {}: {}",
574 old.display(),
575 new.display(),
576 e
577 );
578 }
579 }
580 }
581
582 let rotated = Self::rotation_path(path, 1);
584 std::fs::rename(path, &rotated)
585 .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
586
587 std::fs::write(path, "")
589 .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
590
591 let oldest = Self::rotation_path(path, max_rotations + 1);
593 if oldest.exists() {
594 if let Err(e) = std::fs::remove_file(&oldest) {
595 log::error!("WAL cleanup failed: {}: {}", oldest.display(), e);
596 }
597 }
598
599 Self::unlock_file(&lock_file);
600 Ok(())
601 }
602
603 pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
620 use std::time::{SystemTime, UNIX_EPOCH};
621
622 let cutoff = SystemTime::now()
623 .duration_since(UNIX_EPOCH)
624 .map_or(0, |d| d.as_secs())
625 .saturating_sub(max_age_secs);
626
627 let lock_file = std::fs::File::open(path)
629 .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
630 Self::lock_file(&lock_file)?;
631 let content = std::fs::read_to_string(path)
632 .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
633
634 let all_events: Vec<WalEvent> = content
635 .lines()
636 .filter_map(|line| serde_json::from_str(line).ok())
637 .collect();
638
639 let total = all_events.len();
640
641 let retained: Vec<_> = all_events.into_iter().filter(|e| e.ts >= cutoff).collect();
642 let removed = total - retained.len();
643
644 if removed > 0 {
645 let temp_path = path.with_extension("wal.tmp");
648 {
649 let mut new_wal = Self::create(&temp_path)?;
650 for event in &retained {
651 new_wal.append(event.clone())?;
652 }
653
654 let last_seq = retained.last().map_or(0, |e| e.seq);
657 let current_content = std::fs::read_to_string(path).map_err(|e| {
658 crate::Error::WalError(format!("re-read WAL during cleanup: {}", e))
659 })?;
660 for line in current_content.lines() {
661 if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
662 if event.seq > last_seq {
663 new_wal.append(event)?;
664 }
665 }
666 }
667 }
668 Self::unlock_file(&lock_file);
670 std::fs::rename(&temp_path, path).map_err(|e| {
672 crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
673 })?;
674 } else {
675 Self::unlock_file(&lock_file);
676 }
677
678 Ok(removed)
679 }
680}
681
682#[must_use]
687pub fn truncate_to(s: &str, max_bytes: usize) -> String {
688 if s.len() <= max_bytes {
689 return s.to_string();
690 }
691 let mut end = max_bytes;
692 while end > 0 && !s.is_char_boundary(end) {
693 end -= 1;
694 }
695 let mut truncated = s[..end].to_string();
696 truncated.push_str("...[truncated]");
697 truncated
698}
699
700#[cfg(test)]
701#[allow(
703 clippy::unwrap_used,
704 clippy::indexing_slicing,
705 clippy::items_after_statements
706)]
707mod tests {
708 use super::*;
709
710 fn tmp_wal(name: &str) -> std::path::PathBuf {
711 std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
712 }
713
714 #[test]
715 fn test_wal_write_and_read() {
716 let path = tmp_wal("write_read");
717 let _ = std::fs::remove_file(&path);
718
719 let mut wal = WalWriter::create(&path).unwrap();
720 wal.append(WalEvent {
721 seq: 0,
722 ts: 1715800000,
723 event_type: WalEventType::JobStarted,
724 job_id: "test-job".into(),
725 capability: Some("FileRead".into()),
726 output: None,
727 error: None,
728 telemetry_before: None,
729 telemetry_after: None,
730 process_before: None,
731 process_after: None,
732 cmd: None,
733 cmd_stdout: None,
734 cmd_stderr: None,
735 cmd_exit_code: None,
736 cmd_corrected: None,
737 ..Default::default()
738 })
739 .unwrap();
740
741 let reader = WalReader::load(&path).unwrap();
742 assert_eq!(reader.events().len(), 1);
743 assert_eq!(reader.events()[0].job_id, "test-job");
744
745 let _ = std::fs::remove_file(&path);
746 }
747
748 #[test]
749 fn test_wal_seq_recovery() {
750 let path = tmp_wal("seq_recovery");
751 let _ = std::fs::remove_file(&path);
752
753 let mut wal = WalWriter::create(&path).unwrap();
754 assert_eq!(wal.seq(), 0);
755 wal.append(WalEvent {
756 seq: 0,
757 ts: 1715800000,
758 event_type: WalEventType::JobStarted,
759 job_id: "job1".into(),
760 capability: None,
761 output: None,
762 error: None,
763 telemetry_before: None,
764 telemetry_after: None,
765 process_before: None,
766 process_after: None,
767 cmd: None,
768 cmd_stdout: None,
769 cmd_stderr: None,
770 cmd_exit_code: None,
771 cmd_corrected: None,
772 ..Default::default()
773 })
774 .unwrap();
775 assert_eq!(wal.seq(), 1);
776
777 let wal2 = WalWriter::create(&path).unwrap();
779 assert_eq!(wal2.seq(), 1);
780
781 let _ = std::fs::remove_file(&path);
782 }
783
784 #[test]
785 fn test_wal_rotation() {
786 let path = tmp_wal("rotation");
787 let _ = std::fs::remove_file(&path);
788
789 let mut wal = WalWriter::create(&path).unwrap();
791 for i in 0..100 {
792 wal.append(WalEvent {
793 seq: i,
794 ts: 1715800000 + i,
795 event_type: WalEventType::JobStarted,
796 job_id: format!("job-{}", i),
797 capability: None,
798 output: None,
799 error: None,
800 telemetry_before: None,
801 telemetry_after: None,
802 process_before: None,
803 process_after: None,
804 cmd: None,
805 cmd_stdout: None,
806 cmd_stderr: None,
807 cmd_exit_code: None,
808 cmd_corrected: None,
809 ..Default::default()
810 })
811 .unwrap();
812 }
813
814 let size = std::fs::metadata(&path).unwrap().len();
815 WalWriter::rotate(&path, size - 1, 3).unwrap();
817
818 assert!(WalWriter::rotation_path(&path, 1).exists());
819 assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
821
822 let _ = std::fs::remove_file(&path);
823 let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
824 }
825
826 #[test]
827 fn test_wal_cleanup() {
828 let path = tmp_wal("cleanup");
829 let _ = std::fs::remove_file(&path);
830
831 let mut wal = WalWriter::create(&path).unwrap();
832 let now = std::time::SystemTime::now()
833 .duration_since(std::time::UNIX_EPOCH)
834 .unwrap()
835 .as_secs();
836
837 wal.append(WalEvent {
839 seq: 0,
840 ts: now - 1000,
841 event_type: WalEventType::JobStarted,
842 job_id: "old-job".into(),
843 capability: None,
844 output: None,
845 error: None,
846 telemetry_before: None,
847 telemetry_after: None,
848 process_before: None,
849 process_after: None,
850 cmd: None,
851 cmd_stdout: None,
852 cmd_stderr: None,
853 cmd_exit_code: None,
854 cmd_corrected: None,
855 ..Default::default()
856 })
857 .unwrap();
858
859 wal.append(WalEvent {
861 seq: 1,
862 ts: now,
863 event_type: WalEventType::JobCompleted,
864 job_id: "new-job".into(),
865 capability: None,
866 output: None,
867 error: None,
868 telemetry_before: None,
869 telemetry_after: None,
870 process_before: None,
871 process_after: None,
872 cmd: None,
873 cmd_stdout: None,
874 cmd_stderr: None,
875 cmd_exit_code: None,
876 cmd_corrected: None,
877 ..Default::default()
878 })
879 .unwrap();
880
881 let removed = WalWriter::cleanup(&path, 500).unwrap();
882 assert_eq!(removed, 1); let reader = WalReader::load(&path).unwrap();
885 assert_eq!(reader.events().len(), 1);
886 assert_eq!(reader.events()[0].job_id, "new-job");
887
888 let _ = std::fs::remove_file(&path);
889 }
890
891 #[test]
892 fn test_wal_skip_serializing_optional_fields() {
893 let event = WalEvent {
895 seq: 0,
896 ts: 1715800000,
897 event_type: WalEventType::JobStarted,
898 job_id: "test".into(),
899 capability: None,
900 output: None,
901 error: None,
902 telemetry_before: None,
903 telemetry_after: None,
904 process_before: None,
905 process_after: None,
906 cmd: None,
907 cmd_stdout: None,
908 cmd_stderr: None,
909 cmd_exit_code: None,
910 cmd_corrected: None,
911 ..Default::default()
912 };
913
914 let json = serde_json::to_string(&event).unwrap();
915 assert!(!json.contains("capability"));
916 assert!(!json.contains("telemetry_before"));
917 assert!(!json.contains("process_before"));
918 assert!(!json.contains("cmd"));
919 }
920
921 #[test]
922 fn test_command_executed_event() {
923 let path = tmp_wal("cmd_exec");
924 let _ = std::fs::remove_file(&path);
925
926 let mut wal = WalWriter::create(&path).unwrap();
927 wal.append(WalEvent {
928 seq: 0,
929 ts: 1715800000,
930 event_type: WalEventType::CommandExecuted,
931 job_id: "job-cmd".into(),
932 capability: None,
933 output: None,
934 error: None,
935 telemetry_before: None,
936 telemetry_after: None,
937 process_before: None,
938 process_after: None,
939 cmd: Some("ls | hed -3".into()),
940 cmd_stdout: None,
941 cmd_stderr: Some("hed: command not found".into()),
942 cmd_exit_code: Some(127),
943 cmd_corrected: None,
944 ..Default::default()
945 })
946 .unwrap();
947
948 let reader = WalReader::load(&path).unwrap();
949 assert_eq!(reader.events().len(), 1);
950 assert_eq!(reader.events()[0].event_type, WalEventType::CommandExecuted);
951 assert_eq!(reader.events()[0].cmd.as_deref(), Some("ls | hed -3"));
952 assert_eq!(reader.events()[0].cmd_exit_code, Some(127));
953
954 let _ = std::fs::remove_file(&path);
955 }
956
957 #[test]
958 fn test_truncate_to() {
959 assert_eq!(truncate_to("hello", 1024), "hello");
960 assert_eq!(truncate_to("hello", 3), "hel...[truncated]");
961 let long = "a".repeat(2000);
962 let truncated = truncate_to(&long, 1024);
963 assert!(truncated.len() < 1100);
964 assert!(truncated.ends_with("...[truncated]"));
965 }
966
967 #[test]
970 fn test_wal_recovers_from_truncated_last_line() {
971 let path = tmp_wal("truncated");
972 let _ = std::fs::remove_file(&path);
973
974 let mut wal = WalWriter::create(&path).unwrap();
976 wal.append(WalEvent {
977 seq: 0,
978 ts: 1000,
979 event_type: WalEventType::JobStarted,
980 job_id: "job1".into(),
981 capability: None,
982 output: None,
983 error: None,
984 telemetry_before: None,
985 telemetry_after: None,
986 process_before: None,
987 process_after: None,
988 cmd: None,
989 cmd_stdout: None,
990 cmd_stderr: None,
991 cmd_exit_code: None,
992 cmd_corrected: None,
993 ..Default::default()
994 })
995 .unwrap();
996
997 wal.append(WalEvent {
999 seq: 1,
1000 ts: 1001,
1001 event_type: WalEventType::JobCompleted,
1002 job_id: "job1".into(),
1003 capability: None,
1004 output: None,
1005 error: None,
1006 telemetry_before: None,
1007 telemetry_after: None,
1008 process_before: None,
1009 process_after: None,
1010 cmd: None,
1011 cmd_stdout: None,
1012 cmd_stderr: None,
1013 cmd_exit_code: None,
1014 cmd_corrected: None,
1015 ..Default::default()
1016 })
1017 .unwrap();
1018
1019 let reader_before = WalReader::load(&path).unwrap();
1021 assert_eq!(reader_before.events().len(), 2);
1022
1023 use std::io::Write;
1026 let mut file = std::fs::OpenOptions::new()
1027 .append(true)
1028 .open(&path)
1029 .unwrap();
1030 file.write_all(b"{\"seq\":2,\"ts\":1002,\"type\":\"job_started\",\"job_id\":\"truncated")
1032 .unwrap();
1033 file.flush().unwrap();
1034
1035 let reader_after = WalReader::load(&path).unwrap();
1037 assert_eq!(
1038 reader_after.events().len(),
1039 2,
1040 "Should skip truncated last line and read 2 valid events, got {}",
1041 reader_after.events().len()
1042 );
1043 assert_eq!(reader_after.events()[0].job_id, "job1");
1044 assert_eq!(reader_after.events()[1].job_id, "job1");
1045
1046 let _ = std::fs::remove_file(&path);
1047 }
1048
1049 #[test]
1050 fn test_wal_skips_garbage_lines() {
1051 let path = tmp_wal("garbage");
1052 let _ = std::fs::remove_file(&path);
1053
1054 let mut wal = WalWriter::create(&path).unwrap();
1056 wal.append(WalEvent {
1057 seq: 0,
1058 ts: 1000,
1059 event_type: WalEventType::JobStarted,
1060 job_id: "valid".into(),
1061 capability: None,
1062 output: None,
1063 error: None,
1064 telemetry_before: None,
1065 telemetry_after: None,
1066 process_before: None,
1067 process_after: None,
1068 cmd: None,
1069 cmd_stdout: None,
1070 cmd_stderr: None,
1071 cmd_exit_code: None,
1072 cmd_corrected: None,
1073 ..Default::default()
1074 })
1075 .unwrap();
1076
1077 use std::io::Write;
1078 let mut file = std::fs::OpenOptions::new()
1079 .append(true)
1080 .open(&path)
1081 .unwrap();
1082 file.write_all(b"not valid json at all\n").unwrap();
1083 file.write_all(b"{\"seq\":999,\"type\":\"garbage\"}\n")
1084 .unwrap(); file.flush().unwrap();
1086
1087 let reader = WalReader::load(&path).unwrap();
1088 assert_eq!(
1089 reader.events().len(),
1090 1,
1091 "Should only find 1 valid event, got {}",
1092 reader.events().len()
1093 );
1094 assert_eq!(reader.events()[0].job_id, "valid");
1095
1096 let _ = std::fs::remove_file(&path);
1097 }
1098
1099 #[test]
1100 fn test_wal_load_all_reads_archived_files() {
1101 let path = tmp_wal("load_all");
1105 let rotated = WalWriter::rotation_path(&path, 1);
1106 let _ = std::fs::remove_file(&path);
1107 let _ = std::fs::remove_file(&rotated);
1108
1109 let mut wal = WalWriter::create(&path).unwrap();
1111 wal.append(WalEvent {
1112 seq: 0,
1113 ts: 1000,
1114 event_type: WalEventType::JobStarted,
1115 job_id: "archived-job".into(),
1116 capability: None,
1117 output: Some(serde_json::json!({
1118 "data": {
1119 "path": "/tmp/original.txt",
1120 "backup_path": "/tmp/backups/archived-job/test.txt"
1121 }
1122 })),
1123 error: None,
1124 telemetry_before: None,
1125 telemetry_after: None,
1126 process_before: None,
1127 process_after: None,
1128 cmd: None,
1129 cmd_stdout: None,
1130 cmd_stderr: None,
1131 cmd_exit_code: None,
1132 cmd_corrected: None,
1133 ..Default::default()
1134 })
1135 .unwrap();
1136
1137 let size = std::fs::metadata(&path).unwrap().len();
1139 WalWriter::rotate(&path, size - 1, 3).unwrap();
1140 assert!(rotated.exists());
1141 assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
1142
1143 wal.append(WalEvent {
1145 seq: 0,
1146 ts: 2000,
1147 event_type: WalEventType::JobStarted,
1148 job_id: "current-job".into(),
1149 capability: Some("FileRead".into()),
1150 output: None,
1151 error: None,
1152 telemetry_before: None,
1153 telemetry_after: None,
1154 process_before: None,
1155 process_after: None,
1156 cmd: None,
1157 cmd_stdout: None,
1158 cmd_stderr: None,
1159 cmd_exit_code: None,
1160 cmd_corrected: None,
1161 ..Default::default()
1162 })
1163 .unwrap();
1164
1165 let single = WalReader::load(&path).unwrap();
1167 assert_eq!(single.events().len(), 1);
1168 assert_eq!(single.events()[0].job_id, "current-job");
1169
1170 let all = WalReader::load_all(&path).unwrap();
1172 assert_eq!(
1173 all.events().len(),
1174 2,
1175 "load_all should see events from both current and .1 archived file"
1176 );
1177
1178 assert_eq!(all.events()[0].job_id, "archived-job");
1180 assert_eq!(all.events()[0].ts, 1000);
1181 assert_eq!(all.events()[1].job_id, "current-job");
1182 assert_eq!(all.events()[1].ts, 2000);
1183
1184 let backup_mapping: std::collections::HashMap<_, _> = all
1186 .events()
1187 .iter()
1188 .filter_map(|e| {
1189 e.output.as_ref().and_then(|o| {
1190 let data = o.get("data")?;
1191 let path = data.get("path")?.as_str()?;
1192 let backup = data.get("backup_path")?.as_str()?;
1193 Some((backup.to_string(), path.to_string()))
1194 })
1195 })
1196 .collect();
1197 assert_eq!(
1198 backup_mapping
1199 .get("/tmp/backups/archived-job/test.txt")
1200 .map(String::as_str),
1201 Some("/tmp/original.txt"),
1202 "load_all must expose backup→original mapping from archived WAL"
1203 );
1204
1205 let _ = std::fs::remove_file(&path);
1206 let _ = std::fs::remove_file(&rotated);
1207 }
1208}