ricecoder_workflows/
storage_integration.rs

1//! Integration with ricecoder-storage for workflow state persistence
2
3use crate::error::{WorkflowError, WorkflowResult};
4use crate::models::WorkflowState;
5use std::path::{Path, PathBuf};
6
7/// Storage integration for workflow state persistence
8pub struct StorageIntegration;
9
10impl StorageIntegration {
11    /// Persist workflow state using storage manager
12    ///
13    /// This function handles storage errors gracefully by:
14    /// - Creating necessary directories
15    /// - Serializing state to JSON format
16    /// - Handling IO errors with context
17    pub fn persist_state(state: &WorkflowState, storage_path: &Path) -> WorkflowResult<()> {
18        // Ensure parent directory exists
19        if let Some(parent) = storage_path.parent() {
20            std::fs::create_dir_all(parent).map_err(|e| {
21                WorkflowError::StateError(format!(
22                    "Failed to create storage directory at {}: {}",
23                    parent.display(),
24                    e
25                ))
26            })?;
27        }
28
29        // Serialize state to JSON
30        let json = serde_json::to_string_pretty(state).map_err(|e| {
31            WorkflowError::StateError(format!("Failed to serialize workflow state: {}", e))
32        })?;
33
34        // Write to file
35        std::fs::write(storage_path, json).map_err(|e| {
36            WorkflowError::StateError(format!(
37                "Failed to write workflow state to {}: {}",
38                storage_path.display(),
39                e
40            ))
41        })?;
42
43        Ok(())
44    }
45
46    /// Load workflow state from storage
47    ///
48    /// This function handles storage errors gracefully by:
49    /// - Checking if file exists
50    /// - Reading and deserializing JSON
51    /// - Validating state integrity
52    pub fn load_state(storage_path: &Path) -> WorkflowResult<WorkflowState> {
53        if !storage_path.exists() {
54            return Err(WorkflowError::StateError(format!(
55                "Workflow state file not found at {}",
56                storage_path.display()
57            )));
58        }
59
60        // Read file
61        let content = std::fs::read_to_string(storage_path).map_err(|e| {
62            WorkflowError::StateError(format!(
63                "Failed to read workflow state from {}: {}",
64                storage_path.display(),
65                e
66            ))
67        })?;
68
69        // Try JSON first (primary format)
70        if let Ok(state) = serde_json::from_str::<WorkflowState>(&content) {
71            return Ok(state);
72        }
73
74        // Fall back to YAML for backward compatibility
75        serde_yaml::from_str::<WorkflowState>(&content).map_err(|e| {
76            WorkflowError::StateError(format!(
77                "Failed to deserialize workflow state from {}: {}",
78                storage_path.display(),
79                e
80            ))
81        })
82    }
83
84    /// Load workflow state with validation
85    ///
86    /// Validates state integrity after loading to ensure:
87    /// - Workflow ID is not empty
88    /// - All completed steps have results
89    /// - Current step (if any) has a result
90    pub fn load_state_validated(storage_path: &Path) -> WorkflowResult<WorkflowState> {
91        let state = Self::load_state(storage_path)?;
92        Self::validate_state(&state)?;
93        Ok(state)
94    }
95
96    /// Validate workflow state integrity
97    fn validate_state(state: &WorkflowState) -> WorkflowResult<()> {
98        // Check that workflow_id is not empty
99        if state.workflow_id.is_empty() {
100            return Err(WorkflowError::StateError(
101                "Workflow ID cannot be empty".to_string(),
102            ));
103        }
104
105        // Check that all completed steps have results
106        for step_id in &state.completed_steps {
107            if !state.step_results.contains_key(step_id) {
108                return Err(WorkflowError::StateError(format!(
109                    "Completed step '{}' has no result",
110                    step_id
111                )));
112            }
113        }
114
115        // Check that current step (if any) has a result
116        if let Some(current_step) = &state.current_step {
117            if !state.step_results.contains_key(current_step) {
118                return Err(WorkflowError::StateError(format!(
119                    "Current step '{}' has no result",
120                    current_step
121                )));
122            }
123        }
124
125        Ok(())
126    }
127
128    /// Delete workflow state from storage
129    ///
130    /// Handles deletion errors gracefully
131    pub fn delete_state(storage_path: &Path) -> WorkflowResult<()> {
132        if storage_path.exists() {
133            std::fs::remove_file(storage_path).map_err(|e| {
134                WorkflowError::StateError(format!(
135                    "Failed to delete workflow state at {}: {}",
136                    storage_path.display(),
137                    e
138                ))
139            })?;
140        }
141        Ok(())
142    }
143
144    /// Check if workflow state exists in storage
145    pub fn state_exists(storage_path: &Path) -> bool {
146        storage_path.exists()
147    }
148
149    /// Get the storage path for a workflow
150    ///
151    /// Constructs a path like: `{base_path}/workflows/{workflow_id}/state.json`
152    pub fn get_workflow_state_path(base_path: &Path, workflow_id: &str) -> PathBuf {
153        base_path
154            .join("workflows")
155            .join(workflow_id)
156            .join("state.json")
157    }
158
159    /// Get the storage directory for a workflow
160    ///
161    /// Constructs a path like: `{base_path}/workflows/{workflow_id}/`
162    pub fn get_workflow_storage_dir(base_path: &Path, workflow_id: &str) -> PathBuf {
163        base_path.join("workflows").join(workflow_id)
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use crate::models::{StepStatus, WorkflowStatus};
171    use std::collections::HashMap;
172    use tempfile::TempDir;
173
174    fn create_test_state() -> WorkflowState {
175        let mut step_results = HashMap::new();
176        step_results.insert(
177            "step1".to_string(),
178            crate::models::StepResult {
179                status: StepStatus::Completed,
180                output: Some(serde_json::json!({"result": "success"})),
181                error: None,
182                duration_ms: 100,
183            },
184        );
185        step_results.insert(
186            "step2".to_string(),
187            crate::models::StepResult {
188                status: StepStatus::Running,
189                output: None,
190                error: None,
191                duration_ms: 0,
192            },
193        );
194
195        WorkflowState {
196            workflow_id: "test-workflow".to_string(),
197            status: WorkflowStatus::Running,
198            current_step: Some("step2".to_string()),
199            completed_steps: vec!["step1".to_string()],
200            step_results,
201            started_at: chrono::Utc::now(),
202            updated_at: chrono::Utc::now(),
203        }
204    }
205
206    #[test]
207    fn test_persist_and_load_state() {
208        let temp_dir = TempDir::new().unwrap();
209        let state_path = temp_dir.path().join("state.json");
210
211        let original_state = create_test_state();
212
213        // Persist state
214        let result = StorageIntegration::persist_state(&original_state, &state_path);
215        assert!(result.is_ok());
216        assert!(state_path.exists());
217
218        // Load state
219        let loaded_state = StorageIntegration::load_state(&state_path).unwrap();
220        assert_eq!(loaded_state.workflow_id, original_state.workflow_id);
221        assert_eq!(loaded_state.status, original_state.status);
222        assert_eq!(loaded_state.completed_steps, original_state.completed_steps);
223    }
224
225    #[test]
226    fn test_load_nonexistent_state() {
227        let temp_dir = TempDir::new().unwrap();
228        let state_path = temp_dir.path().join("nonexistent.json");
229
230        let result = StorageIntegration::load_state(&state_path);
231        assert!(result.is_err());
232    }
233
234    #[test]
235    fn test_validate_state_success() {
236        let state = create_test_state();
237        let result = StorageIntegration::validate_state(&state);
238        assert!(result.is_ok());
239    }
240
241    #[test]
242    fn test_validate_state_empty_workflow_id() {
243        let mut state = create_test_state();
244        state.workflow_id = String::new();
245
246        let result = StorageIntegration::validate_state(&state);
247        assert!(result.is_err());
248    }
249
250    #[test]
251    fn test_validate_state_missing_result() {
252        let mut state = create_test_state();
253        state.completed_steps.push("step3".to_string());
254        // Don't add result for step3
255
256        let result = StorageIntegration::validate_state(&state);
257        assert!(result.is_err());
258    }
259
260    #[test]
261    fn test_delete_state() {
262        let temp_dir = TempDir::new().unwrap();
263        let state_path = temp_dir.path().join("state.json");
264
265        let state = create_test_state();
266        StorageIntegration::persist_state(&state, &state_path).unwrap();
267        assert!(state_path.exists());
268
269        let result = StorageIntegration::delete_state(&state_path);
270        assert!(result.is_ok());
271        assert!(!state_path.exists());
272    }
273
274    #[test]
275    fn test_state_exists() {
276        let temp_dir = TempDir::new().unwrap();
277        let state_path = temp_dir.path().join("state.json");
278
279        assert!(!StorageIntegration::state_exists(&state_path));
280
281        let state = create_test_state();
282        StorageIntegration::persist_state(&state, &state_path).unwrap();
283        assert!(StorageIntegration::state_exists(&state_path));
284    }
285
286    #[test]
287    fn test_get_workflow_state_path() {
288        let base_path = Path::new("/storage");
289        let path = StorageIntegration::get_workflow_state_path(base_path, "my-workflow");
290
291        assert_eq!(
292            path,
293            PathBuf::from("/storage/workflows/my-workflow/state.json")
294        );
295    }
296
297    #[test]
298    fn test_get_workflow_storage_dir() {
299        let base_path = Path::new("/storage");
300        let dir = StorageIntegration::get_workflow_storage_dir(base_path, "my-workflow");
301
302        assert_eq!(dir, PathBuf::from("/storage/workflows/my-workflow"));
303    }
304
305    #[test]
306    fn test_load_state_validated() {
307        let temp_dir = TempDir::new().unwrap();
308        let state_path = temp_dir.path().join("state.json");
309
310        let state = create_test_state();
311        StorageIntegration::persist_state(&state, &state_path).unwrap();
312
313        let result = StorageIntegration::load_state_validated(&state_path);
314        assert!(result.is_ok());
315    }
316
317    #[test]
318    fn test_load_state_validated_with_invalid_state() {
319        let temp_dir = TempDir::new().unwrap();
320        let state_path = temp_dir.path().join("state.json");
321
322        let mut state = create_test_state();
323        state.workflow_id = String::new();
324        StorageIntegration::persist_state(&state, &state_path).unwrap();
325
326        let result = StorageIntegration::load_state_validated(&state_path);
327        assert!(result.is_err());
328    }
329}