Skip to main content

simple_agents_workflow/
checkpoint.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use thiserror::Error;
7
8/// Serializable checkpoint used to resume workflow execution.
9#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
10pub struct WorkflowCheckpoint {
11    /// Unique run id for this checkpoint.
12    pub run_id: String,
13    /// Workflow name associated with the run.
14    pub workflow_name: String,
15    /// Step index reached in this run.
16    pub step: usize,
17    /// Node id to execute next when resuming.
18    pub next_node_id: String,
19    /// Scoped input snapshot used to rebuild execution state.
20    pub scope_snapshot: Value,
21}
22
23/// Errors produced by checkpoint persistence.
24#[derive(Debug, Error)]
25pub enum CheckpointError {
26    /// IO operation failed.
27    #[error("checkpoint io failure: {0}")]
28    Io(#[from] std::io::Error),
29    /// Serialization failed.
30    #[error("checkpoint serialization failure: {0}")]
31    Serde(#[from] serde_json::Error),
32}
33
34/// Storage abstraction for workflow checkpoints.
35pub trait CheckpointStore: Send + Sync {
36    /// Persists checkpoint content for the run id.
37    fn save(&self, checkpoint: &WorkflowCheckpoint) -> Result<(), CheckpointError>;
38    /// Loads checkpoint for the provided run id.
39    fn load(&self, run_id: &str) -> Result<Option<WorkflowCheckpoint>, CheckpointError>;
40    /// Deletes checkpoint for the provided run id.
41    fn delete(&self, run_id: &str) -> Result<(), CheckpointError>;
42}
43
44/// Filesystem-backed checkpoint storage.
45#[derive(Debug, Clone)]
46pub struct FilesystemCheckpointStore {
47    root: PathBuf,
48}
49
50impl FilesystemCheckpointStore {
51    /// Creates a backend rooted at `root`.
52    pub fn new(root: impl Into<PathBuf>) -> Self {
53        Self { root: root.into() }
54    }
55
56    fn path_for(&self, run_id: &str) -> PathBuf {
57        self.root.join(format!("{}.json", run_id))
58    }
59
60    fn ensure_root(root: &Path) -> Result<(), std::io::Error> {
61        if !root.exists() {
62            fs::create_dir_all(root)?;
63        }
64        Ok(())
65    }
66}
67
68impl CheckpointStore for FilesystemCheckpointStore {
69    fn save(&self, checkpoint: &WorkflowCheckpoint) -> Result<(), CheckpointError> {
70        Self::ensure_root(&self.root)?;
71        let path = self.path_for(&checkpoint.run_id);
72        let bytes = serde_json::to_vec_pretty(checkpoint)?;
73        fs::write(path, bytes)?;
74        Ok(())
75    }
76
77    fn load(&self, run_id: &str) -> Result<Option<WorkflowCheckpoint>, CheckpointError> {
78        let path = self.path_for(run_id);
79        if !path.exists() {
80            return Ok(None);
81        }
82        let bytes = fs::read(path)?;
83        let checkpoint = serde_json::from_slice::<WorkflowCheckpoint>(&bytes)?;
84        Ok(Some(checkpoint))
85    }
86
87    fn delete(&self, run_id: &str) -> Result<(), CheckpointError> {
88        let path = self.path_for(run_id);
89        if path.exists() {
90            fs::remove_file(path)?;
91        }
92        Ok(())
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use std::time::{SystemTime, UNIX_EPOCH};
99
100    use serde_json::json;
101
102    use super::{CheckpointStore, FilesystemCheckpointStore, WorkflowCheckpoint};
103
104    fn temp_dir() -> std::path::PathBuf {
105        let millis = SystemTime::now()
106            .duration_since(UNIX_EPOCH)
107            .expect("time should be monotonic")
108            .as_millis();
109        std::env::temp_dir().join(format!("simple-agents-workflow-checkpoints-{millis}"))
110    }
111
112    #[test]
113    fn round_trips_checkpoint_on_filesystem() {
114        let root = temp_dir();
115        let store = FilesystemCheckpointStore::new(&root);
116        let checkpoint = WorkflowCheckpoint {
117            run_id: "run-1".to_string(),
118            workflow_name: "wf".to_string(),
119            step: 3,
120            next_node_id: "tool".to_string(),
121            scope_snapshot: json!({"input": {"k": "v"}}),
122        };
123
124        store.save(&checkpoint).expect("save should succeed");
125        let loaded = store
126            .load("run-1")
127            .expect("load should succeed")
128            .expect("checkpoint should exist");
129        assert_eq!(loaded, checkpoint);
130
131        store.delete("run-1").expect("delete should succeed");
132        let missing = store.load("run-1").expect("load should succeed");
133        assert!(missing.is_none());
134
135        let _ = std::fs::remove_dir_all(root);
136    }
137}