1use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::path::{Path, PathBuf};
11use std::rc::Rc;
12
13use crate::value::{VmError, VmValue};
14use crate::vm::Vm;
15
16struct CheckpointState {
17 data: BTreeMap<String, serde_json::Value>,
18 path: PathBuf,
19 loaded: bool,
20}
21
22impl CheckpointState {
23 fn new(base_dir: &Path, pipeline_name: &str) -> Self {
24 Self {
25 data: BTreeMap::new(),
26 path: base_dir
27 .join(".harn")
28 .join("checkpoints")
29 .join(format!("{pipeline_name}.json")),
30 loaded: false,
31 }
32 }
33
34 fn ensure_loaded(&mut self) {
35 if self.loaded {
36 return;
37 }
38 self.loaded = true;
39 if let Ok(contents) = std::fs::read_to_string(&self.path) {
40 if let Ok(serde_json::Value::Object(map)) =
41 serde_json::from_str::<serde_json::Value>(&contents)
42 {
43 for (k, v) in map {
44 self.data.insert(k, v);
45 }
46 }
47 }
48 }
49
50 fn save(&self) -> Result<(), String> {
51 let obj: serde_json::Map<String, serde_json::Value> = self
52 .data
53 .iter()
54 .map(|(k, v)| (k.clone(), v.clone()))
55 .collect();
56 let json = serde_json::to_string_pretty(&serde_json::Value::Object(obj))
57 .map_err(|e| format!("checkpoint save error: {e}"))?;
58 if let Some(parent) = self.path.parent() {
59 std::fs::create_dir_all(parent).map_err(|e| format!("checkpoint mkdir error: {e}"))?;
60 }
61 std::fs::write(&self.path, json).map_err(|e| format!("checkpoint write error: {e}"))?;
62 Ok(())
63 }
64
65 fn get(&mut self, key: &str) -> VmValue {
66 self.ensure_loaded();
67 match self.data.get(key) {
68 Some(v) => json_to_vm(v),
69 None => VmValue::Nil,
70 }
71 }
72
73 fn set(&mut self, key: String, value: serde_json::Value) -> Result<(), String> {
74 self.ensure_loaded();
75 self.data.insert(key, value);
76 self.save()
77 }
78
79 fn clear(&mut self) -> Result<(), String> {
80 self.data.clear();
81 if self.path.exists() {
82 std::fs::remove_file(&self.path).map_err(|e| format!("checkpoint clear error: {e}"))?;
83 }
84 Ok(())
85 }
86
87 fn list(&mut self) -> Vec<String> {
88 self.ensure_loaded();
89 self.data.keys().cloned().collect()
90 }
91
92 fn exists(&mut self, key: &str) -> bool {
93 self.ensure_loaded();
94 self.data.contains_key(key)
95 }
96
97 fn delete(&mut self, key: &str) -> Result<(), String> {
98 self.ensure_loaded();
99 self.data.remove(key);
100 self.save()
101 }
102}
103
104fn vm_to_json(val: &VmValue) -> serde_json::Value {
105 match val {
106 VmValue::String(s) => serde_json::Value::String(s.to_string()),
107 VmValue::Int(n) => serde_json::json!(*n),
108 VmValue::Float(n) => serde_json::json!(*n),
109 VmValue::Bool(b) => serde_json::Value::Bool(*b),
110 VmValue::Nil => serde_json::Value::Null,
111 VmValue::List(items) => serde_json::Value::Array(items.iter().map(vm_to_json).collect()),
112 VmValue::Dict(map) => {
113 let obj: serde_json::Map<String, serde_json::Value> = map
114 .iter()
115 .map(|(k, v)| (k.clone(), vm_to_json(v)))
116 .collect();
117 serde_json::Value::Object(obj)
118 }
119 _ => serde_json::Value::Null,
120 }
121}
122
123fn json_to_vm(jv: &serde_json::Value) -> VmValue {
124 match jv {
125 serde_json::Value::Null => VmValue::Nil,
126 serde_json::Value::Bool(b) => VmValue::Bool(*b),
127 serde_json::Value::Number(n) => {
128 if let Some(i) = n.as_i64() {
129 VmValue::Int(i)
130 } else {
131 VmValue::Float(n.as_f64().unwrap_or(0.0))
132 }
133 }
134 serde_json::Value::String(s) => VmValue::String(Rc::from(s.as_str())),
135 serde_json::Value::Array(arr) => {
136 VmValue::List(Rc::new(arr.iter().map(json_to_vm).collect()))
137 }
138 serde_json::Value::Object(map) => {
139 let mut m = BTreeMap::new();
140 for (k, v) in map {
141 m.insert(k.clone(), json_to_vm(v));
142 }
143 VmValue::Dict(Rc::new(m))
144 }
145 }
146}
147
148fn sanitize_pipeline_name(name: &str) -> String {
151 let base = std::path::Path::new(name)
153 .file_name()
154 .and_then(|f| f.to_str())
155 .unwrap_or("default");
156 if base.is_empty() || base == "." || base == ".." {
158 return "default".to_string();
159 }
160 base.to_string()
161}
162
163pub fn register_checkpoint_builtins(vm: &mut Vm, base_dir: &Path, pipeline_name: &str) {
168 let safe_name = sanitize_pipeline_name(pipeline_name);
169 let state = Rc::new(RefCell::new(CheckpointState::new(base_dir, &safe_name)));
170
171 let s = Rc::clone(&state);
173 vm.register_builtin("checkpoint", move |args, _out| {
174 let key = args.first().map(|a| a.display()).unwrap_or_default();
175 let value = args.get(1).unwrap_or(&VmValue::Nil);
176 let json_val = vm_to_json(value);
177 s.borrow_mut()
178 .set(key, json_val)
179 .map_err(VmError::Runtime)?;
180 Ok(VmValue::Nil)
181 });
182
183 let s = Rc::clone(&state);
185 vm.register_builtin("checkpoint_get", move |args, _out| {
186 let key = args.first().map(|a| a.display()).unwrap_or_default();
187 Ok(s.borrow_mut().get(&key))
188 });
189
190 let s = Rc::clone(&state);
192 vm.register_builtin("checkpoint_clear", move |_args, _out| {
193 s.borrow_mut().clear().map_err(VmError::Runtime)?;
194 Ok(VmValue::Nil)
195 });
196
197 let s = Rc::clone(&state);
199 vm.register_builtin("checkpoint_list", move |_args, _out| {
200 let keys = s.borrow_mut().list();
201 Ok(VmValue::List(Rc::new(
202 keys.into_iter()
203 .map(|k| VmValue::String(Rc::from(k)))
204 .collect(),
205 )))
206 });
207
208 let s = Rc::clone(&state);
211 vm.register_builtin("checkpoint_exists", move |args, _out| {
212 let key = args.first().map(|a| a.display()).unwrap_or_default();
213 Ok(VmValue::Bool(s.borrow_mut().exists(&key)))
214 });
215
216 let s = Rc::clone(&state);
218 vm.register_builtin("checkpoint_delete", move |args, _out| {
219 let key = args.first().map(|a| a.display()).unwrap_or_default();
220 s.borrow_mut().delete(&key).map_err(VmError::Runtime)?;
221 Ok(VmValue::Nil)
222 });
223}