1#![allow(clippy::arithmetic_side_effects)]
11use crate::processes::ProcessSummary;
32use crate::telemetry::Telemetry;
33use crate::Result;
34use serde::{Deserialize, Serialize};
35use std::path::Path;
36
37fn read_last_seq(path: &Path, tail_bytes: usize) -> Option<u64> {
44 use std::io::{Read, Seek, SeekFrom};
45
46 let mut file = std::fs::File::open(path).ok()?;
47 let file_len = file.metadata().ok()?.len();
48 if file_len == 0 {
49 return None;
50 }
51
52 let start = file_len.saturating_sub(tail_bytes as u64);
53 file.seek(SeekFrom::Start(start)).ok()?;
54 let mut buf = vec![0u8; usize::try_from(file_len - start).unwrap_or(0).saturating_add(1)];
55 let n = file.read(&mut buf).ok()?;
56 buf.truncate(n);
57
58 let lines: Vec<&[u8]> = buf
60 .split(|&b| b == b'\n')
61 .filter(|l| !l.is_empty())
62 .collect();
63
64 for line in lines.iter().rev() {
65 if let Ok(line_str) = std::str::from_utf8(line) {
66 if let Ok(event) = serde_json::from_str::<WalEvent>(line_str.trim()) {
67 return Some(event.seq);
68 }
69 }
70 }
71 None
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
87#[allow(clippy::exhaustive_structs)]
88pub struct WalEvent {
89 pub seq: u64,
91 pub ts: u64,
93 #[serde(rename = "type")]
95 pub event_type: WalEventType,
96 pub job_id: String,
98 #[serde(skip_serializing_if = "Option::is_none")]
100 pub capability: Option<String>,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub output: Option<serde_json::Value>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub error: Option<String>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub telemetry_before: Option<Telemetry>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub telemetry_after: Option<Telemetry>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub process_before: Option<ProcessSummary>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub process_after: Option<ProcessSummary>,
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub cmd: Option<String>,
122 #[serde(skip_serializing_if = "Option::is_none")]
124 pub cmd_stdout: Option<String>,
125 #[serde(skip_serializing_if = "Option::is_none")]
127 pub cmd_stderr: Option<String>,
128 #[serde(skip_serializing_if = "Option::is_none")]
130 pub cmd_exit_code: Option<i32>,
131 #[serde(skip_serializing_if = "Option::is_none")]
133 pub cmd_corrected: Option<String>,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
139#[serde(rename_all = "snake_case")]
140#[allow(clippy::exhaustive_enums)]
143pub enum WalEventType {
144 JobSubmitted,
146 JobValidated,
148 JobStarted,
150 JobCompleted,
152 JobFailed,
154 JobRolledBack,
156 CommandExecuted,
158}
159
160#[allow(clippy::exhaustive_structs)]
181pub struct WalWriter {
182 path: std::path::PathBuf,
183 seq: u64,
184}
185
186impl WalWriter {
187 pub fn create(path: &Path) -> Result<Self> {
196 if let Some(parent) = path.parent() {
198 if !parent.exists() {
199 std::fs::create_dir_all(parent).map_err(|e| {
200 crate::Error::WalError(format!(
201 "Failed to create WAL directory {}: {}",
202 parent.display(),
203 e
204 ))
205 })?;
206 }
207 }
208
209 if !path.exists() {
211 std::fs::File::create(path).map_err(|e| {
212 crate::Error::WalError(format!(
213 "Failed to create WAL file {}: {}",
214 path.display(),
215 e
216 ))
217 })?;
218 }
219
220 let seq = if path.exists() {
227 let lock_file = std::fs::File::open(path)
228 .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
229 Self::lock_file(&lock_file)?;
230 let recovered = if let Some(last_seq) = read_last_seq(path, 8192) {
231 last_seq + 1
232 } else {
233 let content = std::fs::read_to_string(path).map_err(|e| {
235 crate::Error::WalError(format!("read WAL for seq recovery: {}", e))
236 })?;
237 content
238 .lines()
239 .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
240 .map(|e| e.seq)
241 .max()
242 .map_or(0, |max| max + 1)
243 };
244 Self::unlock_file(&lock_file);
245 recovered
246 } else {
247 0
248 };
249
250 Ok(Self {
251 path: path.to_path_buf(),
252 seq,
253 })
254 }
255
256 #[cfg(unix)]
258 fn lock_file(file: &std::fs::File) -> Result<()> {
259 use std::os::unix::io::AsRawFd;
260 let fd = file.as_raw_fd();
261 let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
263 if result != 0 {
264 return Err(crate::Error::WalError(format!(
265 "Failed to acquire WAL lock: {}",
266 std::io::Error::last_os_error()
267 )));
268 }
269 Ok(())
270 }
271
272 #[cfg(not(unix))]
274 fn lock_file(_file: &std::fs::File) -> Result<()> {
275 Ok(())
276 }
277
278 #[cfg(unix)]
280 fn unlock_file(file: &std::fs::File) {
281 use std::os::unix::io::AsRawFd;
282 let fd = file.as_raw_fd();
283 unsafe { libc::flock(fd, libc::LOCK_UN) };
285 }
286
287 #[cfg(not(unix))]
289 fn unlock_file(_file: &std::fs::File) {}
290
291 pub fn append(&mut self, event: WalEvent) -> Result<()> {
305 use std::io::Write;
306 let line =
307 serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
308
309 let file = std::fs::OpenOptions::new()
311 .create(true)
312 .append(true)
313 .open(&self.path)
314 .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
315
316 Self::lock_file(&file)?;
318 {
319 let mut buf = std::io::BufWriter::new(&file);
320 writeln!(buf, "{}", line)
321 .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
322 buf.flush()
323 .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
324 file.sync_all()
325 .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
326 }
327 Self::unlock_file(&file);
328
329 self.seq += 1;
330 Ok(())
331 }
332
333 #[must_use]
335 pub fn seq(&self) -> u64 {
336 self.seq
337 }
338}
339
340#[allow(clippy::exhaustive_structs)]
357pub struct WalReader {
358 events: Vec<WalEvent>,
359}
360
361impl WalReader {
362 pub fn load(path: &Path) -> Result<Self> {
369 let content =
370 std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
371
372 let events: Vec<WalEvent> = content
373 .lines()
374 .filter_map(|line| serde_json::from_str(line).ok())
375 .collect();
376
377 Ok(Self { events })
378 }
379
380 #[must_use]
382 pub fn events(&self) -> &[WalEvent] {
383 &self.events
384 }
385
386 pub fn tail(path: &Path, n: usize) -> Result<Self> {
396 use std::collections::VecDeque;
397 use std::io::{BufRead, BufReader};
398 let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
399 let reader = BufReader::new(file);
400
401 let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
402 for line in reader.lines() {
403 let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
404 if let Ok(event) = serde_json::from_str(&line) {
405 window.push_back(event);
406 if window.len() > n {
407 window.pop_front();
408 }
409 }
410 }
411
412 Ok(Self {
413 events: window.into(),
414 })
415 }
416}
417
418impl WalWriter {
420 fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
425 let mut s = path.to_string_lossy().into_owned();
426 s.push('.');
427 s.push_str(&index.to_string());
428 std::path::PathBuf::from(s)
429 }
430
431 pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
440 let Ok(metadata) = std::fs::metadata(path) else {
441 return Ok(()); };
443
444 if metadata.len() < max_size_bytes {
445 return Ok(());
446 }
447
448 for i in (1..max_rotations).rev() {
450 let old = Self::rotation_path(path, i);
451 let new = Self::rotation_path(path, i + 1);
452 if old.exists() {
453 let _ = std::fs::rename(&old, &new);
454 }
455 }
456
457 let rotated = Self::rotation_path(path, 1);
459 std::fs::rename(path, &rotated)
460 .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
461
462 std::fs::write(path, "")
464 .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
465
466 let oldest = Self::rotation_path(path, max_rotations + 1);
468 if oldest.exists() {
469 let _ = std::fs::remove_file(&oldest);
470 }
471
472 Ok(())
473 }
474
475 pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
492 use std::time::{SystemTime, UNIX_EPOCH};
493
494 let cutoff = SystemTime::now()
495 .duration_since(UNIX_EPOCH)
496 .map_or(0, |d| d.as_secs())
497 .saturating_sub(max_age_secs);
498
499 let lock_file = std::fs::File::open(path)
501 .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
502 Self::lock_file(&lock_file)?;
503 let content = std::fs::read_to_string(path)
504 .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
505
506 let all_events: Vec<WalEvent> = content
507 .lines()
508 .filter_map(|line| serde_json::from_str(line).ok())
509 .collect();
510
511 let total = all_events.len();
512
513 let retained: Vec<_> = all_events.into_iter().filter(|e| e.ts >= cutoff).collect();
514 let removed = total - retained.len();
515
516 if removed > 0 {
517 let temp_path = path.with_extension("wal.tmp");
520 {
521 let mut new_wal = Self::create(&temp_path)?;
522 for event in &retained {
523 new_wal.append(event.clone())?;
524 }
525
526 let last_seq = retained.last().map_or(0, |e| e.seq);
529 let current_content = std::fs::read_to_string(path).map_err(|e| {
530 crate::Error::WalError(format!("re-read WAL during cleanup: {}", e))
531 })?;
532 for line in current_content.lines() {
533 if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
534 if event.seq > last_seq {
535 new_wal.append(event)?;
536 }
537 }
538 }
539 }
540 Self::unlock_file(&lock_file);
542 std::fs::rename(&temp_path, path).map_err(|e| {
544 crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
545 })?;
546 } else {
547 Self::unlock_file(&lock_file);
548 }
549
550 Ok(removed)
551 }
552}
553
554#[must_use]
559pub fn truncate_to(s: &str, max_bytes: usize) -> String {
560 if s.len() <= max_bytes {
561 return s.to_string();
562 }
563 let mut end = max_bytes;
564 while end > 0 && !s.is_char_boundary(end) {
565 end -= 1;
566 }
567 let mut truncated = s[..end].to_string();
568 truncated.push_str("...[truncated]");
569 truncated
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575
576 fn tmp_wal(name: &str) -> std::path::PathBuf {
577 std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
578 }
579
580 #[test]
581 fn test_wal_write_and_read() {
582 let path = tmp_wal("write_read");
583 let _ = std::fs::remove_file(&path);
584
585 let mut wal = WalWriter::create(&path).unwrap();
586 wal.append(WalEvent {
587 seq: 0,
588 ts: 1715800000,
589 event_type: WalEventType::JobStarted,
590 job_id: "test-job".into(),
591 capability: Some("FileRead".into()),
592 output: None,
593 error: None,
594 telemetry_before: None,
595 telemetry_after: None,
596 process_before: None,
597 process_after: None,
598 cmd: None,
599 cmd_stdout: None,
600 cmd_stderr: None,
601 cmd_exit_code: None,
602 cmd_corrected: None,
603 })
604 .unwrap();
605
606 let reader = WalReader::load(&path).unwrap();
607 assert_eq!(reader.events().len(), 1);
608 assert_eq!(reader.events()[0].job_id, "test-job");
609
610 let _ = std::fs::remove_file(&path);
611 }
612
613 #[test]
614 fn test_wal_seq_recovery() {
615 let path = tmp_wal("seq_recovery");
616 let _ = std::fs::remove_file(&path);
617
618 let mut wal = WalWriter::create(&path).unwrap();
619 assert_eq!(wal.seq(), 0);
620 wal.append(WalEvent {
621 seq: 0,
622 ts: 1715800000,
623 event_type: WalEventType::JobStarted,
624 job_id: "job1".into(),
625 capability: None,
626 output: None,
627 error: None,
628 telemetry_before: None,
629 telemetry_after: None,
630 process_before: None,
631 process_after: None,
632 cmd: None,
633 cmd_stdout: None,
634 cmd_stderr: None,
635 cmd_exit_code: None,
636 cmd_corrected: None,
637 })
638 .unwrap();
639 assert_eq!(wal.seq(), 1);
640
641 let wal2 = WalWriter::create(&path).unwrap();
643 assert_eq!(wal2.seq(), 1);
644
645 let _ = std::fs::remove_file(&path);
646 }
647
648 #[test]
649 fn test_wal_rotation() {
650 let path = tmp_wal("rotation");
651 let _ = std::fs::remove_file(&path);
652
653 let mut wal = WalWriter::create(&path).unwrap();
655 for i in 0..100 {
656 wal.append(WalEvent {
657 seq: i,
658 ts: 1715800000 + i,
659 event_type: WalEventType::JobStarted,
660 job_id: format!("job-{}", i),
661 capability: None,
662 output: None,
663 error: None,
664 telemetry_before: None,
665 telemetry_after: None,
666 process_before: None,
667 process_after: None,
668 cmd: None,
669 cmd_stdout: None,
670 cmd_stderr: None,
671 cmd_exit_code: None,
672 cmd_corrected: None,
673 })
674 .unwrap();
675 }
676
677 let size = std::fs::metadata(&path).unwrap().len();
678 WalWriter::rotate(&path, size - 1, 3).unwrap();
680
681 assert!(WalWriter::rotation_path(&path, 1).exists());
682 assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
684
685 let _ = std::fs::remove_file(&path);
686 let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
687 }
688
689 #[test]
690 fn test_wal_cleanup() {
691 let path = tmp_wal("cleanup");
692 let _ = std::fs::remove_file(&path);
693
694 let mut wal = WalWriter::create(&path).unwrap();
695 let now = std::time::SystemTime::now()
696 .duration_since(std::time::UNIX_EPOCH)
697 .unwrap()
698 .as_secs();
699
700 wal.append(WalEvent {
702 seq: 0,
703 ts: now - 1000,
704 event_type: WalEventType::JobStarted,
705 job_id: "old-job".into(),
706 capability: None,
707 output: None,
708 error: None,
709 telemetry_before: None,
710 telemetry_after: None,
711 process_before: None,
712 process_after: None,
713 cmd: None,
714 cmd_stdout: None,
715 cmd_stderr: None,
716 cmd_exit_code: None,
717 cmd_corrected: None,
718 })
719 .unwrap();
720
721 wal.append(WalEvent {
723 seq: 1,
724 ts: now,
725 event_type: WalEventType::JobCompleted,
726 job_id: "new-job".into(),
727 capability: None,
728 output: None,
729 error: None,
730 telemetry_before: None,
731 telemetry_after: None,
732 process_before: None,
733 process_after: None,
734 cmd: None,
735 cmd_stdout: None,
736 cmd_stderr: None,
737 cmd_exit_code: None,
738 cmd_corrected: None,
739 })
740 .unwrap();
741
742 let removed = WalWriter::cleanup(&path, 500).unwrap();
743 assert_eq!(removed, 1); let reader = WalReader::load(&path).unwrap();
746 assert_eq!(reader.events().len(), 1);
747 assert_eq!(reader.events()[0].job_id, "new-job");
748
749 let _ = std::fs::remove_file(&path);
750 }
751
752 #[test]
753 fn test_wal_skip_serializing_optional_fields() {
754 let event = WalEvent {
756 seq: 0,
757 ts: 1715800000,
758 event_type: WalEventType::JobStarted,
759 job_id: "test".into(),
760 capability: None,
761 output: None,
762 error: None,
763 telemetry_before: None,
764 telemetry_after: None,
765 process_before: None,
766 process_after: None,
767 cmd: None,
768 cmd_stdout: None,
769 cmd_stderr: None,
770 cmd_exit_code: None,
771 cmd_corrected: None,
772 };
773
774 let json = serde_json::to_string(&event).unwrap();
775 assert!(!json.contains("capability"));
776 assert!(!json.contains("telemetry_before"));
777 assert!(!json.contains("process_before"));
778 assert!(!json.contains("cmd"));
779 }
780
781 #[test]
782 fn test_command_executed_event() {
783 let path = tmp_wal("cmd_exec");
784 let _ = std::fs::remove_file(&path);
785
786 let mut wal = WalWriter::create(&path).unwrap();
787 wal.append(WalEvent {
788 seq: 0,
789 ts: 1715800000,
790 event_type: WalEventType::CommandExecuted,
791 job_id: "job-cmd".into(),
792 capability: None,
793 output: None,
794 error: None,
795 telemetry_before: None,
796 telemetry_after: None,
797 process_before: None,
798 process_after: None,
799 cmd: Some("ls | hed -3".into()),
800 cmd_stdout: None,
801 cmd_stderr: Some("hed: command not found".into()),
802 cmd_exit_code: Some(127),
803 cmd_corrected: None,
804 })
805 .unwrap();
806
807 let reader = WalReader::load(&path).unwrap();
808 assert_eq!(reader.events().len(), 1);
809 assert_eq!(reader.events()[0].event_type, WalEventType::CommandExecuted);
810 assert_eq!(reader.events()[0].cmd.as_deref(), Some("ls | hed -3"));
811 assert_eq!(reader.events()[0].cmd_exit_code, Some(127));
812
813 let _ = std::fs::remove_file(&path);
814 }
815
816 #[test]
817 fn test_truncate_to() {
818 assert_eq!(truncate_to("hello", 1024), "hello");
819 assert_eq!(truncate_to("hello", 3), "hel...[truncated]");
820 let long = "a".repeat(2000);
821 let truncated = truncate_to(&long, 1024);
822 assert!(truncated.len() < 1100);
823 assert!(truncated.ends_with("...[truncated]"));
824 }
825}