1#![allow(clippy::arithmetic_side_effects)]
11use crate::processes::ProcessSummary;
32use crate::telemetry::Telemetry;
33use crate::Result;
34use serde::{Deserialize, Serialize};
35use std::path::Path;
36
37fn read_last_seq(path: &Path, tail_bytes: usize) -> Option<u64> {
44 use std::io::{Read, Seek, SeekFrom};
45
46 let mut file = std::fs::File::open(path).ok()?;
47 let file_len = file.metadata().ok()?.len();
48 if file_len == 0 {
49 return None;
50 }
51
52 let start = file_len.saturating_sub(tail_bytes as u64);
53 file.seek(SeekFrom::Start(start)).ok()?;
54 let mut buf = vec![
55 0u8;
56 usize::try_from(file_len - start)
57 .unwrap_or(0)
58 .saturating_add(1)
59 ];
60 let n = file.read(&mut buf).ok()?;
61 buf.truncate(n);
62
63 let lines: Vec<&[u8]> = buf
65 .split(|&b| b == b'\n')
66 .filter(|l| !l.is_empty())
67 .collect();
68
69 for line in lines.iter().rev() {
70 if let Ok(line_str) = std::str::from_utf8(line) {
71 if let Ok(event) = serde_json::from_str::<WalEvent>(line_str.trim()) {
72 return Some(event.seq);
73 }
74 }
75 }
76 None
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize, Default)]
92#[allow(clippy::exhaustive_structs)]
93pub struct WalEvent {
94 pub seq: u64,
96 pub ts: u64,
98 #[serde(rename = "type")]
100 pub event_type: WalEventType,
101 pub job_id: String,
103 #[serde(skip_serializing_if = "Option::is_none")]
105 pub capability: Option<String>,
106 #[serde(skip_serializing_if = "Option::is_none")]
108 pub output: Option<serde_json::Value>,
109 #[serde(skip_serializing_if = "Option::is_none")]
111 pub error: Option<String>,
112 #[serde(skip_serializing_if = "Option::is_none")]
114 pub telemetry_before: Option<Telemetry>,
115 #[serde(skip_serializing_if = "Option::is_none")]
117 pub telemetry_after: Option<Telemetry>,
118 #[serde(skip_serializing_if = "Option::is_none")]
120 pub process_before: Option<ProcessSummary>,
121 #[serde(skip_serializing_if = "Option::is_none")]
123 pub process_after: Option<ProcessSummary>,
124 #[serde(skip_serializing_if = "Option::is_none")]
126 pub cmd: Option<String>,
127 #[serde(skip_serializing_if = "Option::is_none")]
129 pub cmd_stdout: Option<String>,
130 #[serde(skip_serializing_if = "Option::is_none")]
132 pub cmd_stderr: Option<String>,
133 #[serde(skip_serializing_if = "Option::is_none")]
135 pub cmd_exit_code: Option<i32>,
136 #[serde(skip_serializing_if = "Option::is_none")]
138 pub cmd_corrected: Option<String>,
139 #[serde(skip_serializing_if = "Option::is_none")]
141 pub oov_ratio: Option<u8>,
142 #[serde(skip_serializing_if = "Option::is_none")]
144 pub detection_flags: Option<u8>,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
150#[serde(rename_all = "snake_case")]
151#[allow(clippy::exhaustive_enums)]
154pub enum WalEventType {
155 #[default]
157 JobSubmitted,
158 JobValidated,
160 JobStarted,
162 JobCompleted,
164 JobFailed,
166 JobRolledBack,
168 CommandExecuted,
170}
171
172#[allow(clippy::exhaustive_structs)]
193pub struct WalWriter {
194 path: std::path::PathBuf,
195 seq: u64,
196}
197
198impl WalWriter {
199 pub fn create(path: &Path) -> Result<Self> {
208 if let Some(parent) = path.parent() {
210 if !parent.exists() {
211 std::fs::create_dir_all(parent).map_err(|e| {
212 crate::Error::WalError(format!(
213 "Failed to create WAL directory {}: {}",
214 parent.display(),
215 e
216 ))
217 })?;
218 }
219 }
220
221 if !path.exists() {
223 std::fs::File::create(path).map_err(|e| {
224 crate::Error::WalError(format!(
225 "Failed to create WAL file {}: {}",
226 path.display(),
227 e
228 ))
229 })?;
230 }
231
232 let seq = if path.exists() {
239 let lock_file = std::fs::File::open(path)
240 .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
241 Self::lock_file(&lock_file)?;
242 let recovered = if let Some(last_seq) = read_last_seq(path, 8192) {
243 last_seq + 1
244 } else {
245 let content = std::fs::read_to_string(path).map_err(|e| {
247 crate::Error::WalError(format!("read WAL for seq recovery: {}", e))
248 })?;
249 content
250 .lines()
251 .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
252 .map(|e| e.seq)
253 .max()
254 .map_or(0, |max| max + 1)
255 };
256 Self::unlock_file(&lock_file);
257 recovered
258 } else {
259 0
260 };
261
262 Ok(Self {
263 path: path.to_path_buf(),
264 seq,
265 })
266 }
267
268 #[cfg(unix)]
270 fn lock_file(file: &std::fs::File) -> Result<()> {
271 use std::os::unix::io::AsRawFd;
272 let fd = file.as_raw_fd();
273 let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
275 if result != 0 {
276 return Err(crate::Error::WalError(format!(
277 "Failed to acquire WAL lock: {}",
278 std::io::Error::last_os_error()
279 )));
280 }
281 Ok(())
282 }
283
284 #[cfg(not(unix))]
286 fn lock_file(_file: &std::fs::File) -> Result<()> {
287 Ok(())
288 }
289
290 #[cfg(unix)]
292 fn unlock_file(file: &std::fs::File) {
293 use std::os::unix::io::AsRawFd;
294 let fd = file.as_raw_fd();
295 unsafe { libc::flock(fd, libc::LOCK_UN) };
297 }
298
299 #[cfg(not(unix))]
301 fn unlock_file(_file: &std::fs::File) {}
302
303 pub fn append(&mut self, event: WalEvent) -> Result<()> {
317 use std::io::Write;
318 let line =
319 serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
320
321 let file = std::fs::OpenOptions::new()
323 .create(true)
324 .append(true)
325 .open(&self.path)
326 .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
327
328 Self::lock_file(&file)?;
330 {
331 let mut buf = std::io::BufWriter::new(&file);
332 writeln!(buf, "{}", line)
333 .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
334 buf.flush()
335 .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
336 file.sync_all()
337 .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
338 }
339 Self::unlock_file(&file);
340
341 self.seq += 1;
342 Ok(())
343 }
344
345 #[must_use]
347 pub fn seq(&self) -> u64 {
348 self.seq
349 }
350}
351
352#[allow(clippy::exhaustive_structs)]
369pub struct WalReader {
370 events: Vec<WalEvent>,
371}
372
373impl WalReader {
374 pub fn load(path: &Path) -> Result<Self> {
381 let content =
382 std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
383
384 let events: Vec<WalEvent> = content
385 .lines()
386 .filter_map(|line| serde_json::from_str(line).ok())
387 .collect();
388
389 Ok(Self { events })
390 }
391
392 #[must_use]
394 pub fn events(&self) -> &[WalEvent] {
395 &self.events
396 }
397
398 pub fn tail(path: &Path, n: usize) -> Result<Self> {
408 use std::collections::VecDeque;
409 use std::io::{BufRead, BufReader};
410 let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
411 let reader = BufReader::new(file);
412
413 let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
414 for line in reader.lines() {
415 let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
416 if let Ok(event) = serde_json::from_str(&line) {
417 window.push_back(event);
418 if window.len() > n {
419 window.pop_front();
420 }
421 }
422 }
423
424 Ok(Self {
425 events: window.into(),
426 })
427 }
428}
429
430impl WalWriter {
432 fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
437 let mut s = path.to_string_lossy().into_owned();
438 s.push('.');
439 s.push_str(&index.to_string());
440 std::path::PathBuf::from(s)
441 }
442
443 pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
453 let Ok(metadata) = std::fs::metadata(path) else {
454 return Ok(()); };
456
457 if metadata.len() < max_size_bytes {
458 return Ok(());
459 }
460
461 let lock_file = std::fs::File::open(path)
463 .map_err(|e| crate::Error::WalError(format!("open WAL for rotation: {}", e)))?;
464 Self::lock_file(&lock_file)?;
465
466 for i in (1..max_rotations).rev() {
468 let old = Self::rotation_path(path, i);
469 let new = Self::rotation_path(path, i + 1);
470 if old.exists() {
471 let _ = std::fs::rename(&old, &new);
472 }
473 }
474
475 let rotated = Self::rotation_path(path, 1);
477 std::fs::rename(path, &rotated)
478 .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
479
480 std::fs::write(path, "")
482 .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
483
484 let oldest = Self::rotation_path(path, max_rotations + 1);
486 if oldest.exists() {
487 let _ = std::fs::remove_file(&oldest);
488 }
489
490 Self::unlock_file(&lock_file);
491 Ok(())
492 }
493
494 pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
511 use std::time::{SystemTime, UNIX_EPOCH};
512
513 let cutoff = SystemTime::now()
514 .duration_since(UNIX_EPOCH)
515 .map_or(0, |d| d.as_secs())
516 .saturating_sub(max_age_secs);
517
518 let lock_file = std::fs::File::open(path)
520 .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
521 Self::lock_file(&lock_file)?;
522 let content = std::fs::read_to_string(path)
523 .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
524
525 let all_events: Vec<WalEvent> = content
526 .lines()
527 .filter_map(|line| serde_json::from_str(line).ok())
528 .collect();
529
530 let total = all_events.len();
531
532 let retained: Vec<_> = all_events.into_iter().filter(|e| e.ts >= cutoff).collect();
533 let removed = total - retained.len();
534
535 if removed > 0 {
536 let temp_path = path.with_extension("wal.tmp");
539 {
540 let mut new_wal = Self::create(&temp_path)?;
541 for event in &retained {
542 new_wal.append(event.clone())?;
543 }
544
545 let last_seq = retained.last().map_or(0, |e| e.seq);
548 let current_content = std::fs::read_to_string(path).map_err(|e| {
549 crate::Error::WalError(format!("re-read WAL during cleanup: {}", e))
550 })?;
551 for line in current_content.lines() {
552 if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
553 if event.seq > last_seq {
554 new_wal.append(event)?;
555 }
556 }
557 }
558 }
559 Self::unlock_file(&lock_file);
561 std::fs::rename(&temp_path, path).map_err(|e| {
563 crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
564 })?;
565 } else {
566 Self::unlock_file(&lock_file);
567 }
568
569 Ok(removed)
570 }
571}
572
573#[must_use]
578pub fn truncate_to(s: &str, max_bytes: usize) -> String {
579 if s.len() <= max_bytes {
580 return s.to_string();
581 }
582 let mut end = max_bytes;
583 while end > 0 && !s.is_char_boundary(end) {
584 end -= 1;
585 }
586 let mut truncated = s[..end].to_string();
587 truncated.push_str("...[truncated]");
588 truncated
589}
590
591#[cfg(test)]
592#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
594mod tests {
595 use super::*;
596
597 fn tmp_wal(name: &str) -> std::path::PathBuf {
598 std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
599 }
600
601 #[test]
602 fn test_wal_write_and_read() {
603 let path = tmp_wal("write_read");
604 let _ = std::fs::remove_file(&path);
605
606 let mut wal = WalWriter::create(&path).unwrap();
607 wal.append(WalEvent {
608 seq: 0,
609 ts: 1715800000,
610 event_type: WalEventType::JobStarted,
611 job_id: "test-job".into(),
612 capability: Some("FileRead".into()),
613 output: None,
614 error: None,
615 telemetry_before: None,
616 telemetry_after: None,
617 process_before: None,
618 process_after: None,
619 cmd: None,
620 cmd_stdout: None,
621 cmd_stderr: None,
622 cmd_exit_code: None,
623 cmd_corrected: None,
624 ..Default::default()
625 })
626 .unwrap();
627
628 let reader = WalReader::load(&path).unwrap();
629 assert_eq!(reader.events().len(), 1);
630 assert_eq!(reader.events()[0].job_id, "test-job");
631
632 let _ = std::fs::remove_file(&path);
633 }
634
635 #[test]
636 fn test_wal_seq_recovery() {
637 let path = tmp_wal("seq_recovery");
638 let _ = std::fs::remove_file(&path);
639
640 let mut wal = WalWriter::create(&path).unwrap();
641 assert_eq!(wal.seq(), 0);
642 wal.append(WalEvent {
643 seq: 0,
644 ts: 1715800000,
645 event_type: WalEventType::JobStarted,
646 job_id: "job1".into(),
647 capability: None,
648 output: None,
649 error: None,
650 telemetry_before: None,
651 telemetry_after: None,
652 process_before: None,
653 process_after: None,
654 cmd: None,
655 cmd_stdout: None,
656 cmd_stderr: None,
657 cmd_exit_code: None,
658 cmd_corrected: None,
659 ..Default::default()
660 })
661 .unwrap();
662 assert_eq!(wal.seq(), 1);
663
664 let wal2 = WalWriter::create(&path).unwrap();
666 assert_eq!(wal2.seq(), 1);
667
668 let _ = std::fs::remove_file(&path);
669 }
670
671 #[test]
672 fn test_wal_rotation() {
673 let path = tmp_wal("rotation");
674 let _ = std::fs::remove_file(&path);
675
676 let mut wal = WalWriter::create(&path).unwrap();
678 for i in 0..100 {
679 wal.append(WalEvent {
680 seq: i,
681 ts: 1715800000 + i,
682 event_type: WalEventType::JobStarted,
683 job_id: format!("job-{}", i),
684 capability: None,
685 output: None,
686 error: None,
687 telemetry_before: None,
688 telemetry_after: None,
689 process_before: None,
690 process_after: None,
691 cmd: None,
692 cmd_stdout: None,
693 cmd_stderr: None,
694 cmd_exit_code: None,
695 cmd_corrected: None,
696 ..Default::default()
697 })
698 .unwrap();
699 }
700
701 let size = std::fs::metadata(&path).unwrap().len();
702 WalWriter::rotate(&path, size - 1, 3).unwrap();
704
705 assert!(WalWriter::rotation_path(&path, 1).exists());
706 assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
708
709 let _ = std::fs::remove_file(&path);
710 let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
711 }
712
713 #[test]
714 fn test_wal_cleanup() {
715 let path = tmp_wal("cleanup");
716 let _ = std::fs::remove_file(&path);
717
718 let mut wal = WalWriter::create(&path).unwrap();
719 let now = std::time::SystemTime::now()
720 .duration_since(std::time::UNIX_EPOCH)
721 .unwrap()
722 .as_secs();
723
724 wal.append(WalEvent {
726 seq: 0,
727 ts: now - 1000,
728 event_type: WalEventType::JobStarted,
729 job_id: "old-job".into(),
730 capability: None,
731 output: None,
732 error: None,
733 telemetry_before: None,
734 telemetry_after: None,
735 process_before: None,
736 process_after: None,
737 cmd: None,
738 cmd_stdout: None,
739 cmd_stderr: None,
740 cmd_exit_code: None,
741 cmd_corrected: None,
742 ..Default::default()
743 })
744 .unwrap();
745
746 wal.append(WalEvent {
748 seq: 1,
749 ts: now,
750 event_type: WalEventType::JobCompleted,
751 job_id: "new-job".into(),
752 capability: None,
753 output: None,
754 error: None,
755 telemetry_before: None,
756 telemetry_after: None,
757 process_before: None,
758 process_after: None,
759 cmd: None,
760 cmd_stdout: None,
761 cmd_stderr: None,
762 cmd_exit_code: None,
763 cmd_corrected: None,
764 ..Default::default()
765 })
766 .unwrap();
767
768 let removed = WalWriter::cleanup(&path, 500).unwrap();
769 assert_eq!(removed, 1); let reader = WalReader::load(&path).unwrap();
772 assert_eq!(reader.events().len(), 1);
773 assert_eq!(reader.events()[0].job_id, "new-job");
774
775 let _ = std::fs::remove_file(&path);
776 }
777
778 #[test]
779 fn test_wal_skip_serializing_optional_fields() {
780 let event = WalEvent {
782 seq: 0,
783 ts: 1715800000,
784 event_type: WalEventType::JobStarted,
785 job_id: "test".into(),
786 capability: None,
787 output: None,
788 error: None,
789 telemetry_before: None,
790 telemetry_after: None,
791 process_before: None,
792 process_after: None,
793 cmd: None,
794 cmd_stdout: None,
795 cmd_stderr: None,
796 cmd_exit_code: None,
797 cmd_corrected: None,
798 ..Default::default()
799 };
800
801 let json = serde_json::to_string(&event).unwrap();
802 assert!(!json.contains("capability"));
803 assert!(!json.contains("telemetry_before"));
804 assert!(!json.contains("process_before"));
805 assert!(!json.contains("cmd"));
806 }
807
808 #[test]
809 fn test_command_executed_event() {
810 let path = tmp_wal("cmd_exec");
811 let _ = std::fs::remove_file(&path);
812
813 let mut wal = WalWriter::create(&path).unwrap();
814 wal.append(WalEvent {
815 seq: 0,
816 ts: 1715800000,
817 event_type: WalEventType::CommandExecuted,
818 job_id: "job-cmd".into(),
819 capability: None,
820 output: None,
821 error: None,
822 telemetry_before: None,
823 telemetry_after: None,
824 process_before: None,
825 process_after: None,
826 cmd: Some("ls | hed -3".into()),
827 cmd_stdout: None,
828 cmd_stderr: Some("hed: command not found".into()),
829 cmd_exit_code: Some(127),
830 cmd_corrected: None,
831 ..Default::default()
832 })
833 .unwrap();
834
835 let reader = WalReader::load(&path).unwrap();
836 assert_eq!(reader.events().len(), 1);
837 assert_eq!(reader.events()[0].event_type, WalEventType::CommandExecuted);
838 assert_eq!(reader.events()[0].cmd.as_deref(), Some("ls | hed -3"));
839 assert_eq!(reader.events()[0].cmd_exit_code, Some(127));
840
841 let _ = std::fs::remove_file(&path);
842 }
843
844 #[test]
845 fn test_truncate_to() {
846 assert_eq!(truncate_to("hello", 1024), "hello");
847 assert_eq!(truncate_to("hello", 3), "hel...[truncated]");
848 let long = "a".repeat(2000);
849 let truncated = truncate_to(&long, 1024);
850 assert!(truncated.len() < 1100);
851 assert!(truncated.ends_with("...[truncated]"));
852 }
853
854 #[test]
857 fn test_wal_recovers_from_truncated_last_line() {
858 let path = tmp_wal("truncated");
859 let _ = std::fs::remove_file(&path);
860
861 let mut wal = WalWriter::create(&path).unwrap();
863 wal.append(WalEvent {
864 seq: 0,
865 ts: 1000,
866 event_type: WalEventType::JobStarted,
867 job_id: "job1".into(),
868 capability: None,
869 output: None,
870 error: None,
871 telemetry_before: None,
872 telemetry_after: None,
873 process_before: None,
874 process_after: None,
875 cmd: None,
876 cmd_stdout: None,
877 cmd_stderr: None,
878 cmd_exit_code: None,
879 cmd_corrected: None,
880 ..Default::default()
881 })
882 .unwrap();
883
884 wal.append(WalEvent {
886 seq: 1,
887 ts: 1001,
888 event_type: WalEventType::JobCompleted,
889 job_id: "job1".into(),
890 capability: None,
891 output: None,
892 error: None,
893 telemetry_before: None,
894 telemetry_after: None,
895 process_before: None,
896 process_after: None,
897 cmd: None,
898 cmd_stdout: None,
899 cmd_stderr: None,
900 cmd_exit_code: None,
901 cmd_corrected: None,
902 ..Default::default()
903 })
904 .unwrap();
905
906 let reader_before = WalReader::load(&path).unwrap();
908 assert_eq!(reader_before.events().len(), 2);
909
910 use std::io::Write;
913 let mut file = std::fs::OpenOptions::new()
914 .append(true)
915 .open(&path)
916 .unwrap();
917 file.write_all(b"{\"seq\":2,\"ts\":1002,\"type\":\"job_started\",\"job_id\":\"truncated")
919 .unwrap();
920 file.flush().unwrap();
921
922 let reader_after = WalReader::load(&path).unwrap();
924 assert_eq!(
925 reader_after.events().len(),
926 2,
927 "Should skip truncated last line and read 2 valid events, got {}",
928 reader_after.events().len()
929 );
930 assert_eq!(reader_after.events()[0].job_id, "job1");
931 assert_eq!(reader_after.events()[1].job_id, "job1");
932
933 let _ = std::fs::remove_file(&path);
934 }
935
936 #[test]
937 fn test_wal_skips_garbage_lines() {
938 let path = tmp_wal("garbage");
939 let _ = std::fs::remove_file(&path);
940
941 let mut wal = WalWriter::create(&path).unwrap();
943 wal.append(WalEvent {
944 seq: 0,
945 ts: 1000,
946 event_type: WalEventType::JobStarted,
947 job_id: "valid".into(),
948 capability: None,
949 output: None,
950 error: None,
951 telemetry_before: None,
952 telemetry_after: None,
953 process_before: None,
954 process_after: None,
955 cmd: None,
956 cmd_stdout: None,
957 cmd_stderr: None,
958 cmd_exit_code: None,
959 cmd_corrected: None,
960 ..Default::default()
961 })
962 .unwrap();
963
964 use std::io::Write;
965 let mut file = std::fs::OpenOptions::new()
966 .append(true)
967 .open(&path)
968 .unwrap();
969 file.write_all(b"not valid json at all\n").unwrap();
970 file.write_all(b"{\"seq\":999,\"type\":\"garbage\"}\n")
971 .unwrap(); file.flush().unwrap();
973
974 let reader = WalReader::load(&path).unwrap();
975 assert_eq!(
976 reader.events().len(),
977 1,
978 "Should only find 1 valid event, got {}",
979 reader.events().len()
980 );
981 assert_eq!(reader.events()[0].job_id, "valid");
982
983 let _ = std::fs::remove_file(&path);
984 }
985}