1use crate::coordinator::AgentCoordinator;
4use crate::error::Result;
5use crate::models::{AgentOutput, AgentTask};
6use crate::registry::AgentRegistry;
7use crate::scheduler::AgentScheduler;
8use std::sync::Arc;
9use std::time::Duration;
10use tracing::{debug, error, info, warn};
11
12#[derive(Debug, Clone)]
14pub struct RetryConfig {
15 pub max_retries: u32,
17 pub initial_backoff_ms: u64,
19 pub max_backoff_ms: u64,
21 pub backoff_multiplier: f64,
23}
24
25impl Default for RetryConfig {
26 fn default() -> Self {
27 Self {
28 max_retries: 3,
29 initial_backoff_ms: 100,
30 max_backoff_ms: 10000,
31 backoff_multiplier: 2.0,
32 }
33 }
34}
35
36pub struct AgentOrchestrator {
70 registry: Arc<AgentRegistry>,
71 scheduler: Arc<AgentScheduler>,
72 coordinator: Arc<AgentCoordinator>,
73 retry_config: RetryConfig,
74}
75
76impl AgentOrchestrator {
77 pub fn new(registry: Arc<AgentRegistry>) -> Self {
87 Self {
88 registry,
89 scheduler: Arc::new(AgentScheduler::new()),
90 coordinator: Arc::new(AgentCoordinator::new()),
91 retry_config: RetryConfig::default(),
92 }
93 }
94
95 pub fn with_retry_config(registry: Arc<AgentRegistry>, retry_config: RetryConfig) -> Self {
106 Self {
107 registry,
108 scheduler: Arc::new(AgentScheduler::new()),
109 coordinator: Arc::new(AgentCoordinator::new()),
110 retry_config,
111 }
112 }
113
114 pub fn set_retry_config(&mut self, retry_config: RetryConfig) {
116 self.retry_config = retry_config;
117 }
118
119 pub fn retry_config(&self) -> &RetryConfig {
121 &self.retry_config
122 }
123
124 pub async fn execute_with_retry(&self, tasks: Vec<AgentTask>) -> Result<Vec<AgentOutput>> {
137 let mut last_error = None;
138 let mut backoff_ms = self.retry_config.initial_backoff_ms;
139
140 for attempt in 0..=self.retry_config.max_retries {
141 match self.execute(tasks.clone()).await {
142 Ok(outputs) => {
143 if attempt > 0 {
144 info!("Orchestration succeeded on attempt {}", attempt + 1);
145 }
146 return Ok(outputs);
147 }
148 Err(e) => {
149 last_error = Some(e.clone());
150
151 if attempt < self.retry_config.max_retries {
152 warn!(
153 "Orchestration failed on attempt {}, retrying in {}ms: {}",
154 attempt + 1,
155 backoff_ms,
156 e
157 );
158
159 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
161
162 backoff_ms = std::cmp::min(
164 (backoff_ms as f64 * self.retry_config.backoff_multiplier) as u64,
165 self.retry_config.max_backoff_ms,
166 );
167 } else {
168 error!(
169 "Orchestration failed after {} attempts: {}",
170 self.retry_config.max_retries + 1,
171 e
172 );
173 }
174 }
175 }
176 }
177
178 Err(last_error.unwrap_or_else(|| {
179 crate::error::AgentError::execution_failed("Unknown error during orchestration")
180 }))
181 }
182
183 pub async fn execute(&self, tasks: Vec<AgentTask>) -> Result<Vec<AgentOutput>> {
197 info!("Starting orchestration of {} tasks", tasks.len());
198
199 let schedule = self.scheduler.schedule(&tasks)?;
201 debug!(
202 "Created execution schedule with {} phases",
203 schedule.phases.len()
204 );
205
206 let mut all_outputs = Vec::new();
207
208 for (phase_idx, phase) in schedule.phases.iter().enumerate() {
210 debug!("Executing phase {}", phase_idx);
211
212 let mut phase_futures = Vec::new();
214
215 for task in &phase.tasks {
216 let registry = self.registry.clone();
217 let task = task.clone();
218
219 let future = async move {
220 let agents = registry.find_agents_by_task_type(task.task_type);
222
223 if agents.is_empty() {
224 error!("No agent found for task type: {:?}", task.task_type);
225 return Err(crate::error::AgentError::not_found(format!(
226 "No agent for {:?}",
227 task.task_type
228 )));
229 }
230
231 let agent = &agents[0];
233 debug!("Executing agent {} for task {}", agent.id(), task.id);
234
235 let input = crate::models::AgentInput {
237 task,
238 context: crate::models::ProjectContext {
239 name: "ricecoder".to_string(),
240 root: std::path::PathBuf::from("."),
241 },
242 config: crate::models::AgentConfig::default(),
243 };
244
245 agent.execute(input).await
246 };
247
248 phase_futures.push(future);
249 }
250
251 let phase_results = futures::future::join_all(phase_futures).await;
253
254 for result in phase_results {
255 match result {
256 Ok(output) => {
257 debug!("Agent execution succeeded");
258 all_outputs.push(output);
259 }
260 Err(e) => {
261 error!("Agent execution failed: {}", e);
262 return Err(e);
263 }
264 }
265 }
266 }
267
268 info!("Orchestration completed with {} outputs", all_outputs.len());
269 Ok(all_outputs)
270 }
271
272 pub async fn execute_and_aggregate(&self, tasks: Vec<AgentTask>) -> Result<AgentOutput> {
285 let outputs = self.execute(tasks).await?;
286 self.coordinator.aggregate(outputs)
287 }
288
289 pub async fn execute_conditional<F>(
299 &self,
300 tasks: Vec<AgentTask>,
301 condition: F,
302 ) -> Result<Vec<AgentOutput>>
303 where
304 F: Fn(&[AgentOutput]) -> bool,
305 {
306 info!(
307 "Starting conditional orchestration of {} tasks",
308 tasks.len()
309 );
310
311 let schedule = self.scheduler.schedule(&tasks)?;
312 debug!(
313 "Created execution schedule with {} phases",
314 schedule.phases.len()
315 );
316
317 let mut all_outputs = Vec::new();
318
319 for (phase_idx, phase) in schedule.phases.iter().enumerate() {
321 debug!("Executing phase {}", phase_idx);
322
323 if !condition(&all_outputs) {
325 debug!("Condition not met, stopping execution");
326 break;
327 }
328
329 let mut phase_futures = Vec::new();
331
332 for task in &phase.tasks {
333 let registry = self.registry.clone();
334 let task = task.clone();
335
336 let future = async move {
337 let agents = registry.find_agents_by_task_type(task.task_type);
338
339 if agents.is_empty() {
340 error!("No agent found for task type: {:?}", task.task_type);
341 return Err(crate::error::AgentError::not_found(format!(
342 "No agent for {:?}",
343 task.task_type
344 )));
345 }
346
347 let agent = &agents[0];
348 debug!("Executing agent {} for task {}", agent.id(), task.id);
349
350 let input = crate::models::AgentInput {
351 task,
352 context: crate::models::ProjectContext {
353 name: "ricecoder".to_string(),
354 root: std::path::PathBuf::from("."),
355 },
356 config: crate::models::AgentConfig::default(),
357 };
358
359 agent.execute(input).await
360 };
361
362 phase_futures.push(future);
363 }
364
365 let phase_results = futures::future::join_all(phase_futures).await;
367
368 for result in phase_results {
369 match result {
370 Ok(output) => {
371 debug!("Agent execution succeeded");
372 all_outputs.push(output);
373 }
374 Err(e) => {
375 error!("Agent execution failed: {}", e);
376 return Err(e);
377 }
378 }
379 }
380 }
381
382 info!(
383 "Conditional orchestration completed with {} outputs",
384 all_outputs.len()
385 );
386 Ok(all_outputs)
387 }
388
389 pub fn registry(&self) -> &AgentRegistry {
391 &self.registry
392 }
393
394 pub fn scheduler(&self) -> &AgentScheduler {
396 &self.scheduler
397 }
398
399 pub fn coordinator(&self) -> &AgentCoordinator {
401 &self.coordinator
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use crate::agents::Agent;
409 use crate::models::{TaskOptions, TaskScope, TaskTarget, TaskType};
410 use std::path::PathBuf;
411
412 struct TestAgent {
413 id: String,
414 }
415
416 #[async_trait::async_trait]
417 impl Agent for TestAgent {
418 fn id(&self) -> &str {
419 &self.id
420 }
421
422 fn name(&self) -> &str {
423 "Test Agent"
424 }
425
426 fn description(&self) -> &str {
427 "A test agent"
428 }
429
430 fn supports(&self, _task_type: TaskType) -> bool {
431 true
432 }
433
434 async fn execute(&self, _input: crate::models::AgentInput) -> Result<AgentOutput> {
435 Ok(AgentOutput::default())
436 }
437 }
438
439 #[tokio::test]
440 async fn test_execute_empty_tasks() {
441 let registry = Arc::new(AgentRegistry::new());
442 let orchestrator = AgentOrchestrator::new(registry);
443
444 let results = orchestrator.execute(vec![]).await.unwrap();
445 assert_eq!(results.len(), 0);
446 }
447
448 #[tokio::test]
449 async fn test_execute_with_agent() {
450 let mut registry = AgentRegistry::new();
451 let agent = Arc::new(TestAgent {
452 id: "test-agent".to_string(),
453 });
454 registry.register(agent);
455
456 let orchestrator = AgentOrchestrator::new(Arc::new(registry));
457
458 let task = AgentTask {
459 id: "task1".to_string(),
460 task_type: TaskType::CodeReview,
461 target: TaskTarget {
462 files: vec![PathBuf::from("test.rs")],
463 scope: TaskScope::File,
464 },
465 options: TaskOptions::default(),
466 };
467
468 let results = orchestrator.execute(vec![task]).await.unwrap();
469 assert_eq!(results.len(), 1);
470 }
471
472 #[tokio::test]
473 async fn test_execute_conditional_always_true() {
474 let mut registry = AgentRegistry::new();
475 let agent = Arc::new(TestAgent {
476 id: "test-agent".to_string(),
477 });
478 registry.register(agent);
479
480 let orchestrator = AgentOrchestrator::new(Arc::new(registry));
481
482 let task = AgentTask {
483 id: "task1".to_string(),
484 task_type: TaskType::CodeReview,
485 target: TaskTarget {
486 files: vec![PathBuf::from("test.rs")],
487 scope: TaskScope::File,
488 },
489 options: TaskOptions::default(),
490 };
491
492 let results = orchestrator
494 .execute_conditional(vec![task], |_| true)
495 .await
496 .unwrap();
497 assert_eq!(results.len(), 1);
498 }
499
500 #[tokio::test]
501 async fn test_execute_conditional_always_false() {
502 let mut registry = AgentRegistry::new();
503 let agent = Arc::new(TestAgent {
504 id: "test-agent".to_string(),
505 });
506 registry.register(agent);
507
508 let orchestrator = AgentOrchestrator::new(Arc::new(registry));
509
510 let task = AgentTask {
511 id: "task1".to_string(),
512 task_type: TaskType::CodeReview,
513 target: TaskTarget {
514 files: vec![PathBuf::from("test.rs")],
515 scope: TaskScope::File,
516 },
517 options: TaskOptions::default(),
518 };
519
520 let results = orchestrator
522 .execute_conditional(vec![task], |_| false)
523 .await
524 .unwrap();
525 assert_eq!(results.len(), 0);
526 }
527
528 #[tokio::test]
529 async fn test_execute_conditional_based_on_output_count() {
530 let mut registry = AgentRegistry::new();
531 let agent = Arc::new(TestAgent {
532 id: "test-agent".to_string(),
533 });
534 registry.register(agent);
535
536 let orchestrator = AgentOrchestrator::new(Arc::new(registry));
537
538 let tasks = vec![
539 AgentTask {
540 id: "task1".to_string(),
541 task_type: TaskType::CodeReview,
542 target: TaskTarget {
543 files: vec![PathBuf::from("test.rs")],
544 scope: TaskScope::File,
545 },
546 options: TaskOptions::default(),
547 },
548 AgentTask {
549 id: "task2".to_string(),
550 task_type: TaskType::CodeReview,
551 target: TaskTarget {
552 files: vec![PathBuf::from("test.rs")],
553 scope: TaskScope::File,
554 },
555 options: TaskOptions::default(),
556 },
557 ];
558
559 let results = orchestrator
563 .execute_conditional(tasks, |outputs| outputs.len() < 2)
564 .await
565 .unwrap();
566 assert_eq!(results.len(), 2);
567 }
568
569 #[test]
570 fn test_retry_config_default() {
571 let config = RetryConfig::default();
572 assert_eq!(config.max_retries, 3);
573 assert_eq!(config.initial_backoff_ms, 100);
574 assert_eq!(config.max_backoff_ms, 10000);
575 assert_eq!(config.backoff_multiplier, 2.0);
576 }
577
578 #[test]
579 fn test_retry_config_custom() {
580 let config = RetryConfig {
581 max_retries: 5,
582 initial_backoff_ms: 200,
583 max_backoff_ms: 20000,
584 backoff_multiplier: 1.5,
585 };
586
587 assert_eq!(config.max_retries, 5);
588 assert_eq!(config.initial_backoff_ms, 200);
589 assert_eq!(config.max_backoff_ms, 20000);
590 assert_eq!(config.backoff_multiplier, 1.5);
591 }
592
593 #[test]
594 fn test_orchestrator_with_retry_config() {
595 let registry = Arc::new(AgentRegistry::new());
596 let retry_config = RetryConfig {
597 max_retries: 5,
598 initial_backoff_ms: 200,
599 max_backoff_ms: 20000,
600 backoff_multiplier: 1.5,
601 };
602
603 let orchestrator = AgentOrchestrator::with_retry_config(registry, retry_config.clone());
604 assert_eq!(orchestrator.retry_config().max_retries, 5);
605 assert_eq!(orchestrator.retry_config().initial_backoff_ms, 200);
606 }
607
608 #[test]
609 fn test_orchestrator_set_retry_config() {
610 let registry = Arc::new(AgentRegistry::new());
611 let mut orchestrator = AgentOrchestrator::new(registry);
612
613 let new_config = RetryConfig {
614 max_retries: 10,
615 initial_backoff_ms: 500,
616 max_backoff_ms: 30000,
617 backoff_multiplier: 2.5,
618 };
619
620 orchestrator.set_retry_config(new_config);
621 assert_eq!(orchestrator.retry_config().max_retries, 10);
622 assert_eq!(orchestrator.retry_config().initial_backoff_ms, 500);
623 }
624
625 #[tokio::test]
626 async fn test_execute_with_retry_success_first_attempt() {
627 let mut registry = AgentRegistry::new();
628 let agent = Arc::new(TestAgent {
629 id: "test-agent".to_string(),
630 });
631 registry.register(agent);
632
633 let orchestrator = AgentOrchestrator::new(Arc::new(registry));
634
635 let task = AgentTask {
636 id: "task1".to_string(),
637 task_type: TaskType::CodeReview,
638 target: TaskTarget {
639 files: vec![PathBuf::from("test.rs")],
640 scope: TaskScope::File,
641 },
642 options: TaskOptions::default(),
643 };
644
645 let results = orchestrator.execute_with_retry(vec![task]).await.unwrap();
646 assert_eq!(results.len(), 1);
647 }
648
649 #[tokio::test]
650 async fn test_execute_with_retry_empty_tasks() {
651 let registry = Arc::new(AgentRegistry::new());
652 let orchestrator = AgentOrchestrator::new(registry);
653
654 let results = orchestrator.execute_with_retry(vec![]).await.unwrap();
655 assert_eq!(results.len(), 0);
656 }
657}