ricecoder_agents/
executor.rs

1//! Parallel execution engine for agents
2
3use crate::error::Result;
4use crate::models::AgentTask;
5use crate::scheduler::ExecutionPhase;
6use std::time::Duration;
7use tokio::time::timeout;
8use tracing::{debug, info, warn};
9
10/// Configuration for the parallel execution engine
11#[derive(Debug, Clone)]
12pub struct ExecutionConfig {
13    /// Maximum number of concurrent tasks
14    pub max_concurrency: usize,
15    /// Timeout for each task in milliseconds
16    pub timeout_ms: u64,
17    /// Enable detailed logging
18    pub verbose: bool,
19}
20
21impl Default for ExecutionConfig {
22    fn default() -> Self {
23        Self {
24            max_concurrency: 4,
25            timeout_ms: 30000, // 30 seconds
26            verbose: false,
27        }
28    }
29}
30
31/// Result of executing a task
32#[derive(Debug, Clone)]
33pub struct ExecutionResult {
34    /// Task ID
35    pub task_id: String,
36    /// Whether the task succeeded
37    pub success: bool,
38    /// Error message if failed
39    pub error: Option<String>,
40    /// Execution time in milliseconds
41    pub duration_ms: u64,
42}
43
44/// Parallel execution engine for agents
45pub struct ParallelExecutor {
46    config: ExecutionConfig,
47}
48
49impl ParallelExecutor {
50    /// Create a new parallel executor with default configuration
51    pub fn new() -> Self {
52        Self {
53            config: ExecutionConfig::default(),
54        }
55    }
56
57    /// Create a new parallel executor with custom configuration
58    pub fn with_config(config: ExecutionConfig) -> Self {
59        Self { config }
60    }
61
62    /// Execute a phase of tasks in parallel
63    pub async fn execute_phase(&self, phase: &ExecutionPhase) -> Result<Vec<ExecutionResult>> {
64        info!(
65            task_count = phase.tasks.len(),
66            max_concurrency = self.config.max_concurrency,
67            "Starting parallel execution phase"
68        );
69
70        let mut results = Vec::new();
71
72        // Execute tasks with concurrency limit
73        let semaphore =
74            std::sync::Arc::new(tokio::sync::Semaphore::new(self.config.max_concurrency));
75
76        let mut handles = Vec::new();
77
78        for task in &phase.tasks {
79            let task_clone = task.clone();
80            let semaphore_clone = semaphore.clone();
81            let timeout_ms = self.config.timeout_ms;
82            let verbose = self.config.verbose;
83
84            let handle = tokio::spawn(async move {
85                let _permit = semaphore_clone.acquire().await;
86
87                debug!(task_id = %task_clone.id, "Task execution started");
88
89                let start = std::time::Instant::now();
90
91                // Simulate task execution with timeout
92                let result = timeout(
93                    Duration::from_millis(timeout_ms),
94                    Self::execute_task_internal(&task_clone, verbose),
95                )
96                .await;
97
98                let duration_ms = start.elapsed().as_millis() as u64;
99
100                match result {
101                    Ok(Ok(())) => {
102                        debug!(
103                            task_id = %task_clone.id,
104                            duration_ms = duration_ms,
105                            "Task execution completed successfully"
106                        );
107                        ExecutionResult {
108                            task_id: task_clone.id.clone(),
109                            success: true,
110                            error: None,
111                            duration_ms,
112                        }
113                    }
114                    Ok(Err(e)) => {
115                        warn!(
116                            task_id = %task_clone.id,
117                            error = %e,
118                            duration_ms = duration_ms,
119                            "Task execution failed"
120                        );
121                        ExecutionResult {
122                            task_id: task_clone.id.clone(),
123                            success: false,
124                            error: Some(e),
125                            duration_ms,
126                        }
127                    }
128                    Err(_) => {
129                        warn!(
130                            task_id = %task_clone.id,
131                            timeout_ms = timeout_ms,
132                            "Task execution timeout"
133                        );
134                        ExecutionResult {
135                            task_id: task_clone.id.clone(),
136                            success: false,
137                            error: Some(format!("Task timeout after {}ms", timeout_ms)),
138                            duration_ms,
139                        }
140                    }
141                }
142            });
143
144            handles.push(handle);
145        }
146
147        // Collect results
148        for handle in handles {
149            match handle.await {
150                Ok(result) => results.push(result),
151                Err(e) => {
152                    // Task was cancelled or panicked
153                    warn!(error = %e, "Task execution error");
154                    results.push(ExecutionResult {
155                        task_id: "unknown".to_string(),
156                        success: false,
157                        error: Some(format!("Task execution error: {}", e)),
158                        duration_ms: 0,
159                    });
160                }
161            }
162        }
163
164        info!(
165            completed_count = results.len(),
166            success_count = results.iter().filter(|r| r.success).count(),
167            "Parallel execution phase completed"
168        );
169
170        Ok(results)
171    }
172
173    /// Internal task execution logic
174    async fn execute_task_internal(
175        task: &AgentTask,
176        verbose: bool,
177    ) -> std::result::Result<(), String> {
178        if verbose {
179            eprintln!("Executing task: {}", task.id);
180        }
181
182        // Simulate task execution
183        // In a real implementation, this would call the actual agent
184        tokio::time::sleep(Duration::from_millis(10)).await;
185
186        if verbose {
187            eprintln!("Task completed: {}", task.id);
188        }
189
190        Ok(())
191    }
192
193    /// Get the current configuration
194    pub fn config(&self) -> &ExecutionConfig {
195        &self.config
196    }
197
198    /// Update the configuration
199    pub fn set_config(&mut self, config: ExecutionConfig) {
200        self.config = config;
201    }
202
203    /// Set the maximum concurrency
204    pub fn set_max_concurrency(&mut self, max_concurrency: usize) {
205        self.config.max_concurrency = max_concurrency;
206    }
207
208    /// Set the timeout for tasks
209    pub fn set_timeout_ms(&mut self, timeout_ms: u64) {
210        self.config.timeout_ms = timeout_ms;
211    }
212
213    /// Enable or disable verbose logging
214    pub fn set_verbose(&mut self, verbose: bool) {
215        self.config.verbose = verbose;
216    }
217}
218
219impl Default for ParallelExecutor {
220    fn default() -> Self {
221        Self::new()
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228    use crate::models::{TaskOptions, TaskScope, TaskTarget, TaskType};
229    use std::path::PathBuf;
230
231    fn create_test_task(id: &str) -> AgentTask {
232        AgentTask {
233            id: id.to_string(),
234            task_type: TaskType::CodeReview,
235            target: TaskTarget {
236                files: vec![PathBuf::from("test.rs")],
237                scope: TaskScope::File,
238            },
239            options: TaskOptions::default(),
240        }
241    }
242
243    #[test]
244    fn test_execution_config_default() {
245        let config = ExecutionConfig::default();
246        assert_eq!(config.max_concurrency, 4);
247        assert_eq!(config.timeout_ms, 30000);
248        assert!(!config.verbose);
249    }
250
251    #[test]
252    fn test_execution_config_custom() {
253        let config = ExecutionConfig {
254            max_concurrency: 8,
255            timeout_ms: 60000,
256            verbose: true,
257        };
258
259        assert_eq!(config.max_concurrency, 8);
260        assert_eq!(config.timeout_ms, 60000);
261        assert!(config.verbose);
262    }
263
264    #[test]
265    fn test_parallel_executor_new() {
266        let executor = ParallelExecutor::new();
267        assert_eq!(executor.config().max_concurrency, 4);
268        assert_eq!(executor.config().timeout_ms, 30000);
269    }
270
271    #[test]
272    fn test_parallel_executor_with_config() {
273        let config = ExecutionConfig {
274            max_concurrency: 16,
275            timeout_ms: 120000,
276            verbose: true,
277        };
278
279        let executor = ParallelExecutor::with_config(config.clone());
280        assert_eq!(executor.config().max_concurrency, 16);
281        assert_eq!(executor.config().timeout_ms, 120000);
282        assert!(executor.config().verbose);
283    }
284
285    #[test]
286    fn test_parallel_executor_set_max_concurrency() {
287        let mut executor = ParallelExecutor::new();
288        executor.set_max_concurrency(8);
289        assert_eq!(executor.config().max_concurrency, 8);
290    }
291
292    #[test]
293    fn test_parallel_executor_set_timeout() {
294        let mut executor = ParallelExecutor::new();
295        executor.set_timeout_ms(60000);
296        assert_eq!(executor.config().timeout_ms, 60000);
297    }
298
299    #[test]
300    fn test_parallel_executor_set_verbose() {
301        let mut executor = ParallelExecutor::new();
302        executor.set_verbose(true);
303        assert!(executor.config().verbose);
304    }
305
306    #[test]
307    fn test_execution_result_success() {
308        let result = ExecutionResult {
309            task_id: "task1".to_string(),
310            success: true,
311            error: None,
312            duration_ms: 100,
313        };
314
315        assert_eq!(result.task_id, "task1");
316        assert!(result.success);
317        assert!(result.error.is_none());
318        assert_eq!(result.duration_ms, 100);
319    }
320
321    #[test]
322    fn test_execution_result_failure() {
323        let result = ExecutionResult {
324            task_id: "task1".to_string(),
325            success: false,
326            error: Some("Task failed".to_string()),
327            duration_ms: 50,
328        };
329
330        assert_eq!(result.task_id, "task1");
331        assert!(!result.success);
332        assert!(result.error.is_some());
333        assert_eq!(result.error.unwrap(), "Task failed");
334    }
335
336    #[tokio::test]
337    async fn test_execute_phase_single_task() {
338        let executor = ParallelExecutor::new();
339        let phase = ExecutionPhase {
340            tasks: vec![create_test_task("task1")],
341        };
342
343        let results = executor.execute_phase(&phase).await.unwrap();
344        assert_eq!(results.len(), 1);
345        assert_eq!(results[0].task_id, "task1");
346        assert!(results[0].success);
347    }
348
349    #[tokio::test]
350    async fn test_execute_phase_multiple_tasks() {
351        let executor = ParallelExecutor::new();
352        let phase = ExecutionPhase {
353            tasks: vec![
354                create_test_task("task1"),
355                create_test_task("task2"),
356                create_test_task("task3"),
357            ],
358        };
359
360        let results = executor.execute_phase(&phase).await.unwrap();
361        assert_eq!(results.len(), 3);
362
363        for result in &results {
364            assert!(result.success);
365            assert!(result.error.is_none());
366        }
367    }
368
369    #[tokio::test]
370    async fn test_execute_phase_respects_concurrency() {
371        let config = ExecutionConfig {
372            max_concurrency: 2,
373            timeout_ms: 30000,
374            verbose: false,
375        };
376
377        let executor = ParallelExecutor::with_config(config);
378        let phase = ExecutionPhase {
379            tasks: vec![
380                create_test_task("task1"),
381                create_test_task("task2"),
382                create_test_task("task3"),
383                create_test_task("task4"),
384            ],
385        };
386
387        let results = executor.execute_phase(&phase).await.unwrap();
388        assert_eq!(results.len(), 4);
389
390        // All tasks should complete successfully
391        for result in &results {
392            assert!(result.success);
393        }
394    }
395
396    #[tokio::test]
397    async fn test_execute_phase_empty() {
398        let executor = ParallelExecutor::new();
399        let phase = ExecutionPhase { tasks: vec![] };
400
401        let results = executor.execute_phase(&phase).await.unwrap();
402        assert_eq!(results.len(), 0);
403    }
404
405    #[test]
406    fn test_parallel_executor_default() {
407        let _executor = ParallelExecutor::default();
408        // Just verify it can be created with default
409    }
410}