1#![allow(clippy::arithmetic_side_effects)]
11use crate::processes::ProcessSummary;
32use crate::telemetry::Telemetry;
33use crate::Result;
34
35use serde::{Deserialize, Serialize};
36use std::path::Path;
37
38fn read_last_seq(path: &Path, tail_bytes: usize) -> Option<u64> {
45 use std::io::{Read, Seek, SeekFrom};
46
47 let mut file = std::fs::File::open(path).ok()?;
48 let file_len = file.metadata().ok()?.len();
49 if file_len == 0 {
50 return None;
51 }
52
53 let start = file_len.saturating_sub(tail_bytes as u64);
54 file.seek(SeekFrom::Start(start)).ok()?;
55 let mut buf = vec![
56 0u8;
57 usize::try_from(file_len - start)
58 .unwrap_or(0)
59 .saturating_add(1)
60 ];
61 let n = file.read(&mut buf).ok()?;
62 buf.truncate(n);
63
64 let lines: Vec<&[u8]> = buf
66 .split(|&b| b == b'\n')
67 .filter(|l| !l.is_empty())
68 .collect();
69
70 for line in lines.iter().rev() {
71 if let Ok(line_str) = std::str::from_utf8(line) {
72 if let Ok(event) = serde_json::from_str::<WalEvent>(line_str.trim()) {
73 return Some(event.seq);
74 }
75 }
76 }
77 None
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize, Default)]
93#[allow(clippy::exhaustive_structs)]
94pub struct WalEvent {
95 pub seq: u64,
97 pub ts: u64,
99 #[serde(rename = "type")]
101 pub event_type: WalEventType,
102 pub job_id: String,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub capability: Option<String>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub output: Option<serde_json::Value>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub error: Option<String>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub telemetry_before: Option<Telemetry>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub telemetry_after: Option<Telemetry>,
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub process_before: Option<ProcessSummary>,
122 #[serde(skip_serializing_if = "Option::is_none")]
124 pub process_after: Option<ProcessSummary>,
125 #[serde(skip_serializing_if = "Option::is_none")]
127 pub cmd: Option<String>,
128 #[serde(skip_serializing_if = "Option::is_none")]
130 pub cmd_stdout: Option<String>,
131 #[serde(skip_serializing_if = "Option::is_none")]
133 pub cmd_stderr: Option<String>,
134 #[serde(skip_serializing_if = "Option::is_none")]
136 pub cmd_exit_code: Option<i32>,
137 #[serde(skip_serializing_if = "Option::is_none")]
139 pub cmd_corrected: Option<String>,
140 #[serde(skip_serializing_if = "Option::is_none")]
142 pub oov_ratio: Option<u8>,
143 #[serde(skip_serializing_if = "Option::is_none")]
145 pub detection_flags: Option<u8>,
146}
147
148#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
151#[serde(rename_all = "snake_case")]
152#[allow(clippy::exhaustive_enums)]
155pub enum WalEventType {
156 #[default]
158 JobSubmitted,
159 JobValidated,
161 JobStarted,
163 JobCompleted,
165 JobFailed,
167 JobRolledBack,
169 CommandExecuted,
171}
172
173#[allow(clippy::exhaustive_structs)]
194pub struct WalWriter {
195 path: std::path::PathBuf,
196 seq: u64,
197}
198
199impl WalWriter {
200 pub fn create(path: &Path) -> Result<Self> {
209 if let Some(parent) = path.parent() {
211 if !parent.exists() {
212 std::fs::create_dir_all(parent).map_err(|e| {
213 crate::Error::WalError(format!(
214 "Failed to create WAL directory {}: {}",
215 parent.display(),
216 e
217 ))
218 })?;
219 }
220 }
221
222 if !path.exists() {
224 std::fs::File::create(path).map_err(|e| {
225 crate::Error::WalError(format!(
226 "Failed to create WAL file {}: {}",
227 path.display(),
228 e
229 ))
230 })?;
231 }
232
233 let seq = if path.exists() {
240 let lock_file = std::fs::File::open(path)
241 .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
242 Self::lock_file(&lock_file)?;
243 let recovered = if let Some(last_seq) = read_last_seq(path, 8192) {
244 last_seq + 1
245 } else {
246 let content = std::fs::read_to_string(path).map_err(|e| {
248 crate::Error::WalError(format!("read WAL for seq recovery: {}", e))
249 })?;
250 content
251 .lines()
252 .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
253 .map(|e| e.seq)
254 .max()
255 .map_or(0, |max| max + 1)
256 };
257 Self::unlock_file(&lock_file);
258 recovered
259 } else {
260 0
261 };
262
263 Ok(Self {
264 path: path.to_path_buf(),
265 seq,
266 })
267 }
268
269 #[cfg(unix)]
271 fn lock_file(file: &std::fs::File) -> Result<()> {
272 use std::os::unix::io::AsRawFd;
273 let fd = file.as_raw_fd();
274 let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
276 if result != 0 {
277 return Err(crate::Error::WalError(format!(
278 "Failed to acquire WAL lock: {}",
279 std::io::Error::last_os_error()
280 )));
281 }
282 Ok(())
283 }
284
285 #[cfg(not(unix))]
287 fn lock_file(_file: &std::fs::File) -> Result<()> {
288 Ok(())
289 }
290
291 #[cfg(unix)]
293 fn unlock_file(file: &std::fs::File) {
294 use std::os::unix::io::AsRawFd;
295 let fd = file.as_raw_fd();
296 unsafe { libc::flock(fd, libc::LOCK_UN) };
298 }
299
300 #[cfg(not(unix))]
302 fn unlock_file(_file: &std::fs::File) {}
303
304 pub fn append(&mut self, event: WalEvent) -> Result<()> {
318 use std::io::Write;
319 let line =
320 serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
321
322 let file = std::fs::OpenOptions::new()
324 .create(true)
325 .append(true)
326 .open(&self.path)
327 .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
328
329 Self::lock_file(&file)?;
331 {
332 let mut buf = std::io::BufWriter::new(&file);
333 writeln!(buf, "{}", line)
334 .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
335 buf.flush()
336 .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
337 file.sync_all()
338 .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
339 }
340 Self::unlock_file(&file);
341
342 self.seq += 1;
343 Ok(())
344 }
345
346 #[must_use]
348 pub fn seq(&self) -> u64 {
349 self.seq
350 }
351}
352
353#[allow(clippy::exhaustive_structs)]
370pub struct WalReader {
371 events: Vec<WalEvent>,
372}
373
374impl WalReader {
375 pub fn load(path: &Path) -> Result<Self> {
382 let content =
383 std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
384
385 let events: Vec<WalEvent> = content
386 .lines()
387 .filter_map(|line| serde_json::from_str(line).ok())
388 .collect();
389
390 Ok(Self { events })
391 }
392
393 #[must_use]
395 pub fn events(&self) -> &[WalEvent] {
396 &self.events
397 }
398
399 pub fn tail(path: &Path, n: usize) -> Result<Self> {
409 use std::collections::VecDeque;
410 use std::io::{BufRead, BufReader};
411 let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
412 let reader = BufReader::new(file);
413
414 let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
415 for line in reader.lines() {
416 let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
417 if let Ok(event) = serde_json::from_str(&line) {
418 window.push_back(event);
419 if window.len() > n {
420 window.pop_front();
421 }
422 }
423 }
424
425 Ok(Self {
426 events: window.into(),
427 })
428 }
429}
430
431impl WalWriter {
433 fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
438 let mut s = path.to_string_lossy().into_owned();
439 s.push('.');
440 s.push_str(&index.to_string());
441 std::path::PathBuf::from(s)
442 }
443
444 pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
454 let Ok(metadata) = std::fs::metadata(path) else {
455 return Ok(()); };
457
458 if metadata.len() < max_size_bytes {
459 return Ok(());
460 }
461
462 let lock_file = std::fs::File::open(path)
464 .map_err(|e| crate::Error::WalError(format!("open WAL for rotation: {}", e)))?;
465 Self::lock_file(&lock_file)?;
466
467 for i in (1..max_rotations).rev() {
469 let old = Self::rotation_path(path, i);
470 let new = Self::rotation_path(path, i + 1);
471 if old.exists() {
472 if let Err(e) = std::fs::rename(&old, &new) {
473 log::error!(
474 "WAL rotation shift failed: {} -> {}: {}",
475 old.display(),
476 new.display(),
477 e
478 );
479 }
480 }
481 }
482
483 let rotated = Self::rotation_path(path, 1);
485 std::fs::rename(path, &rotated)
486 .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
487
488 std::fs::write(path, "")
490 .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
491
492 let oldest = Self::rotation_path(path, max_rotations + 1);
494 if oldest.exists() {
495 if let Err(e) = std::fs::remove_file(&oldest) {
496 log::error!("WAL cleanup failed: {}: {}", oldest.display(), e);
497 }
498 }
499
500 Self::unlock_file(&lock_file);
501 Ok(())
502 }
503
504 pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
521 use std::time::{SystemTime, UNIX_EPOCH};
522
523 let cutoff = SystemTime::now()
524 .duration_since(UNIX_EPOCH)
525 .map_or(0, |d| d.as_secs())
526 .saturating_sub(max_age_secs);
527
528 let lock_file = std::fs::File::open(path)
530 .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
531 Self::lock_file(&lock_file)?;
532 let content = std::fs::read_to_string(path)
533 .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
534
535 let all_events: Vec<WalEvent> = content
536 .lines()
537 .filter_map(|line| serde_json::from_str(line).ok())
538 .collect();
539
540 let total = all_events.len();
541
542 let retained: Vec<_> = all_events.into_iter().filter(|e| e.ts >= cutoff).collect();
543 let removed = total - retained.len();
544
545 if removed > 0 {
546 let temp_path = path.with_extension("wal.tmp");
549 {
550 let mut new_wal = Self::create(&temp_path)?;
551 for event in &retained {
552 new_wal.append(event.clone())?;
553 }
554
555 let last_seq = retained.last().map_or(0, |e| e.seq);
558 let current_content = std::fs::read_to_string(path).map_err(|e| {
559 crate::Error::WalError(format!("re-read WAL during cleanup: {}", e))
560 })?;
561 for line in current_content.lines() {
562 if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
563 if event.seq > last_seq {
564 new_wal.append(event)?;
565 }
566 }
567 }
568 }
569 Self::unlock_file(&lock_file);
571 std::fs::rename(&temp_path, path).map_err(|e| {
573 crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
574 })?;
575 } else {
576 Self::unlock_file(&lock_file);
577 }
578
579 Ok(removed)
580 }
581}
582
583#[must_use]
588pub fn truncate_to(s: &str, max_bytes: usize) -> String {
589 if s.len() <= max_bytes {
590 return s.to_string();
591 }
592 let mut end = max_bytes;
593 while end > 0 && !s.is_char_boundary(end) {
594 end -= 1;
595 }
596 let mut truncated = s[..end].to_string();
597 truncated.push_str("...[truncated]");
598 truncated
599}
600
601#[cfg(test)]
602#[allow(
604 clippy::unwrap_used,
605 clippy::indexing_slicing,
606 clippy::items_after_statements
607)]
608mod tests {
609 use super::*;
610
611 fn tmp_wal(name: &str) -> std::path::PathBuf {
612 std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
613 }
614
615 #[test]
616 fn test_wal_write_and_read() {
617 let path = tmp_wal("write_read");
618 let _ = std::fs::remove_file(&path);
619
620 let mut wal = WalWriter::create(&path).unwrap();
621 wal.append(WalEvent {
622 seq: 0,
623 ts: 1715800000,
624 event_type: WalEventType::JobStarted,
625 job_id: "test-job".into(),
626 capability: Some("FileRead".into()),
627 output: None,
628 error: None,
629 telemetry_before: None,
630 telemetry_after: None,
631 process_before: None,
632 process_after: None,
633 cmd: None,
634 cmd_stdout: None,
635 cmd_stderr: None,
636 cmd_exit_code: None,
637 cmd_corrected: None,
638 ..Default::default()
639 })
640 .unwrap();
641
642 let reader = WalReader::load(&path).unwrap();
643 assert_eq!(reader.events().len(), 1);
644 assert_eq!(reader.events()[0].job_id, "test-job");
645
646 let _ = std::fs::remove_file(&path);
647 }
648
649 #[test]
650 fn test_wal_seq_recovery() {
651 let path = tmp_wal("seq_recovery");
652 let _ = std::fs::remove_file(&path);
653
654 let mut wal = WalWriter::create(&path).unwrap();
655 assert_eq!(wal.seq(), 0);
656 wal.append(WalEvent {
657 seq: 0,
658 ts: 1715800000,
659 event_type: WalEventType::JobStarted,
660 job_id: "job1".into(),
661 capability: None,
662 output: None,
663 error: None,
664 telemetry_before: None,
665 telemetry_after: None,
666 process_before: None,
667 process_after: None,
668 cmd: None,
669 cmd_stdout: None,
670 cmd_stderr: None,
671 cmd_exit_code: None,
672 cmd_corrected: None,
673 ..Default::default()
674 })
675 .unwrap();
676 assert_eq!(wal.seq(), 1);
677
678 let wal2 = WalWriter::create(&path).unwrap();
680 assert_eq!(wal2.seq(), 1);
681
682 let _ = std::fs::remove_file(&path);
683 }
684
685 #[test]
686 fn test_wal_rotation() {
687 let path = tmp_wal("rotation");
688 let _ = std::fs::remove_file(&path);
689
690 let mut wal = WalWriter::create(&path).unwrap();
692 for i in 0..100 {
693 wal.append(WalEvent {
694 seq: i,
695 ts: 1715800000 + i,
696 event_type: WalEventType::JobStarted,
697 job_id: format!("job-{}", i),
698 capability: None,
699 output: None,
700 error: None,
701 telemetry_before: None,
702 telemetry_after: None,
703 process_before: None,
704 process_after: None,
705 cmd: None,
706 cmd_stdout: None,
707 cmd_stderr: None,
708 cmd_exit_code: None,
709 cmd_corrected: None,
710 ..Default::default()
711 })
712 .unwrap();
713 }
714
715 let size = std::fs::metadata(&path).unwrap().len();
716 WalWriter::rotate(&path, size - 1, 3).unwrap();
718
719 assert!(WalWriter::rotation_path(&path, 1).exists());
720 assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
722
723 let _ = std::fs::remove_file(&path);
724 let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
725 }
726
727 #[test]
728 fn test_wal_cleanup() {
729 let path = tmp_wal("cleanup");
730 let _ = std::fs::remove_file(&path);
731
732 let mut wal = WalWriter::create(&path).unwrap();
733 let now = std::time::SystemTime::now()
734 .duration_since(std::time::UNIX_EPOCH)
735 .unwrap()
736 .as_secs();
737
738 wal.append(WalEvent {
740 seq: 0,
741 ts: now - 1000,
742 event_type: WalEventType::JobStarted,
743 job_id: "old-job".into(),
744 capability: None,
745 output: None,
746 error: None,
747 telemetry_before: None,
748 telemetry_after: None,
749 process_before: None,
750 process_after: None,
751 cmd: None,
752 cmd_stdout: None,
753 cmd_stderr: None,
754 cmd_exit_code: None,
755 cmd_corrected: None,
756 ..Default::default()
757 })
758 .unwrap();
759
760 wal.append(WalEvent {
762 seq: 1,
763 ts: now,
764 event_type: WalEventType::JobCompleted,
765 job_id: "new-job".into(),
766 capability: None,
767 output: None,
768 error: None,
769 telemetry_before: None,
770 telemetry_after: None,
771 process_before: None,
772 process_after: None,
773 cmd: None,
774 cmd_stdout: None,
775 cmd_stderr: None,
776 cmd_exit_code: None,
777 cmd_corrected: None,
778 ..Default::default()
779 })
780 .unwrap();
781
782 let removed = WalWriter::cleanup(&path, 500).unwrap();
783 assert_eq!(removed, 1); let reader = WalReader::load(&path).unwrap();
786 assert_eq!(reader.events().len(), 1);
787 assert_eq!(reader.events()[0].job_id, "new-job");
788
789 let _ = std::fs::remove_file(&path);
790 }
791
792 #[test]
793 fn test_wal_skip_serializing_optional_fields() {
794 let event = WalEvent {
796 seq: 0,
797 ts: 1715800000,
798 event_type: WalEventType::JobStarted,
799 job_id: "test".into(),
800 capability: None,
801 output: None,
802 error: None,
803 telemetry_before: None,
804 telemetry_after: None,
805 process_before: None,
806 process_after: None,
807 cmd: None,
808 cmd_stdout: None,
809 cmd_stderr: None,
810 cmd_exit_code: None,
811 cmd_corrected: None,
812 ..Default::default()
813 };
814
815 let json = serde_json::to_string(&event).unwrap();
816 assert!(!json.contains("capability"));
817 assert!(!json.contains("telemetry_before"));
818 assert!(!json.contains("process_before"));
819 assert!(!json.contains("cmd"));
820 }
821
822 #[test]
823 fn test_command_executed_event() {
824 let path = tmp_wal("cmd_exec");
825 let _ = std::fs::remove_file(&path);
826
827 let mut wal = WalWriter::create(&path).unwrap();
828 wal.append(WalEvent {
829 seq: 0,
830 ts: 1715800000,
831 event_type: WalEventType::CommandExecuted,
832 job_id: "job-cmd".into(),
833 capability: None,
834 output: None,
835 error: None,
836 telemetry_before: None,
837 telemetry_after: None,
838 process_before: None,
839 process_after: None,
840 cmd: Some("ls | hed -3".into()),
841 cmd_stdout: None,
842 cmd_stderr: Some("hed: command not found".into()),
843 cmd_exit_code: Some(127),
844 cmd_corrected: None,
845 ..Default::default()
846 })
847 .unwrap();
848
849 let reader = WalReader::load(&path).unwrap();
850 assert_eq!(reader.events().len(), 1);
851 assert_eq!(reader.events()[0].event_type, WalEventType::CommandExecuted);
852 assert_eq!(reader.events()[0].cmd.as_deref(), Some("ls | hed -3"));
853 assert_eq!(reader.events()[0].cmd_exit_code, Some(127));
854
855 let _ = std::fs::remove_file(&path);
856 }
857
858 #[test]
859 fn test_truncate_to() {
860 assert_eq!(truncate_to("hello", 1024), "hello");
861 assert_eq!(truncate_to("hello", 3), "hel...[truncated]");
862 let long = "a".repeat(2000);
863 let truncated = truncate_to(&long, 1024);
864 assert!(truncated.len() < 1100);
865 assert!(truncated.ends_with("...[truncated]"));
866 }
867
868 #[test]
871 fn test_wal_recovers_from_truncated_last_line() {
872 let path = tmp_wal("truncated");
873 let _ = std::fs::remove_file(&path);
874
875 let mut wal = WalWriter::create(&path).unwrap();
877 wal.append(WalEvent {
878 seq: 0,
879 ts: 1000,
880 event_type: WalEventType::JobStarted,
881 job_id: "job1".into(),
882 capability: None,
883 output: None,
884 error: None,
885 telemetry_before: None,
886 telemetry_after: None,
887 process_before: None,
888 process_after: None,
889 cmd: None,
890 cmd_stdout: None,
891 cmd_stderr: None,
892 cmd_exit_code: None,
893 cmd_corrected: None,
894 ..Default::default()
895 })
896 .unwrap();
897
898 wal.append(WalEvent {
900 seq: 1,
901 ts: 1001,
902 event_type: WalEventType::JobCompleted,
903 job_id: "job1".into(),
904 capability: None,
905 output: None,
906 error: None,
907 telemetry_before: None,
908 telemetry_after: None,
909 process_before: None,
910 process_after: None,
911 cmd: None,
912 cmd_stdout: None,
913 cmd_stderr: None,
914 cmd_exit_code: None,
915 cmd_corrected: None,
916 ..Default::default()
917 })
918 .unwrap();
919
920 let reader_before = WalReader::load(&path).unwrap();
922 assert_eq!(reader_before.events().len(), 2);
923
924 use std::io::Write;
927 let mut file = std::fs::OpenOptions::new()
928 .append(true)
929 .open(&path)
930 .unwrap();
931 file.write_all(b"{\"seq\":2,\"ts\":1002,\"type\":\"job_started\",\"job_id\":\"truncated")
933 .unwrap();
934 file.flush().unwrap();
935
936 let reader_after = WalReader::load(&path).unwrap();
938 assert_eq!(
939 reader_after.events().len(),
940 2,
941 "Should skip truncated last line and read 2 valid events, got {}",
942 reader_after.events().len()
943 );
944 assert_eq!(reader_after.events()[0].job_id, "job1");
945 assert_eq!(reader_after.events()[1].job_id, "job1");
946
947 let _ = std::fs::remove_file(&path);
948 }
949
950 #[test]
951 fn test_wal_skips_garbage_lines() {
952 let path = tmp_wal("garbage");
953 let _ = std::fs::remove_file(&path);
954
955 let mut wal = WalWriter::create(&path).unwrap();
957 wal.append(WalEvent {
958 seq: 0,
959 ts: 1000,
960 event_type: WalEventType::JobStarted,
961 job_id: "valid".into(),
962 capability: None,
963 output: None,
964 error: None,
965 telemetry_before: None,
966 telemetry_after: None,
967 process_before: None,
968 process_after: None,
969 cmd: None,
970 cmd_stdout: None,
971 cmd_stderr: None,
972 cmd_exit_code: None,
973 cmd_corrected: None,
974 ..Default::default()
975 })
976 .unwrap();
977
978 use std::io::Write;
979 let mut file = std::fs::OpenOptions::new()
980 .append(true)
981 .open(&path)
982 .unwrap();
983 file.write_all(b"not valid json at all\n").unwrap();
984 file.write_all(b"{\"seq\":999,\"type\":\"garbage\"}\n")
985 .unwrap(); file.flush().unwrap();
987
988 let reader = WalReader::load(&path).unwrap();
989 assert_eq!(
990 reader.events().len(),
991 1,
992 "Should only find 1 valid event, got {}",
993 reader.events().len()
994 );
995 assert_eq!(reader.events()[0].job_id, "valid");
996
997 let _ = std::fs::remove_file(&path);
998 }
999}