Skip to main content

harn_vm/
checkpoint.rs

1//! Checkpoint system for resilient pipeline execution.
2//!
3//! Provides `checkpoint`, `checkpoint_get`, and `checkpoint_clear` builtins.
4//! Checkpoints are persisted to `<state-root>/checkpoints/<pipeline>.json`
5//! and survive pipeline crashes/timeouts. On resume, a pipeline can skip
6//! already-processed items by checking `checkpoint_get`.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::path::{Path, PathBuf};
11use std::rc::Rc;
12
13use crate::value::{VmError, VmValue};
14use crate::vm::Vm;
15
16struct CheckpointState {
17    data: BTreeMap<String, serde_json::Value>,
18    path: PathBuf,
19    loaded: bool,
20}
21
22impl CheckpointState {
23    fn new(base_dir: &Path, pipeline_name: &str) -> Self {
24        Self {
25            data: BTreeMap::new(),
26            path: crate::runtime_paths::checkpoint_dir(base_dir)
27                .join(format!("{pipeline_name}.json")),
28            loaded: false,
29        }
30    }
31
32    fn ensure_loaded(&mut self) {
33        if self.loaded {
34            return;
35        }
36        self.loaded = true;
37        if let Ok(contents) = std::fs::read_to_string(&self.path) {
38            if let Ok(serde_json::Value::Object(map)) =
39                serde_json::from_str::<serde_json::Value>(&contents)
40            {
41                for (k, v) in map {
42                    self.data.insert(k, v);
43                }
44            }
45        }
46    }
47
48    fn save(&self) -> Result<(), String> {
49        let obj: serde_json::Map<String, serde_json::Value> = self
50            .data
51            .iter()
52            .map(|(k, v)| (k.clone(), v.clone()))
53            .collect();
54        let json = serde_json::to_string_pretty(&serde_json::Value::Object(obj))
55            .map_err(|e| format!("checkpoint save error: {e}"))?;
56        if let Some(parent) = self.path.parent() {
57            std::fs::create_dir_all(parent).map_err(|e| format!("checkpoint mkdir error: {e}"))?;
58        }
59        std::fs::write(&self.path, json).map_err(|e| format!("checkpoint write error: {e}"))?;
60        Ok(())
61    }
62
63    fn get(&mut self, key: &str) -> VmValue {
64        self.ensure_loaded();
65        match self.data.get(key) {
66            Some(v) => json_to_vm(v),
67            None => VmValue::Nil,
68        }
69    }
70
71    fn set(&mut self, key: String, value: serde_json::Value) -> Result<(), String> {
72        self.ensure_loaded();
73        self.data.insert(key, value);
74        self.save()
75    }
76
77    fn clear(&mut self) -> Result<(), String> {
78        self.data.clear();
79        if self.path.exists() {
80            std::fs::remove_file(&self.path).map_err(|e| format!("checkpoint clear error: {e}"))?;
81        }
82        Ok(())
83    }
84
85    fn list(&mut self) -> Vec<String> {
86        self.ensure_loaded();
87        self.data.keys().cloned().collect()
88    }
89
90    fn exists(&mut self, key: &str) -> bool {
91        self.ensure_loaded();
92        self.data.contains_key(key)
93    }
94
95    fn delete(&mut self, key: &str) -> Result<(), String> {
96        self.ensure_loaded();
97        self.data.remove(key);
98        self.save()
99    }
100}
101
102fn vm_to_json(val: &VmValue) -> serde_json::Value {
103    match val {
104        VmValue::String(s) => serde_json::Value::String(s.to_string()),
105        VmValue::Int(n) => serde_json::json!(*n),
106        VmValue::Float(n) => serde_json::json!(*n),
107        VmValue::Bool(b) => serde_json::Value::Bool(*b),
108        VmValue::Nil => serde_json::Value::Null,
109        VmValue::List(items) => serde_json::Value::Array(items.iter().map(vm_to_json).collect()),
110        VmValue::Dict(map) => {
111            let obj: serde_json::Map<String, serde_json::Value> = map
112                .iter()
113                .map(|(k, v)| (k.clone(), vm_to_json(v)))
114                .collect();
115            serde_json::Value::Object(obj)
116        }
117        _ => serde_json::Value::Null,
118    }
119}
120
121fn json_to_vm(jv: &serde_json::Value) -> VmValue {
122    match jv {
123        serde_json::Value::Null => VmValue::Nil,
124        serde_json::Value::Bool(b) => VmValue::Bool(*b),
125        serde_json::Value::Number(n) => {
126            if let Some(i) = n.as_i64() {
127                VmValue::Int(i)
128            } else {
129                VmValue::Float(n.as_f64().unwrap_or(0.0))
130            }
131        }
132        serde_json::Value::String(s) => VmValue::String(Rc::from(s.as_str())),
133        serde_json::Value::Array(arr) => {
134            VmValue::List(Rc::new(arr.iter().map(json_to_vm).collect()))
135        }
136        serde_json::Value::Object(map) => {
137            let mut m = BTreeMap::new();
138            for (k, v) in map {
139                m.insert(k.clone(), json_to_vm(v));
140            }
141            VmValue::Dict(Rc::new(m))
142        }
143    }
144}
145
146/// Sanitize a pipeline name for use as a filename.
147/// Rejects path traversal attempts and invalid characters.
148fn sanitize_pipeline_name(name: &str) -> String {
149    let base = std::path::Path::new(name)
150        .file_name()
151        .and_then(|f| f.to_str())
152        .unwrap_or("default");
153    if base.is_empty() || base == "." || base == ".." {
154        return "default".to_string();
155    }
156    base.to_string()
157}
158
159/// Register checkpoint builtins on a VM.
160///
161/// The pipeline name is used to namespace checkpoint files. If not provided,
162/// defaults to "default".
163pub fn register_checkpoint_builtins(vm: &mut Vm, base_dir: &Path, pipeline_name: &str) {
164    let safe_name = sanitize_pipeline_name(pipeline_name);
165    let state = Rc::new(RefCell::new(CheckpointState::new(base_dir, &safe_name)));
166
167    // checkpoint(key, value) — persist a checkpoint immediately
168    let s = Rc::clone(&state);
169    vm.register_builtin("checkpoint", move |args, _out| {
170        let key = args.first().map(|a| a.display()).unwrap_or_default();
171        let value = args.get(1).unwrap_or(&VmValue::Nil);
172        let json_val = vm_to_json(value);
173        s.borrow_mut()
174            .set(key, json_val)
175            .map_err(VmError::Runtime)?;
176        Ok(VmValue::Nil)
177    });
178
179    // checkpoint_get(key) -> value | nil
180    let s = Rc::clone(&state);
181    vm.register_builtin("checkpoint_get", move |args, _out| {
182        let key = args.first().map(|a| a.display()).unwrap_or_default();
183        Ok(s.borrow_mut().get(&key))
184    });
185
186    // checkpoint_clear() — clear all checkpoints for this pipeline
187    let s = Rc::clone(&state);
188    vm.register_builtin("checkpoint_clear", move |_args, _out| {
189        s.borrow_mut().clear().map_err(VmError::Runtime)?;
190        Ok(VmValue::Nil)
191    });
192
193    // checkpoint_list() -> [key1, key2, ...]
194    let s = Rc::clone(&state);
195    vm.register_builtin("checkpoint_list", move |_args, _out| {
196        let keys = s.borrow_mut().list();
197        Ok(VmValue::List(Rc::new(
198            keys.into_iter()
199                .map(|k| VmValue::String(Rc::from(k)))
200                .collect(),
201        )))
202    });
203
204    // checkpoint_exists(key): true if key is present, even if its value is nil.
205    let s = Rc::clone(&state);
206    vm.register_builtin("checkpoint_exists", move |args, _out| {
207        let key = args.first().map(|a| a.display()).unwrap_or_default();
208        Ok(VmValue::Bool(s.borrow_mut().exists(&key)))
209    });
210
211    // checkpoint_delete(key) — remove a single key from the checkpoint store
212    let s = Rc::clone(&state);
213    vm.register_builtin("checkpoint_delete", move |args, _out| {
214        let key = args.first().map(|a| a.display()).unwrap_or_default();
215        s.borrow_mut().delete(&key).map_err(VmError::Runtime)?;
216        Ok(VmValue::Nil)
217    });
218}