ricecoder_agents/
orchestrator.rs

1//! Agent orchestrator for managing agent lifecycle and workflows
2
3use crate::coordinator::AgentCoordinator;
4use crate::error::Result;
5use crate::models::{AgentOutput, AgentTask};
6use crate::registry::AgentRegistry;
7use crate::scheduler::AgentScheduler;
8use std::sync::Arc;
9use std::time::Duration;
10use tracing::{debug, error, info, warn};
11
12/// Configuration for retry logic
13#[derive(Debug, Clone)]
14pub struct RetryConfig {
15    /// Maximum number of retry attempts
16    pub max_retries: u32,
17    /// Initial backoff duration in milliseconds
18    pub initial_backoff_ms: u64,
19    /// Maximum backoff duration in milliseconds
20    pub max_backoff_ms: u64,
21    /// Backoff multiplier for exponential backoff
22    pub backoff_multiplier: f64,
23}
24
25impl Default for RetryConfig {
26    fn default() -> Self {
27        Self {
28            max_retries: 3,
29            initial_backoff_ms: 100,
30            max_backoff_ms: 10000,
31            backoff_multiplier: 2.0,
32        }
33    }
34}
35
36/// Central orchestrator for agent lifecycle and workflows
37///
38/// The `AgentOrchestrator` manages the execution of agents, including:
39/// - Agent lifecycle (initialization, execution, cleanup)
40/// - Inter-agent communication and handoff
41/// - Sequential, parallel, and conditional workflows
42/// - Error handling and retry logic
43///
44/// # Examples
45///
46/// ```ignore
47/// use ricecoder_agents::{AgentOrchestrator, AgentRegistry, AgentTask, TaskType, TaskTarget, TaskScope};
48/// use std::sync::Arc;
49/// use std::path::PathBuf;
50///
51/// #[tokio::main]
52/// async fn main() {
53///     let registry = Arc::new(AgentRegistry::new());
54///     let orchestrator = AgentOrchestrator::new(registry);
55///
56///     let task = AgentTask {
57///         id: "task-1".to_string(),
58///         task_type: TaskType::CodeReview,
59///         target: TaskTarget {
60///             files: vec![PathBuf::from("src/main.rs")],
61///             scope: TaskScope::File,
62///         },
63///         options: Default::default(),
64///     };
65///
66///     let results = orchestrator.execute(vec![task]).await.unwrap();
67/// }
68/// ```
69pub struct AgentOrchestrator {
70    registry: Arc<AgentRegistry>,
71    scheduler: Arc<AgentScheduler>,
72    coordinator: Arc<AgentCoordinator>,
73    retry_config: RetryConfig,
74}
75
76impl AgentOrchestrator {
77    /// Create a new agent orchestrator
78    ///
79    /// # Arguments
80    ///
81    /// * `registry` - The agent registry containing registered agents
82    ///
83    /// # Returns
84    ///
85    /// A new `AgentOrchestrator` with default retry configuration
86    pub fn new(registry: Arc<AgentRegistry>) -> Self {
87        Self {
88            registry,
89            scheduler: Arc::new(AgentScheduler::new()),
90            coordinator: Arc::new(AgentCoordinator::new()),
91            retry_config: RetryConfig::default(),
92        }
93    }
94
95    /// Create a new agent orchestrator with custom retry configuration
96    ///
97    /// # Arguments
98    ///
99    /// * `registry` - The agent registry containing registered agents
100    /// * `retry_config` - Custom retry configuration
101    ///
102    /// # Returns
103    ///
104    /// A new `AgentOrchestrator` with the specified retry configuration
105    pub fn with_retry_config(registry: Arc<AgentRegistry>, retry_config: RetryConfig) -> Self {
106        Self {
107            registry,
108            scheduler: Arc::new(AgentScheduler::new()),
109            coordinator: Arc::new(AgentCoordinator::new()),
110            retry_config,
111        }
112    }
113
114    /// Set the retry configuration
115    pub fn set_retry_config(&mut self, retry_config: RetryConfig) {
116        self.retry_config = retry_config;
117    }
118
119    /// Get the retry configuration
120    pub fn retry_config(&self) -> &RetryConfig {
121        &self.retry_config
122    }
123
124    /// Execute agents for the given tasks with retry logic
125    ///
126    /// This method executes the given tasks with automatic retry on failure.
127    /// If execution fails, it will retry up to `max_retries` times with exponential backoff.
128    ///
129    /// # Arguments
130    ///
131    /// * `tasks` - Vector of tasks to execute
132    ///
133    /// # Returns
134    ///
135    /// A `Result` containing the agent outputs or an error
136    pub async fn execute_with_retry(&self, tasks: Vec<AgentTask>) -> Result<Vec<AgentOutput>> {
137        let mut last_error = None;
138        let mut backoff_ms = self.retry_config.initial_backoff_ms;
139
140        for attempt in 0..=self.retry_config.max_retries {
141            match self.execute(tasks.clone()).await {
142                Ok(outputs) => {
143                    if attempt > 0 {
144                        info!("Orchestration succeeded on attempt {}", attempt + 1);
145                    }
146                    return Ok(outputs);
147                }
148                Err(e) => {
149                    last_error = Some(e.clone());
150
151                    if attempt < self.retry_config.max_retries {
152                        warn!(
153                            "Orchestration failed on attempt {}, retrying in {}ms: {}",
154                            attempt + 1,
155                            backoff_ms,
156                            e
157                        );
158
159                        // Wait with exponential backoff
160                        tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
161
162                        // Calculate next backoff
163                        backoff_ms = std::cmp::min(
164                            (backoff_ms as f64 * self.retry_config.backoff_multiplier) as u64,
165                            self.retry_config.max_backoff_ms,
166                        );
167                    } else {
168                        error!(
169                            "Orchestration failed after {} attempts: {}",
170                            self.retry_config.max_retries + 1,
171                            e
172                        );
173                    }
174                }
175            }
176        }
177
178        Err(last_error.unwrap_or_else(|| {
179            crate::error::AgentError::execution_failed("Unknown error during orchestration")
180        }))
181    }
182
183    /// Execute agents for the given tasks
184    ///
185    /// This method executes the given tasks using the orchestrator's scheduler
186    /// to determine execution order and parallelism. Tasks are executed according
187    /// to their dependencies, with independent tasks running in parallel.
188    ///
189    /// # Arguments
190    ///
191    /// * `tasks` - Vector of tasks to execute
192    ///
193    /// # Returns
194    ///
195    /// A `Result` containing the agent outputs or an error
196    pub async fn execute(&self, tasks: Vec<AgentTask>) -> Result<Vec<AgentOutput>> {
197        info!("Starting orchestration of {} tasks", tasks.len());
198
199        // Create execution schedule
200        let schedule = self.scheduler.schedule(&tasks)?;
201        debug!(
202            "Created execution schedule with {} phases",
203            schedule.phases.len()
204        );
205
206        let mut all_outputs = Vec::new();
207
208        // Execute each phase
209        for (phase_idx, phase) in schedule.phases.iter().enumerate() {
210            debug!("Executing phase {}", phase_idx);
211
212            // Execute all tasks in the phase in parallel
213            let mut phase_futures = Vec::new();
214
215            for task in &phase.tasks {
216                let registry = self.registry.clone();
217                let task = task.clone();
218
219                let future = async move {
220                    // Find agent for this task
221                    let agents = registry.find_agents_by_task_type(task.task_type);
222
223                    if agents.is_empty() {
224                        error!("No agent found for task type: {:?}", task.task_type);
225                        return Err(crate::error::AgentError::not_found(format!(
226                            "No agent for {:?}",
227                            task.task_type
228                        )));
229                    }
230
231                    // Execute the first agent that supports this task
232                    let agent = &agents[0];
233                    debug!("Executing agent {} for task {}", agent.id(), task.id);
234
235                    // Create agent input
236                    let input = crate::models::AgentInput {
237                        task,
238                        context: crate::models::ProjectContext {
239                            name: "ricecoder".to_string(),
240                            root: std::path::PathBuf::from("."),
241                        },
242                        config: crate::models::AgentConfig::default(),
243                    };
244
245                    agent.execute(input).await
246                };
247
248                phase_futures.push(future);
249            }
250
251            // Wait for all futures in the phase to complete
252            let phase_results = futures::future::join_all(phase_futures).await;
253
254            for result in phase_results {
255                match result {
256                    Ok(output) => {
257                        debug!("Agent execution succeeded");
258                        all_outputs.push(output);
259                    }
260                    Err(e) => {
261                        error!("Agent execution failed: {}", e);
262                        return Err(e);
263                    }
264                }
265            }
266        }
267
268        info!("Orchestration completed with {} outputs", all_outputs.len());
269        Ok(all_outputs)
270    }
271
272    /// Execute and aggregate results from multiple agents
273    ///
274    /// This method executes the given tasks and then aggregates all results
275    /// into a single `AgentOutput`, combining findings, suggestions, and generated content.
276    ///
277    /// # Arguments
278    ///
279    /// * `tasks` - Vector of tasks to execute
280    ///
281    /// # Returns
282    ///
283    /// A `Result` containing the aggregated agent output or an error
284    pub async fn execute_and_aggregate(&self, tasks: Vec<AgentTask>) -> Result<AgentOutput> {
285        let outputs = self.execute(tasks).await?;
286        self.coordinator.aggregate(outputs)
287    }
288
289    /// Execute tasks conditionally based on a predicate
290    ///
291    /// This method supports conditional workflows where later tasks only execute
292    /// if earlier tasks meet certain conditions.
293    ///
294    /// # Arguments
295    /// * `tasks` - Vector of tasks to execute
296    /// * `condition` - Closure that determines if execution should continue
297    ///   Takes the current outputs and returns true to continue, false to stop
298    pub async fn execute_conditional<F>(
299        &self,
300        tasks: Vec<AgentTask>,
301        condition: F,
302    ) -> Result<Vec<AgentOutput>>
303    where
304        F: Fn(&[AgentOutput]) -> bool,
305    {
306        info!(
307            "Starting conditional orchestration of {} tasks",
308            tasks.len()
309        );
310
311        let schedule = self.scheduler.schedule(&tasks)?;
312        debug!(
313            "Created execution schedule with {} phases",
314            schedule.phases.len()
315        );
316
317        let mut all_outputs = Vec::new();
318
319        // Execute each phase
320        for (phase_idx, phase) in schedule.phases.iter().enumerate() {
321            debug!("Executing phase {}", phase_idx);
322
323            // Check condition before executing phase
324            if !condition(&all_outputs) {
325                debug!("Condition not met, stopping execution");
326                break;
327            }
328
329            // Execute all tasks in the phase in parallel
330            let mut phase_futures = Vec::new();
331
332            for task in &phase.tasks {
333                let registry = self.registry.clone();
334                let task = task.clone();
335
336                let future = async move {
337                    let agents = registry.find_agents_by_task_type(task.task_type);
338
339                    if agents.is_empty() {
340                        error!("No agent found for task type: {:?}", task.task_type);
341                        return Err(crate::error::AgentError::not_found(format!(
342                            "No agent for {:?}",
343                            task.task_type
344                        )));
345                    }
346
347                    let agent = &agents[0];
348                    debug!("Executing agent {} for task {}", agent.id(), task.id);
349
350                    let input = crate::models::AgentInput {
351                        task,
352                        context: crate::models::ProjectContext {
353                            name: "ricecoder".to_string(),
354                            root: std::path::PathBuf::from("."),
355                        },
356                        config: crate::models::AgentConfig::default(),
357                    };
358
359                    agent.execute(input).await
360                };
361
362                phase_futures.push(future);
363            }
364
365            // Wait for all futures in the phase to complete
366            let phase_results = futures::future::join_all(phase_futures).await;
367
368            for result in phase_results {
369                match result {
370                    Ok(output) => {
371                        debug!("Agent execution succeeded");
372                        all_outputs.push(output);
373                    }
374                    Err(e) => {
375                        error!("Agent execution failed: {}", e);
376                        return Err(e);
377                    }
378                }
379            }
380        }
381
382        info!(
383            "Conditional orchestration completed with {} outputs",
384            all_outputs.len()
385        );
386        Ok(all_outputs)
387    }
388
389    /// Get the registry
390    pub fn registry(&self) -> &AgentRegistry {
391        &self.registry
392    }
393
394    /// Get the scheduler
395    pub fn scheduler(&self) -> &AgentScheduler {
396        &self.scheduler
397    }
398
399    /// Get the coordinator
400    pub fn coordinator(&self) -> &AgentCoordinator {
401        &self.coordinator
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use crate::agents::Agent;
409    use crate::models::{TaskOptions, TaskScope, TaskTarget, TaskType};
410    use std::path::PathBuf;
411
412    struct TestAgent {
413        id: String,
414    }
415
416    #[async_trait::async_trait]
417    impl Agent for TestAgent {
418        fn id(&self) -> &str {
419            &self.id
420        }
421
422        fn name(&self) -> &str {
423            "Test Agent"
424        }
425
426        fn description(&self) -> &str {
427            "A test agent"
428        }
429
430        fn supports(&self, _task_type: TaskType) -> bool {
431            true
432        }
433
434        async fn execute(&self, _input: crate::models::AgentInput) -> Result<AgentOutput> {
435            Ok(AgentOutput::default())
436        }
437    }
438
439    #[tokio::test]
440    async fn test_execute_empty_tasks() {
441        let registry = Arc::new(AgentRegistry::new());
442        let orchestrator = AgentOrchestrator::new(registry);
443
444        let results = orchestrator.execute(vec![]).await.unwrap();
445        assert_eq!(results.len(), 0);
446    }
447
448    #[tokio::test]
449    async fn test_execute_with_agent() {
450        let mut registry = AgentRegistry::new();
451        let agent = Arc::new(TestAgent {
452            id: "test-agent".to_string(),
453        });
454        registry.register(agent);
455
456        let orchestrator = AgentOrchestrator::new(Arc::new(registry));
457
458        let task = AgentTask {
459            id: "task1".to_string(),
460            task_type: TaskType::CodeReview,
461            target: TaskTarget {
462                files: vec![PathBuf::from("test.rs")],
463                scope: TaskScope::File,
464            },
465            options: TaskOptions::default(),
466        };
467
468        let results = orchestrator.execute(vec![task]).await.unwrap();
469        assert_eq!(results.len(), 1);
470    }
471
472    #[tokio::test]
473    async fn test_execute_conditional_always_true() {
474        let mut registry = AgentRegistry::new();
475        let agent = Arc::new(TestAgent {
476            id: "test-agent".to_string(),
477        });
478        registry.register(agent);
479
480        let orchestrator = AgentOrchestrator::new(Arc::new(registry));
481
482        let task = AgentTask {
483            id: "task1".to_string(),
484            task_type: TaskType::CodeReview,
485            target: TaskTarget {
486                files: vec![PathBuf::from("test.rs")],
487                scope: TaskScope::File,
488            },
489            options: TaskOptions::default(),
490        };
491
492        // Condition always returns true
493        let results = orchestrator
494            .execute_conditional(vec![task], |_| true)
495            .await
496            .unwrap();
497        assert_eq!(results.len(), 1);
498    }
499
500    #[tokio::test]
501    async fn test_execute_conditional_always_false() {
502        let mut registry = AgentRegistry::new();
503        let agent = Arc::new(TestAgent {
504            id: "test-agent".to_string(),
505        });
506        registry.register(agent);
507
508        let orchestrator = AgentOrchestrator::new(Arc::new(registry));
509
510        let task = AgentTask {
511            id: "task1".to_string(),
512            task_type: TaskType::CodeReview,
513            target: TaskTarget {
514                files: vec![PathBuf::from("test.rs")],
515                scope: TaskScope::File,
516            },
517            options: TaskOptions::default(),
518        };
519
520        // Condition always returns false
521        let results = orchestrator
522            .execute_conditional(vec![task], |_| false)
523            .await
524            .unwrap();
525        assert_eq!(results.len(), 0);
526    }
527
528    #[tokio::test]
529    async fn test_execute_conditional_based_on_output_count() {
530        let mut registry = AgentRegistry::new();
531        let agent = Arc::new(TestAgent {
532            id: "test-agent".to_string(),
533        });
534        registry.register(agent);
535
536        let orchestrator = AgentOrchestrator::new(Arc::new(registry));
537
538        let tasks = vec![
539            AgentTask {
540                id: "task1".to_string(),
541                task_type: TaskType::CodeReview,
542                target: TaskTarget {
543                    files: vec![PathBuf::from("test.rs")],
544                    scope: TaskScope::File,
545                },
546                options: TaskOptions::default(),
547            },
548            AgentTask {
549                id: "task2".to_string(),
550                task_type: TaskType::CodeReview,
551                target: TaskTarget {
552                    files: vec![PathBuf::from("test.rs")],
553                    scope: TaskScope::File,
554                },
555                options: TaskOptions::default(),
556            },
557        ];
558
559        // Condition: continue if we have less than 2 outputs
560        // Since both tasks are in the same phase, they execute together
561        // So we'll get 2 outputs in the first phase
562        let results = orchestrator
563            .execute_conditional(tasks, |outputs| outputs.len() < 2)
564            .await
565            .unwrap();
566        assert_eq!(results.len(), 2);
567    }
568
569    #[test]
570    fn test_retry_config_default() {
571        let config = RetryConfig::default();
572        assert_eq!(config.max_retries, 3);
573        assert_eq!(config.initial_backoff_ms, 100);
574        assert_eq!(config.max_backoff_ms, 10000);
575        assert_eq!(config.backoff_multiplier, 2.0);
576    }
577
578    #[test]
579    fn test_retry_config_custom() {
580        let config = RetryConfig {
581            max_retries: 5,
582            initial_backoff_ms: 200,
583            max_backoff_ms: 20000,
584            backoff_multiplier: 1.5,
585        };
586
587        assert_eq!(config.max_retries, 5);
588        assert_eq!(config.initial_backoff_ms, 200);
589        assert_eq!(config.max_backoff_ms, 20000);
590        assert_eq!(config.backoff_multiplier, 1.5);
591    }
592
593    #[test]
594    fn test_orchestrator_with_retry_config() {
595        let registry = Arc::new(AgentRegistry::new());
596        let retry_config = RetryConfig {
597            max_retries: 5,
598            initial_backoff_ms: 200,
599            max_backoff_ms: 20000,
600            backoff_multiplier: 1.5,
601        };
602
603        let orchestrator = AgentOrchestrator::with_retry_config(registry, retry_config.clone());
604        assert_eq!(orchestrator.retry_config().max_retries, 5);
605        assert_eq!(orchestrator.retry_config().initial_backoff_ms, 200);
606    }
607
608    #[test]
609    fn test_orchestrator_set_retry_config() {
610        let registry = Arc::new(AgentRegistry::new());
611        let mut orchestrator = AgentOrchestrator::new(registry);
612
613        let new_config = RetryConfig {
614            max_retries: 10,
615            initial_backoff_ms: 500,
616            max_backoff_ms: 30000,
617            backoff_multiplier: 2.5,
618        };
619
620        orchestrator.set_retry_config(new_config);
621        assert_eq!(orchestrator.retry_config().max_retries, 10);
622        assert_eq!(orchestrator.retry_config().initial_backoff_ms, 500);
623    }
624
625    #[tokio::test]
626    async fn test_execute_with_retry_success_first_attempt() {
627        let mut registry = AgentRegistry::new();
628        let agent = Arc::new(TestAgent {
629            id: "test-agent".to_string(),
630        });
631        registry.register(agent);
632
633        let orchestrator = AgentOrchestrator::new(Arc::new(registry));
634
635        let task = AgentTask {
636            id: "task1".to_string(),
637            task_type: TaskType::CodeReview,
638            target: TaskTarget {
639                files: vec![PathBuf::from("test.rs")],
640                scope: TaskScope::File,
641            },
642            options: TaskOptions::default(),
643        };
644
645        let results = orchestrator.execute_with_retry(vec![task]).await.unwrap();
646        assert_eq!(results.len(), 1);
647    }
648
649    #[tokio::test]
650    async fn test_execute_with_retry_empty_tasks() {
651        let registry = Arc::new(AgentRegistry::new());
652        let orchestrator = AgentOrchestrator::new(registry);
653
654        let results = orchestrator.execute_with_retry(vec![]).await.unwrap();
655        assert_eq!(results.len(), 0);
656    }
657}