Skip to main content

matrixcode_core/workflow/
persistence.rs

1//! Workflow persistence module
2//!
3//! Provides save/load functionality for workflow contexts
4//!
5//! Supports two storage locations:
6//! - User directory: `~/.matrix/workflows/` (global workflows)
7//! - Project directory: `.matrix/workflows/` (project-specific workflows)
8
9use anyhow::Result;
10use std::fs;
11use std::path::PathBuf;
12
13use super::context::WorkflowContext;
14
15/// Workflow persistence manager
16///
17/// Handles saving and loading workflow contexts to/from disk.
18/// Files are stored in JSON format under:
19/// - `~/.matrix/workflows/` (user/global)
20/// - `.matrix/workflows/` (project-specific)
21pub struct WorkflowPersistence {
22    /// User directory for global workflows
23    user_path: PathBuf,
24    /// Project directory for project-specific workflows (optional)
25    project_path: Option<PathBuf>,
26    /// Preferred save location (project if available, else user)
27    save_path: PathBuf,
28}
29
30impl WorkflowPersistence {
31    /// Create a new persistence manager with project context
32    ///
33    /// Searches both project and user directories when loading.
34    /// Saves to project directory if available, else to user directory.
35    pub fn new(project_dir: Option<&PathBuf>) -> Self {
36        // User directory: ~/.matrix/workflows/
37        let user_path = dirs::home_dir()
38            .unwrap_or_else(|| PathBuf::from("."))
39            .join(".matrix")
40            .join("workflows");
41
42        // Project directory: <project>/ .matrix/workflows/
43        let project_path = project_dir.map(|p| p.join(".matrix").join("workflows"));
44
45        // Save path: prefer project if available
46        let save_path = project_path
47            .as_ref()
48            .filter(|_p| {
49                // Only use project path if project directory exists
50                project_dir.as_ref().map(|pd| pd.exists()).unwrap_or(false)
51            })
52            .cloned()
53            .unwrap_or_else(|| user_path.clone());
54
55        // Ensure directories exist
56        if let Err(e) = fs::create_dir_all(&user_path) {
57            log::warn!("Failed to create user workflow directory: {}", e);
58        }
59        if let Some(ref proj) = project_path
60            && let Err(e) = fs::create_dir_all(proj)
61        {
62            log::warn!("Failed to create project workflow directory: {}", e);
63        }
64
65        Self {
66            user_path,
67            project_path,
68            save_path,
69        }
70    }
71
72    /// Create a persistence manager for user directory only (no project context)
73    pub fn new_global() -> Self {
74        Self::new(None)
75    }
76
77    /// Create a persistence manager with a custom base path
78    ///
79    /// Useful for testing or custom configurations.
80    pub fn with_base_path(base_path: PathBuf) -> Self {
81        if let Err(e) = fs::create_dir_all(&base_path) {
82            log::warn!("Failed to create workflow directory: {}", e);
83        }
84        Self {
85            user_path: base_path.clone(),
86            project_path: None,
87            save_path: base_path,
88        }
89    }
90
91    /// Get the file path for saving a workflow run
92    fn get_save_file_path(&self, instance_id: &str) -> PathBuf {
93        self.save_path.join(format!("{}.json", instance_id))
94    }
95
96    /// Search for a workflow file in both project and user directories
97    fn find_file_path(&self, instance_id: &str) -> Option<PathBuf> {
98        // Search project first, then user
99        if let Some(ref proj) = self.project_path {
100            let proj_path = proj.join(format!("{}.json", instance_id));
101            if proj_path.exists() {
102                return Some(proj_path);
103            }
104        }
105
106        let user_path = self.user_path.join(format!("{}.json", instance_id));
107        if user_path.exists() {
108            return Some(user_path);
109        }
110
111        None
112    }
113
114    /// Save a workflow context to disk
115    ///
116    /// The context is serialized to JSON and stored in the save directory
117    /// (project directory if available, else user directory).
118    ///
119    /// # Example
120    /// ```ignore
121    /// let persistence = WorkflowPersistence::new(Some(&project_path));
122    /// let ctx = WorkflowContext::new("my_workflow".to_string(), HashMap::new());
123    /// persistence.save(&ctx)?;
124    /// ```
125    pub fn save(&self, context: &WorkflowContext) -> Result<()> {
126        let path = self.get_save_file_path(&context.instance_id);
127        let content = serde_json::to_string_pretty(context)?;
128
129        fs::write(&path, content)?;
130        log::info!("Saved workflow context to {:?}", path);
131
132        Ok(())
133    }
134
135    /// Load a workflow context from disk
136    ///
137    /// Searches both project and user directories.
138    /// Returns `Ok(None)` if the instance_id does not exist.
139    ///
140    /// # Example
141    /// ```ignore
142    /// let persistence = WorkflowPersistence::new(Some(&project_path));
143    /// if let Some(ctx) = persistence.load("run-123")? {
144    ///     println!("Loaded workflow: {}", ctx.workflow_id);
145    /// }
146    /// ```
147    pub fn load(&self, instance_id: &str) -> Result<Option<WorkflowContext>> {
148        let path = self.find_file_path(instance_id);
149
150        if let Some(path) = path {
151            let content = fs::read_to_string(&path)?;
152            let context: WorkflowContext = serde_json::from_str(&content)?;
153            log::info!("Loaded workflow context from {:?}", path);
154            Ok(Some(context))
155        } else {
156            Ok(None)
157        }
158    }
159
160    /// List all saved workflow contexts from both directories
161    ///
162    /// Returns a list of all persisted workflow contexts.
163    /// Failed loads are logged and skipped.
164    ///
165    /// # Example
166    /// ```ignore
167    /// let persistence = WorkflowPersistence::new(Some(&project_path));
168    /// let workflows = persistence.list()?;
169    /// for ctx in workflows {
170    ///     println!("{}: {:?}", ctx.instance_id, ctx.status);
171    /// }
172    /// ```
173    pub fn list(&self) -> Result<Vec<WorkflowContext>> {
174        let mut contexts = Vec::new();
175
176        // Helper to load contexts from a directory
177        fn load_from_dir(dir: &PathBuf, contexts: &mut Vec<WorkflowContext>) -> Result<()> {
178            if !dir.exists() {
179                return Ok(());
180            }
181
182            for entry in fs::read_dir(dir)? {
183                let entry = entry?;
184                let path = entry.path();
185
186                if path.extension().is_some_and(|ext| ext == "json") {
187                    match fs::read_to_string(&path) {
188                        Ok(content) => match serde_json::from_str::<WorkflowContext>(&content) {
189                            Ok(ctx) => contexts.push(ctx),
190                            Err(e) => {
191                                log::warn!("Failed to parse {:?}: {}", path, e);
192                            }
193                        },
194                        Err(e) => {
195                            log::warn!("Failed to read {:?}: {}", path, e);
196                        }
197                    }
198                }
199            }
200            Ok(())
201        }
202
203        // Load from both directories
204        if let Some(ref proj) = self.project_path {
205            load_from_dir(proj, &mut contexts)?;
206        }
207        load_from_dir(&self.user_path, &mut contexts)?;
208
209        // Sort by created_at descending (newest first)
210        contexts.sort_by(|a, b| b.created_at.cmp(&a.created_at));
211
212        Ok(contexts)
213    }
214
215    /// Delete a saved workflow context
216    ///
217    /// Searches both directories and deletes from where it exists.
218    /// Returns `Ok(())` even if the file does not exist.
219    ///
220    /// # Example
221    /// ```ignore
222    /// let persistence = WorkflowPersistence::new(Some(&project_path));
223    /// persistence.delete("run-123")?;
224    /// ```
225    pub fn delete(&self, instance_id: &str) -> Result<()> {
226        if let Some(path) = self.find_file_path(instance_id) {
227            fs::remove_file(&path)?;
228            log::info!("Deleted workflow context: {:?}", path);
229        }
230
231        Ok(())
232    }
233
234    /// List workflow contexts by status
235    ///
236    /// # Example
237    /// ```ignore
238    /// let persistence = WorkflowPersistence::new(Some(&project_path));
239    /// let paused = persistence.list_by_status(WorkflowStatus::Paused)?;
240    /// ```
241    pub fn list_by_status(
242        &self,
243        status: super::context::WorkflowStatus,
244    ) -> Result<Vec<WorkflowContext>> {
245        let all = self.list()?;
246        Ok(all.into_iter().filter(|ctx| ctx.status == status).collect())
247    }
248
249    /// Get the save path (where new workflows will be saved)
250    pub fn save_path(&self) -> &PathBuf {
251        &self.save_path
252    }
253
254    /// Get the user path
255    pub fn user_path(&self) -> &PathBuf {
256        &self.user_path
257    }
258
259    /// Get the project path (if available)
260    pub fn project_path(&self) -> Option<&PathBuf> {
261        self.project_path.as_ref()
262    }
263}
264
265impl Default for WorkflowPersistence {
266    fn default() -> Self {
267        Self::new_global()
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::super::context::WorkflowStatus;
274    use super::*;
275    use std::collections::HashMap;
276    use tempfile::tempdir;
277
278    #[test]
279    fn test_save_and_load() {
280        let dir = tempdir().unwrap();
281        let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
282
283        let mut ctx = WorkflowContext::new("test_workflow".to_string(), HashMap::new());
284        ctx.start();
285
286        // Save
287        persistence.save(&ctx).unwrap();
288
289        // Load
290        let loaded = persistence.load(&ctx.instance_id).unwrap();
291        assert!(loaded.is_some());
292
293        let loaded_ctx = loaded.unwrap();
294        assert_eq!(loaded_ctx.instance_id, ctx.instance_id);
295        assert_eq!(loaded_ctx.workflow_id, "test_workflow");
296    }
297
298    #[test]
299    fn test_load_nonexistent() {
300        let dir = tempdir().unwrap();
301        let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
302
303        let loaded = persistence.load("nonexistent-id").unwrap();
304        assert!(loaded.is_none());
305    }
306
307    #[test]
308    fn test_list() {
309        let dir = tempdir().unwrap();
310        let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
311
312        // Create multiple contexts
313        let mut ctx1 = WorkflowContext::new("workflow1".to_string(), HashMap::new());
314        ctx1.start();
315        let mut ctx2 = WorkflowContext::new("workflow2".to_string(), HashMap::new());
316        ctx2.start();
317
318        persistence.save(&ctx1).unwrap();
319        persistence.save(&ctx2).unwrap();
320
321        let list = persistence.list().unwrap();
322        assert_eq!(list.len(), 2);
323    }
324
325    #[test]
326    fn test_delete() {
327        let dir = tempdir().unwrap();
328        let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
329
330        let ctx = WorkflowContext::new("test".to_string(), HashMap::new());
331        persistence.save(&ctx).unwrap();
332
333        // Verify exists
334        assert!(persistence.load(&ctx.instance_id).unwrap().is_some());
335
336        // Delete
337        persistence.delete(&ctx.instance_id).unwrap();
338
339        // Verify deleted
340        assert!(persistence.load(&ctx.instance_id).unwrap().is_none());
341    }
342
343    #[test]
344    fn test_save_and_load_paused_workflow() {
345        let dir = tempdir().unwrap();
346        let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
347
348        let mut ctx = WorkflowContext::new("paused_workflow".to_string(), HashMap::new());
349        ctx.start();
350        ctx.set_current_node("node1".to_string());
351        ctx.pause();
352
353        // Save paused workflow
354        persistence.save(&ctx).unwrap();
355
356        // Load and verify status
357        let loaded = persistence.load(&ctx.instance_id).unwrap().unwrap();
358        assert_eq!(loaded.status, WorkflowStatus::Paused);
359        assert_eq!(loaded.current_node_id, Some("node1".to_string()));
360
361        // Resume and save again
362        let mut loaded = loaded;
363        loaded.resume();
364        persistence.save(&loaded).unwrap();
365
366        // Verify resumed status
367        let resumed = persistence.load(&loaded.instance_id).unwrap().unwrap();
368        assert_eq!(resumed.status, WorkflowStatus::Running);
369    }
370
371    #[test]
372    fn test_list_by_status() {
373        let dir = tempdir().unwrap();
374        let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
375
376        // Create workflows with different statuses
377        let mut ctx1 = WorkflowContext::new("workflow1".to_string(), HashMap::new());
378        ctx1.start();
379        ctx1.pause();
380
381        let mut ctx2 = WorkflowContext::new("workflow2".to_string(), HashMap::new());
382        ctx2.start();
383
384        persistence.save(&ctx1).unwrap();
385        persistence.save(&ctx2).unwrap();
386
387        // List paused workflows
388        let paused = persistence.list_by_status(WorkflowStatus::Paused).unwrap();
389        assert_eq!(paused.len(), 1);
390        assert_eq!(paused[0].workflow_id, "workflow1");
391
392        // List running workflows
393        let running = persistence.list_by_status(WorkflowStatus::Running).unwrap();
394        assert_eq!(running.len(), 1);
395        assert_eq!(running[0].workflow_id, "workflow2");
396    }
397}