use std::fs;
use std::path::{Path, PathBuf};
use crate::result::PausedWorkflow;
#[derive(Debug, Clone)]
pub struct CheckpointStore {
dir: PathBuf,
}
impl CheckpointStore {
pub fn open(dir: impl AsRef<Path>) -> std::io::Result<Self> {
let dir = dir.as_ref().to_path_buf();
fs::create_dir_all(&dir)?;
Ok(Self { dir })
}
fn path_for(&self, run_id: &str) -> PathBuf {
self.dir.join(format!("{run_id}.json"))
}
fn inflight_path_for(&self, run_id: &str) -> PathBuf {
self.dir.join(format!("{run_id}.inflight"))
}
fn read_paused(path: &Path) -> std::io::Result<Option<PausedWorkflow>> {
match fs::read_to_string(path) {
Ok(json) => {
let paused = serde_json::from_str(&json)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
Ok(Some(paused))
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e),
}
}
pub fn save(&self, paused: &PausedWorkflow) -> std::io::Result<()> {
let json = serde_json::to_string_pretty(paused)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let final_path = self.path_for(&paused.run_id);
let tmp_path = self.dir.join(format!(
".{}.{}.tmp",
paused.run_id,
uuid::Uuid::new_v4().simple()
));
fs::write(&tmp_path, json)?;
fs::rename(&tmp_path, &final_path)?;
Ok(())
}
pub fn load(&self, run_id: &str) -> std::io::Result<Option<PausedWorkflow>> {
Self::read_paused(&self.path_for(run_id))
}
pub fn claim(&self, run_id: &str) -> std::io::Result<Option<PausedWorkflow>> {
let src = self.path_for(run_id);
let dst = self.inflight_path_for(run_id);
if src.exists() {
remove_if_present(&dst)?;
}
match fs::rename(&src, &dst) {
Ok(()) => Self::read_paused(&dst),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e),
}
}
pub fn recover_orphaned(&self) -> std::io::Result<usize> {
let mut rearmed = 0;
for entry in fs::read_dir(&self.dir)? {
let path = entry?.path();
if path.extension().and_then(|e| e.to_str()) != Some("inflight") {
continue;
}
let Some(run_id) = path.file_stem().and_then(|s| s.to_str()) else {
continue;
};
let json_path = self.path_for(run_id);
if json_path.exists() {
remove_if_present(&path)?;
} else {
fs::rename(&path, &json_path)?;
rearmed += 1;
}
}
Ok(rearmed)
}
pub fn complete(&self, run_id: &str) -> std::io::Result<()> {
remove_if_present(&self.inflight_path_for(run_id))
}
pub fn remove(&self, run_id: &str) -> std::io::Result<()> {
remove_if_present(&self.path_for(run_id))?;
remove_if_present(&self.inflight_path_for(run_id))
}
pub fn list(&self) -> std::io::Result<Vec<String>> {
let mut ids = Vec::new();
for entry in fs::read_dir(&self.dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("json") {
if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
ids.push(stem.to_string());
}
}
}
ids.sort();
Ok(ids)
}
}
fn remove_if_present(path: &Path) -> std::io::Result<()> {
match fs::remove_file(path) {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(e),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::Workflow;
use std::collections::HashMap;
fn sample(run_id: &str) -> PausedWorkflow {
PausedWorkflow {
run_id: run_id.into(),
workflow: Workflow {
id: "wf".into(),
name: "WF".into(),
start: "gate".into(),
goal: None,
stages: vec![],
edges: vec![],
max_iterations: 100,
metadata: HashMap::new(),
},
paused_stage_id: "gate".into(),
prompt: "approve?".into(),
fields: vec![],
output_key: "approval".into(),
wf_state: HashMap::new(),
stage_results: vec![],
completed_stage_ids: vec![],
iterations: 1,
prior_duration_ms: 0.0,
created_at: chrono::Utc::now(),
}
}
#[test]
fn save_load_remove_roundtrip() {
let dir = std::env::temp_dir().join(format!("car-ckpt-{}", new_id()));
let store = CheckpointStore::open(&dir).unwrap();
assert!(store.load("missing").unwrap().is_none());
let p = sample("run-1");
store.save(&p).unwrap();
let loaded = store.load("run-1").unwrap().unwrap();
assert_eq!(loaded.run_id, "run-1");
assert_eq!(loaded.output_key, "approval");
assert_eq!(loaded.prompt, "approve?");
assert_eq!(store.list().unwrap(), vec!["run-1".to_string()]);
store.remove("run-1").unwrap();
assert!(store.load("run-1").unwrap().is_none());
store.remove("run-1").unwrap();
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn claim_is_exactly_once() {
let dir = std::env::temp_dir().join(format!("car-ckpt-claim-{}", new_id()));
let store = CheckpointStore::open(&dir).unwrap();
store.save(&sample("run-x")).unwrap();
let first = store.claim("run-x").unwrap();
assert!(first.is_some());
assert!(store.list().unwrap().is_empty());
let second = store.claim("run-x").unwrap();
assert!(second.is_none());
store.complete("run-x").unwrap();
store.complete("run-x").unwrap();
assert!(store.claim("run-x").unwrap().is_none());
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn recover_rearms_orphaned_inflight() {
let dir = std::env::temp_dir().join(format!("car-ckpt-recover-{}", new_id()));
let store = CheckpointStore::open(&dir).unwrap();
store.save(&sample("crashed")).unwrap();
let _ = store.claim("crashed").unwrap();
assert!(store.claim("crashed").unwrap().is_none(), "claimed, no .json");
assert_eq!(store.recover_orphaned().unwrap(), 1);
assert!(store.claim("crashed").unwrap().is_some(), "re-armed, claimable");
store.complete("crashed").unwrap();
store.save(&sample("both")).unwrap();
std::fs::write(dir.join("both.inflight"), "stale").unwrap();
assert_eq!(store.recover_orphaned().unwrap(), 0, "json present → not re-armed");
assert!(!dir.join("both.inflight").exists(), "stale marker removed");
assert!(store.claim("both").unwrap().is_some());
let _ = fs::remove_dir_all(&dir);
}
fn new_id() -> String {
uuid::Uuid::new_v4().simple().to_string()[..8].to_string()
}
}