simple_agents_workflow/
checkpoint.rs1use std::fs;
2use std::path::{Path, PathBuf};
3
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use thiserror::Error;
7
8#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
10pub struct WorkflowCheckpoint {
11 pub run_id: String,
13 pub workflow_name: String,
15 pub step: usize,
17 pub next_node_id: String,
19 pub scope_snapshot: Value,
21}
22
23#[derive(Debug, Error)]
25pub enum CheckpointError {
26 #[error("checkpoint io failure: {0}")]
28 Io(#[from] std::io::Error),
29 #[error("checkpoint serialization failure: {0}")]
31 Serde(#[from] serde_json::Error),
32}
33
34pub trait CheckpointStore: Send + Sync {
36 fn save(&self, checkpoint: &WorkflowCheckpoint) -> Result<(), CheckpointError>;
38 fn load(&self, run_id: &str) -> Result<Option<WorkflowCheckpoint>, CheckpointError>;
40 fn delete(&self, run_id: &str) -> Result<(), CheckpointError>;
42}
43
44#[derive(Debug, Clone)]
46pub struct FilesystemCheckpointStore {
47 root: PathBuf,
48}
49
50impl FilesystemCheckpointStore {
51 pub fn new(root: impl Into<PathBuf>) -> Self {
53 Self { root: root.into() }
54 }
55
56 fn path_for(&self, run_id: &str) -> PathBuf {
57 self.root.join(format!("{}.json", run_id))
58 }
59
60 fn ensure_root(root: &Path) -> Result<(), std::io::Error> {
61 if !root.exists() {
62 fs::create_dir_all(root)?;
63 }
64 Ok(())
65 }
66}
67
68impl CheckpointStore for FilesystemCheckpointStore {
69 fn save(&self, checkpoint: &WorkflowCheckpoint) -> Result<(), CheckpointError> {
70 Self::ensure_root(&self.root)?;
71 let path = self.path_for(&checkpoint.run_id);
72 let bytes = serde_json::to_vec_pretty(checkpoint)?;
73 fs::write(path, bytes)?;
74 Ok(())
75 }
76
77 fn load(&self, run_id: &str) -> Result<Option<WorkflowCheckpoint>, CheckpointError> {
78 let path = self.path_for(run_id);
79 if !path.exists() {
80 return Ok(None);
81 }
82 let bytes = fs::read(path)?;
83 let checkpoint = serde_json::from_slice::<WorkflowCheckpoint>(&bytes)?;
84 Ok(Some(checkpoint))
85 }
86
87 fn delete(&self, run_id: &str) -> Result<(), CheckpointError> {
88 let path = self.path_for(run_id);
89 if path.exists() {
90 fs::remove_file(path)?;
91 }
92 Ok(())
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use std::time::{SystemTime, UNIX_EPOCH};
99
100 use serde_json::json;
101
102 use super::{CheckpointStore, FilesystemCheckpointStore, WorkflowCheckpoint};
103
104 fn temp_dir() -> std::path::PathBuf {
105 let millis = SystemTime::now()
106 .duration_since(UNIX_EPOCH)
107 .expect("time should be monotonic")
108 .as_millis();
109 std::env::temp_dir().join(format!("simple-agents-workflow-checkpoints-{millis}"))
110 }
111
112 #[test]
113 fn round_trips_checkpoint_on_filesystem() {
114 let root = temp_dir();
115 let store = FilesystemCheckpointStore::new(&root);
116 let checkpoint = WorkflowCheckpoint {
117 run_id: "run-1".to_string(),
118 workflow_name: "wf".to_string(),
119 step: 3,
120 next_node_id: "tool".to_string(),
121 scope_snapshot: json!({"input": {"k": "v"}}),
122 };
123
124 store.save(&checkpoint).expect("save should succeed");
125 let loaded = store
126 .load("run-1")
127 .expect("load should succeed")
128 .expect("checkpoint should exist");
129 assert_eq!(loaded, checkpoint);
130
131 store.delete("run-1").expect("delete should succeed");
132 let missing = store.load("run-1").expect("load should succeed");
133 assert!(missing.is_none());
134
135 let _ = std::fs::remove_dir_all(root);
136 }
137}