ricecoder_workflows/
storage_integration.rs1use crate::error::{WorkflowError, WorkflowResult};
4use crate::models::WorkflowState;
5use std::path::{Path, PathBuf};
6
7pub struct StorageIntegration;
9
10impl StorageIntegration {
11 pub fn persist_state(state: &WorkflowState, storage_path: &Path) -> WorkflowResult<()> {
18 if let Some(parent) = storage_path.parent() {
20 std::fs::create_dir_all(parent).map_err(|e| {
21 WorkflowError::StateError(format!(
22 "Failed to create storage directory at {}: {}",
23 parent.display(),
24 e
25 ))
26 })?;
27 }
28
29 let json = serde_json::to_string_pretty(state).map_err(|e| {
31 WorkflowError::StateError(format!("Failed to serialize workflow state: {}", e))
32 })?;
33
34 std::fs::write(storage_path, json).map_err(|e| {
36 WorkflowError::StateError(format!(
37 "Failed to write workflow state to {}: {}",
38 storage_path.display(),
39 e
40 ))
41 })?;
42
43 Ok(())
44 }
45
46 pub fn load_state(storage_path: &Path) -> WorkflowResult<WorkflowState> {
53 if !storage_path.exists() {
54 return Err(WorkflowError::StateError(format!(
55 "Workflow state file not found at {}",
56 storage_path.display()
57 )));
58 }
59
60 let content = std::fs::read_to_string(storage_path).map_err(|e| {
62 WorkflowError::StateError(format!(
63 "Failed to read workflow state from {}: {}",
64 storage_path.display(),
65 e
66 ))
67 })?;
68
69 if let Ok(state) = serde_json::from_str::<WorkflowState>(&content) {
71 return Ok(state);
72 }
73
74 serde_yaml::from_str::<WorkflowState>(&content).map_err(|e| {
76 WorkflowError::StateError(format!(
77 "Failed to deserialize workflow state from {}: {}",
78 storage_path.display(),
79 e
80 ))
81 })
82 }
83
84 pub fn load_state_validated(storage_path: &Path) -> WorkflowResult<WorkflowState> {
91 let state = Self::load_state(storage_path)?;
92 Self::validate_state(&state)?;
93 Ok(state)
94 }
95
96 fn validate_state(state: &WorkflowState) -> WorkflowResult<()> {
98 if state.workflow_id.is_empty() {
100 return Err(WorkflowError::StateError(
101 "Workflow ID cannot be empty".to_string(),
102 ));
103 }
104
105 for step_id in &state.completed_steps {
107 if !state.step_results.contains_key(step_id) {
108 return Err(WorkflowError::StateError(format!(
109 "Completed step '{}' has no result",
110 step_id
111 )));
112 }
113 }
114
115 if let Some(current_step) = &state.current_step {
117 if !state.step_results.contains_key(current_step) {
118 return Err(WorkflowError::StateError(format!(
119 "Current step '{}' has no result",
120 current_step
121 )));
122 }
123 }
124
125 Ok(())
126 }
127
128 pub fn delete_state(storage_path: &Path) -> WorkflowResult<()> {
132 if storage_path.exists() {
133 std::fs::remove_file(storage_path).map_err(|e| {
134 WorkflowError::StateError(format!(
135 "Failed to delete workflow state at {}: {}",
136 storage_path.display(),
137 e
138 ))
139 })?;
140 }
141 Ok(())
142 }
143
144 pub fn state_exists(storage_path: &Path) -> bool {
146 storage_path.exists()
147 }
148
149 pub fn get_workflow_state_path(base_path: &Path, workflow_id: &str) -> PathBuf {
153 base_path
154 .join("workflows")
155 .join(workflow_id)
156 .join("state.json")
157 }
158
159 pub fn get_workflow_storage_dir(base_path: &Path, workflow_id: &str) -> PathBuf {
163 base_path.join("workflows").join(workflow_id)
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use crate::models::{StepStatus, WorkflowStatus};
171 use std::collections::HashMap;
172 use tempfile::TempDir;
173
174 fn create_test_state() -> WorkflowState {
175 let mut step_results = HashMap::new();
176 step_results.insert(
177 "step1".to_string(),
178 crate::models::StepResult {
179 status: StepStatus::Completed,
180 output: Some(serde_json::json!({"result": "success"})),
181 error: None,
182 duration_ms: 100,
183 },
184 );
185 step_results.insert(
186 "step2".to_string(),
187 crate::models::StepResult {
188 status: StepStatus::Running,
189 output: None,
190 error: None,
191 duration_ms: 0,
192 },
193 );
194
195 WorkflowState {
196 workflow_id: "test-workflow".to_string(),
197 status: WorkflowStatus::Running,
198 current_step: Some("step2".to_string()),
199 completed_steps: vec!["step1".to_string()],
200 step_results,
201 started_at: chrono::Utc::now(),
202 updated_at: chrono::Utc::now(),
203 }
204 }
205
206 #[test]
207 fn test_persist_and_load_state() {
208 let temp_dir = TempDir::new().unwrap();
209 let state_path = temp_dir.path().join("state.json");
210
211 let original_state = create_test_state();
212
213 let result = StorageIntegration::persist_state(&original_state, &state_path);
215 assert!(result.is_ok());
216 assert!(state_path.exists());
217
218 let loaded_state = StorageIntegration::load_state(&state_path).unwrap();
220 assert_eq!(loaded_state.workflow_id, original_state.workflow_id);
221 assert_eq!(loaded_state.status, original_state.status);
222 assert_eq!(loaded_state.completed_steps, original_state.completed_steps);
223 }
224
225 #[test]
226 fn test_load_nonexistent_state() {
227 let temp_dir = TempDir::new().unwrap();
228 let state_path = temp_dir.path().join("nonexistent.json");
229
230 let result = StorageIntegration::load_state(&state_path);
231 assert!(result.is_err());
232 }
233
234 #[test]
235 fn test_validate_state_success() {
236 let state = create_test_state();
237 let result = StorageIntegration::validate_state(&state);
238 assert!(result.is_ok());
239 }
240
241 #[test]
242 fn test_validate_state_empty_workflow_id() {
243 let mut state = create_test_state();
244 state.workflow_id = String::new();
245
246 let result = StorageIntegration::validate_state(&state);
247 assert!(result.is_err());
248 }
249
250 #[test]
251 fn test_validate_state_missing_result() {
252 let mut state = create_test_state();
253 state.completed_steps.push("step3".to_string());
254 let result = StorageIntegration::validate_state(&state);
257 assert!(result.is_err());
258 }
259
260 #[test]
261 fn test_delete_state() {
262 let temp_dir = TempDir::new().unwrap();
263 let state_path = temp_dir.path().join("state.json");
264
265 let state = create_test_state();
266 StorageIntegration::persist_state(&state, &state_path).unwrap();
267 assert!(state_path.exists());
268
269 let result = StorageIntegration::delete_state(&state_path);
270 assert!(result.is_ok());
271 assert!(!state_path.exists());
272 }
273
274 #[test]
275 fn test_state_exists() {
276 let temp_dir = TempDir::new().unwrap();
277 let state_path = temp_dir.path().join("state.json");
278
279 assert!(!StorageIntegration::state_exists(&state_path));
280
281 let state = create_test_state();
282 StorageIntegration::persist_state(&state, &state_path).unwrap();
283 assert!(StorageIntegration::state_exists(&state_path));
284 }
285
286 #[test]
287 fn test_get_workflow_state_path() {
288 let base_path = Path::new("/storage");
289 let path = StorageIntegration::get_workflow_state_path(base_path, "my-workflow");
290
291 assert_eq!(
292 path,
293 PathBuf::from("/storage/workflows/my-workflow/state.json")
294 );
295 }
296
297 #[test]
298 fn test_get_workflow_storage_dir() {
299 let base_path = Path::new("/storage");
300 let dir = StorageIntegration::get_workflow_storage_dir(base_path, "my-workflow");
301
302 assert_eq!(dir, PathBuf::from("/storage/workflows/my-workflow"));
303 }
304
305 #[test]
306 fn test_load_state_validated() {
307 let temp_dir = TempDir::new().unwrap();
308 let state_path = temp_dir.path().join("state.json");
309
310 let state = create_test_state();
311 StorageIntegration::persist_state(&state, &state_path).unwrap();
312
313 let result = StorageIntegration::load_state_validated(&state_path);
314 assert!(result.is_ok());
315 }
316
317 #[test]
318 fn test_load_state_validated_with_invalid_state() {
319 let temp_dir = TempDir::new().unwrap();
320 let state_path = temp_dir.path().join("state.json");
321
322 let mut state = create_test_state();
323 state.workflow_id = String::new();
324 StorageIntegration::persist_state(&state, &state_path).unwrap();
325
326 let result = StorageIntegration::load_state_validated(&state_path);
327 assert!(result.is_err());
328 }
329}