Skip to main content

raps_admin/bulk/
state.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2025 Dmytro Yemelianov
3
4//! Operation state persistence
5
6use std::collections::HashMap;
7use std::path::PathBuf;
8
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use uuid::Uuid;
12
13use crate::bulk::executor::{ItemResult, ProgressUpdate};
14use crate::error::AdminError;
15use crate::types::{OperationStatus, OperationType};
16
17/// Manages persistent state for bulk operations
18pub struct StateManager {
19    state_dir: PathBuf,
20}
21
22/// Persisted state of an operation
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct OperationState {
25    /// Unique operation identifier
26    pub operation_id: Uuid,
27    /// Type of operation
28    pub operation_type: OperationType,
29    /// Current status
30    pub status: OperationStatus,
31    /// Operation parameters (JSON value for flexibility)
32    pub parameters: serde_json::Value,
33    /// List of project IDs to process
34    pub project_ids: Vec<String>,
35    /// Per-project results
36    pub results: HashMap<String, ProjectResultState>,
37    /// When the operation was created
38    pub created_at: DateTime<Utc>,
39    /// Last update time
40    pub updated_at: DateTime<Utc>,
41}
42
43/// State of a single project's processing
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ProjectResultState {
46    /// Processing result
47    pub result: ItemResult,
48    /// Number of attempts made
49    pub attempts: u32,
50    /// When processing completed (if done)
51    pub completed_at: Option<DateTime<Utc>>,
52}
53
54/// Summary of an operation (for listing)
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct OperationSummary {
57    /// Operation ID
58    pub operation_id: Uuid,
59    /// Type of operation
60    pub operation_type: OperationType,
61    /// Current status
62    pub status: OperationStatus,
63    /// Total projects
64    pub total: usize,
65    /// Completed count
66    pub completed: usize,
67    /// Failed count
68    pub failed: usize,
69    /// Skipped count
70    pub skipped: usize,
71    /// Creation time
72    pub created_at: DateTime<Utc>,
73    /// Last update time
74    pub updated_at: DateTime<Utc>,
75}
76
77/// Types of state updates
78pub enum StateUpdate {
79    /// An item was processed
80    ItemCompleted {
81        project_id: String,
82        result: ItemResult,
83        attempts: u32,
84    },
85    /// Operation status changed
86    StatusChanged { status: OperationStatus },
87    /// Progress was updated (for partial saves)
88    ProgressUpdated { progress: ProgressUpdate },
89}
90
91impl StateManager {
92    /// Create a new state manager using the default state directory
93    pub fn new() -> Result<Self, AdminError> {
94        let state_dir = Self::default_state_dir()?;
95        std::fs::create_dir_all(&state_dir)?;
96        Ok(Self { state_dir })
97    }
98
99    /// Create a state manager with a custom directory (for testing)
100    pub fn with_dir(state_dir: PathBuf) -> Result<Self, AdminError> {
101        std::fs::create_dir_all(&state_dir)?;
102        Ok(Self { state_dir })
103    }
104
105    /// Get the state directory path
106    pub fn state_dir(&self) -> &PathBuf {
107        &self.state_dir
108    }
109
110    /// Get the default state directory based on platform
111    fn default_state_dir() -> Result<PathBuf, AdminError> {
112        let base_dirs =
113            directories::ProjectDirs::from("com", "autodesk", "raps").ok_or_else(|| {
114                AdminError::StateError(std::io::Error::new(
115                    std::io::ErrorKind::NotFound,
116                    "Could not determine user data directory",
117                ))
118            })?;
119
120        Ok(base_dirs.data_dir().join("operations"))
121    }
122
123    /// Get the file path for an operation's state
124    fn operation_path(&self, operation_id: Uuid) -> PathBuf {
125        self.state_dir.join(format!("{}.json", operation_id))
126    }
127
128    /// Create a new operation state
129    ///
130    /// # Arguments
131    /// * `operation_type` - Type of bulk operation
132    /// * `parameters` - Operation parameters (as JSON)
133    /// * `project_ids` - List of project IDs to process
134    ///
135    /// # Returns
136    /// The new operation's UUID
137    pub async fn create_operation(
138        &self,
139        operation_type: OperationType,
140        parameters: serde_json::Value,
141        project_ids: Vec<String>,
142    ) -> Result<Uuid, AdminError> {
143        let operation_id = Uuid::new_v4();
144        let now = Utc::now();
145
146        let state = OperationState {
147            operation_id,
148            operation_type,
149            status: OperationStatus::Pending,
150            parameters,
151            project_ids,
152            results: HashMap::new(),
153            created_at: now,
154            updated_at: now,
155        };
156
157        self.save_state(&state).await?;
158        Ok(operation_id)
159    }
160
161    /// Load an existing operation state
162    pub async fn load_operation(&self, operation_id: Uuid) -> Result<OperationState, AdminError> {
163        let path = self.operation_path(operation_id);
164
165        if !path.exists() {
166            return Err(AdminError::OperationNotFound { id: operation_id });
167        }
168
169        let content = tokio::fs::read_to_string(&path).await?;
170        let state: OperationState = serde_json::from_str(&content)?;
171
172        Ok(state)
173    }
174
175    /// Update operation state
176    pub async fn update_state(
177        &self,
178        operation_id: Uuid,
179        update: StateUpdate,
180    ) -> Result<(), AdminError> {
181        let mut state = self.load_operation(operation_id).await?;
182        state.updated_at = Utc::now();
183
184        match update {
185            StateUpdate::ItemCompleted {
186                project_id,
187                result,
188                attempts,
189            } => {
190                state.results.insert(
191                    project_id,
192                    ProjectResultState {
193                        result,
194                        attempts,
195                        completed_at: Some(Utc::now()),
196                    },
197                );
198            }
199            StateUpdate::StatusChanged { status } => {
200                state.status = status;
201            }
202            StateUpdate::ProgressUpdated { .. } => {
203                // Progress updates don't modify persisted state directly
204                // They're used for in-memory tracking
205            }
206        }
207
208        self.save_state(&state).await
209    }
210
211    /// Mark operation as complete with final result
212    pub async fn complete_operation(
213        &self,
214        operation_id: Uuid,
215        status: OperationStatus,
216    ) -> Result<(), AdminError> {
217        let mut state = self.load_operation(operation_id).await?;
218        state.status = status;
219        state.updated_at = Utc::now();
220        self.save_state(&state).await
221    }
222
223    /// List all operations, optionally filtered by status
224    pub async fn list_operations(
225        &self,
226        status_filter: Option<OperationStatus>,
227    ) -> Result<Vec<OperationSummary>, AdminError> {
228        let mut summaries = Vec::new();
229
230        let entries = std::fs::read_dir(&self.state_dir)?;
231
232        for entry in entries.flatten() {
233            let path = entry.path();
234            if path.extension().is_some_and(|ext| ext == "json")
235                && let Ok(content) = std::fs::read_to_string(&path)
236                && let Ok(state) = serde_json::from_str::<OperationState>(&content)
237            {
238                // Apply status filter if provided
239                if let Some(filter_status) = status_filter
240                    && state.status != filter_status
241                {
242                    continue;
243                }
244
245                let (completed, failed, skipped) = count_results(&state.results);
246
247                summaries.push(OperationSummary {
248                    operation_id: state.operation_id,
249                    operation_type: state.operation_type,
250                    status: state.status,
251                    total: state.project_ids.len(),
252                    completed,
253                    failed,
254                    skipped,
255                    created_at: state.created_at,
256                    updated_at: state.updated_at,
257                });
258            }
259        }
260
261        // Sort by updated_at descending (most recent first)
262        summaries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
263
264        Ok(summaries)
265    }
266
267    /// Get the most recent incomplete operation
268    pub async fn get_resumable_operation(&self) -> Result<Option<Uuid>, AdminError> {
269        let operations = self
270            .list_operations(Some(OperationStatus::InProgress))
271            .await?;
272
273        Ok(operations.first().map(|s| s.operation_id))
274    }
275
276    /// Get project IDs that haven't been processed yet
277    pub fn get_pending_projects(&self, state: &OperationState) -> Vec<String> {
278        state
279            .project_ids
280            .iter()
281            .filter(|id| !state.results.contains_key(*id))
282            .cloned()
283            .collect()
284    }
285
286    /// Cancel an operation (mark as cancelled)
287    pub async fn cancel_operation(&self, operation_id: Uuid) -> Result<(), AdminError> {
288        let mut state = self.load_operation(operation_id).await?;
289
290        // Only allow cancelling in-progress or pending operations
291        if state.status != OperationStatus::InProgress && state.status != OperationStatus::Pending {
292            return Err(AdminError::InvalidOperation {
293                message: format!("Cannot cancel operation with status {:?}", state.status),
294            });
295        }
296
297        state.status = OperationStatus::Cancelled;
298        state.updated_at = Utc::now();
299        self.save_state(&state).await
300    }
301
302    /// Delete an operation state file
303    pub async fn delete_operation(&self, operation_id: Uuid) -> Result<(), AdminError> {
304        let path = self.operation_path(operation_id);
305        if path.exists() {
306            tokio::fs::remove_file(&path).await?;
307        }
308        Ok(())
309    }
310
311    /// Save operation state to disk
312    async fn save_state(&self, state: &OperationState) -> Result<(), AdminError> {
313        let path = self.operation_path(state.operation_id);
314        let content = serde_json::to_string_pretty(state)?;
315        tokio::fs::write(&path, content).await?;
316        Ok(())
317    }
318}
319
320/// Count completed, failed, and skipped results
321fn count_results(results: &HashMap<String, ProjectResultState>) -> (usize, usize, usize) {
322    let mut completed = 0;
323    let mut failed = 0;
324    let mut skipped = 0;
325
326    for result in results.values() {
327        match &result.result {
328            ItemResult::Success => completed += 1,
329            ItemResult::Failed { .. } => failed += 1,
330            ItemResult::Skipped { .. } => skipped += 1,
331        }
332    }
333
334    (completed, failed, skipped)
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use tempfile::TempDir;
341
342    async fn create_test_manager() -> (StateManager, TempDir) {
343        let temp_dir = TempDir::new().unwrap();
344        let manager = StateManager::with_dir(temp_dir.path().to_path_buf()).unwrap();
345        (manager, temp_dir)
346    }
347
348    #[tokio::test]
349    async fn test_create_and_load_operation() {
350        let (manager, _temp_dir) = create_test_manager().await;
351
352        let project_ids = vec!["proj-1".to_string(), "proj-2".to_string()];
353        let params = serde_json::json!({"email": "user@example.com"});
354
355        let operation_id = manager
356            .create_operation(OperationType::AddUser, params.clone(), project_ids.clone())
357            .await
358            .unwrap();
359
360        let state = manager.load_operation(operation_id).await.unwrap();
361
362        assert_eq!(state.operation_id, operation_id);
363        assert_eq!(state.operation_type, OperationType::AddUser);
364        assert_eq!(state.status, OperationStatus::Pending);
365        assert_eq!(state.project_ids.len(), 2);
366    }
367
368    #[tokio::test]
369    async fn test_update_state() {
370        let (manager, _temp_dir) = create_test_manager().await;
371
372        let operation_id = manager
373            .create_operation(
374                OperationType::AddUser,
375                serde_json::json!({}),
376                vec!["proj-1".to_string()],
377            )
378            .await
379            .unwrap();
380
381        // Update with item completion
382        manager
383            .update_state(
384                operation_id,
385                StateUpdate::ItemCompleted {
386                    project_id: "proj-1".to_string(),
387                    result: ItemResult::Success,
388                    attempts: 1,
389                },
390            )
391            .await
392            .unwrap();
393
394        let state = manager.load_operation(operation_id).await.unwrap();
395        assert!(state.results.contains_key("proj-1"));
396    }
397
398    #[tokio::test]
399    async fn test_get_pending_projects() {
400        let (manager, _temp_dir) = create_test_manager().await;
401
402        let operation_id = manager
403            .create_operation(
404                OperationType::AddUser,
405                serde_json::json!({}),
406                vec![
407                    "proj-1".to_string(),
408                    "proj-2".to_string(),
409                    "proj-3".to_string(),
410                ],
411            )
412            .await
413            .unwrap();
414
415        // Complete one project
416        manager
417            .update_state(
418                operation_id,
419                StateUpdate::ItemCompleted {
420                    project_id: "proj-1".to_string(),
421                    result: ItemResult::Success,
422                    attempts: 1,
423                },
424            )
425            .await
426            .unwrap();
427
428        let state = manager.load_operation(operation_id).await.unwrap();
429        let pending = manager.get_pending_projects(&state);
430
431        assert_eq!(pending.len(), 2);
432        assert!(pending.contains(&"proj-2".to_string()));
433        assert!(pending.contains(&"proj-3".to_string()));
434    }
435}