ricecoder_orchestration/managers/
batch_executor.rs

1//! Batch execution of operations across multiple projects
2
3use crate::analyzers::DependencyGraph;
4#[allow(unused_imports)]
5use crate::error::{OrchestrationError, Result};
6use crate::models::{Operation, Project, Transaction, TransactionState};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10
11/// Trait for operations that can be executed on projects
12pub trait ProjectOperation: Send + Sync {
13    /// Execute the operation on a project
14    fn execute(&self, project: &Project) -> Result<()>;
15
16    /// Rollback the operation on a project
17    fn rollback(&self, project: &Project) -> Result<()>;
18
19    /// Get a description of the operation
20    fn description(&self) -> String;
21}
22
23/// Configuration for batch execution
24#[derive(Debug, Clone)]
25pub struct BatchExecutionConfig {
26    /// Whether to execute projects in parallel where safe
27    pub parallel: bool,
28
29    /// Maximum number of concurrent operations
30    pub max_concurrent: usize,
31
32    /// Whether to stop on first error
33    pub fail_fast: bool,
34
35    /// Whether to automatically rollback on failure
36    pub auto_rollback: bool,
37}
38
39impl Default for BatchExecutionConfig {
40    fn default() -> Self {
41        Self {
42            parallel: false,
43            max_concurrent: 4,
44            fail_fast: true,
45            auto_rollback: true,
46        }
47    }
48}
49
50/// Result of a batch execution
51#[derive(Debug, Clone)]
52pub struct BatchExecutionResult {
53    /// Transaction ID
54    pub transaction_id: String,
55
56    /// Projects that were executed successfully
57    pub successful_projects: Vec<String>,
58
59    /// Projects that failed
60    pub failed_projects: Vec<(String, String)>,
61
62    /// Final transaction state
63    pub final_state: TransactionState,
64
65    /// Execution order
66    pub execution_order: Vec<String>,
67}
68
69/// Executes operations across multiple projects in dependency order
70pub struct BatchExecutor {
71    /// Dependency graph for ordering
72    graph: Arc<Mutex<DependencyGraph>>,
73
74    /// Configuration for execution
75    config: BatchExecutionConfig,
76
77    /// Transaction history
78    transactions: Arc<Mutex<HashMap<String, Transaction>>>,
79}
80
81impl BatchExecutor {
82    /// Creates a new batch executor
83    pub fn new(graph: DependencyGraph, config: BatchExecutionConfig) -> Self {
84        Self {
85            graph: Arc::new(Mutex::new(graph)),
86            config,
87            transactions: Arc::new(Mutex::new(HashMap::new())),
88        }
89    }
90
91    /// Creates a new batch executor with default configuration
92    pub fn with_graph(graph: DependencyGraph) -> Self {
93        Self::new(graph, BatchExecutionConfig::default())
94    }
95
96    /// Executes an operation on all specified projects
97    pub async fn execute_batch(
98        &self,
99        projects: Vec<Project>,
100        operation: Arc<dyn ProjectOperation>,
101    ) -> Result<BatchExecutionResult> {
102        let transaction_id = uuid::Uuid::new_v4().to_string();
103
104        // Get the dependency graph
105        let graph = self.graph.lock().await;
106
107        // Determine execution order
108        let execution_order = self.determine_execution_order(&graph, &projects)?;
109
110        // Filter to only projects that are in the execution order
111        let projects_map: HashMap<String, Project> = projects
112            .into_iter()
113            .map(|p| (p.name.clone(), p))
114            .collect();
115
116        let mut successful_projects = Vec::new();
117        let mut failed_projects = Vec::new();
118        let mut executed_operations = Vec::new();
119
120        // Execute projects in order
121        for project_name in &execution_order {
122            if let Some(project) = projects_map.get(project_name) {
123                match operation.execute(project) {
124                    Ok(()) => {
125                        successful_projects.push(project_name.clone());
126                        executed_operations.push(Operation {
127                            id: uuid::Uuid::new_v4().to_string(),
128                            project: project_name.clone(),
129                            operation_type: "batch_operation".to_string(),
130                            data: serde_json::json!({}),
131                        });
132                    }
133                    Err(e) => {
134                        failed_projects.push((project_name.clone(), e.to_string()));
135
136                        if self.config.fail_fast {
137                            // Rollback if configured
138                            if self.config.auto_rollback {
139                                self.rollback_operations(&projects_map, &executed_operations, &operation)
140                                    .await?;
141                            }
142
143                            return Ok(BatchExecutionResult {
144                                transaction_id,
145                                successful_projects,
146                                failed_projects,
147                                final_state: TransactionState::RolledBack,
148                                execution_order,
149                            });
150                        }
151                    }
152                }
153            }
154        }
155
156        // Determine final state
157        let final_state = if failed_projects.is_empty() {
158            TransactionState::Committed
159        } else {
160            TransactionState::RolledBack
161        };
162
163        // Store transaction
164        let transaction = Transaction {
165            id: transaction_id.clone(),
166            operations: executed_operations,
167            state: final_state,
168        };
169
170        self.transactions
171            .lock()
172            .await
173            .insert(transaction_id.clone(), transaction);
174
175        Ok(BatchExecutionResult {
176            transaction_id,
177            successful_projects,
178            failed_projects,
179            final_state,
180            execution_order,
181        })
182    }
183
184    /// Determines the execution order based on dependencies
185    fn determine_execution_order(
186        &self,
187        graph: &DependencyGraph,
188        projects: &[Project],
189    ) -> Result<Vec<String>> {
190        // Use ExecutionOrderer to determine the order
191        let orderer = crate::managers::ExecutionOrderer::new(graph.clone());
192        orderer.determine_order(projects)
193    }
194
195    /// Rolls back executed operations
196    async fn rollback_operations(
197        &self,
198        projects_map: &HashMap<String, Project>,
199        executed_operations: &[Operation],
200        operation: &Arc<dyn ProjectOperation>,
201    ) -> Result<()> {
202        // Rollback in reverse order
203        for op in executed_operations.iter().rev() {
204            if let Some(project) = projects_map.get(&op.project) {
205                operation.rollback(project)?;
206            }
207        }
208
209        Ok(())
210    }
211
212    /// Gets a transaction by ID
213    pub async fn get_transaction(&self, transaction_id: &str) -> Result<Option<Transaction>> {
214        Ok(self.transactions.lock().await.get(transaction_id).cloned())
215    }
216
217    /// Gets all transactions
218    pub async fn get_all_transactions(&self) -> Result<Vec<Transaction>> {
219        Ok(self
220            .transactions
221            .lock()
222            .await
223            .values()
224            .cloned()
225            .collect())
226    }
227
228    /// Clears all transactions
229    pub async fn clear_transactions(&self) -> Result<()> {
230        self.transactions.lock().await.clear();
231        Ok(())
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use crate::models::{DependencyType, ProjectStatus};
239    use std::path::PathBuf;
240    use std::sync::atomic::{AtomicUsize, Ordering};
241
242    fn create_test_project(name: &str) -> Project {
243        Project {
244            path: PathBuf::from(format!("/path/to/{}", name)),
245            name: name.to_string(),
246            project_type: "rust".to_string(),
247            version: "0.1.0".to_string(),
248            status: ProjectStatus::Healthy,
249        }
250    }
251
252    struct TestOperation {
253        executed: Arc<AtomicUsize>,
254        should_fail: bool,
255    }
256
257    impl ProjectOperation for TestOperation {
258        fn execute(&self, _project: &Project) -> Result<()> {
259            self.executed.fetch_add(1, Ordering::SeqCst);
260            if self.should_fail {
261                Err(OrchestrationError::BatchExecutionFailed(
262                    "Test failure".to_string(),
263                ))
264            } else {
265                Ok(())
266            }
267        }
268
269        fn rollback(&self, _project: &Project) -> Result<()> {
270            Ok(())
271        }
272
273        fn description(&self) -> String {
274            "Test operation".to_string()
275        }
276    }
277
278    #[tokio::test]
279    async fn test_batch_executor_creation() {
280        let graph = DependencyGraph::new(false);
281        let executor = BatchExecutor::with_graph(graph);
282
283        assert_eq!(executor.config.parallel, false);
284        assert_eq!(executor.config.max_concurrent, 4);
285    }
286
287    #[tokio::test]
288    async fn test_execute_single_project() {
289        let mut graph = DependencyGraph::new(false);
290        let project = create_test_project("project-a");
291        graph.add_project(project.clone()).unwrap();
292
293        let executor = BatchExecutor::with_graph(graph);
294        let operation = Arc::new(TestOperation {
295            executed: Arc::new(AtomicUsize::new(0)),
296            should_fail: false,
297        });
298
299        let result = executor
300            .execute_batch(vec![project], operation.clone())
301            .await
302            .unwrap();
303
304        assert_eq!(result.successful_projects.len(), 1);
305        assert_eq!(result.failed_projects.len(), 0);
306        assert_eq!(result.final_state, TransactionState::Committed);
307        assert_eq!(operation.executed.load(Ordering::SeqCst), 1);
308    }
309
310    #[tokio::test]
311    async fn test_execute_multiple_projects_in_order() {
312        let mut graph = DependencyGraph::new(false);
313        let project_a = create_test_project("project-a");
314        let project_b = create_test_project("project-b");
315        let project_c = create_test_project("project-c");
316
317        graph.add_project(project_a.clone()).unwrap();
318        graph.add_project(project_b.clone()).unwrap();
319        graph.add_project(project_c.clone()).unwrap();
320
321        // B -> A, C -> B (C depends on B, B depends on A)
322        graph
323            .add_dependency(crate::models::ProjectDependency {
324                from: "project-b".to_string(),
325                to: "project-a".to_string(),
326                dependency_type: DependencyType::Direct,
327                version_constraint: "^0.1.0".to_string(),
328            })
329            .unwrap();
330
331        graph
332            .add_dependency(crate::models::ProjectDependency {
333                from: "project-c".to_string(),
334                to: "project-b".to_string(),
335                dependency_type: DependencyType::Direct,
336                version_constraint: "^0.1.0".to_string(),
337            })
338            .unwrap();
339
340        let executor = BatchExecutor::with_graph(graph);
341        let operation = Arc::new(TestOperation {
342            executed: Arc::new(AtomicUsize::new(0)),
343            should_fail: false,
344        });
345
346        let result = executor
347            .execute_batch(
348                vec![project_a, project_b, project_c],
349                operation.clone(),
350            )
351            .await
352            .unwrap();
353
354        assert_eq!(result.successful_projects.len(), 3);
355        assert_eq!(result.failed_projects.len(), 0);
356        assert_eq!(result.execution_order.len(), 3);
357
358        // Verify order: A should come before B, B before C
359        let a_idx = result.execution_order.iter().position(|x| x == "project-a").unwrap();
360        let b_idx = result.execution_order.iter().position(|x| x == "project-b").unwrap();
361        let c_idx = result.execution_order.iter().position(|x| x == "project-c").unwrap();
362
363        assert!(a_idx < b_idx);
364        assert!(b_idx < c_idx);
365    }
366
367    #[tokio::test]
368    async fn test_execute_with_failure_fail_fast() {
369        let mut graph = DependencyGraph::new(false);
370        let project_a = create_test_project("project-a");
371        let project_b = create_test_project("project-b");
372
373        graph.add_project(project_a.clone()).unwrap();
374        graph.add_project(project_b.clone()).unwrap();
375
376        let executor = BatchExecutor::new(
377            graph,
378            BatchExecutionConfig {
379                parallel: false,
380                max_concurrent: 4,
381                fail_fast: true,
382                auto_rollback: true,
383            },
384        );
385
386        let operation = Arc::new(TestOperation {
387            executed: Arc::new(AtomicUsize::new(0)),
388            should_fail: true,
389        });
390
391        let result = executor
392            .execute_batch(vec![project_a, project_b], operation.clone())
393            .await
394            .unwrap();
395
396        assert_eq!(result.successful_projects.len(), 0);
397        assert_eq!(result.failed_projects.len(), 1);
398        assert_eq!(result.final_state, TransactionState::RolledBack);
399    }
400
401    #[tokio::test]
402    async fn test_transaction_storage() {
403        let graph = DependencyGraph::new(false);
404        let project = create_test_project("project-a");
405        let mut graph = graph;
406        graph.add_project(project.clone()).unwrap();
407
408        let executor = BatchExecutor::with_graph(graph);
409        let operation = Arc::new(TestOperation {
410            executed: Arc::new(AtomicUsize::new(0)),
411            should_fail: false,
412        });
413
414        let result = executor
415            .execute_batch(vec![project], operation)
416            .await
417            .unwrap();
418
419        let transaction = executor
420            .get_transaction(&result.transaction_id)
421            .await
422            .unwrap();
423
424        assert!(transaction.is_some());
425        let txn = transaction.unwrap();
426        assert_eq!(txn.id, result.transaction_id);
427        assert_eq!(txn.state, TransactionState::Committed);
428    }
429
430    #[tokio::test]
431    async fn test_get_all_transactions() {
432        let graph = DependencyGraph::new(false);
433        let project = create_test_project("project-a");
434        let mut graph = graph;
435        graph.add_project(project.clone()).unwrap();
436
437        let executor = BatchExecutor::with_graph(graph);
438        let operation = Arc::new(TestOperation {
439            executed: Arc::new(AtomicUsize::new(0)),
440            should_fail: false,
441        });
442
443        executor
444            .execute_batch(vec![project.clone()], operation.clone())
445            .await
446            .unwrap();
447
448        executor
449            .execute_batch(vec![project], operation)
450            .await
451            .unwrap();
452
453        let transactions = executor.get_all_transactions().await.unwrap();
454        assert_eq!(transactions.len(), 2);
455    }
456
457    #[tokio::test]
458    async fn test_clear_transactions() {
459        let graph = DependencyGraph::new(false);
460        let project = create_test_project("project-a");
461        let mut graph = graph;
462        graph.add_project(project.clone()).unwrap();
463
464        let executor = BatchExecutor::with_graph(graph);
465        let operation = Arc::new(TestOperation {
466            executed: Arc::new(AtomicUsize::new(0)),
467            should_fail: false,
468        });
469
470        executor
471            .execute_batch(vec![project], operation)
472            .await
473            .unwrap();
474
475        let transactions = executor.get_all_transactions().await.unwrap();
476        assert_eq!(transactions.len(), 1);
477
478        executor.clear_transactions().await.unwrap();
479
480        let transactions = executor.get_all_transactions().await.unwrap();
481        assert_eq!(transactions.len(), 0);
482    }
483}