ricecoder_orchestration/managers/
batch_executor.rs1use crate::analyzers::DependencyGraph;
4#[allow(unused_imports)]
5use crate::error::{OrchestrationError, Result};
6use crate::models::{Operation, Project, Transaction, TransactionState};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10
11pub trait ProjectOperation: Send + Sync {
13 fn execute(&self, project: &Project) -> Result<()>;
15
16 fn rollback(&self, project: &Project) -> Result<()>;
18
19 fn description(&self) -> String;
21}
22
23#[derive(Debug, Clone)]
25pub struct BatchExecutionConfig {
26 pub parallel: bool,
28
29 pub max_concurrent: usize,
31
32 pub fail_fast: bool,
34
35 pub auto_rollback: bool,
37}
38
39impl Default for BatchExecutionConfig {
40 fn default() -> Self {
41 Self {
42 parallel: false,
43 max_concurrent: 4,
44 fail_fast: true,
45 auto_rollback: true,
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct BatchExecutionResult {
53 pub transaction_id: String,
55
56 pub successful_projects: Vec<String>,
58
59 pub failed_projects: Vec<(String, String)>,
61
62 pub final_state: TransactionState,
64
65 pub execution_order: Vec<String>,
67}
68
69pub struct BatchExecutor {
71 graph: Arc<Mutex<DependencyGraph>>,
73
74 config: BatchExecutionConfig,
76
77 transactions: Arc<Mutex<HashMap<String, Transaction>>>,
79}
80
81impl BatchExecutor {
82 pub fn new(graph: DependencyGraph, config: BatchExecutionConfig) -> Self {
84 Self {
85 graph: Arc::new(Mutex::new(graph)),
86 config,
87 transactions: Arc::new(Mutex::new(HashMap::new())),
88 }
89 }
90
91 pub fn with_graph(graph: DependencyGraph) -> Self {
93 Self::new(graph, BatchExecutionConfig::default())
94 }
95
96 pub async fn execute_batch(
98 &self,
99 projects: Vec<Project>,
100 operation: Arc<dyn ProjectOperation>,
101 ) -> Result<BatchExecutionResult> {
102 let transaction_id = uuid::Uuid::new_v4().to_string();
103
104 let graph = self.graph.lock().await;
106
107 let execution_order = self.determine_execution_order(&graph, &projects)?;
109
110 let projects_map: HashMap<String, Project> = projects
112 .into_iter()
113 .map(|p| (p.name.clone(), p))
114 .collect();
115
116 let mut successful_projects = Vec::new();
117 let mut failed_projects = Vec::new();
118 let mut executed_operations = Vec::new();
119
120 for project_name in &execution_order {
122 if let Some(project) = projects_map.get(project_name) {
123 match operation.execute(project) {
124 Ok(()) => {
125 successful_projects.push(project_name.clone());
126 executed_operations.push(Operation {
127 id: uuid::Uuid::new_v4().to_string(),
128 project: project_name.clone(),
129 operation_type: "batch_operation".to_string(),
130 data: serde_json::json!({}),
131 });
132 }
133 Err(e) => {
134 failed_projects.push((project_name.clone(), e.to_string()));
135
136 if self.config.fail_fast {
137 if self.config.auto_rollback {
139 self.rollback_operations(&projects_map, &executed_operations, &operation)
140 .await?;
141 }
142
143 return Ok(BatchExecutionResult {
144 transaction_id,
145 successful_projects,
146 failed_projects,
147 final_state: TransactionState::RolledBack,
148 execution_order,
149 });
150 }
151 }
152 }
153 }
154 }
155
156 let final_state = if failed_projects.is_empty() {
158 TransactionState::Committed
159 } else {
160 TransactionState::RolledBack
161 };
162
163 let transaction = Transaction {
165 id: transaction_id.clone(),
166 operations: executed_operations,
167 state: final_state,
168 };
169
170 self.transactions
171 .lock()
172 .await
173 .insert(transaction_id.clone(), transaction);
174
175 Ok(BatchExecutionResult {
176 transaction_id,
177 successful_projects,
178 failed_projects,
179 final_state,
180 execution_order,
181 })
182 }
183
184 fn determine_execution_order(
186 &self,
187 graph: &DependencyGraph,
188 projects: &[Project],
189 ) -> Result<Vec<String>> {
190 let orderer = crate::managers::ExecutionOrderer::new(graph.clone());
192 orderer.determine_order(projects)
193 }
194
195 async fn rollback_operations(
197 &self,
198 projects_map: &HashMap<String, Project>,
199 executed_operations: &[Operation],
200 operation: &Arc<dyn ProjectOperation>,
201 ) -> Result<()> {
202 for op in executed_operations.iter().rev() {
204 if let Some(project) = projects_map.get(&op.project) {
205 operation.rollback(project)?;
206 }
207 }
208
209 Ok(())
210 }
211
212 pub async fn get_transaction(&self, transaction_id: &str) -> Result<Option<Transaction>> {
214 Ok(self.transactions.lock().await.get(transaction_id).cloned())
215 }
216
217 pub async fn get_all_transactions(&self) -> Result<Vec<Transaction>> {
219 Ok(self
220 .transactions
221 .lock()
222 .await
223 .values()
224 .cloned()
225 .collect())
226 }
227
228 pub async fn clear_transactions(&self) -> Result<()> {
230 self.transactions.lock().await.clear();
231 Ok(())
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238 use crate::models::{DependencyType, ProjectStatus};
239 use std::path::PathBuf;
240 use std::sync::atomic::{AtomicUsize, Ordering};
241
242 fn create_test_project(name: &str) -> Project {
243 Project {
244 path: PathBuf::from(format!("/path/to/{}", name)),
245 name: name.to_string(),
246 project_type: "rust".to_string(),
247 version: "0.1.0".to_string(),
248 status: ProjectStatus::Healthy,
249 }
250 }
251
252 struct TestOperation {
253 executed: Arc<AtomicUsize>,
254 should_fail: bool,
255 }
256
257 impl ProjectOperation for TestOperation {
258 fn execute(&self, _project: &Project) -> Result<()> {
259 self.executed.fetch_add(1, Ordering::SeqCst);
260 if self.should_fail {
261 Err(OrchestrationError::BatchExecutionFailed(
262 "Test failure".to_string(),
263 ))
264 } else {
265 Ok(())
266 }
267 }
268
269 fn rollback(&self, _project: &Project) -> Result<()> {
270 Ok(())
271 }
272
273 fn description(&self) -> String {
274 "Test operation".to_string()
275 }
276 }
277
278 #[tokio::test]
279 async fn test_batch_executor_creation() {
280 let graph = DependencyGraph::new(false);
281 let executor = BatchExecutor::with_graph(graph);
282
283 assert_eq!(executor.config.parallel, false);
284 assert_eq!(executor.config.max_concurrent, 4);
285 }
286
287 #[tokio::test]
288 async fn test_execute_single_project() {
289 let mut graph = DependencyGraph::new(false);
290 let project = create_test_project("project-a");
291 graph.add_project(project.clone()).unwrap();
292
293 let executor = BatchExecutor::with_graph(graph);
294 let operation = Arc::new(TestOperation {
295 executed: Arc::new(AtomicUsize::new(0)),
296 should_fail: false,
297 });
298
299 let result = executor
300 .execute_batch(vec![project], operation.clone())
301 .await
302 .unwrap();
303
304 assert_eq!(result.successful_projects.len(), 1);
305 assert_eq!(result.failed_projects.len(), 0);
306 assert_eq!(result.final_state, TransactionState::Committed);
307 assert_eq!(operation.executed.load(Ordering::SeqCst), 1);
308 }
309
310 #[tokio::test]
311 async fn test_execute_multiple_projects_in_order() {
312 let mut graph = DependencyGraph::new(false);
313 let project_a = create_test_project("project-a");
314 let project_b = create_test_project("project-b");
315 let project_c = create_test_project("project-c");
316
317 graph.add_project(project_a.clone()).unwrap();
318 graph.add_project(project_b.clone()).unwrap();
319 graph.add_project(project_c.clone()).unwrap();
320
321 graph
323 .add_dependency(crate::models::ProjectDependency {
324 from: "project-b".to_string(),
325 to: "project-a".to_string(),
326 dependency_type: DependencyType::Direct,
327 version_constraint: "^0.1.0".to_string(),
328 })
329 .unwrap();
330
331 graph
332 .add_dependency(crate::models::ProjectDependency {
333 from: "project-c".to_string(),
334 to: "project-b".to_string(),
335 dependency_type: DependencyType::Direct,
336 version_constraint: "^0.1.0".to_string(),
337 })
338 .unwrap();
339
340 let executor = BatchExecutor::with_graph(graph);
341 let operation = Arc::new(TestOperation {
342 executed: Arc::new(AtomicUsize::new(0)),
343 should_fail: false,
344 });
345
346 let result = executor
347 .execute_batch(
348 vec![project_a, project_b, project_c],
349 operation.clone(),
350 )
351 .await
352 .unwrap();
353
354 assert_eq!(result.successful_projects.len(), 3);
355 assert_eq!(result.failed_projects.len(), 0);
356 assert_eq!(result.execution_order.len(), 3);
357
358 let a_idx = result.execution_order.iter().position(|x| x == "project-a").unwrap();
360 let b_idx = result.execution_order.iter().position(|x| x == "project-b").unwrap();
361 let c_idx = result.execution_order.iter().position(|x| x == "project-c").unwrap();
362
363 assert!(a_idx < b_idx);
364 assert!(b_idx < c_idx);
365 }
366
367 #[tokio::test]
368 async fn test_execute_with_failure_fail_fast() {
369 let mut graph = DependencyGraph::new(false);
370 let project_a = create_test_project("project-a");
371 let project_b = create_test_project("project-b");
372
373 graph.add_project(project_a.clone()).unwrap();
374 graph.add_project(project_b.clone()).unwrap();
375
376 let executor = BatchExecutor::new(
377 graph,
378 BatchExecutionConfig {
379 parallel: false,
380 max_concurrent: 4,
381 fail_fast: true,
382 auto_rollback: true,
383 },
384 );
385
386 let operation = Arc::new(TestOperation {
387 executed: Arc::new(AtomicUsize::new(0)),
388 should_fail: true,
389 });
390
391 let result = executor
392 .execute_batch(vec![project_a, project_b], operation.clone())
393 .await
394 .unwrap();
395
396 assert_eq!(result.successful_projects.len(), 0);
397 assert_eq!(result.failed_projects.len(), 1);
398 assert_eq!(result.final_state, TransactionState::RolledBack);
399 }
400
401 #[tokio::test]
402 async fn test_transaction_storage() {
403 let graph = DependencyGraph::new(false);
404 let project = create_test_project("project-a");
405 let mut graph = graph;
406 graph.add_project(project.clone()).unwrap();
407
408 let executor = BatchExecutor::with_graph(graph);
409 let operation = Arc::new(TestOperation {
410 executed: Arc::new(AtomicUsize::new(0)),
411 should_fail: false,
412 });
413
414 let result = executor
415 .execute_batch(vec![project], operation)
416 .await
417 .unwrap();
418
419 let transaction = executor
420 .get_transaction(&result.transaction_id)
421 .await
422 .unwrap();
423
424 assert!(transaction.is_some());
425 let txn = transaction.unwrap();
426 assert_eq!(txn.id, result.transaction_id);
427 assert_eq!(txn.state, TransactionState::Committed);
428 }
429
430 #[tokio::test]
431 async fn test_get_all_transactions() {
432 let graph = DependencyGraph::new(false);
433 let project = create_test_project("project-a");
434 let mut graph = graph;
435 graph.add_project(project.clone()).unwrap();
436
437 let executor = BatchExecutor::with_graph(graph);
438 let operation = Arc::new(TestOperation {
439 executed: Arc::new(AtomicUsize::new(0)),
440 should_fail: false,
441 });
442
443 executor
444 .execute_batch(vec![project.clone()], operation.clone())
445 .await
446 .unwrap();
447
448 executor
449 .execute_batch(vec![project], operation)
450 .await
451 .unwrap();
452
453 let transactions = executor.get_all_transactions().await.unwrap();
454 assert_eq!(transactions.len(), 2);
455 }
456
457 #[tokio::test]
458 async fn test_clear_transactions() {
459 let graph = DependencyGraph::new(false);
460 let project = create_test_project("project-a");
461 let mut graph = graph;
462 graph.add_project(project.clone()).unwrap();
463
464 let executor = BatchExecutor::with_graph(graph);
465 let operation = Arc::new(TestOperation {
466 executed: Arc::new(AtomicUsize::new(0)),
467 should_fail: false,
468 });
469
470 executor
471 .execute_batch(vec![project], operation)
472 .await
473 .unwrap();
474
475 let transactions = executor.get_all_transactions().await.unwrap();
476 assert_eq!(transactions.len(), 1);
477
478 executor.clear_transactions().await.unwrap();
479
480 let transactions = executor.get_all_transactions().await.unwrap();
481 assert_eq!(transactions.len(), 0);
482 }
483}