1use crate::processes::ProcessSummary;
27use crate::telemetry::Telemetry;
28use crate::Result;
29use serde::{Deserialize, Serialize};
30use std::path::Path;
31
32
33
34fn read_last_seq(path: &Path, tail_bytes: usize) -> Option<u64> {
41 use std::io::{Read, Seek, SeekFrom};
42
43 let mut file = std::fs::File::open(path).ok()?;
44 let file_len = file.metadata().ok()?.len();
45 if file_len == 0 {
46 return None;
47 }
48
49 let start = file_len.saturating_sub(tail_bytes as u64);
50 file.seek(SeekFrom::Start(start)).ok()?;
51 let mut buf = vec![0u8; (file_len - start) as usize + 1];
52 let n = file.read(&mut buf).ok()?;
53 buf.truncate(n);
54
55 let lines: Vec<&[u8]> = buf
57 .split(|&b| b == b'\n')
58 .filter(|l| !l.is_empty())
59 .collect();
60
61 for line in lines.iter().rev() {
62 if let Ok(line_str) = std::str::from_utf8(line) {
63 if let Ok(event) = serde_json::from_str::<WalEvent>(line_str.trim()) {
64 return Some(event.seq);
65 }
66 }
67 }
68 None
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct WalEvent {
85 pub seq: u64,
87 pub ts: u64,
89 #[serde(rename = "type")]
91 pub event_type: WalEventType,
92 pub job_id: String,
94 #[serde(skip_serializing_if = "Option::is_none")]
96 pub capability: Option<String>,
97 #[serde(skip_serializing_if = "Option::is_none")]
99 pub output: Option<serde_json::Value>,
100 #[serde(skip_serializing_if = "Option::is_none")]
102 pub error: Option<String>,
103 #[serde(skip_serializing_if = "Option::is_none")]
105 pub telemetry_before: Option<Telemetry>,
106 #[serde(skip_serializing_if = "Option::is_none")]
108 pub telemetry_after: Option<Telemetry>,
109 #[serde(skip_serializing_if = "Option::is_none")]
111 pub process_before: Option<ProcessSummary>,
112 #[serde(skip_serializing_if = "Option::is_none")]
114 pub process_after: Option<ProcessSummary>,
115 #[serde(skip_serializing_if = "Option::is_none")]
117 pub cmd: Option<String>,
118 #[serde(skip_serializing_if = "Option::is_none")]
120 pub cmd_stdout: Option<String>,
121 #[serde(skip_serializing_if = "Option::is_none")]
123 pub cmd_stderr: Option<String>,
124 #[serde(skip_serializing_if = "Option::is_none")]
126 pub cmd_exit_code: Option<i32>,
127 #[serde(skip_serializing_if = "Option::is_none")]
129 pub cmd_corrected: Option<String>,
130}
131
132#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
135#[serde(rename_all = "snake_case")]
136pub enum WalEventType {
137 JobSubmitted,
139 JobValidated,
141 JobStarted,
143 JobCompleted,
145 JobFailed,
147 JobRolledBack,
149 CommandExecuted,
151}
152
153pub struct WalWriter {
174 path: std::path::PathBuf,
175 seq: u64,
176}
177
178impl WalWriter {
179 pub fn create(path: &Path) -> Result<Self> {
188 if let Some(parent) = path.parent() {
190 if !parent.exists() {
191 std::fs::create_dir_all(parent).map_err(|e| {
192 crate::Error::WalError(format!(
193 "Failed to create WAL directory {}: {}",
194 parent.display(),
195 e
196 ))
197 })?;
198 }
199 }
200
201 if !path.exists() {
203 std::fs::File::create(path).map_err(|e| {
204 crate::Error::WalError(format!(
205 "Failed to create WAL file {}: {}",
206 path.display(),
207 e
208 ))
209 })?;
210 }
211
212 let seq = if path.exists() {
219 let lock_file = std::fs::File::open(path)
220 .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
221 Self::lock_file(&lock_file)?;
222 let recovered = match read_last_seq(path, 8192) {
223 Some(last_seq) => last_seq + 1,
224 None => {
225 let content = std::fs::read_to_string(path)
227 .map_err(|e| crate::Error::WalError(format!("read WAL for seq recovery: {}", e)))?;
228 content
229 .lines()
230 .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
231 .map(|e| e.seq)
232 .max()
233 .map(|max| max + 1)
234 .unwrap_or(0)
235 }
236 };
237 Self::unlock_file(&lock_file);
238 recovered
239 } else {
240 0
241 };
242
243 Ok(Self {
244 path: path.to_path_buf(),
245 seq,
246 })
247 }
248
249 #[cfg(unix)]
251 fn lock_file(file: &std::fs::File) -> Result<()> {
252 use std::os::unix::io::AsRawFd;
253 let fd = file.as_raw_fd();
254 let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
255 if result != 0 {
256 return Err(crate::Error::WalError(format!(
257 "Failed to acquire WAL lock: {}",
258 std::io::Error::last_os_error()
259 )));
260 }
261 Ok(())
262 }
263
264 #[cfg(not(unix))]
266 fn lock_file(_file: &std::fs::File) -> Result<()> {
267 Ok(())
268 }
269
270 #[cfg(unix)]
272 fn unlock_file(file: &std::fs::File) {
273 use std::os::unix::io::AsRawFd;
274 let fd = file.as_raw_fd();
275 unsafe { libc::flock(fd, libc::LOCK_UN) };
276 }
277
278 #[cfg(not(unix))]
280 fn unlock_file(_file: &std::fs::File) {}
281
282 pub fn append(&mut self, event: WalEvent) -> Result<()> {
296 use std::io::Write;
297 let line =
298 serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
299
300 let file = std::fs::OpenOptions::new()
302 .create(true)
303 .append(true)
304 .open(&self.path)
305 .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
306
307 Self::lock_file(&file)?;
309 {
310 let mut buf = std::io::BufWriter::new(&file);
311 writeln!(buf, "{}", line)
312 .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
313 buf.flush()
314 .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
315 file.sync_all()
316 .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
317 }
318 Self::unlock_file(&file);
319
320 self.seq += 1;
321 Ok(())
322 }
323
324 pub fn seq(&self) -> u64 {
326 self.seq
327 }
328}
329
330pub struct WalReader {
347 events: Vec<WalEvent>,
348}
349
350impl WalReader {
351 pub fn load(path: &Path) -> Result<Self> {
358 let content =
359 std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
360
361 let events: Vec<WalEvent> = content
362 .lines()
363 .filter_map(|line| serde_json::from_str(line).ok())
364 .collect();
365
366 Ok(Self { events })
367 }
368
369 pub fn events(&self) -> &[WalEvent] {
371 &self.events
372 }
373
374 pub fn tail(path: &Path, n: usize) -> Result<Self> {
384 use std::collections::VecDeque;
385 use std::io::{BufRead, BufReader};
386 let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
387 let reader = BufReader::new(file);
388
389 let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
390 for line in reader.lines() {
391 let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
392 if let Ok(event) = serde_json::from_str(&line) {
393 window.push_back(event);
394 if window.len() > n {
395 window.pop_front();
396 }
397 }
398 }
399
400 Ok(Self {
401 events: window.into(),
402 })
403 }
404}
405
406impl WalWriter {
408 fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
413 let mut s = path.to_string_lossy().into_owned();
414 s.push('.');
415 s.push_str(&index.to_string());
416 std::path::PathBuf::from(s)
417 }
418
419 pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
425 let metadata = match std::fs::metadata(path) {
426 Ok(m) => m,
427 Err(_) => return Ok(()), };
429
430 if metadata.len() < max_size_bytes {
431 return Ok(());
432 }
433
434 for i in (1..max_rotations).rev() {
436 let old = Self::rotation_path(path, i);
437 let new = Self::rotation_path(path, i + 1);
438 if old.exists() {
439 let _ = std::fs::rename(&old, &new);
440 }
441 }
442
443 let rotated = Self::rotation_path(path, 1);
445 std::fs::rename(path, &rotated)
446 .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
447
448 std::fs::write(path, "")
450 .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
451
452 let oldest = Self::rotation_path(path, max_rotations + 1);
454 if oldest.exists() {
455 let _ = std::fs::remove_file(&oldest);
456 }
457
458 Ok(())
459 }
460
461 pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
475 use std::time::{SystemTime, UNIX_EPOCH};
476
477 let cutoff = SystemTime::now()
478 .duration_since(UNIX_EPOCH)
479 .map(|d| d.as_secs())
480 .unwrap_or(0)
481 .saturating_sub(max_age_secs);
482
483 let lock_file = std::fs::File::open(path)
485 .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
486 Self::lock_file(&lock_file)?;
487 let content = std::fs::read_to_string(path)
488 .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
489
490 let events: Vec<WalEvent> = content
491 .lines()
492 .filter_map(|line| serde_json::from_str(line).ok())
493 .collect();
494
495 let retained: Vec<_> = events.into_iter().filter(|e| e.ts >= cutoff).collect();
497
498 let total = content
499 .lines()
500 .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
501 .count();
502 let removed = total - retained.len();
503
504 if removed > 0 {
505 let temp_path = path.with_extension("wal.tmp");
508 {
509 let mut new_wal = WalWriter::create(&temp_path)?;
510 for event in &retained {
511 new_wal.append(event.clone())?;
512 }
513
514 let last_seq = retained.last().map(|e| e.seq).unwrap_or(0);
517 let current_content = std::fs::read_to_string(path)
518 .map_err(|e| crate::Error::WalError(format!("re-read WAL during cleanup: {}", e)))?;
519 for line in current_content.lines() {
520 if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
521 if event.seq > last_seq {
522 new_wal.append(event)?;
523 }
524 }
525 }
526 }
527 Self::unlock_file(&lock_file);
529 std::fs::rename(&temp_path, path).map_err(|e| {
531 crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
532 })?;
533 } else {
534 Self::unlock_file(&lock_file);
535 }
536
537 Ok(removed)
538 }
539}
540
541pub fn truncate_to(s: &str, max_bytes: usize) -> String {
546 if s.len() <= max_bytes {
547 return s.to_string();
548 }
549 let mut end = max_bytes;
550 while end > 0 && !s.is_char_boundary(end) {
551 end -= 1;
552 }
553 let mut truncated = s[..end].to_string();
554 truncated.push_str("...[truncated]");
555 truncated
556}
557
558#[cfg(test)]
559mod tests {
560 use super::*;
561
562 fn tmp_wal(name: &str) -> std::path::PathBuf {
563 std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
564 }
565
566 #[test]
567 fn test_wal_write_and_read() {
568 let path = tmp_wal("write_read");
569 let _ = std::fs::remove_file(&path);
570
571 let mut wal = WalWriter::create(&path).unwrap();
572 wal.append(WalEvent {
573 seq: 0,
574 ts: 1715800000,
575 event_type: WalEventType::JobStarted,
576 job_id: "test-job".into(),
577 capability: Some("FileRead".into()),
578 output: None,
579 error: None,
580 telemetry_before: None,
581 telemetry_after: None,
582 process_before: None,
583 process_after: None,
584 cmd: None,
585 cmd_stdout: None,
586 cmd_stderr: None,
587 cmd_exit_code: None,
588 cmd_corrected: None,
589 })
590 .unwrap();
591
592 let reader = WalReader::load(&path).unwrap();
593 assert_eq!(reader.events().len(), 1);
594 assert_eq!(reader.events()[0].job_id, "test-job");
595
596 let _ = std::fs::remove_file(&path);
597 }
598
599 #[test]
600 fn test_wal_seq_recovery() {
601 let path = tmp_wal("seq_recovery");
602 let _ = std::fs::remove_file(&path);
603
604 let mut wal = WalWriter::create(&path).unwrap();
605 assert_eq!(wal.seq(), 0);
606 wal.append(WalEvent {
607 seq: 0,
608 ts: 1715800000,
609 event_type: WalEventType::JobStarted,
610 job_id: "job1".into(),
611 capability: None,
612 output: None,
613 error: None,
614 telemetry_before: None,
615 telemetry_after: None,
616 process_before: None,
617 process_after: None,
618 cmd: None,
619 cmd_stdout: None,
620 cmd_stderr: None,
621 cmd_exit_code: None,
622 cmd_corrected: None,
623 })
624 .unwrap();
625 assert_eq!(wal.seq(), 1);
626
627 let wal2 = WalWriter::create(&path).unwrap();
629 assert_eq!(wal2.seq(), 1);
630
631 let _ = std::fs::remove_file(&path);
632 }
633
634 #[test]
635 fn test_wal_rotation() {
636 let path = tmp_wal("rotation");
637 let _ = std::fs::remove_file(&path);
638
639 let mut wal = WalWriter::create(&path).unwrap();
641 for i in 0..100 {
642 wal.append(WalEvent {
643 seq: i,
644 ts: 1715800000 + i,
645 event_type: WalEventType::JobStarted,
646 job_id: format!("job-{}", i),
647 capability: None,
648 output: None,
649 error: None,
650 telemetry_before: None,
651 telemetry_after: None,
652 process_before: None,
653 process_after: None,
654 cmd: None,
655 cmd_stdout: None,
656 cmd_stderr: None,
657 cmd_exit_code: None,
658 cmd_corrected: None,
659 })
660 .unwrap();
661 }
662
663 let size = std::fs::metadata(&path).unwrap().len();
664 WalWriter::rotate(&path, size - 1, 3).unwrap();
666
667 assert!(WalWriter::rotation_path(&path, 1).exists());
668 assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
670
671 let _ = std::fs::remove_file(&path);
672 let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
673 }
674
675 #[test]
676 fn test_wal_cleanup() {
677 let path = tmp_wal("cleanup");
678 let _ = std::fs::remove_file(&path);
679
680 let mut wal = WalWriter::create(&path).unwrap();
681 let now = std::time::SystemTime::now()
682 .duration_since(std::time::UNIX_EPOCH)
683 .unwrap()
684 .as_secs();
685
686 wal.append(WalEvent {
688 seq: 0,
689 ts: now - 1000,
690 event_type: WalEventType::JobStarted,
691 job_id: "old-job".into(),
692 capability: None,
693 output: None,
694 error: None,
695 telemetry_before: None,
696 telemetry_after: None,
697 process_before: None,
698 process_after: None,
699 cmd: None,
700 cmd_stdout: None,
701 cmd_stderr: None,
702 cmd_exit_code: None,
703 cmd_corrected: None,
704 })
705 .unwrap();
706
707 wal.append(WalEvent {
709 seq: 1,
710 ts: now,
711 event_type: WalEventType::JobCompleted,
712 job_id: "new-job".into(),
713 capability: None,
714 output: None,
715 error: None,
716 telemetry_before: None,
717 telemetry_after: None,
718 process_before: None,
719 process_after: None,
720 cmd: None,
721 cmd_stdout: None,
722 cmd_stderr: None,
723 cmd_exit_code: None,
724 cmd_corrected: None,
725 })
726 .unwrap();
727
728 let removed = WalWriter::cleanup(&path, 500).unwrap();
729 assert_eq!(removed, 1); let reader = WalReader::load(&path).unwrap();
732 assert_eq!(reader.events().len(), 1);
733 assert_eq!(reader.events()[0].job_id, "new-job");
734
735 let _ = std::fs::remove_file(&path);
736 }
737
738 #[test]
739 fn test_wal_skip_serializing_optional_fields() {
740 let event = WalEvent {
742 seq: 0,
743 ts: 1715800000,
744 event_type: WalEventType::JobStarted,
745 job_id: "test".into(),
746 capability: None,
747 output: None,
748 error: None,
749 telemetry_before: None,
750 telemetry_after: None,
751 process_before: None,
752 process_after: None,
753 cmd: None,
754 cmd_stdout: None,
755 cmd_stderr: None,
756 cmd_exit_code: None,
757 cmd_corrected: None,
758 };
759
760 let json = serde_json::to_string(&event).unwrap();
761 assert!(!json.contains("capability"));
762 assert!(!json.contains("telemetry_before"));
763 assert!(!json.contains("process_before"));
764 assert!(!json.contains("cmd"));
765 }
766
767 #[test]
768 fn test_command_executed_event() {
769 let path = tmp_wal("cmd_exec");
770 let _ = std::fs::remove_file(&path);
771
772 let mut wal = WalWriter::create(&path).unwrap();
773 wal.append(WalEvent {
774 seq: 0,
775 ts: 1715800000,
776 event_type: WalEventType::CommandExecuted,
777 job_id: "job-cmd".into(),
778 capability: None,
779 output: None,
780 error: None,
781 telemetry_before: None,
782 telemetry_after: None,
783 process_before: None,
784 process_after: None,
785 cmd: Some("ls | hed -3".into()),
786 cmd_stdout: None,
787 cmd_stderr: Some("hed: command not found".into()),
788 cmd_exit_code: Some(127),
789 cmd_corrected: None,
790 })
791 .unwrap();
792
793 let reader = WalReader::load(&path).unwrap();
794 assert_eq!(reader.events().len(), 1);
795 assert_eq!(reader.events()[0].event_type, WalEventType::CommandExecuted);
796 assert_eq!(reader.events()[0].cmd.as_deref(), Some("ls | hed -3"));
797 assert_eq!(reader.events()[0].cmd_exit_code, Some(127));
798
799 let _ = std::fs::remove_file(&path);
800 }
801
802 #[test]
803 fn test_truncate_to() {
804 assert_eq!(truncate_to("hello", 1024), "hello");
805 assert_eq!(truncate_to("hello", 3), "hel...[truncated]");
806 let long = "a".repeat(2000);
807 let truncated = truncate_to(&long, 1024);
808 assert!(truncated.len() < 1100);
809 assert!(truncated.ends_with("...[truncated]"));
810 }
811}