1#![allow(clippy::arithmetic_side_effects)]
11use crate::processes::ProcessSummary;
32use crate::telemetry::Telemetry;
33use crate::Result;
34use serde::{Deserialize, Serialize};
35use std::path::Path;
36
37fn read_last_seq(path: &Path, tail_bytes: usize) -> Option<u64> {
44 use std::io::{Read, Seek, SeekFrom};
45
46 let mut file = std::fs::File::open(path).ok()?;
47 let file_len = file.metadata().ok()?.len();
48 if file_len == 0 {
49 return None;
50 }
51
52 let start = file_len.saturating_sub(tail_bytes as u64);
53 file.seek(SeekFrom::Start(start)).ok()?;
54 let mut buf = vec![
55 0u8;
56 usize::try_from(file_len - start)
57 .unwrap_or(0)
58 .saturating_add(1)
59 ];
60 let n = file.read(&mut buf).ok()?;
61 buf.truncate(n);
62
63 let lines: Vec<&[u8]> = buf
65 .split(|&b| b == b'\n')
66 .filter(|l| !l.is_empty())
67 .collect();
68
69 for line in lines.iter().rev() {
70 if let Ok(line_str) = std::str::from_utf8(line) {
71 if let Ok(event) = serde_json::from_str::<WalEvent>(line_str.trim()) {
72 return Some(event.seq);
73 }
74 }
75 }
76 None
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
92#[allow(clippy::exhaustive_structs)]
93pub struct WalEvent {
94 pub seq: u64,
96 pub ts: u64,
98 #[serde(rename = "type")]
100 pub event_type: WalEventType,
101 pub job_id: String,
103 #[serde(skip_serializing_if = "Option::is_none")]
105 pub capability: Option<String>,
106 #[serde(skip_serializing_if = "Option::is_none")]
108 pub output: Option<serde_json::Value>,
109 #[serde(skip_serializing_if = "Option::is_none")]
111 pub error: Option<String>,
112 #[serde(skip_serializing_if = "Option::is_none")]
114 pub telemetry_before: Option<Telemetry>,
115 #[serde(skip_serializing_if = "Option::is_none")]
117 pub telemetry_after: Option<Telemetry>,
118 #[serde(skip_serializing_if = "Option::is_none")]
120 pub process_before: Option<ProcessSummary>,
121 #[serde(skip_serializing_if = "Option::is_none")]
123 pub process_after: Option<ProcessSummary>,
124 #[serde(skip_serializing_if = "Option::is_none")]
126 pub cmd: Option<String>,
127 #[serde(skip_serializing_if = "Option::is_none")]
129 pub cmd_stdout: Option<String>,
130 #[serde(skip_serializing_if = "Option::is_none")]
132 pub cmd_stderr: Option<String>,
133 #[serde(skip_serializing_if = "Option::is_none")]
135 pub cmd_exit_code: Option<i32>,
136 #[serde(skip_serializing_if = "Option::is_none")]
138 pub cmd_corrected: Option<String>,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
144#[serde(rename_all = "snake_case")]
145#[allow(clippy::exhaustive_enums)]
148pub enum WalEventType {
149 JobSubmitted,
151 JobValidated,
153 JobStarted,
155 JobCompleted,
157 JobFailed,
159 JobRolledBack,
161 CommandExecuted,
163}
164
165#[allow(clippy::exhaustive_structs)]
186pub struct WalWriter {
187 path: std::path::PathBuf,
188 seq: u64,
189}
190
191impl WalWriter {
192 pub fn create(path: &Path) -> Result<Self> {
201 if let Some(parent) = path.parent() {
203 if !parent.exists() {
204 std::fs::create_dir_all(parent).map_err(|e| {
205 crate::Error::WalError(format!(
206 "Failed to create WAL directory {}: {}",
207 parent.display(),
208 e
209 ))
210 })?;
211 }
212 }
213
214 if !path.exists() {
216 std::fs::File::create(path).map_err(|e| {
217 crate::Error::WalError(format!(
218 "Failed to create WAL file {}: {}",
219 path.display(),
220 e
221 ))
222 })?;
223 }
224
225 let seq = if path.exists() {
232 let lock_file = std::fs::File::open(path)
233 .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
234 Self::lock_file(&lock_file)?;
235 let recovered = if let Some(last_seq) = read_last_seq(path, 8192) {
236 last_seq + 1
237 } else {
238 let content = std::fs::read_to_string(path).map_err(|e| {
240 crate::Error::WalError(format!("read WAL for seq recovery: {}", e))
241 })?;
242 content
243 .lines()
244 .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
245 .map(|e| e.seq)
246 .max()
247 .map_or(0, |max| max + 1)
248 };
249 Self::unlock_file(&lock_file);
250 recovered
251 } else {
252 0
253 };
254
255 Ok(Self {
256 path: path.to_path_buf(),
257 seq,
258 })
259 }
260
261 #[cfg(unix)]
263 fn lock_file(file: &std::fs::File) -> Result<()> {
264 use std::os::unix::io::AsRawFd;
265 let fd = file.as_raw_fd();
266 let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
268 if result != 0 {
269 return Err(crate::Error::WalError(format!(
270 "Failed to acquire WAL lock: {}",
271 std::io::Error::last_os_error()
272 )));
273 }
274 Ok(())
275 }
276
277 #[cfg(not(unix))]
279 fn lock_file(_file: &std::fs::File) -> Result<()> {
280 Ok(())
281 }
282
283 #[cfg(unix)]
285 fn unlock_file(file: &std::fs::File) {
286 use std::os::unix::io::AsRawFd;
287 let fd = file.as_raw_fd();
288 unsafe { libc::flock(fd, libc::LOCK_UN) };
290 }
291
292 #[cfg(not(unix))]
294 fn unlock_file(_file: &std::fs::File) {}
295
296 pub fn append(&mut self, event: WalEvent) -> Result<()> {
310 use std::io::Write;
311 let line =
312 serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
313
314 let file = std::fs::OpenOptions::new()
316 .create(true)
317 .append(true)
318 .open(&self.path)
319 .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
320
321 Self::lock_file(&file)?;
323 {
324 let mut buf = std::io::BufWriter::new(&file);
325 writeln!(buf, "{}", line)
326 .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
327 buf.flush()
328 .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
329 file.sync_all()
330 .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
331 }
332 Self::unlock_file(&file);
333
334 self.seq += 1;
335 Ok(())
336 }
337
338 #[must_use]
340 pub fn seq(&self) -> u64 {
341 self.seq
342 }
343}
344
345#[allow(clippy::exhaustive_structs)]
362pub struct WalReader {
363 events: Vec<WalEvent>,
364}
365
366impl WalReader {
367 pub fn load(path: &Path) -> Result<Self> {
374 let content =
375 std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
376
377 let events: Vec<WalEvent> = content
378 .lines()
379 .filter_map(|line| serde_json::from_str(line).ok())
380 .collect();
381
382 Ok(Self { events })
383 }
384
385 #[must_use]
387 pub fn events(&self) -> &[WalEvent] {
388 &self.events
389 }
390
391 pub fn tail(path: &Path, n: usize) -> Result<Self> {
401 use std::collections::VecDeque;
402 use std::io::{BufRead, BufReader};
403 let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
404 let reader = BufReader::new(file);
405
406 let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
407 for line in reader.lines() {
408 let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
409 if let Ok(event) = serde_json::from_str(&line) {
410 window.push_back(event);
411 if window.len() > n {
412 window.pop_front();
413 }
414 }
415 }
416
417 Ok(Self {
418 events: window.into(),
419 })
420 }
421}
422
423impl WalWriter {
425 fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
430 let mut s = path.to_string_lossy().into_owned();
431 s.push('.');
432 s.push_str(&index.to_string());
433 std::path::PathBuf::from(s)
434 }
435
436 pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
445 let Ok(metadata) = std::fs::metadata(path) else {
446 return Ok(()); };
448
449 if metadata.len() < max_size_bytes {
450 return Ok(());
451 }
452
453 for i in (1..max_rotations).rev() {
455 let old = Self::rotation_path(path, i);
456 let new = Self::rotation_path(path, i + 1);
457 if old.exists() {
458 let _ = std::fs::rename(&old, &new);
459 }
460 }
461
462 let rotated = Self::rotation_path(path, 1);
464 std::fs::rename(path, &rotated)
465 .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
466
467 std::fs::write(path, "")
469 .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
470
471 let oldest = Self::rotation_path(path, max_rotations + 1);
473 if oldest.exists() {
474 let _ = std::fs::remove_file(&oldest);
475 }
476
477 Ok(())
478 }
479
480 pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
497 use std::time::{SystemTime, UNIX_EPOCH};
498
499 let cutoff = SystemTime::now()
500 .duration_since(UNIX_EPOCH)
501 .map_or(0, |d| d.as_secs())
502 .saturating_sub(max_age_secs);
503
504 let lock_file = std::fs::File::open(path)
506 .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
507 Self::lock_file(&lock_file)?;
508 let content = std::fs::read_to_string(path)
509 .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
510
511 let all_events: Vec<WalEvent> = content
512 .lines()
513 .filter_map(|line| serde_json::from_str(line).ok())
514 .collect();
515
516 let total = all_events.len();
517
518 let retained: Vec<_> = all_events.into_iter().filter(|e| e.ts >= cutoff).collect();
519 let removed = total - retained.len();
520
521 if removed > 0 {
522 let temp_path = path.with_extension("wal.tmp");
525 {
526 let mut new_wal = Self::create(&temp_path)?;
527 for event in &retained {
528 new_wal.append(event.clone())?;
529 }
530
531 let last_seq = retained.last().map_or(0, |e| e.seq);
534 let current_content = std::fs::read_to_string(path).map_err(|e| {
535 crate::Error::WalError(format!("re-read WAL during cleanup: {}", e))
536 })?;
537 for line in current_content.lines() {
538 if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
539 if event.seq > last_seq {
540 new_wal.append(event)?;
541 }
542 }
543 }
544 }
545 Self::unlock_file(&lock_file);
547 std::fs::rename(&temp_path, path).map_err(|e| {
549 crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
550 })?;
551 } else {
552 Self::unlock_file(&lock_file);
553 }
554
555 Ok(removed)
556 }
557}
558
559#[must_use]
564pub fn truncate_to(s: &str, max_bytes: usize) -> String {
565 if s.len() <= max_bytes {
566 return s.to_string();
567 }
568 let mut end = max_bytes;
569 while end > 0 && !s.is_char_boundary(end) {
570 end -= 1;
571 }
572 let mut truncated = s[..end].to_string();
573 truncated.push_str("...[truncated]");
574 truncated
575}
576
577#[cfg(test)]
578mod tests {
579 use super::*;
580
581 fn tmp_wal(name: &str) -> std::path::PathBuf {
582 std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
583 }
584
585 #[test]
586 fn test_wal_write_and_read() {
587 let path = tmp_wal("write_read");
588 let _ = std::fs::remove_file(&path);
589
590 let mut wal = WalWriter::create(&path).unwrap();
591 wal.append(WalEvent {
592 seq: 0,
593 ts: 1715800000,
594 event_type: WalEventType::JobStarted,
595 job_id: "test-job".into(),
596 capability: Some("FileRead".into()),
597 output: None,
598 error: None,
599 telemetry_before: None,
600 telemetry_after: None,
601 process_before: None,
602 process_after: None,
603 cmd: None,
604 cmd_stdout: None,
605 cmd_stderr: None,
606 cmd_exit_code: None,
607 cmd_corrected: None,
608 })
609 .unwrap();
610
611 let reader = WalReader::load(&path).unwrap();
612 assert_eq!(reader.events().len(), 1);
613 assert_eq!(reader.events()[0].job_id, "test-job");
614
615 let _ = std::fs::remove_file(&path);
616 }
617
618 #[test]
619 fn test_wal_seq_recovery() {
620 let path = tmp_wal("seq_recovery");
621 let _ = std::fs::remove_file(&path);
622
623 let mut wal = WalWriter::create(&path).unwrap();
624 assert_eq!(wal.seq(), 0);
625 wal.append(WalEvent {
626 seq: 0,
627 ts: 1715800000,
628 event_type: WalEventType::JobStarted,
629 job_id: "job1".into(),
630 capability: None,
631 output: None,
632 error: None,
633 telemetry_before: None,
634 telemetry_after: None,
635 process_before: None,
636 process_after: None,
637 cmd: None,
638 cmd_stdout: None,
639 cmd_stderr: None,
640 cmd_exit_code: None,
641 cmd_corrected: None,
642 })
643 .unwrap();
644 assert_eq!(wal.seq(), 1);
645
646 let wal2 = WalWriter::create(&path).unwrap();
648 assert_eq!(wal2.seq(), 1);
649
650 let _ = std::fs::remove_file(&path);
651 }
652
653 #[test]
654 fn test_wal_rotation() {
655 let path = tmp_wal("rotation");
656 let _ = std::fs::remove_file(&path);
657
658 let mut wal = WalWriter::create(&path).unwrap();
660 for i in 0..100 {
661 wal.append(WalEvent {
662 seq: i,
663 ts: 1715800000 + i,
664 event_type: WalEventType::JobStarted,
665 job_id: format!("job-{}", i),
666 capability: None,
667 output: None,
668 error: None,
669 telemetry_before: None,
670 telemetry_after: None,
671 process_before: None,
672 process_after: None,
673 cmd: None,
674 cmd_stdout: None,
675 cmd_stderr: None,
676 cmd_exit_code: None,
677 cmd_corrected: None,
678 })
679 .unwrap();
680 }
681
682 let size = std::fs::metadata(&path).unwrap().len();
683 WalWriter::rotate(&path, size - 1, 3).unwrap();
685
686 assert!(WalWriter::rotation_path(&path, 1).exists());
687 assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
689
690 let _ = std::fs::remove_file(&path);
691 let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
692 }
693
694 #[test]
695 fn test_wal_cleanup() {
696 let path = tmp_wal("cleanup");
697 let _ = std::fs::remove_file(&path);
698
699 let mut wal = WalWriter::create(&path).unwrap();
700 let now = std::time::SystemTime::now()
701 .duration_since(std::time::UNIX_EPOCH)
702 .unwrap()
703 .as_secs();
704
705 wal.append(WalEvent {
707 seq: 0,
708 ts: now - 1000,
709 event_type: WalEventType::JobStarted,
710 job_id: "old-job".into(),
711 capability: None,
712 output: None,
713 error: None,
714 telemetry_before: None,
715 telemetry_after: None,
716 process_before: None,
717 process_after: None,
718 cmd: None,
719 cmd_stdout: None,
720 cmd_stderr: None,
721 cmd_exit_code: None,
722 cmd_corrected: None,
723 })
724 .unwrap();
725
726 wal.append(WalEvent {
728 seq: 1,
729 ts: now,
730 event_type: WalEventType::JobCompleted,
731 job_id: "new-job".into(),
732 capability: None,
733 output: None,
734 error: None,
735 telemetry_before: None,
736 telemetry_after: None,
737 process_before: None,
738 process_after: None,
739 cmd: None,
740 cmd_stdout: None,
741 cmd_stderr: None,
742 cmd_exit_code: None,
743 cmd_corrected: None,
744 })
745 .unwrap();
746
747 let removed = WalWriter::cleanup(&path, 500).unwrap();
748 assert_eq!(removed, 1); let reader = WalReader::load(&path).unwrap();
751 assert_eq!(reader.events().len(), 1);
752 assert_eq!(reader.events()[0].job_id, "new-job");
753
754 let _ = std::fs::remove_file(&path);
755 }
756
757 #[test]
758 fn test_wal_skip_serializing_optional_fields() {
759 let event = WalEvent {
761 seq: 0,
762 ts: 1715800000,
763 event_type: WalEventType::JobStarted,
764 job_id: "test".into(),
765 capability: None,
766 output: None,
767 error: None,
768 telemetry_before: None,
769 telemetry_after: None,
770 process_before: None,
771 process_after: None,
772 cmd: None,
773 cmd_stdout: None,
774 cmd_stderr: None,
775 cmd_exit_code: None,
776 cmd_corrected: None,
777 };
778
779 let json = serde_json::to_string(&event).unwrap();
780 assert!(!json.contains("capability"));
781 assert!(!json.contains("telemetry_before"));
782 assert!(!json.contains("process_before"));
783 assert!(!json.contains("cmd"));
784 }
785
786 #[test]
787 fn test_command_executed_event() {
788 let path = tmp_wal("cmd_exec");
789 let _ = std::fs::remove_file(&path);
790
791 let mut wal = WalWriter::create(&path).unwrap();
792 wal.append(WalEvent {
793 seq: 0,
794 ts: 1715800000,
795 event_type: WalEventType::CommandExecuted,
796 job_id: "job-cmd".into(),
797 capability: None,
798 output: None,
799 error: None,
800 telemetry_before: None,
801 telemetry_after: None,
802 process_before: None,
803 process_after: None,
804 cmd: Some("ls | hed -3".into()),
805 cmd_stdout: None,
806 cmd_stderr: Some("hed: command not found".into()),
807 cmd_exit_code: Some(127),
808 cmd_corrected: None,
809 })
810 .unwrap();
811
812 let reader = WalReader::load(&path).unwrap();
813 assert_eq!(reader.events().len(), 1);
814 assert_eq!(reader.events()[0].event_type, WalEventType::CommandExecuted);
815 assert_eq!(reader.events()[0].cmd.as_deref(), Some("ls | hed -3"));
816 assert_eq!(reader.events()[0].cmd_exit_code, Some(127));
817
818 let _ = std::fs::remove_file(&path);
819 }
820
821 #[test]
822 fn test_truncate_to() {
823 assert_eq!(truncate_to("hello", 1024), "hello");
824 assert_eq!(truncate_to("hello", 3), "hel...[truncated]");
825 let long = "a".repeat(2000);
826 let truncated = truncate_to(&long, 1024);
827 assert!(truncated.len() < 1100);
828 assert!(truncated.ends_with("...[truncated]"));
829 }
830}