use crate::Result;
use serde::{Deserialize, Serialize};
use std::path::Path;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalEvent {
pub seq: u64,
pub ts: u64,
#[serde(rename = "type")]
pub event_type: WalEventType,
pub job_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub capability: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub output: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WalEventType {
JobSubmitted,
JobValidated,
JobStarted,
JobCompleted,
JobFailed,
JobRolledBack,
}
pub struct WalWriter {
file: std::fs::File,
seq: u64,
}
impl WalWriter {
pub fn create(path: &Path) -> Result<Self> {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|e| crate::Error::WalError(e.to_string()))?;
let seq = if path.exists() {
if let Ok(content) = std::fs::read_to_string(path) {
content
.lines()
.filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
.map(|e| e.seq)
.max()
.map(|max| max + 1)
.unwrap_or(0)
} else {
0
}
} else {
0
};
Ok(Self { file, seq })
}
pub fn append(&mut self, event: WalEvent) -> Result<()> {
use std::io::Write;
let line =
serde_json::to_string(&event).map_err(|e| crate::Error::WalError(e.to_string()))?;
writeln!(self.file, "{}", line).map_err(|e| crate::Error::WalError(e.to_string()))?;
self.file
.sync_all()
.map_err(|e| crate::Error::WalError(e.to_string()))?;
self.seq += 1;
Ok(())
}
pub fn seq(&self) -> u64 {
self.seq
}
}
pub struct WalReader {
events: Vec<WalEvent>,
}
impl WalReader {
pub fn load(path: &Path) -> Result<Self> {
let content =
std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
let events: Vec<WalEvent> = content
.lines()
.filter_map(|line| serde_json::from_str(line).ok())
.collect();
Ok(Self { events })
}
pub fn events(&self) -> &[WalEvent] {
&self.events
}
pub fn tail(path: &Path, n: usize) -> Result<Self> {
use std::collections::VecDeque;
use std::io::{BufRead, BufReader};
let file = std::fs::File::open(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
let reader = BufReader::new(file);
let mut window: VecDeque<WalEvent> = VecDeque::with_capacity(n + 1);
for line in reader.lines() {
let line = line.map_err(|e| crate::Error::WalError(e.to_string()))?;
if let Ok(event) = serde_json::from_str(&line) {
window.push_back(event);
if window.len() > n {
window.pop_front();
}
}
}
Ok(Self {
events: window.into(),
})
}
}
impl WalWriter {
pub fn cleanup(path: &Path, max_age_secs: u64) -> Result<usize> {
use std::time::{SystemTime, UNIX_EPOCH};
let cutoff = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
.saturating_sub(max_age_secs);
let content =
std::fs::read_to_string(path).map_err(|e| crate::Error::WalError(e.to_string()))?;
let events: Vec<WalEvent> = content
.lines()
.filter_map(|line| serde_json::from_str(line).ok())
.collect();
let retained: Vec<_> = events.into_iter().filter(|e| e.ts >= cutoff).collect();
let total = content
.lines()
.filter_map(|line| serde_json::from_str::<WalEvent>(line).ok())
.count();
let removed = total - retained.len();
if removed > 0 {
std::fs::write(path, "").map_err(|e| {
crate::Error::WalError(format!("truncate WAL before cleanup: {}", e))
})?;
let mut new_wal = WalWriter::create(path)?;
for event in retained {
new_wal.append(event)?;
}
}
Ok(removed)
}
}