Skip to main content

matrixcode_core/workflow/
persistence.rs

1//! Workflow persistence module
2//!
3//! Provides save/load functionality for workflow contexts
4//!
5//! Supports two storage locations:
6//! - User directory: `~/.matrix/workflows/` (global workflows)
7//! - Project directory: `.matrix/workflows/` (project-specific workflows)
8
9use anyhow::Result;
10use std::fs;
11use std::path::PathBuf;
12
13use super::context::WorkflowContext;
14
15/// Workflow persistence manager
16///
17/// Handles saving and loading workflow contexts to/from disk.
18/// Files are stored in JSON format under:
19/// - `~/.matrix/workflows/` (user/global)
20/// - `.matrix/workflows/` (project-specific)
21pub struct WorkflowPersistence {
22    /// User directory for global workflows
23    user_path: PathBuf,
24    /// Project directory for project-specific workflows (optional)
25    project_path: Option<PathBuf>,
26    /// Preferred save location (project if available, else user)
27    save_path: PathBuf,
28}
29
30impl WorkflowPersistence {
31    /// Create a new persistence manager with project context
32    ///
33    /// Searches both project and user directories when loading.
34    /// Saves to project directory if available, else to user directory.
35    pub fn new(project_dir: Option<&PathBuf>) -> Self {
36        // User directory: ~/.matrix/workflows/
37        let user_path = dirs::home_dir()
38            .unwrap_or_else(|| PathBuf::from("."))
39            .join(".matrix")
40            .join("workflows");
41
42        // Project directory: <project>/ .matrix/workflows/
43        let project_path = project_dir.map(|p| p.join(".matrix").join("workflows"));
44
45        // Save path: prefer project if available
46        let save_path = project_path.as_ref()
47            .filter(|_p| {
48                // Only use project path if project directory exists
49                project_dir.as_ref().map(|pd| pd.exists()).unwrap_or(false)
50            })
51            .cloned()
52            .unwrap_or_else(|| user_path.clone());
53
54        // Ensure directories exist
55        if let Err(e) = fs::create_dir_all(&user_path) {
56            log::warn!("Failed to create user workflow directory: {}", e);
57        }
58        if let Some(ref proj) = project_path
59            && let Err(e) = fs::create_dir_all(proj) {
60                log::warn!("Failed to create project workflow directory: {}", e);
61            }
62
63        Self {
64            user_path,
65            project_path,
66            save_path,
67        }
68    }
69
70    /// Create a persistence manager for user directory only (no project context)
71    pub fn new_global() -> Self {
72        Self::new(None)
73    }
74
75    /// Create a persistence manager with a custom base path
76    ///
77    /// Useful for testing or custom configurations.
78    pub fn with_base_path(base_path: PathBuf) -> Self {
79        if let Err(e) = fs::create_dir_all(&base_path) {
80            log::warn!("Failed to create workflow directory: {}", e);
81        }
82        Self {
83            user_path: base_path.clone(),
84            project_path: None,
85            save_path: base_path,
86        }
87    }
88
89    /// Get the file path for saving a workflow run
90    fn get_save_file_path(&self, instance_id: &str) -> PathBuf {
91        self.save_path.join(format!("{}.json", instance_id))
92    }
93
94    /// Search for a workflow file in both project and user directories
95    fn find_file_path(&self, instance_id: &str) -> Option<PathBuf> {
96        // Search project first, then user
97        if let Some(ref proj) = self.project_path {
98            let proj_path = proj.join(format!("{}.json", instance_id));
99            if proj_path.exists() {
100                return Some(proj_path);
101            }
102        }
103
104        let user_path = self.user_path.join(format!("{}.json", instance_id));
105        if user_path.exists() {
106            return Some(user_path);
107        }
108
109        None
110    }
111
112    /// Save a workflow context to disk
113    ///
114    /// The context is serialized to JSON and stored in the save directory
115    /// (project directory if available, else user directory).
116    ///
117    /// # Example
118    /// ```ignore
119    /// let persistence = WorkflowPersistence::new(Some(&project_path));
120    /// let ctx = WorkflowContext::new("my_workflow".to_string(), HashMap::new());
121    /// persistence.save(&ctx)?;
122    /// ```
123    pub fn save(&self, context: &WorkflowContext) -> Result<()> {
124        let path = self.get_save_file_path(&context.instance_id);
125        let content = serde_json::to_string_pretty(context)?;
126
127        fs::write(&path, content)?;
128        log::info!("Saved workflow context to {:?}", path);
129
130        Ok(())
131    }
132
133    /// Load a workflow context from disk
134    ///
135    /// Searches both project and user directories.
136    /// Returns `Ok(None)` if the instance_id does not exist.
137    ///
138    /// # Example
139    /// ```ignore
140    /// let persistence = WorkflowPersistence::new(Some(&project_path));
141    /// if let Some(ctx) = persistence.load("run-123")? {
142    ///     println!("Loaded workflow: {}", ctx.workflow_id);
143    /// }
144    /// ```
145    pub fn load(&self, instance_id: &str) -> Result<Option<WorkflowContext>> {
146        let path = self.find_file_path(instance_id);
147
148        if let Some(path) = path {
149            let content = fs::read_to_string(&path)?;
150            let context: WorkflowContext = serde_json::from_str(&content)?;
151            log::info!("Loaded workflow context from {:?}", path);
152            Ok(Some(context))
153        } else {
154            Ok(None)
155        }
156    }
157
158    /// List all saved workflow contexts from both directories
159    ///
160    /// Returns a list of all persisted workflow contexts.
161    /// Failed loads are logged and skipped.
162    ///
163    /// # Example
164    /// ```ignore
165    /// let persistence = WorkflowPersistence::new(Some(&project_path));
166    /// let workflows = persistence.list()?;
167    /// for ctx in workflows {
168    ///     println!("{}: {:?}", ctx.instance_id, ctx.status);
169    /// }
170    /// ```
171    pub fn list(&self) -> Result<Vec<WorkflowContext>> {
172        let mut contexts = Vec::new();
173
174        // Helper to load contexts from a directory
175        fn load_from_dir(dir: &PathBuf, contexts: &mut Vec<WorkflowContext>) -> Result<()> {
176            if !dir.exists() {
177                return Ok(());
178            }
179
180            for entry in fs::read_dir(dir)? {
181                let entry = entry?;
182                let path = entry.path();
183
184                if path.extension().is_some_and(|ext| ext == "json") {
185                    match fs::read_to_string(&path) {
186                        Ok(content) => match serde_json::from_str::<WorkflowContext>(&content) {
187                            Ok(ctx) => contexts.push(ctx),
188                            Err(e) => {
189                                log::warn!("Failed to parse {:?}: {}", path, e);
190                            }
191                        },
192                        Err(e) => {
193                            log::warn!("Failed to read {:?}: {}", path, e);
194                        }
195                    }
196                }
197            }
198            Ok(())
199        }
200
201        // Load from both directories
202        if let Some(ref proj) = self.project_path {
203            load_from_dir(proj, &mut contexts)?;
204        }
205        load_from_dir(&self.user_path, &mut contexts)?;
206
207        // Sort by created_at descending (newest first)
208        contexts.sort_by(|a, b| b.created_at.cmp(&a.created_at));
209
210        Ok(contexts)
211    }
212
213    /// Delete a saved workflow context
214    ///
215    /// Searches both directories and deletes from where it exists.
216    /// Returns `Ok(())` even if the file does not exist.
217    ///
218    /// # Example
219    /// ```ignore
220    /// let persistence = WorkflowPersistence::new(Some(&project_path));
221    /// persistence.delete("run-123")?;
222    /// ```
223    pub fn delete(&self, instance_id: &str) -> Result<()> {
224        if let Some(path) = self.find_file_path(instance_id) {
225            fs::remove_file(&path)?;
226            log::info!("Deleted workflow context: {:?}", path);
227        }
228
229        Ok(())
230    }
231
232    /// List workflow contexts by status
233    ///
234    /// # Example
235    /// ```ignore
236    /// let persistence = WorkflowPersistence::new(Some(&project_path));
237    /// let paused = persistence.list_by_status(WorkflowStatus::Paused)?;
238    /// ```
239    pub fn list_by_status(
240        &self,
241        status: super::context::WorkflowStatus,
242    ) -> Result<Vec<WorkflowContext>> {
243        let all = self.list()?;
244        Ok(all.into_iter().filter(|ctx| ctx.status == status).collect())
245    }
246
247    /// Get the save path (where new workflows will be saved)
248    pub fn save_path(&self) -> &PathBuf {
249        &self.save_path
250    }
251
252    /// Get the user path
253    pub fn user_path(&self) -> &PathBuf {
254        &self.user_path
255    }
256
257    /// Get the project path (if available)
258    pub fn project_path(&self) -> Option<&PathBuf> {
259        self.project_path.as_ref()
260    }
261}
262
263impl Default for WorkflowPersistence {
264    fn default() -> Self {
265        Self::new_global()
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use super::super::context::WorkflowStatus;
273    use std::collections::HashMap;
274    use tempfile::tempdir;
275
276    #[test]
277    fn test_save_and_load() {
278        let dir = tempdir().unwrap();
279        let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
280
281        let mut ctx = WorkflowContext::new("test_workflow".to_string(), HashMap::new());
282        ctx.start();
283
284        // Save
285        persistence.save(&ctx).unwrap();
286
287        // Load
288        let loaded = persistence.load(&ctx.instance_id).unwrap();
289        assert!(loaded.is_some());
290
291        let loaded_ctx = loaded.unwrap();
292        assert_eq!(loaded_ctx.instance_id, ctx.instance_id);
293        assert_eq!(loaded_ctx.workflow_id, "test_workflow");
294    }
295
296    #[test]
297    fn test_load_nonexistent() {
298        let dir = tempdir().unwrap();
299        let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
300
301        let loaded = persistence.load("nonexistent-id").unwrap();
302        assert!(loaded.is_none());
303    }
304
305    #[test]
306    fn test_list() {
307        let dir = tempdir().unwrap();
308        let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
309
310        // Create multiple contexts
311        let mut ctx1 = WorkflowContext::new("workflow1".to_string(), HashMap::new());
312        ctx1.start();
313        let mut ctx2 = WorkflowContext::new("workflow2".to_string(), HashMap::new());
314        ctx2.start();
315
316        persistence.save(&ctx1).unwrap();
317        persistence.save(&ctx2).unwrap();
318
319        let list = persistence.list().unwrap();
320        assert_eq!(list.len(), 2);
321    }
322
323    #[test]
324    fn test_delete() {
325        let dir = tempdir().unwrap();
326        let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
327
328        let ctx = WorkflowContext::new("test".to_string(), HashMap::new());
329        persistence.save(&ctx).unwrap();
330
331        // Verify exists
332        assert!(persistence.load(&ctx.instance_id).unwrap().is_some());
333
334        // Delete
335        persistence.delete(&ctx.instance_id).unwrap();
336
337        // Verify deleted
338        assert!(persistence.load(&ctx.instance_id).unwrap().is_none());
339    }
340
341    #[test]
342    fn test_save_and_load_paused_workflow() {
343        let dir = tempdir().unwrap();
344        let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
345
346        let mut ctx = WorkflowContext::new("paused_workflow".to_string(), HashMap::new());
347        ctx.start();
348        ctx.set_current_node("node1".to_string());
349        ctx.pause();
350
351        // Save paused workflow
352        persistence.save(&ctx).unwrap();
353
354        // Load and verify status
355        let loaded = persistence.load(&ctx.instance_id).unwrap().unwrap();
356        assert_eq!(loaded.status, WorkflowStatus::Paused);
357        assert_eq!(loaded.current_node_id, Some("node1".to_string()));
358
359        // Resume and save again
360        let mut loaded = loaded;
361        loaded.resume();
362        persistence.save(&loaded).unwrap();
363
364        // Verify resumed status
365        let resumed = persistence.load(&loaded.instance_id).unwrap().unwrap();
366        assert_eq!(resumed.status, WorkflowStatus::Running);
367    }
368
369    #[test]
370    fn test_list_by_status() {
371        let dir = tempdir().unwrap();
372        let persistence = WorkflowPersistence::with_base_path(dir.path().to_path_buf());
373
374        // Create workflows with different statuses
375        let mut ctx1 = WorkflowContext::new("workflow1".to_string(), HashMap::new());
376        ctx1.start();
377        ctx1.pause();
378
379        let mut ctx2 = WorkflowContext::new("workflow2".to_string(), HashMap::new());
380        ctx2.start();
381
382        persistence.save(&ctx1).unwrap();
383        persistence.save(&ctx2).unwrap();
384
385        // List paused workflows
386        let paused = persistence.list_by_status(WorkflowStatus::Paused).unwrap();
387        assert_eq!(paused.len(), 1);
388        assert_eq!(paused[0].workflow_id, "workflow1");
389
390        // List running workflows
391        let running = persistence.list_by_status(WorkflowStatus::Running).unwrap();
392        assert_eq!(running.len(), 1);
393        assert_eq!(running[0].workflow_id, "workflow2");
394    }
395}