1use crate::processes::ProcessSummary;
27use crate::telemetry::Telemetry;
28use crate::Result;
29use serde::{Deserialize, Serialize};
30use std::path::Path;
31
32
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct WalEvent {
41 pub seq: u64,
43 pub ts: u64,
45 #[serde(rename = "type")]
47 pub event_type: WalEventType,
48 pub job_id: String,
50 #[serde(skip_serializing_if = "Option::is_none")]
52 pub capability: Option<String>,
53 #[serde(skip_serializing_if = "Option::is_none")]
55 pub output: Option<serde_json::Value>,
56 #[serde(skip_serializing_if = "Option::is_none")]
58 pub error: Option<String>,
59 #[serde(skip_serializing_if = "Option::is_none")]
61 pub telemetry_before: Option<Telemetry>,
62 #[serde(skip_serializing_if = "Option::is_none")]
64 pub telemetry_after: Option<Telemetry>,
65 #[serde(skip_serializing_if = "Option::is_none")]
67 pub process_before: Option<ProcessSummary>,
68 #[serde(skip_serializing_if = "Option::is_none")]
70 pub process_after: Option<ProcessSummary>,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub enum WalEventType {
77 JobSubmitted,
79 JobValidated,
81 JobStarted,
83 JobCompleted,
85 JobFailed,
87 JobRolledBack,
89}
90
91pub struct WalWriter {
112 path: std::path::PathBuf,
113 seq: u64,
114}
115
116impl WalWriter {
117 pub fn create(path: &Path) -> Result<Self> {
126 if let Some(parent) = path.parent() {
128 if !parent.exists() {
129 std::fs::create_dir_all(parent).map_err(|e| {
130 crate::Error::WalError(format!(
131 "Failed to create WAL directory {}: {}",
132 parent.display(),
133 e
134 ))
135 })?;
136 }
137 }
138
139 if !path.exists() {
141 std::fs::File::create(path).map_err(|e| {
142 crate::Error::WalError(format!(
143 "Failed to create WAL file {}: {}",
144 path.display(),
145 e
146 ))
147 })?;
148 }
149
150 let seq = if path.exists() {
154 let lock_file = std::fs::File::open(path)
155 .map_err(|e| crate::Error::WalError(format!("open WAL for seq recovery: {}", e)))?;
156 Self::lock_file(&lock_file)?;
157 let content = std::fs::read_to_string(path)
158 .map_err(|e| crate::Error::WalError(format!("read WAL for seq recovery: {}", e)))?;
159 let recovered = content
160 .lines()
161 .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
162 .map(|e| e.seq)
163 .max()
164 .map(|max| max + 1)
165 .unwrap_or(0);
166 Self::unlock_file(&lock_file);
167 recovered
168 } else {
169 0
170 };
171
172 Ok(Self {
173 path: path.to_path_buf(),
174 seq,
175 })
176 }
177
178 #[cfg(unix)]
180 fn lock_file(file: &std::fs::File) -> Result<()> {
181 use std::os::unix::io::AsRawFd;
182 let fd = file.as_raw_fd();
183 let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
184 if result != 0 {
185 return Err(crate::Error::WalError(format!(
186 "Failed to acquire WAL lock: {}",
187 std::io::Error::last_os_error()
188 )));
189 }
190 Ok(())
191 }
192
193 #[cfg(not(unix))]
195 fn lock_file(_file: &std::fs::File) -> Result<()> {
196 Ok(())
197 }
198
199 #[cfg(unix)]
201 fn unlock_file(file: &std::fs::File) {
202 use std::os::unix::io::AsRawFd;
203 let fd = file.as_raw_fd();
204 unsafe { libc::flock(fd, libc::LOCK_UN) };
205 }
206
207 #[cfg(not(unix))]
209 fn unlock_file(_file: &std::fs::File) {}
210
211 pub fn append(&mut self, event: WalEvent) -> Result<()> {
225 use std::io::Write;
226 let line =
227 serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
228
229 let file = std::fs::OpenOptions::new()
231 .create(true)
232 .append(true)
233 .open(&self.path)
234 .map_err(|e| crate::Error::WalError(format!("open WAL for append: {}", e)))?;
235
236 Self::lock_file(&file)?;
238 {
239 let mut buf = std::io::BufWriter::new(&file);
240 writeln!(buf, "{}", line)
241 .map_err(|e| crate::Error::WalError(format!("write WAL line: {}", e)))?;
242 buf.flush()
243 .map_err(|e| crate::Error::WalError(format!("flush WAL: {}", e)))?;
244 file.sync_all()
245 .map_err(|e| crate::Error::WalError(format!("fsync WAL: {}", e)))?;
246 }
247 Self::unlock_file(&file);
248
249 self.seq += 1;
250 Ok(())
251 }
252
253 pub fn seq(&self) -> u64 {
255 self.seq
256 }
257}
258
259pub struct WalReader {
276 events: Vec<WalEvent>,
277}
278
279impl WalReader {
280 pub fn load(path: &Path) -> Result<Self> {
287 let content =
288 std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
289
290 let events: Vec<WalEvent> = content
291 .lines()
292 .filter_map(|line| serde_json::from_str(line).ok())
293 .collect();
294
295 Ok(Self { events })
296 }
297
298 pub fn events(&self) -> &[WalEvent] {
300 &self.events
301 }
302
303 pub fn tail(path: &Path, n: usize) -> Result<Self> {
313 use std::collections::VecDeque;
314 use std::io::{BufRead, BufReader};
315 let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
316 let reader = BufReader::new(file);
317
318 let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
319 for line in reader.lines() {
320 let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
321 if let Ok(event) = serde_json::from_str(&line) {
322 window.push_back(event);
323 if window.len() > n {
324 window.pop_front();
325 }
326 }
327 }
328
329 Ok(Self {
330 events: window.into(),
331 })
332 }
333}
334
335impl WalWriter {
337 fn rotation_path(path: &Path, index: usize) -> std::path::PathBuf {
342 let mut s = path.to_string_lossy().into_owned();
343 s.push('.');
344 s.push_str(&index.to_string());
345 std::path::PathBuf::from(s)
346 }
347
348 pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
354 let metadata = match std::fs::metadata(path) {
355 Ok(m) => m,
356 Err(_) => return Ok(()), };
358
359 if metadata.len() < max_size_bytes {
360 return Ok(());
361 }
362
363 for i in (1..max_rotations).rev() {
365 let old = Self::rotation_path(path, i);
366 let new = Self::rotation_path(path, i + 1);
367 if old.exists() {
368 let _ = std::fs::rename(&old, &new);
369 }
370 }
371
372 let rotated = Self::rotation_path(path, 1);
374 std::fs::rename(path, &rotated)
375 .map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
376
377 std::fs::write(path, "")
379 .map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
380
381 let oldest = Self::rotation_path(path, max_rotations + 1);
383 if oldest.exists() {
384 let _ = std::fs::remove_file(&oldest);
385 }
386
387 Ok(())
388 }
389
390 pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
404 use std::time::{SystemTime, UNIX_EPOCH};
405
406 let cutoff = SystemTime::now()
407 .duration_since(UNIX_EPOCH)
408 .map(|d| d.as_secs())
409 .unwrap_or(0)
410 .saturating_sub(max_age_secs);
411
412 let lock_file = std::fs::File::open(path)
414 .map_err(|e| crate::Error::WalError(format!("open WAL for cleanup: {}", e)))?;
415 Self::lock_file(&lock_file)?;
416 let content = std::fs::read_to_string(path)
417 .map_err(|e| crate::Error::WalError(format!("read WAL for cleanup: {}", e)))?;
418
419 let events: Vec<WalEvent> = content
420 .lines()
421 .filter_map(|line| serde_json::from_str(line).ok())
422 .collect();
423
424 let retained: Vec<_> = events.into_iter().filter(|e| e.ts >= cutoff).collect();
426
427 let total = content
428 .lines()
429 .filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
430 .count();
431 let removed = total - retained.len();
432
433 if removed > 0 {
434 let temp_path = path.with_extension("wal.tmp");
437 {
438 let mut new_wal = WalWriter::create(&temp_path)?;
439 for event in &retained {
440 new_wal.append(event.clone())?;
441 }
442
443 let last_seq = retained.last().map(|e| e.seq).unwrap_or(0);
446 let current_content = std::fs::read_to_string(path)
447 .map_err(|e| crate::Error::WalError(format!("re-read WAL during cleanup: {}", e)))?;
448 for line in current_content.lines() {
449 if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
450 if event.seq > last_seq {
451 new_wal.append(event)?;
452 }
453 }
454 }
455 }
456 Self::unlock_file(&lock_file);
458 std::fs::rename(&temp_path, path).map_err(|e| {
460 crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
461 })?;
462 } else {
463 Self::unlock_file(&lock_file);
464 }
465
466 Ok(removed)
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473
474 fn tmp_wal(name: &str) -> std::path::PathBuf {
475 std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
476 }
477
478 #[test]
479 fn test_wal_write_and_read() {
480 let path = tmp_wal("write_read");
481 let _ = std::fs::remove_file(&path);
482
483 let mut wal = WalWriter::create(&path).unwrap();
484 wal.append(WalEvent {
485 seq: 0,
486 ts: 1715800000,
487 event_type: WalEventType::JobStarted,
488 job_id: "test-job".into(),
489 capability: Some("FileRead".into()),
490 output: None,
491 error: None,
492 telemetry_before: None,
493 telemetry_after: None,
494 process_before: None,
495 process_after: None,
496 })
497 .unwrap();
498
499 let reader = WalReader::load(&path).unwrap();
500 assert_eq!(reader.events().len(), 1);
501 assert_eq!(reader.events()[0].job_id, "test-job");
502
503 let _ = std::fs::remove_file(&path);
504 }
505
506 #[test]
507 fn test_wal_seq_recovery() {
508 let path = tmp_wal("seq_recovery");
509 let _ = std::fs::remove_file(&path);
510
511 let mut wal = WalWriter::create(&path).unwrap();
512 assert_eq!(wal.seq(), 0);
513 wal.append(WalEvent {
514 seq: 0,
515 ts: 1715800000,
516 event_type: WalEventType::JobStarted,
517 job_id: "job1".into(),
518 capability: None,
519 output: None,
520 error: None,
521 telemetry_before: None,
522 telemetry_after: None,
523 process_before: None,
524 process_after: None,
525 })
526 .unwrap();
527 assert_eq!(wal.seq(), 1);
528
529 let wal2 = WalWriter::create(&path).unwrap();
531 assert_eq!(wal2.seq(), 1);
532
533 let _ = std::fs::remove_file(&path);
534 }
535
536 #[test]
537 fn test_wal_rotation() {
538 let path = tmp_wal("rotation");
539 let _ = std::fs::remove_file(&path);
540
541 let mut wal = WalWriter::create(&path).unwrap();
543 for i in 0..100 {
544 wal.append(WalEvent {
545 seq: i,
546 ts: 1715800000 + i,
547 event_type: WalEventType::JobStarted,
548 job_id: format!("job-{}", i),
549 capability: None,
550 output: None,
551 error: None,
552 telemetry_before: None,
553 telemetry_after: None,
554 process_before: None,
555 process_after: None,
556 })
557 .unwrap();
558 }
559
560 let size = std::fs::metadata(&path).unwrap().len();
561 WalWriter::rotate(&path, size - 1, 3).unwrap();
563
564 assert!(WalWriter::rotation_path(&path, 1).exists());
565 assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
567
568 let _ = std::fs::remove_file(&path);
569 let _ = std::fs::remove_file(WalWriter::rotation_path(&path, 1));
570 }
571
572 #[test]
573 fn test_wal_cleanup() {
574 let path = tmp_wal("cleanup");
575 let _ = std::fs::remove_file(&path);
576
577 let mut wal = WalWriter::create(&path).unwrap();
578 let now = std::time::SystemTime::now()
579 .duration_since(std::time::UNIX_EPOCH)
580 .unwrap()
581 .as_secs();
582
583 wal.append(WalEvent {
585 seq: 0,
586 ts: now - 1000,
587 event_type: WalEventType::JobStarted,
588 job_id: "old-job".into(),
589 capability: None,
590 output: None,
591 error: None,
592 telemetry_before: None,
593 telemetry_after: None,
594 process_before: None,
595 process_after: None,
596 })
597 .unwrap();
598
599 wal.append(WalEvent {
601 seq: 1,
602 ts: now,
603 event_type: WalEventType::JobCompleted,
604 job_id: "new-job".into(),
605 capability: None,
606 output: None,
607 error: None,
608 telemetry_before: None,
609 telemetry_after: None,
610 process_before: None,
611 process_after: None,
612 })
613 .unwrap();
614
615 let removed = WalWriter::cleanup(&path, 500).unwrap();
616 assert_eq!(removed, 1); let reader = WalReader::load(&path).unwrap();
619 assert_eq!(reader.events().len(), 1);
620 assert_eq!(reader.events()[0].job_id, "new-job");
621
622 let _ = std::fs::remove_file(&path);
623 }
624
625 #[test]
626 fn test_wal_skip_serializing_optional_fields() {
627 let event = WalEvent {
629 seq: 0,
630 ts: 1715800000,
631 event_type: WalEventType::JobStarted,
632 job_id: "test".into(),
633 capability: None,
634 output: None,
635 error: None,
636 telemetry_before: None,
637 telemetry_after: None,
638 process_before: None,
639 process_after: None,
640 };
641
642 let json = serde_json::to_string(&event).unwrap();
643 assert!(!json.contains("capability"));
644 assert!(!json.contains("telemetry_before"));
645 assert!(!json.contains("process_before"));
646 }
647}