use crate::processes::ProcessSummary;
use crate::telemetry::Telemetry;
use crate::Result;
use serde::{Deserialize, Serialize};
use std::path::Path;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalEvent {
pub seq: u64,
pub ts: u64,
#[serde(rename = "type")]
pub event_type: WalEventType,
pub job_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub capability: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub output: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub telemetry_before: Option<Telemetry>,
#[serde(skip_serializing_if = "Option::is_none")]
pub telemetry_after: Option<Telemetry>,
#[serde(skip_serializing_if = "Option::is_none")]
pub process_before: Option<ProcessSummary>,
#[serde(skip_serializing_if = "Option::is_none")]
pub process_after: Option<ProcessSummary>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WalEventType {
JobSubmitted,
JobValidated,
JobStarted,
JobCompleted,
JobFailed,
JobRolledBack,
}
pub struct WalWriter {
path: std::path::PathBuf,
seq: u64,
}
impl WalWriter {
pub fn create(path: &Path) -> Result<Self> {
if let Some(parent) = path.parent() {
if !parent.exists() {
std::fs::create_dir_all(parent).map_err(|e| {
crate::Error::WalError(format!(
"Failed to create WAL directory {}: {}",
parent.display(),
e
))
})?;
}
}
if !path.exists() {
std::fs::File::create(path).map_err(|e| {
crate::Error::WalError(format!(
"Failed to create WAL file {}: {}",
path.display(),
e
))
})?;
}
let seq = if path.exists() {
if let Ok(content) = std::fs::read_to_string(path) {
content
.lines()
.filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
.map(|e| e.seq)
.max()
.map(|max| max + 1)
.unwrap_or(0)
} else {
0
}
} else {
0
};
Ok(Self {
path: path.to_path_buf(),
seq,
})
}
#[cfg(unix)]
fn lock_file(file: &std::fs::File) -> Result<()> {
use std::os::unix::io::AsRawFd;
let fd = file.as_raw_fd();
let result = unsafe { libc::flock(fd, libc::LOCK_EX) };
if result != 0 {
return Err(crate::Error::WalError(format!(
"Failed to acquire WAL lock: {}",
std::io::Error::last_os_error()
)));
}
Ok(())
}
#[cfg(not(unix))]
fn lock_file(_file: &std::fs::File) -> Result<()> {
Ok(())
}
#[cfg(unix)]
fn unlock_file(file: &std::fs::File) {
use std::os::unix::io::AsRawFd;
let fd = file.as_raw_fd();
unsafe { libc::flock(fd, libc::LOCK_UN) };
}
#[cfg(not(unix))]
fn unlock_file(_file: &std::fs::File) {}
pub fn append(&mut self, event: WalEvent) -> Result<()> {
use std::io::Write;
let line =
serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
let temp_path = self.path.with_extension("wal.tmp");
let existing_content = if self.path.exists() {
let lock_file = std::fs::File::open(&self.path)
.map_err(|e| crate::Error::WalError(e.to_string()))?;
Self::lock_file(&lock_file)?;
let content = std::fs::read_to_string(&self.path)
.map_err(|e| crate::Error::WalError(e.to_string()))?;
Self::unlock_file(&lock_file);
content
} else {
String::new()
};
{
let mut temp_file = std::fs::File::create(&temp_path)
.map_err(|e| crate::Error::WalError(e.to_string()))?;
writeln!(temp_file, "{}{}", existing_content, line)
.map_err(|e| crate::Error::WalError(e.to_string()))?;
temp_file
.sync_all()
.map_err(|e| crate::Error::WalError(e.to_string()))?;
}
std::fs::rename(&temp_path, &self.path)
.map_err(|e| crate::Error::WalError(format!("atomic rename: {}", e)))?;
if let Ok(dir) = std::fs::File::open(self.path.parent().unwrap_or(Path::new("."))) {
let _ = dir.sync_all();
}
self.seq += 1;
Ok(())
}
pub fn seq(&self) -> u64 {
self.seq
}
}
pub struct WalReader {
events: Vec<WalEvent>,
}
impl WalReader {
pub fn load(path: &Path) -> Result<Self> {
let content =
std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
let events: Vec<WalEvent> = content
.lines()
.filter_map(|line| serde_json::from_str(line).ok())
.collect();
Ok(Self { events })
}
pub fn events(&self) -> &[WalEvent] {
&self.events
}
pub fn tail(path: &Path, n: usize) -> Result<Self> {
use std::collections::VecDeque;
use std::io::{BufRead, BufReader};
let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
let reader = BufReader::new(file);
let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
for line in reader.lines() {
let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
if let Ok(event) = serde_json::from_str(&line) {
window.push_back(event);
if window.len() > n {
window.pop_front();
}
}
}
Ok(Self {
events: window.into(),
})
}
}
impl WalWriter {
pub fn rotate(path: &Path, max_size_bytes: u64, max_rotations: usize) -> Result<()> {
let metadata = match std::fs::metadata(path) {
Ok(m) => m,
Err(_) => return Ok(()), };
if metadata.len() < max_size_bytes {
return Ok(());
}
for i in (1..max_rotations).rev() {
let old = path.with_extension(format!("wal.{}", i));
let new = path.with_extension(format!("wal.{}", i + 1));
if old.exists() {
let _ = std::fs::rename(&old, &new);
}
}
let rotated = path.with_extension("wal.1");
std::fs::rename(path, &rotated)
.map_err(|e| crate::Error::WalError(format!("WAL rotation rename: {}", e)))?;
std::fs::write(path, "")
.map_err(|e| crate::Error::WalError(format!("WAL rotation create: {}", e)))?;
let oldest = path.with_extension(format!("wal.{}", max_rotations + 1));
if oldest.exists() {
let _ = std::fs::remove_file(&oldest);
}
Ok(())
}
pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
use std::time::{SystemTime, UNIX_EPOCH};
let cutoff = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
.saturating_sub(max_age_secs);
let content =
std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
let events: Vec<WalEvent> = content
.lines()
.filter_map(|line| serde_json::from_str(line).ok())
.collect();
let retained: Vec<_> = events.into_iter().filter(|e| e.ts >= cutoff).collect();
let total = content
.lines()
.filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
.count();
let removed = total - retained.len();
if removed > 0 {
let temp_path = path.with_extension("wal.tmp");
{
let mut new_wal = WalWriter::create(&temp_path)?;
for event in &retained {
new_wal.append(event.clone())?;
}
let last_seq = retained.last().map(|e| e.seq).unwrap_or(0);
let current_content = std::fs::read_to_string(path)
.map_err(|e| crate::Error::WalError(format!("re-read WAL during cleanup: {}", e)))?;
for line in current_content.lines() {
if let Ok(event) = serde_json::from_str::<WalEvent>(line) {
if event.seq > last_seq {
new_wal.append(event)?;
}
}
}
}
std::fs::rename(&temp_path, path).map_err(|e| {
crate::Error::WalError(format!("atomic rename during cleanup: {}", e))
})?;
}
Ok(removed)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn tmp_wal(name: &str) -> std::path::PathBuf {
std::env::temp_dir().join(format!("runtimo_test_wal_{}.jsonl", name))
}
#[test]
fn test_wal_write_and_read() {
let path = tmp_wal("write_read");
let _ = std::fs::remove_file(&path);
let mut wal = WalWriter::create(&path).unwrap();
wal.append(WalEvent {
seq: 0,
ts: 1715800000,
event_type: WalEventType::JobStarted,
job_id: "test-job".into(),
capability: Some("FileRead".into()),
output: None,
error: None,
telemetry_before: None,
telemetry_after: None,
process_before: None,
process_after: None,
})
.unwrap();
let reader = WalReader::load(&path).unwrap();
assert_eq!(reader.events().len(), 1);
assert_eq!(reader.events()[0].job_id, "test-job");
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_wal_seq_recovery() {
let path = tmp_wal("seq_recovery");
let _ = std::fs::remove_file(&path);
let mut wal = WalWriter::create(&path).unwrap();
assert_eq!(wal.seq(), 0);
wal.append(WalEvent {
seq: 0,
ts: 1715800000,
event_type: WalEventType::JobStarted,
job_id: "job1".into(),
capability: None,
output: None,
error: None,
telemetry_before: None,
telemetry_after: None,
process_before: None,
process_after: None,
})
.unwrap();
assert_eq!(wal.seq(), 1);
let wal2 = WalWriter::create(&path).unwrap();
assert_eq!(wal2.seq(), 1);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_wal_rotation() {
let path = tmp_wal("rotation");
let _ = std::fs::remove_file(&path);
let mut wal = WalWriter::create(&path).unwrap();
for i in 0..100 {
wal.append(WalEvent {
seq: i,
ts: 1715800000 + i,
event_type: WalEventType::JobStarted,
job_id: format!("job-{}", i),
capability: None,
output: None,
error: None,
telemetry_before: None,
telemetry_after: None,
process_before: None,
process_after: None,
})
.unwrap();
}
let size = std::fs::metadata(&path).unwrap().len();
WalWriter::rotate(&path, size - 1, 3).unwrap();
assert!(path.with_extension("wal.1").exists());
assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_file(path.with_extension("wal.1"));
}
#[test]
fn test_wal_cleanup() {
let path = tmp_wal("cleanup");
let _ = std::fs::remove_file(&path);
let mut wal = WalWriter::create(&path).unwrap();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
wal.append(WalEvent {
seq: 0,
ts: now - 1000,
event_type: WalEventType::JobStarted,
job_id: "old-job".into(),
capability: None,
output: None,
error: None,
telemetry_before: None,
telemetry_after: None,
process_before: None,
process_after: None,
})
.unwrap();
wal.append(WalEvent {
seq: 1,
ts: now,
event_type: WalEventType::JobCompleted,
job_id: "new-job".into(),
capability: None,
output: None,
error: None,
telemetry_before: None,
telemetry_after: None,
process_before: None,
process_after: None,
})
.unwrap();
let removed = WalWriter::cleanup(&path, 500).unwrap();
assert_eq!(removed, 1);
let reader = WalReader::load(&path).unwrap();
assert_eq!(reader.events().len(), 1);
assert_eq!(reader.events()[0].job_id, "new-job");
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_wal_skip_serializing_optional_fields() {
let event = WalEvent {
seq: 0,
ts: 1715800000,
event_type: WalEventType::JobStarted,
job_id: "test".into(),
capability: None,
output: None,
error: None,
telemetry_before: None,
telemetry_after: None,
process_before: None,
process_after: None,
};
let json = serde_json::to_string(&event).unwrap();
assert!(!json.contains("capability"));
assert!(!json.contains("telemetry_before"));
assert!(!json.contains("process_before"));
}
}