Skip to main content

mur_core/workflow/
checkpoint.rs

1//! Checkpoint & Rollback — save/restore execution state between steps.
2
3use crate::types::StepResult;
4use anyhow::{Context, Result};
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::PathBuf;
9use uuid::Uuid;
10
11/// A checkpoint capturing execution state before a step.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct Checkpoint {
14    pub execution_id: Uuid,
15    pub step_index: usize,
16    pub variables: HashMap<String, String>,
17    pub completed_steps: Vec<StepResult>,
18    pub created_at: DateTime<Utc>,
19}
20
21/// Get the checkpoints directory for a given execution.
22fn checkpoint_dir(execution_id: &Uuid) -> PathBuf {
23    directories::BaseDirs::new()
24        .map(|d| d.home_dir().join(".mur").join("commander").join("checkpoints"))
25        .unwrap_or_else(|| PathBuf::from(".mur/commander/checkpoints"))
26        .join(execution_id.to_string())
27}
28
29/// Save a checkpoint before executing a step.
30pub async fn save_checkpoint(checkpoint: &Checkpoint) -> Result<PathBuf> {
31    let dir = checkpoint_dir(&checkpoint.execution_id);
32    tokio::fs::create_dir_all(&dir)
33        .await
34        .with_context(|| format!("Creating checkpoint dir {:?}", dir))?;
35
36    let path = dir.join(format!("{}.json", checkpoint.step_index));
37    let json = serde_json::to_string_pretty(checkpoint).context("Serializing checkpoint")?;
38    tokio::fs::write(&path, json)
39        .await
40        .with_context(|| format!("Writing checkpoint {:?}", path))?;
41
42    tracing::debug!(
43        "Checkpoint saved: execution={}, step={}",
44        checkpoint.execution_id,
45        checkpoint.step_index
46    );
47    Ok(path)
48}
49
50/// Load a checkpoint for a given execution and step index.
51pub async fn load_checkpoint(execution_id: &Uuid, step_index: usize) -> Result<Checkpoint> {
52    let path = checkpoint_dir(execution_id).join(format!("{step_index}.json"));
53    let content = tokio::fs::read_to_string(&path)
54        .await
55        .with_context(|| format!("Reading checkpoint {:?}", path))?;
56    let checkpoint: Checkpoint =
57        serde_json::from_str(&content).context("Deserializing checkpoint")?;
58    Ok(checkpoint)
59}
60
61/// Load the latest checkpoint for an execution (highest step index).
62pub async fn load_latest_checkpoint(execution_id: &Uuid) -> Result<Option<Checkpoint>> {
63    let dir = checkpoint_dir(execution_id);
64    if !dir.exists() {
65        return Ok(None);
66    }
67
68    let mut entries = tokio::fs::read_dir(&dir)
69        .await
70        .context("Reading checkpoint dir")?;
71
72    let mut latest: Option<(usize, PathBuf)> = None;
73    while let Some(entry) = entries.next_entry().await? {
74        let path = entry.path();
75        if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
76            if let Ok(idx) = stem.parse::<usize>() {
77                if latest.as_ref().is_none_or(|(max, _)| idx > *max) {
78                    latest = Some((idx, path));
79                }
80            }
81        }
82    }
83
84    match latest {
85        Some((_, path)) => {
86            let content = tokio::fs::read_to_string(&path).await?;
87            let cp: Checkpoint = serde_json::from_str(&content)?;
88            Ok(Some(cp))
89        }
90        None => Ok(None),
91    }
92}
93
94/// Delete all checkpoints for an execution.
95pub async fn cleanup_checkpoints(execution_id: &Uuid) -> Result<()> {
96    let dir = checkpoint_dir(execution_id);
97    if dir.exists() {
98        tokio::fs::remove_dir_all(&dir)
99            .await
100            .with_context(|| format!("Removing checkpoint dir {:?}", dir))?;
101    }
102    Ok(())
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108
109    #[tokio::test]
110    async fn test_checkpoint_save_load() {
111        let execution_id = Uuid::new_v4();
112        let cp = Checkpoint {
113            execution_id,
114            step_index: 0,
115            variables: HashMap::from([("key".into(), "value".into())]),
116            completed_steps: vec![],
117            created_at: Utc::now(),
118        };
119
120        let path = save_checkpoint(&cp).await.unwrap();
121        assert!(path.exists());
122
123        let loaded = load_checkpoint(&execution_id, 0).await.unwrap();
124        assert_eq!(loaded.step_index, 0);
125        assert_eq!(loaded.variables.get("key").unwrap(), "value");
126
127        // Cleanup
128        cleanup_checkpoints(&execution_id).await.unwrap();
129        assert!(!checkpoint_dir(&execution_id).exists());
130    }
131
132    #[tokio::test]
133    async fn test_latest_checkpoint() {
134        let execution_id = Uuid::new_v4();
135
136        for i in 0..3 {
137            let cp = Checkpoint {
138                execution_id,
139                step_index: i,
140                variables: HashMap::new(),
141                completed_steps: vec![],
142                created_at: Utc::now(),
143            };
144            save_checkpoint(&cp).await.unwrap();
145        }
146
147        let latest = load_latest_checkpoint(&execution_id).await.unwrap().unwrap();
148        assert_eq!(latest.step_index, 2);
149
150        cleanup_checkpoints(&execution_id).await.unwrap();
151    }
152}