1use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::path::{Path, PathBuf};
11use std::rc::Rc;
12
13use crate::value::{VmError, VmValue};
14use crate::vm::Vm;
15
16struct CheckpointState {
17 data: BTreeMap<String, serde_json::Value>,
18 path: PathBuf,
19 loaded: bool,
20}
21
22impl CheckpointState {
23 fn new(base_dir: &Path, pipeline_name: &str) -> Self {
24 Self {
25 data: BTreeMap::new(),
26 path: crate::runtime_paths::checkpoint_dir(base_dir)
27 .join(format!("{pipeline_name}.json")),
28 loaded: false,
29 }
30 }
31
32 fn ensure_loaded(&mut self) {
33 if self.loaded {
34 return;
35 }
36 self.loaded = true;
37 if let Ok(contents) = std::fs::read_to_string(&self.path) {
38 if let Ok(serde_json::Value::Object(map)) =
39 serde_json::from_str::<serde_json::Value>(&contents)
40 {
41 for (k, v) in map {
42 self.data.insert(k, v);
43 }
44 }
45 }
46 }
47
48 fn save(&self) -> Result<(), String> {
49 let obj: serde_json::Map<String, serde_json::Value> = self
50 .data
51 .iter()
52 .map(|(k, v)| (k.clone(), v.clone()))
53 .collect();
54 let json = serde_json::to_string_pretty(&serde_json::Value::Object(obj))
55 .map_err(|e| format!("checkpoint save error: {e}"))?;
56 if let Some(parent) = self.path.parent() {
57 std::fs::create_dir_all(parent).map_err(|e| format!("checkpoint mkdir error: {e}"))?;
58 }
59 std::fs::write(&self.path, json).map_err(|e| format!("checkpoint write error: {e}"))?;
60 Ok(())
61 }
62
63 fn get(&mut self, key: &str) -> VmValue {
64 self.ensure_loaded();
65 match self.data.get(key) {
66 Some(v) => json_to_vm(v),
67 None => VmValue::Nil,
68 }
69 }
70
71 fn set(&mut self, key: String, value: serde_json::Value) -> Result<(), String> {
72 self.ensure_loaded();
73 self.data.insert(key, value);
74 self.save()
75 }
76
77 fn clear(&mut self) -> Result<(), String> {
78 self.data.clear();
79 if self.path.exists() {
80 std::fs::remove_file(&self.path).map_err(|e| format!("checkpoint clear error: {e}"))?;
81 }
82 Ok(())
83 }
84
85 fn list(&mut self) -> Vec<String> {
86 self.ensure_loaded();
87 self.data.keys().cloned().collect()
88 }
89
90 fn exists(&mut self, key: &str) -> bool {
91 self.ensure_loaded();
92 self.data.contains_key(key)
93 }
94
95 fn delete(&mut self, key: &str) -> Result<(), String> {
96 self.ensure_loaded();
97 self.data.remove(key);
98 self.save()
99 }
100}
101
102fn vm_to_json(val: &VmValue) -> serde_json::Value {
103 match val {
104 VmValue::String(s) => serde_json::Value::String(s.to_string()),
105 VmValue::Int(n) => serde_json::json!(*n),
106 VmValue::Float(n) => serde_json::json!(*n),
107 VmValue::Bool(b) => serde_json::Value::Bool(*b),
108 VmValue::Nil => serde_json::Value::Null,
109 VmValue::List(items) => serde_json::Value::Array(items.iter().map(vm_to_json).collect()),
110 VmValue::Dict(map) => {
111 let obj: serde_json::Map<String, serde_json::Value> = map
112 .iter()
113 .map(|(k, v)| (k.clone(), vm_to_json(v)))
114 .collect();
115 serde_json::Value::Object(obj)
116 }
117 _ => serde_json::Value::Null,
118 }
119}
120
121fn json_to_vm(jv: &serde_json::Value) -> VmValue {
122 match jv {
123 serde_json::Value::Null => VmValue::Nil,
124 serde_json::Value::Bool(b) => VmValue::Bool(*b),
125 serde_json::Value::Number(n) => {
126 if let Some(i) = n.as_i64() {
127 VmValue::Int(i)
128 } else {
129 VmValue::Float(n.as_f64().unwrap_or(0.0))
130 }
131 }
132 serde_json::Value::String(s) => VmValue::String(Rc::from(s.as_str())),
133 serde_json::Value::Array(arr) => {
134 VmValue::List(Rc::new(arr.iter().map(json_to_vm).collect()))
135 }
136 serde_json::Value::Object(map) => {
137 let mut m = BTreeMap::new();
138 for (k, v) in map {
139 m.insert(k.clone(), json_to_vm(v));
140 }
141 VmValue::Dict(Rc::new(m))
142 }
143 }
144}
145
146fn sanitize_pipeline_name(name: &str) -> String {
149 let base = std::path::Path::new(name)
150 .file_name()
151 .and_then(|f| f.to_str())
152 .unwrap_or("default");
153 if base.is_empty() || base == "." || base == ".." {
154 return "default".to_string();
155 }
156 base.to_string()
157}
158
159pub fn register_checkpoint_builtins(vm: &mut Vm, base_dir: &Path, pipeline_name: &str) {
164 let safe_name = sanitize_pipeline_name(pipeline_name);
165 let state = Rc::new(RefCell::new(CheckpointState::new(base_dir, &safe_name)));
166
167 let s = Rc::clone(&state);
169 vm.register_builtin("checkpoint", move |args, _out| {
170 let key = args.first().map(|a| a.display()).unwrap_or_default();
171 let value = args.get(1).unwrap_or(&VmValue::Nil);
172 let json_val = vm_to_json(value);
173 s.borrow_mut()
174 .set(key, json_val)
175 .map_err(VmError::Runtime)?;
176 Ok(VmValue::Nil)
177 });
178
179 let s = Rc::clone(&state);
181 vm.register_builtin("checkpoint_get", move |args, _out| {
182 let key = args.first().map(|a| a.display()).unwrap_or_default();
183 Ok(s.borrow_mut().get(&key))
184 });
185
186 let s = Rc::clone(&state);
188 vm.register_builtin("checkpoint_clear", move |_args, _out| {
189 s.borrow_mut().clear().map_err(VmError::Runtime)?;
190 Ok(VmValue::Nil)
191 });
192
193 let s = Rc::clone(&state);
195 vm.register_builtin("checkpoint_list", move |_args, _out| {
196 let keys = s.borrow_mut().list();
197 Ok(VmValue::List(Rc::new(
198 keys.into_iter()
199 .map(|k| VmValue::String(Rc::from(k)))
200 .collect(),
201 )))
202 });
203
204 let s = Rc::clone(&state);
206 vm.register_builtin("checkpoint_exists", move |args, _out| {
207 let key = args.first().map(|a| a.display()).unwrap_or_default();
208 Ok(VmValue::Bool(s.borrow_mut().exists(&key)))
209 });
210
211 let s = Rc::clone(&state);
213 vm.register_builtin("checkpoint_delete", move |args, _out| {
214 let key = args.first().map(|a| a.display()).unwrap_or_default();
215 s.borrow_mut().delete(&key).map_err(VmError::Runtime)?;
216 Ok(VmValue::Nil)
217 });
218}