matrixcode_core/workflow/
persistence.rs1use anyhow::Result;
10use std::fs;
11use std::path::PathBuf;
12
13use super::context::WorkflowContext;
14
15pub struct WorkflowPersistence {
22 user_path: PathBuf,
24 project_path: Option<PathBuf>,
26 save_path: PathBuf,
28}
29
30impl WorkflowPersistence {
31 pub fn new(project_dir: Option<&PathBuf>) -> Self {
36 let user_path = dirs::home_dir()
38 .unwrap_or_else(|| PathBuf::from("."))
39 .join(".matrix")
40 .join("workflows");
41
42 let project_path = project_dir.map(|p| p.join(".matrix").join("workflows"));
44
45 let save_path = project_path.as_ref()
47 .filter(|_p| {
48 project_dir.as_ref().map(|pd| pd.exists()).unwrap_or(false)
50 })
51 .cloned()
52 .unwrap_or_else(|| user_path.clone());
53
54 if let Err(e) = fs::create_dir_all(&user_path) {
56 log::warn!("Failed to create user workflow directory: {}", e);
57 }
58 if let Some(ref proj) = project_path
59 && let Err(e) = fs::create_dir_all(proj) {
60 log::warn!("Failed to create project workflow directory: {}", e);
61 }
62
63 Self {
64 user_path,
65 project_path,
66 save_path,
67 }
68 }
69
70 pub fn new_global() -> Self {
72 Self::new(None)
73 }
74
75 pub fn with_base_path(base_path: PathBuf) -> Self {
79 if let Err(e) = fs::create_dir_all(&base_path) {
80 log::warn!("Failed to create workflow directory: {}", e);
81 }
82 Self {
83 user_path: base_path.clone(),
84 project_path: None,
85 save_path: base_path,
86 }
87 }
88
89 fn get_save_file_path(&self, instance_id: &str) -> PathBuf {
91 self.save_path.join(format!("{}.json", instance_id))
92 }
93
94 fn find_file_path(&self, instance_id: &str) -> Option<PathBuf> {
96 if let Some(ref proj) = self.project_path {
98 let proj_path = proj.join(format!("{}.json", instance_id));
99 if proj_path.exists() {
100 return Some(proj_path);
101 }
102 }
103
104 let user_path = self.user_path.join(format!("{}.json", instance_id));
105 if user_path.exists() {
106 return Some(user_path);
107 }
108
109 None
110 }
111
112 pub fn save(&self, context: &WorkflowContext) -> Result<()> {
124 let path = self.get_save_file_path(&context.instance_id);
125 let content = serde_json::to_string_pretty(context)?;
126
127 fs::write(&path, content)?;
128 log::info!("Saved workflow context to {:?}", path);
129
130 Ok(())
131 }
132
133 pub fn load(&self, instance_id: &str) -> Result<Option<WorkflowContext>> {
146 let path = self.find_file_path(instance_id);
147
148 if let Some(path) = path {
149 let content = fs::read_to_string(&path)?;
150 let context: WorkflowContext = serde_json::from_str(&content)?;
151 log::info!("Loaded workflow context from {:?}", path);
152 Ok(Some(context))
153 } else {
154 Ok(None)
155 }
156 }
157
158 pub fn list(&self) -> Result<Vec<WorkflowContext>> {
172 let mut contexts = Vec::new();
173
174 fn load_from_dir(dir: &PathBuf, contexts: &mut Vec<WorkflowContext>) -> Result<()> {
176 if !dir.exists() {
177 return Ok(());
178 }
179
180 for entry in fs::read_dir(dir)? {
181 let entry = entry?;
182 let path = entry.path();
183
184 if path.extension().is_some_and(|ext| ext == "json") {
185 match fs::read_to_string(&path) {
186 Ok(content) => match serde_json::from_str::<WorkflowContext>(&content) {
187 Ok(ctx) => contexts.push(ctx),
188 Err(e) => {
189 log::warn!("Failed to parse {:?}: {}", path, e);
190 }
191 },
192 Err(e) => {
193 log::warn!("Failed to read {:?}: {}", path, e);
194 }
195 }
196 }
197 }
198 Ok(())
199 }
200
201 if let Some(ref proj) = self.project_path {
203 load_from_dir(proj, &mut contexts)?;
204 }
205 load_from_dir(&self.user_path, &mut contexts)?;
206
207 contexts.sort_by(|a, b| b.created_at.cmp(&a.created_at));
209
210 Ok(contexts)
211 }
212
213 pub fn delete(&self, instance_id: &str) -> Result<()> {
224 if let Some(path) = self.find_file_path(instance_id) {
225 fs::remove_file(&path)?;
226 log::info!("Deleted workflow context: {:?}", path);
227 }
228
229 Ok(())
230 }
231
232 pub fn list_by_status(
240 &self,
241 status: super::context::WorkflowStatus,
242 ) -> Result<Vec<WorkflowContext>> {
243 let all = self.list()?;
244 Ok(all.into_iter().filter(|ctx| ctx.status == status).collect())
245 }
246
247 pub fn save_path(&self) -> &PathBuf {
249 &self.save_path
250 }
251
252 pub fn user_path(&self) -> &PathBuf {
254 &self.user_path
255 }
256
257 pub fn project_path(&self) -> Option<&PathBuf> {
259 self.project_path.as_ref()
260 }
261}
262
263impl Default for WorkflowPersistence {
264 fn default() -> Self {
265 Self::new_global()
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use super::super::context::WorkflowStatus;
273 use std::collections::HashMap;
274 use tempfile::tempdir;
275
276 #[test]
277 fn test_save_and_load() {
278 let dir = tempdir().unwrap();
279 let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
280
281 let mut ctx = WorkflowContext::new("test_workflow".to_string(), HashMap::new());
282 ctx.start();
283
284 persistence.save(&ctx).unwrap();
286
287 let loaded = persistence.load(&ctx.instance_id).unwrap();
289 assert!(loaded.is_some());
290
291 let loaded_ctx = loaded.unwrap();
292 assert_eq!(loaded_ctx.instance_id, ctx.instance_id);
293 assert_eq!(loaded_ctx.workflow_id, "test_workflow");
294 }
295
296 #[test]
297 fn test_load_nonexistent() {
298 let dir = tempdir().unwrap();
299 let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
300
301 let loaded = persistence.load("nonexistent-id").unwrap();
302 assert!(loaded.is_none());
303 }
304
305 #[test]
306 fn test_list() {
307 let dir = tempdir().unwrap();
308 let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
309
310 let mut ctx1 = WorkflowContext::new("workflow1".to_string(), HashMap::new());
312 ctx1.start();
313 let mut ctx2 = WorkflowContext::new("workflow2".to_string(), HashMap::new());
314 ctx2.start();
315
316 persistence.save(&ctx1).unwrap();
317 persistence.save(&ctx2).unwrap();
318
319 let list = persistence.list().unwrap();
320 assert_eq!(list.len(), 2);
321 }
322
323 #[test]
324 fn test_delete() {
325 let dir = tempdir().unwrap();
326 let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
327
328 let ctx = WorkflowContext::new("test".to_string(), HashMap::new());
329 persistence.save(&ctx).unwrap();
330
331 assert!(persistence.load(&ctx.instance_id).unwrap().is_some());
333
334 persistence.delete(&ctx.instance_id).unwrap();
336
337 assert!(persistence.load(&ctx.instance_id).unwrap().is_none());
339 }
340
341 #[test]
342 fn test_save_and_load_paused_workflow() {
343 let dir = tempdir().unwrap();
344 let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
345
346 let mut ctx = WorkflowContext::new("paused_workflow".to_string(), HashMap::new());
347 ctx.start();
348 ctx.set_current_node("node1".to_string());
349 ctx.pause();
350
351 persistence.save(&ctx).unwrap();
353
354 let loaded = persistence.load(&ctx.instance_id).unwrap().unwrap();
356 assert_eq!(loaded.status, WorkflowStatus::Paused);
357 assert_eq!(loaded.current_node_id, Some("node1".to_string()));
358
359 let mut loaded = loaded;
361 loaded.resume();
362 persistence.save(&loaded).unwrap();
363
364 let resumed = persistence.load(&loaded.instance_id).unwrap().unwrap();
366 assert_eq!(resumed.status, WorkflowStatus::Running);
367 }
368
369 #[test]
370 fn test_list_by_status() {
371 let dir = tempdir().unwrap();
372 let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
373
374 let mut ctx1 = WorkflowContext::new("workflow1".to_string(), HashMap::new());
376 ctx1.start();
377 ctx1.pause();
378
379 let mut ctx2 = WorkflowContext::new("workflow2".to_string(), HashMap::new());
380 ctx2.start();
381
382 persistence.save(&ctx1).unwrap();
383 persistence.save(&ctx2).unwrap();
384
385 let paused = persistence.list_by_status(WorkflowStatus::Paused).unwrap();
387 assert_eq!(paused.len(), 1);
388 assert_eq!(paused[0].workflow_id, "workflow1");
389
390 let running = persistence.list_by_status(WorkflowStatus::Running).unwrap();
392 assert_eq!(running.len(), 1);
393 assert_eq!(running[0].workflow_id, "workflow2");
394 }
395}