mur_core/workflow/
checkpoint.rs1use crate::types::StepResult;
4use anyhow::{Context, Result};
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::PathBuf;
9use uuid::Uuid;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct Checkpoint {
14 pub execution_id: Uuid,
15 pub step_index: usize,
16 pub variables: HashMap<String, String>,
17 pub completed_steps: Vec<StepResult>,
18 pub created_at: DateTime<Utc>,
19}
20
21fn checkpoint_dir(execution_id: &Uuid) -> PathBuf {
23 directories::BaseDirs::new()
24 .map(|d| d.home_dir().join(".mur").join("commander").join("checkpoints"))
25 .unwrap_or_else(|| PathBuf::from(".mur/commander/checkpoints"))
26 .join(execution_id.to_string())
27}
28
29pub async fn save_checkpoint(checkpoint: &Checkpoint) -> Result<PathBuf> {
31 let dir = checkpoint_dir(&checkpoint.execution_id);
32 tokio::fs::create_dir_all(&dir)
33 .await
34 .with_context(|| format!("Creating checkpoint dir {:?}", dir))?;
35
36 let path = dir.join(format!("{}.json", checkpoint.step_index));
37 let json = serde_json::to_string_pretty(checkpoint).context("Serializing checkpoint")?;
38 tokio::fs::write(&path, json)
39 .await
40 .with_context(|| format!("Writing checkpoint {:?}", path))?;
41
42 tracing::debug!(
43 "Checkpoint saved: execution={}, step={}",
44 checkpoint.execution_id,
45 checkpoint.step_index
46 );
47 Ok(path)
48}
49
50pub async fn load_checkpoint(execution_id: &Uuid, step_index: usize) -> Result<Checkpoint> {
52 let path = checkpoint_dir(execution_id).join(format!("{step_index}.json"));
53 let content = tokio::fs::read_to_string(&path)
54 .await
55 .with_context(|| format!("Reading checkpoint {:?}", path))?;
56 let checkpoint: Checkpoint =
57 serde_json::from_str(&content).context("Deserializing checkpoint")?;
58 Ok(checkpoint)
59}
60
61pub async fn load_latest_checkpoint(execution_id: &Uuid) -> Result<Option<Checkpoint>> {
63 let dir = checkpoint_dir(execution_id);
64 if !dir.exists() {
65 return Ok(None);
66 }
67
68 let mut entries = tokio::fs::read_dir(&dir)
69 .await
70 .context("Reading checkpoint dir")?;
71
72 let mut latest: Option<(usize, PathBuf)> = None;
73 while let Some(entry) = entries.next_entry().await? {
74 let path = entry.path();
75 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
76 if let Ok(idx) = stem.parse::<usize>() {
77 if latest.as_ref().is_none_or(|(max, _)| idx > *max) {
78 latest = Some((idx, path));
79 }
80 }
81 }
82 }
83
84 match latest {
85 Some((_, path)) => {
86 let content = tokio::fs::read_to_string(&path).await?;
87 let cp: Checkpoint = serde_json::from_str(&content)?;
88 Ok(Some(cp))
89 }
90 None => Ok(None),
91 }
92}
93
94pub async fn cleanup_checkpoints(execution_id: &Uuid) -> Result<()> {
96 let dir = checkpoint_dir(execution_id);
97 if dir.exists() {
98 tokio::fs::remove_dir_all(&dir)
99 .await
100 .with_context(|| format!("Removing checkpoint dir {:?}", dir))?;
101 }
102 Ok(())
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108
109 #[tokio::test]
110 async fn test_checkpoint_save_load() {
111 let execution_id = Uuid::new_v4();
112 let cp = Checkpoint {
113 execution_id,
114 step_index: 0,
115 variables: HashMap::from([("key".into(), "value".into())]),
116 completed_steps: vec![],
117 created_at: Utc::now(),
118 };
119
120 let path = save_checkpoint(&cp).await.unwrap();
121 assert!(path.exists());
122
123 let loaded = load_checkpoint(&execution_id, 0).await.unwrap();
124 assert_eq!(loaded.step_index, 0);
125 assert_eq!(loaded.variables.get("key").unwrap(), "value");
126
127 cleanup_checkpoints(&execution_id).await.unwrap();
129 assert!(!checkpoint_dir(&execution_id).exists());
130 }
131
132 #[tokio::test]
133 async fn test_latest_checkpoint() {
134 let execution_id = Uuid::new_v4();
135
136 for i in 0..3 {
137 let cp = Checkpoint {
138 execution_id,
139 step_index: i,
140 variables: HashMap::new(),
141 completed_steps: vec![],
142 created_at: Utc::now(),
143 };
144 save_checkpoint(&cp).await.unwrap();
145 }
146
147 let latest = load_latest_checkpoint(&execution_id).await.unwrap().unwrap();
148 assert_eq!(latest.step_index, 2);
149
150 cleanup_checkpoints(&execution_id).await.unwrap();
151 }
152}