Skip to main content

harn_vm/
checkpoint.rs

1//! Checkpoint system for resilient pipeline execution.
2//!
3//! Provides `checkpoint`, `checkpoint_get`, and `checkpoint_clear` builtins.
4//! Checkpoints are persisted to `<base_dir>/.harn/checkpoints/<pipeline>.json`
5//! and survive pipeline crashes/timeouts. On resume, a pipeline can skip
6//! already-processed items by checking `checkpoint_get`.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::path::{Path, PathBuf};
11use std::rc::Rc;
12
13use crate::value::{VmError, VmValue};
14use crate::vm::Vm;
15
16struct CheckpointState {
17    data: BTreeMap<String, serde_json::Value>,
18    path: PathBuf,
19    loaded: bool,
20}
21
22impl CheckpointState {
23    fn new(base_dir: &Path, pipeline_name: &str) -> Self {
24        Self {
25            data: BTreeMap::new(),
26            path: base_dir
27                .join(".harn")
28                .join("checkpoints")
29                .join(format!("{pipeline_name}.json")),
30            loaded: false,
31        }
32    }
33
34    fn ensure_loaded(&mut self) {
35        if self.loaded {
36            return;
37        }
38        self.loaded = true;
39        if let Ok(contents) = std::fs::read_to_string(&self.path) {
40            if let Ok(serde_json::Value::Object(map)) =
41                serde_json::from_str::<serde_json::Value>(&contents)
42            {
43                for (k, v) in map {
44                    self.data.insert(k, v);
45                }
46            }
47        }
48    }
49
50    fn save(&self) -> Result<(), String> {
51        let obj: serde_json::Map<String, serde_json::Value> = self
52            .data
53            .iter()
54            .map(|(k, v)| (k.clone(), v.clone()))
55            .collect();
56        let json = serde_json::to_string_pretty(&serde_json::Value::Object(obj))
57            .map_err(|e| format!("checkpoint save error: {e}"))?;
58        if let Some(parent) = self.path.parent() {
59            std::fs::create_dir_all(parent).map_err(|e| format!("checkpoint mkdir error: {e}"))?;
60        }
61        std::fs::write(&self.path, json).map_err(|e| format!("checkpoint write error: {e}"))?;
62        Ok(())
63    }
64
65    fn get(&mut self, key: &str) -> VmValue {
66        self.ensure_loaded();
67        match self.data.get(key) {
68            Some(v) => json_to_vm(v),
69            None => VmValue::Nil,
70        }
71    }
72
73    fn set(&mut self, key: String, value: serde_json::Value) -> Result<(), String> {
74        self.ensure_loaded();
75        self.data.insert(key, value);
76        self.save()
77    }
78
79    fn clear(&mut self) -> Result<(), String> {
80        self.data.clear();
81        if self.path.exists() {
82            std::fs::remove_file(&self.path).map_err(|e| format!("checkpoint clear error: {e}"))?;
83        }
84        Ok(())
85    }
86
87    fn list(&mut self) -> Vec<String> {
88        self.ensure_loaded();
89        self.data.keys().cloned().collect()
90    }
91
92    fn exists(&mut self, key: &str) -> bool {
93        self.ensure_loaded();
94        self.data.contains_key(key)
95    }
96
97    fn delete(&mut self, key: &str) -> Result<(), String> {
98        self.ensure_loaded();
99        self.data.remove(key);
100        self.save()
101    }
102}
103
104fn vm_to_json(val: &VmValue) -> serde_json::Value {
105    match val {
106        VmValue::String(s) => serde_json::Value::String(s.to_string()),
107        VmValue::Int(n) => serde_json::json!(*n),
108        VmValue::Float(n) => serde_json::json!(*n),
109        VmValue::Bool(b) => serde_json::Value::Bool(*b),
110        VmValue::Nil => serde_json::Value::Null,
111        VmValue::List(items) => serde_json::Value::Array(items.iter().map(vm_to_json).collect()),
112        VmValue::Dict(map) => {
113            let obj: serde_json::Map<String, serde_json::Value> = map
114                .iter()
115                .map(|(k, v)| (k.clone(), vm_to_json(v)))
116                .collect();
117            serde_json::Value::Object(obj)
118        }
119        _ => serde_json::Value::Null,
120    }
121}
122
123fn json_to_vm(jv: &serde_json::Value) -> VmValue {
124    match jv {
125        serde_json::Value::Null => VmValue::Nil,
126        serde_json::Value::Bool(b) => VmValue::Bool(*b),
127        serde_json::Value::Number(n) => {
128            if let Some(i) = n.as_i64() {
129                VmValue::Int(i)
130            } else {
131                VmValue::Float(n.as_f64().unwrap_or(0.0))
132            }
133        }
134        serde_json::Value::String(s) => VmValue::String(Rc::from(s.as_str())),
135        serde_json::Value::Array(arr) => {
136            VmValue::List(Rc::new(arr.iter().map(json_to_vm).collect()))
137        }
138        serde_json::Value::Object(map) => {
139            let mut m = BTreeMap::new();
140            for (k, v) in map {
141                m.insert(k.clone(), json_to_vm(v));
142            }
143            VmValue::Dict(Rc::new(m))
144        }
145    }
146}
147
148/// Sanitize a pipeline name for use as a filename.
149/// Rejects path traversal attempts and invalid characters.
150fn sanitize_pipeline_name(name: &str) -> String {
151    // Use only the filename component, stripping any directory parts
152    let base = std::path::Path::new(name)
153        .file_name()
154        .and_then(|f| f.to_str())
155        .unwrap_or("default");
156    // Reject empty or dot-only names
157    if base.is_empty() || base == "." || base == ".." {
158        return "default".to_string();
159    }
160    base.to_string()
161}
162
163/// Register checkpoint builtins on a VM.
164///
165/// The pipeline name is used to namespace checkpoint files. If not provided,
166/// defaults to "default".
167pub fn register_checkpoint_builtins(vm: &mut Vm, base_dir: &Path, pipeline_name: &str) {
168    let safe_name = sanitize_pipeline_name(pipeline_name);
169    let state = Rc::new(RefCell::new(CheckpointState::new(base_dir, &safe_name)));
170
171    // checkpoint(key, value) — persist a checkpoint immediately
172    let s = Rc::clone(&state);
173    vm.register_builtin("checkpoint", move |args, _out| {
174        let key = args.first().map(|a| a.display()).unwrap_or_default();
175        let value = args.get(1).unwrap_or(&VmValue::Nil);
176        let json_val = vm_to_json(value);
177        s.borrow_mut()
178            .set(key, json_val)
179            .map_err(VmError::Runtime)?;
180        Ok(VmValue::Nil)
181    });
182
183    // checkpoint_get(key) -> value | nil
184    let s = Rc::clone(&state);
185    vm.register_builtin("checkpoint_get", move |args, _out| {
186        let key = args.first().map(|a| a.display()).unwrap_or_default();
187        Ok(s.borrow_mut().get(&key))
188    });
189
190    // checkpoint_clear() — clear all checkpoints for this pipeline
191    let s = Rc::clone(&state);
192    vm.register_builtin("checkpoint_clear", move |_args, _out| {
193        s.borrow_mut().clear().map_err(VmError::Runtime)?;
194        Ok(VmValue::Nil)
195    });
196
197    // checkpoint_list() -> [key1, key2, ...]
198    let s = Rc::clone(&state);
199    vm.register_builtin("checkpoint_list", move |_args, _out| {
200        let keys = s.borrow_mut().list();
201        Ok(VmValue::List(Rc::new(
202            keys.into_iter()
203                .map(|k| VmValue::String(Rc::from(k)))
204                .collect(),
205        )))
206    });
207
208    // checkpoint_exists(key) -> bool
209    // Returns true if the key is present, even if its value is nil.
210    let s = Rc::clone(&state);
211    vm.register_builtin("checkpoint_exists", move |args, _out| {
212        let key = args.first().map(|a| a.display()).unwrap_or_default();
213        Ok(VmValue::Bool(s.borrow_mut().exists(&key)))
214    });
215
216    // checkpoint_delete(key) — remove a single key from the checkpoint store
217    let s = Rc::clone(&state);
218    vm.register_builtin("checkpoint_delete", move |args, _out| {
219        let key = args.first().map(|a| a.display()).unwrap_or_default();
220        s.borrow_mut().delete(&key).map_err(VmError::Runtime)?;
221        Ok(VmValue::Nil)
222    });
223}