1use crate::error::Result;
4use crate::models::AgentTask;
5use crate::scheduler::ExecutionPhase;
6use std::time::Duration;
7use tokio::time::timeout;
8use tracing::{debug, info, warn};
9
10#[derive(Debug, Clone)]
12pub struct ExecutionConfig {
13 pub max_concurrency: usize,
15 pub timeout_ms: u64,
17 pub verbose: bool,
19}
20
21impl Default for ExecutionConfig {
22 fn default() -> Self {
23 Self {
24 max_concurrency: 4,
25 timeout_ms: 30000, verbose: false,
27 }
28 }
29}
30
31#[derive(Debug, Clone)]
33pub struct ExecutionResult {
34 pub task_id: String,
36 pub success: bool,
38 pub error: Option<String>,
40 pub duration_ms: u64,
42}
43
44pub struct ParallelExecutor {
46 config: ExecutionConfig,
47}
48
49impl ParallelExecutor {
50 pub fn new() -> Self {
52 Self {
53 config: ExecutionConfig::default(),
54 }
55 }
56
57 pub fn with_config(config: ExecutionConfig) -> Self {
59 Self { config }
60 }
61
62 pub async fn execute_phase(&self, phase: &ExecutionPhase) -> Result<Vec<ExecutionResult>> {
64 info!(
65 task_count = phase.tasks.len(),
66 max_concurrency = self.config.max_concurrency,
67 "Starting parallel execution phase"
68 );
69
70 let mut results = Vec::new();
71
72 let semaphore =
74 std::sync::Arc::new(tokio::sync::Semaphore::new(self.config.max_concurrency));
75
76 let mut handles = Vec::new();
77
78 for task in &phase.tasks {
79 let task_clone = task.clone();
80 let semaphore_clone = semaphore.clone();
81 let timeout_ms = self.config.timeout_ms;
82 let verbose = self.config.verbose;
83
84 let handle = tokio::spawn(async move {
85 let _permit = semaphore_clone.acquire().await;
86
87 debug!(task_id = %task_clone.id, "Task execution started");
88
89 let start = std::time::Instant::now();
90
91 let result = timeout(
93 Duration::from_millis(timeout_ms),
94 Self::execute_task_internal(&task_clone, verbose),
95 )
96 .await;
97
98 let duration_ms = start.elapsed().as_millis() as u64;
99
100 match result {
101 Ok(Ok(())) => {
102 debug!(
103 task_id = %task_clone.id,
104 duration_ms = duration_ms,
105 "Task execution completed successfully"
106 );
107 ExecutionResult {
108 task_id: task_clone.id.clone(),
109 success: true,
110 error: None,
111 duration_ms,
112 }
113 }
114 Ok(Err(e)) => {
115 warn!(
116 task_id = %task_clone.id,
117 error = %e,
118 duration_ms = duration_ms,
119 "Task execution failed"
120 );
121 ExecutionResult {
122 task_id: task_clone.id.clone(),
123 success: false,
124 error: Some(e),
125 duration_ms,
126 }
127 }
128 Err(_) => {
129 warn!(
130 task_id = %task_clone.id,
131 timeout_ms = timeout_ms,
132 "Task execution timeout"
133 );
134 ExecutionResult {
135 task_id: task_clone.id.clone(),
136 success: false,
137 error: Some(format!("Task timeout after {}ms", timeout_ms)),
138 duration_ms,
139 }
140 }
141 }
142 });
143
144 handles.push(handle);
145 }
146
147 for handle in handles {
149 match handle.await {
150 Ok(result) => results.push(result),
151 Err(e) => {
152 warn!(error = %e, "Task execution error");
154 results.push(ExecutionResult {
155 task_id: "unknown".to_string(),
156 success: false,
157 error: Some(format!("Task execution error: {}", e)),
158 duration_ms: 0,
159 });
160 }
161 }
162 }
163
164 info!(
165 completed_count = results.len(),
166 success_count = results.iter().filter(|r| r.success).count(),
167 "Parallel execution phase completed"
168 );
169
170 Ok(results)
171 }
172
173 async fn execute_task_internal(
175 task: &AgentTask,
176 verbose: bool,
177 ) -> std::result::Result<(), String> {
178 if verbose {
179 eprintln!("Executing task: {}", task.id);
180 }
181
182 tokio::time::sleep(Duration::from_millis(10)).await;
185
186 if verbose {
187 eprintln!("Task completed: {}", task.id);
188 }
189
190 Ok(())
191 }
192
193 pub fn config(&self) -> &ExecutionConfig {
195 &self.config
196 }
197
198 pub fn set_config(&mut self, config: ExecutionConfig) {
200 self.config = config;
201 }
202
203 pub fn set_max_concurrency(&mut self, max_concurrency: usize) {
205 self.config.max_concurrency = max_concurrency;
206 }
207
208 pub fn set_timeout_ms(&mut self, timeout_ms: u64) {
210 self.config.timeout_ms = timeout_ms;
211 }
212
213 pub fn set_verbose(&mut self, verbose: bool) {
215 self.config.verbose = verbose;
216 }
217}
218
219impl Default for ParallelExecutor {
220 fn default() -> Self {
221 Self::new()
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use crate::models::{TaskOptions, TaskScope, TaskTarget, TaskType};
229 use std::path::PathBuf;
230
231 fn create_test_task(id: &str) -> AgentTask {
232 AgentTask {
233 id: id.to_string(),
234 task_type: TaskType::CodeReview,
235 target: TaskTarget {
236 files: vec![PathBuf::from("test.rs")],
237 scope: TaskScope::File,
238 },
239 options: TaskOptions::default(),
240 }
241 }
242
243 #[test]
244 fn test_execution_config_default() {
245 let config = ExecutionConfig::default();
246 assert_eq!(config.max_concurrency, 4);
247 assert_eq!(config.timeout_ms, 30000);
248 assert!(!config.verbose);
249 }
250
251 #[test]
252 fn test_execution_config_custom() {
253 let config = ExecutionConfig {
254 max_concurrency: 8,
255 timeout_ms: 60000,
256 verbose: true,
257 };
258
259 assert_eq!(config.max_concurrency, 8);
260 assert_eq!(config.timeout_ms, 60000);
261 assert!(config.verbose);
262 }
263
264 #[test]
265 fn test_parallel_executor_new() {
266 let executor = ParallelExecutor::new();
267 assert_eq!(executor.config().max_concurrency, 4);
268 assert_eq!(executor.config().timeout_ms, 30000);
269 }
270
271 #[test]
272 fn test_parallel_executor_with_config() {
273 let config = ExecutionConfig {
274 max_concurrency: 16,
275 timeout_ms: 120000,
276 verbose: true,
277 };
278
279 let executor = ParallelExecutor::with_config(config.clone());
280 assert_eq!(executor.config().max_concurrency, 16);
281 assert_eq!(executor.config().timeout_ms, 120000);
282 assert!(executor.config().verbose);
283 }
284
285 #[test]
286 fn test_parallel_executor_set_max_concurrency() {
287 let mut executor = ParallelExecutor::new();
288 executor.set_max_concurrency(8);
289 assert_eq!(executor.config().max_concurrency, 8);
290 }
291
292 #[test]
293 fn test_parallel_executor_set_timeout() {
294 let mut executor = ParallelExecutor::new();
295 executor.set_timeout_ms(60000);
296 assert_eq!(executor.config().timeout_ms, 60000);
297 }
298
299 #[test]
300 fn test_parallel_executor_set_verbose() {
301 let mut executor = ParallelExecutor::new();
302 executor.set_verbose(true);
303 assert!(executor.config().verbose);
304 }
305
306 #[test]
307 fn test_execution_result_success() {
308 let result = ExecutionResult {
309 task_id: "task1".to_string(),
310 success: true,
311 error: None,
312 duration_ms: 100,
313 };
314
315 assert_eq!(result.task_id, "task1");
316 assert!(result.success);
317 assert!(result.error.is_none());
318 assert_eq!(result.duration_ms, 100);
319 }
320
321 #[test]
322 fn test_execution_result_failure() {
323 let result = ExecutionResult {
324 task_id: "task1".to_string(),
325 success: false,
326 error: Some("Task failed".to_string()),
327 duration_ms: 50,
328 };
329
330 assert_eq!(result.task_id, "task1");
331 assert!(!result.success);
332 assert!(result.error.is_some());
333 assert_eq!(result.error.unwrap(), "Task failed");
334 }
335
336 #[tokio::test]
337 async fn test_execute_phase_single_task() {
338 let executor = ParallelExecutor::new();
339 let phase = ExecutionPhase {
340 tasks: vec![create_test_task("task1")],
341 };
342
343 let results = executor.execute_phase(&phase).await.unwrap();
344 assert_eq!(results.len(), 1);
345 assert_eq!(results[0].task_id, "task1");
346 assert!(results[0].success);
347 }
348
349 #[tokio::test]
350 async fn test_execute_phase_multiple_tasks() {
351 let executor = ParallelExecutor::new();
352 let phase = ExecutionPhase {
353 tasks: vec![
354 create_test_task("task1"),
355 create_test_task("task2"),
356 create_test_task("task3"),
357 ],
358 };
359
360 let results = executor.execute_phase(&phase).await.unwrap();
361 assert_eq!(results.len(), 3);
362
363 for result in &results {
364 assert!(result.success);
365 assert!(result.error.is_none());
366 }
367 }
368
369 #[tokio::test]
370 async fn test_execute_phase_respects_concurrency() {
371 let config = ExecutionConfig {
372 max_concurrency: 2,
373 timeout_ms: 30000,
374 verbose: false,
375 };
376
377 let executor = ParallelExecutor::with_config(config);
378 let phase = ExecutionPhase {
379 tasks: vec![
380 create_test_task("task1"),
381 create_test_task("task2"),
382 create_test_task("task3"),
383 create_test_task("task4"),
384 ],
385 };
386
387 let results = executor.execute_phase(&phase).await.unwrap();
388 assert_eq!(results.len(), 4);
389
390 for result in &results {
392 assert!(result.success);
393 }
394 }
395
396 #[tokio::test]
397 async fn test_execute_phase_empty() {
398 let executor = ParallelExecutor::new();
399 let phase = ExecutionPhase { tasks: vec![] };
400
401 let results = executor.execute_phase(&phase).await.unwrap();
402 assert_eq!(results.len(), 0);
403 }
404
405 #[test]
406 fn test_parallel_executor_default() {
407 let _executor = ParallelExecutor::default();
408 }
410}