1use std::collections::HashMap;
7use std::path::PathBuf;
8
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use uuid::Uuid;
12
13use crate::bulk::executor::{ItemResult, ProgressUpdate};
14use crate::error::AdminError;
15use crate::types::{OperationStatus, OperationType};
16
17pub struct StateManager {
19 state_dir: PathBuf,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct OperationState {
25 pub operation_id: Uuid,
27 pub operation_type: OperationType,
29 pub status: OperationStatus,
31 pub parameters: serde_json::Value,
33 pub project_ids: Vec<String>,
35 pub results: HashMap<String, ProjectResultState>,
37 pub created_at: DateTime<Utc>,
39 pub updated_at: DateTime<Utc>,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ProjectResultState {
46 pub result: ItemResult,
48 pub attempts: u32,
50 pub completed_at: Option<DateTime<Utc>>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct OperationSummary {
57 pub operation_id: Uuid,
59 pub operation_type: OperationType,
61 pub status: OperationStatus,
63 pub total: usize,
65 pub completed: usize,
67 pub failed: usize,
69 pub skipped: usize,
71 pub created_at: DateTime<Utc>,
73 pub updated_at: DateTime<Utc>,
75}
76
77pub enum StateUpdate {
79 ItemCompleted {
81 project_id: String,
82 result: ItemResult,
83 attempts: u32,
84 },
85 StatusChanged { status: OperationStatus },
87 ProgressUpdated { progress: ProgressUpdate },
89}
90
91impl StateManager {
92 pub fn new() -> Result<Self, AdminError> {
94 let state_dir = Self::default_state_dir()?;
95 std::fs::create_dir_all(&state_dir)?;
96 Ok(Self { state_dir })
97 }
98
99 pub fn with_dir(state_dir: PathBuf) -> Result<Self, AdminError> {
101 std::fs::create_dir_all(&state_dir)?;
102 Ok(Self { state_dir })
103 }
104
105 pub fn state_dir(&self) -> &PathBuf {
107 &self.state_dir
108 }
109
110 fn default_state_dir() -> Result<PathBuf, AdminError> {
112 let base_dirs =
113 directories::ProjectDirs::from("com", "autodesk", "raps").ok_or_else(|| {
114 AdminError::StateError(std::io::Error::new(
115 std::io::ErrorKind::NotFound,
116 "Could not determine user data directory",
117 ))
118 })?;
119
120 Ok(base_dirs.data_dir().join("operations"))
121 }
122
123 fn operation_path(&self, operation_id: Uuid) -> PathBuf {
125 self.state_dir.join(format!("{}.json", operation_id))
126 }
127
128 pub async fn create_operation(
138 &self,
139 operation_type: OperationType,
140 parameters: serde_json::Value,
141 project_ids: Vec<String>,
142 ) -> Result<Uuid, AdminError> {
143 let operation_id = Uuid::new_v4();
144 let now = Utc::now();
145
146 let state = OperationState {
147 operation_id,
148 operation_type,
149 status: OperationStatus::Pending,
150 parameters,
151 project_ids,
152 results: HashMap::new(),
153 created_at: now,
154 updated_at: now,
155 };
156
157 self.save_state(&state).await?;
158 Ok(operation_id)
159 }
160
161 pub async fn load_operation(&self, operation_id: Uuid) -> Result<OperationState, AdminError> {
163 let path = self.operation_path(operation_id);
164
165 if !path.exists() {
166 return Err(AdminError::OperationNotFound { id: operation_id });
167 }
168
169 let content = tokio::fs::read_to_string(&path).await?;
170 let state: OperationState = serde_json::from_str(&content)?;
171
172 Ok(state)
173 }
174
175 pub async fn update_state(
177 &self,
178 operation_id: Uuid,
179 update: StateUpdate,
180 ) -> Result<(), AdminError> {
181 let mut state = self.load_operation(operation_id).await?;
182 state.updated_at = Utc::now();
183
184 match update {
185 StateUpdate::ItemCompleted {
186 project_id,
187 result,
188 attempts,
189 } => {
190 state.results.insert(
191 project_id,
192 ProjectResultState {
193 result,
194 attempts,
195 completed_at: Some(Utc::now()),
196 },
197 );
198 }
199 StateUpdate::StatusChanged { status } => {
200 state.status = status;
201 }
202 StateUpdate::ProgressUpdated { .. } => {
203 }
206 }
207
208 self.save_state(&state).await
209 }
210
211 pub async fn complete_operation(
213 &self,
214 operation_id: Uuid,
215 status: OperationStatus,
216 ) -> Result<(), AdminError> {
217 let mut state = self.load_operation(operation_id).await?;
218 state.status = status;
219 state.updated_at = Utc::now();
220 self.save_state(&state).await
221 }
222
223 pub async fn list_operations(
225 &self,
226 status_filter: Option<OperationStatus>,
227 ) -> Result<Vec<OperationSummary>, AdminError> {
228 let mut summaries = Vec::new();
229
230 let entries = std::fs::read_dir(&self.state_dir)?;
231
232 for entry in entries.flatten() {
233 let path = entry.path();
234 if path.extension().is_some_and(|ext| ext == "json")
235 && let Ok(content) = std::fs::read_to_string(&path)
236 && let Ok(state) = serde_json::from_str::<OperationState>(&content)
237 {
238 if let Some(filter_status) = status_filter
240 && state.status != filter_status
241 {
242 continue;
243 }
244
245 let (completed, failed, skipped) = count_results(&state.results);
246
247 summaries.push(OperationSummary {
248 operation_id: state.operation_id,
249 operation_type: state.operation_type,
250 status: state.status,
251 total: state.project_ids.len(),
252 completed,
253 failed,
254 skipped,
255 created_at: state.created_at,
256 updated_at: state.updated_at,
257 });
258 }
259 }
260
261 summaries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
263
264 Ok(summaries)
265 }
266
267 pub async fn get_resumable_operation(&self) -> Result<Option<Uuid>, AdminError> {
269 let operations = self
270 .list_operations(Some(OperationStatus::InProgress))
271 .await?;
272
273 Ok(operations.first().map(|s| s.operation_id))
274 }
275
276 pub fn get_pending_projects(&self, state: &OperationState) -> Vec<String> {
278 state
279 .project_ids
280 .iter()
281 .filter(|id| !state.results.contains_key(*id))
282 .cloned()
283 .collect()
284 }
285
286 pub async fn cancel_operation(&self, operation_id: Uuid) -> Result<(), AdminError> {
288 let mut state = self.load_operation(operation_id).await?;
289
290 if state.status != OperationStatus::InProgress && state.status != OperationStatus::Pending {
292 return Err(AdminError::InvalidOperation {
293 message: format!("Cannot cancel operation with status {:?}", state.status),
294 });
295 }
296
297 state.status = OperationStatus::Cancelled;
298 state.updated_at = Utc::now();
299 self.save_state(&state).await
300 }
301
302 pub async fn delete_operation(&self, operation_id: Uuid) -> Result<(), AdminError> {
304 let path = self.operation_path(operation_id);
305 if path.exists() {
306 tokio::fs::remove_file(&path).await?;
307 }
308 Ok(())
309 }
310
311 async fn save_state(&self, state: &OperationState) -> Result<(), AdminError> {
313 let path = self.operation_path(state.operation_id);
314 let content = serde_json::to_string_pretty(state)?;
315 tokio::fs::write(&path, content).await?;
316 Ok(())
317 }
318}
319
320fn count_results(results: &HashMap<String, ProjectResultState>) -> (usize, usize, usize) {
322 let mut completed = 0;
323 let mut failed = 0;
324 let mut skipped = 0;
325
326 for result in results.values() {
327 match &result.result {
328 ItemResult::Success => completed += 1,
329 ItemResult::Failed { .. } => failed += 1,
330 ItemResult::Skipped { .. } => skipped += 1,
331 }
332 }
333
334 (completed, failed, skipped)
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use tempfile::TempDir;
341
342 async fn create_test_manager() -> (StateManager, TempDir) {
343 let temp_dir = TempDir::new().unwrap();
344 let manager = StateManager::with_dir(temp_dir.path().to_path_buf()).unwrap();
345 (manager, temp_dir)
346 }
347
348 #[tokio::test]
349 async fn test_create_and_load_operation() {
350 let (manager, _temp_dir) = create_test_manager().await;
351
352 let project_ids = vec!["proj-1".to_string(), "proj-2".to_string()];
353 let params = serde_json::json!({"email": "user@example.com"});
354
355 let operation_id = manager
356 .create_operation(OperationType::AddUser, params.clone(), project_ids.clone())
357 .await
358 .unwrap();
359
360 let state = manager.load_operation(operation_id).await.unwrap();
361
362 assert_eq!(state.operation_id, operation_id);
363 assert_eq!(state.operation_type, OperationType::AddUser);
364 assert_eq!(state.status, OperationStatus::Pending);
365 assert_eq!(state.project_ids.len(), 2);
366 }
367
368 #[tokio::test]
369 async fn test_update_state() {
370 let (manager, _temp_dir) = create_test_manager().await;
371
372 let operation_id = manager
373 .create_operation(
374 OperationType::AddUser,
375 serde_json::json!({}),
376 vec!["proj-1".to_string()],
377 )
378 .await
379 .unwrap();
380
381 manager
383 .update_state(
384 operation_id,
385 StateUpdate::ItemCompleted {
386 project_id: "proj-1".to_string(),
387 result: ItemResult::Success,
388 attempts: 1,
389 },
390 )
391 .await
392 .unwrap();
393
394 let state = manager.load_operation(operation_id).await.unwrap();
395 assert!(state.results.contains_key("proj-1"));
396 }
397
398 #[tokio::test]
399 async fn test_get_pending_projects() {
400 let (manager, _temp_dir) = create_test_manager().await;
401
402 let operation_id = manager
403 .create_operation(
404 OperationType::AddUser,
405 serde_json::json!({}),
406 vec![
407 "proj-1".to_string(),
408 "proj-2".to_string(),
409 "proj-3".to_string(),
410 ],
411 )
412 .await
413 .unwrap();
414
415 manager
417 .update_state(
418 operation_id,
419 StateUpdate::ItemCompleted {
420 project_id: "proj-1".to_string(),
421 result: ItemResult::Success,
422 attempts: 1,
423 },
424 )
425 .await
426 .unwrap();
427
428 let state = manager.load_operation(operation_id).await.unwrap();
429 let pending = manager.get_pending_projects(&state);
430
431 assert_eq!(pending.len(), 2);
432 assert!(pending.contains(&"proj-2".to_string()));
433 assert!(pending.contains(&"proj-3".to_string()));
434 }
435}