matrixcode_core/workflow/
persistence.rs1use anyhow::Result;
10use std::fs;
11use std::path::PathBuf;
12
13use super::context::WorkflowContext;
14
15pub struct WorkflowPersistence {
22 user_path: PathBuf,
24 project_path: Option<PathBuf>,
26 save_path: PathBuf,
28}
29
30impl WorkflowPersistence {
31 pub fn new(project_dir: Option<&PathBuf>) -> Self {
36 let user_path = dirs::home_dir()
38 .unwrap_or_else(|| PathBuf::from("."))
39 .join(".matrix")
40 .join("workflows");
41
42 let project_path = project_dir.map(|p| p.join(".matrix").join("workflows"));
44
45 let save_path = project_path
47 .as_ref()
48 .filter(|_p| {
49 project_dir.as_ref().map(|pd| pd.exists()).unwrap_or(false)
51 })
52 .cloned()
53 .unwrap_or_else(|| user_path.clone());
54
55 if let Err(e) = fs::create_dir_all(&user_path) {
57 log::warn!("Failed to create user workflow directory: {}", e);
58 }
59 if let Some(ref proj) = project_path
60 && let Err(e) = fs::create_dir_all(proj)
61 {
62 log::warn!("Failed to create project workflow directory: {}", e);
63 }
64
65 Self {
66 user_path,
67 project_path,
68 save_path,
69 }
70 }
71
72 pub fn new_global() -> Self {
74 Self::new(None)
75 }
76
77 pub fn with_base_path(base_path: PathBuf) -> Self {
81 if let Err(e) = fs::create_dir_all(&base_path) {
82 log::warn!("Failed to create workflow directory: {}", e);
83 }
84 Self {
85 user_path: base_path.clone(),
86 project_path: None,
87 save_path: base_path,
88 }
89 }
90
91 fn get_save_file_path(&self, instance_id: &str) -> PathBuf {
93 self.save_path.join(format!("{}.json", instance_id))
94 }
95
96 fn find_file_path(&self, instance_id: &str) -> Option<PathBuf> {
98 if let Some(ref proj) = self.project_path {
100 let proj_path = proj.join(format!("{}.json", instance_id));
101 if proj_path.exists() {
102 return Some(proj_path);
103 }
104 }
105
106 let user_path = self.user_path.join(format!("{}.json", instance_id));
107 if user_path.exists() {
108 return Some(user_path);
109 }
110
111 None
112 }
113
114 pub fn save(&self, context: &WorkflowContext) -> Result<()> {
126 let path = self.get_save_file_path(&context.instance_id);
127 let content = serde_json::to_string_pretty(context)?;
128
129 fs::write(&path, content)?;
130 log::info!("Saved workflow context to {:?}", path);
131
132 Ok(())
133 }
134
135 pub fn load(&self, instance_id: &str) -> Result<Option<WorkflowContext>> {
148 let path = self.find_file_path(instance_id);
149
150 if let Some(path) = path {
151 let content = fs::read_to_string(&path)?;
152 let context: WorkflowContext = serde_json::from_str(&content)?;
153 log::info!("Loaded workflow context from {:?}", path);
154 Ok(Some(context))
155 } else {
156 Ok(None)
157 }
158 }
159
160 pub fn list(&self) -> Result<Vec<WorkflowContext>> {
174 let mut contexts = Vec::new();
175
176 fn load_from_dir(dir: &PathBuf, contexts: &mut Vec<WorkflowContext>) -> Result<()> {
178 if !dir.exists() {
179 return Ok(());
180 }
181
182 for entry in fs::read_dir(dir)? {
183 let entry = entry?;
184 let path = entry.path();
185
186 if path.extension().is_some_and(|ext| ext == "json") {
187 match fs::read_to_string(&path) {
188 Ok(content) => match serde_json::from_str::<WorkflowContext>(&content) {
189 Ok(ctx) => contexts.push(ctx),
190 Err(e) => {
191 log::warn!("Failed to parse {:?}: {}", path, e);
192 }
193 },
194 Err(e) => {
195 log::warn!("Failed to read {:?}: {}", path, e);
196 }
197 }
198 }
199 }
200 Ok(())
201 }
202
203 if let Some(ref proj) = self.project_path {
205 load_from_dir(proj, &mut contexts)?;
206 }
207 load_from_dir(&self.user_path, &mut contexts)?;
208
209 contexts.sort_by(|a, b| b.created_at.cmp(&a.created_at));
211
212 Ok(contexts)
213 }
214
215 pub fn delete(&self, instance_id: &str) -> Result<()> {
226 if let Some(path) = self.find_file_path(instance_id) {
227 fs::remove_file(&path)?;
228 log::info!("Deleted workflow context: {:?}", path);
229 }
230
231 Ok(())
232 }
233
234 pub fn list_by_status(
242 &self,
243 status: super::context::WorkflowStatus,
244 ) -> Result<Vec<WorkflowContext>> {
245 let all = self.list()?;
246 Ok(all.into_iter().filter(|ctx| ctx.status == status).collect())
247 }
248
249 pub fn save_path(&self) -> &PathBuf {
251 &self.save_path
252 }
253
254 pub fn user_path(&self) -> &PathBuf {
256 &self.user_path
257 }
258
259 pub fn project_path(&self) -> Option<&PathBuf> {
261 self.project_path.as_ref()
262 }
263}
264
265impl Default for WorkflowPersistence {
266 fn default() -> Self {
267 Self::new_global()
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::super::context::WorkflowStatus;
274 use super::*;
275 use std::collections::HashMap;
276 use tempfile::tempdir;
277
278 #[test]
279 fn test_save_and_load() {
280 let dir = tempdir().unwrap();
281 let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
282
283 let mut ctx = WorkflowContext::new("test_workflow".to_string(), HashMap::new());
284 ctx.start();
285
286 persistence.save(&ctx).unwrap();
288
289 let loaded = persistence.load(&ctx.instance_id).unwrap();
291 assert!(loaded.is_some());
292
293 let loaded_ctx = loaded.unwrap();
294 assert_eq!(loaded_ctx.instance_id, ctx.instance_id);
295 assert_eq!(loaded_ctx.workflow_id, "test_workflow");
296 }
297
298 #[test]
299 fn test_load_nonexistent() {
300 let dir = tempdir().unwrap();
301 let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
302
303 let loaded = persistence.load("nonexistent-id").unwrap();
304 assert!(loaded.is_none());
305 }
306
307 #[test]
308 fn test_list() {
309 let dir = tempdir().unwrap();
310 let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
311
312 let mut ctx1 = WorkflowContext::new("workflow1".to_string(), HashMap::new());
314 ctx1.start();
315 let mut ctx2 = WorkflowContext::new("workflow2".to_string(), HashMap::new());
316 ctx2.start();
317
318 persistence.save(&ctx1).unwrap();
319 persistence.save(&ctx2).unwrap();
320
321 let list = persistence.list().unwrap();
322 assert_eq!(list.len(), 2);
323 }
324
325 #[test]
326 fn test_delete() {
327 let dir = tempdir().unwrap();
328 let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
329
330 let ctx = WorkflowContext::new("test".to_string(), HashMap::new());
331 persistence.save(&ctx).unwrap();
332
333 assert!(persistence.load(&ctx.instance_id).unwrap().is_some());
335
336 persistence.delete(&ctx.instance_id).unwrap();
338
339 assert!(persistence.load(&ctx.instance_id).unwrap().is_none());
341 }
342
343 #[test]
344 fn test_save_and_load_paused_workflow() {
345 let dir = tempdir().unwrap();
346 let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
347
348 let mut ctx = WorkflowContext::new("paused_workflow".to_string(), HashMap::new());
349 ctx.start();
350 ctx.set_current_node("node1".to_string());
351 ctx.pause();
352
353 persistence.save(&ctx).unwrap();
355
356 let loaded = persistence.load(&ctx.instance_id).unwrap().unwrap();
358 assert_eq!(loaded.status, WorkflowStatus::Paused);
359 assert_eq!(loaded.current_node_id, Some("node1".to_string()));
360
361 let mut loaded = loaded;
363 loaded.resume();
364 persistence.save(&loaded).unwrap();
365
366 let resumed = persistence.load(&loaded.instance_id).unwrap().unwrap();
368 assert_eq!(resumed.status, WorkflowStatus::Running);
369 }
370
371 #[test]
372 fn test_list_by_status() {
373 let dir = tempdir().unwrap();
374 let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
375
376 let mut ctx1 = WorkflowContext::new("workflow1".to_string(), HashMap::new());
378 ctx1.start();
379 ctx1.pause();
380
381 let mut ctx2 = WorkflowContext::new("workflow2".to_string(), HashMap::new());
382 ctx2.start();
383
384 persistence.save(&ctx1).unwrap();
385 persistence.save(&ctx2).unwrap();
386
387 let paused = persistence.list_by_status(WorkflowStatus::Paused).unwrap();
389 assert_eq!(paused.len(), 1);
390 assert_eq!(paused[0].workflow_id, "workflow1");
391
392 let running = persistence.list_by_status(WorkflowStatus::Running).unwrap();
394 assert_eq!(running.len(), 1);
395 assert_eq!(running[0].workflow_id, "workflow2");
396 }
397}