1use std::{
2 collections::{HashMap, HashSet, VecDeque},
3 sync::Arc,
4};
5
6use tokio::sync::mpsc;
7
8use crate::{
9 Result, RoutexError,
10 agent::{Agent, AgentMessage},
11 config::Config,
12 llm::{Adapter, anthropic::AnthropicAdapter, openai::OpenAIAdapter},
13 tools::Registry,
14};
15
16#[derive(Debug, Clone)]
20pub struct RunResult {
21 pub output: String,
23
24 pub agent_outputs: HashMap<String, String>,
26
27 pub total_input_tokens: u32,
29 pub total_output_tokens: u32,
30}
31
32pub struct Runtime {
50 config: Config,
51 registry: Arc<Registry>,
52 adapter: Option<Arc<dyn Adapter + Send + Sync>>,
53}
54
55impl Runtime {
56 pub fn from_file(path: impl AsRef<std::path::Path>) -> Result<Self> {
59 let config = Config::from_file(path)?;
60 Self::from_config(config)
61 }
62
63 pub fn from_config(config: Config) -> Result<Self> {
65 let mut registry = Registry::new();
66
67 for tool_cfg in &config.tools {
69 match tool_cfg.name.as_str() {
70 "web_search" => {
71 registry.register(crate::tools::web_search::WebSearchTool::new());
72 }
73 unknown => {
74 return Err(RoutexError::ToolNotFound {
75 name: unknown.to_string(),
76 });
77 }
78 }
79 }
80
81 Ok(Self {
82 config,
83 registry: Arc::new(registry),
84 adapter: None,
85 })
86 }
87
88 pub fn register_tool(&mut self, tool: impl crate::tools::Tool + 'static) {
91 if let Some(registry) = Arc::get_mut(&mut self.registry) {
92 registry.register(tool);
93 }
94 }
95
96 pub async fn run(&self) -> Result<RunResult> {
104 self.validate_tool_references()?;
106
107 let adapter = build_adapter(&self.config)?;
108
109 let agent_count = self.config.agents.len();
110
111 let (status_tx, mut status_rx) = mpsc::channel::<AgentMessage>(agent_count * 10);
112
113 let mut agent_outputs: HashMap<String, String> = HashMap::new();
115
116 let waves = build_execution_waves(&self.config)?;
117
118 for wave in waves {
121 let mut handles = Vec::new();
123
124 for agent_id in &wave {
125 let agent_config = self
126 .config
127 .agents
128 .iter()
129 .find(|a| &a.id == agent_id)
130 .expect("agent in wave must exist in config")
131 .clone();
132
133 let task = build_agent_task(
136 &self.config.task.input,
137 &agent_config.depends,
138 &agent_outputs,
139 );
140
141 let agent = Agent::new(
142 agent_config,
143 Arc::clone(&adapter),
144 Arc::clone(&self.registry),
145 );
146
147 let (inbox_tx, inbox_rx) = mpsc::channel::<String>(1);
149 let status_tx = status_tx.clone();
150
151 inbox_tx
153 .send(task)
154 .await
155 .map_err(|e| RoutexError::AgentFailed {
156 id: agent_id.clone(),
157 reason: format!("failed to send task: {}", e),
158 })?;
159
160 let handle = tokio::spawn(async move { agent.run(inbox_rx, status_tx).await });
162
163 handles.push((agent_id.clone(), handle));
164 }
165
166 for (agent_id, handle) in handles {
169 match handle.await {
170 Ok(Ok(output)) => {
171 agent_outputs.insert(agent_id, output);
172 }
173 Ok(Err(e)) => {
174 return Err(RoutexError::AgentFailed {
175 id: agent_id,
176 reason: e.to_string(),
177 });
178 }
179 Err(e) => {
180 return Err(RoutexError::AgentFailed {
182 id: agent_id,
183 reason: format!("task panicked: {}", e),
184 });
185 }
186 }
187 }
188 }
189
190 drop(status_tx);
192 while status_rx.try_recv().is_ok() {}
193
194 let final_output = find_final_output(&self.config, &agent_outputs)?;
197
198 Ok(RunResult {
199 output: final_output,
200 agent_outputs,
201 total_input_tokens: 0, total_output_tokens: 0,
203 })
204 }
205
206 fn validate_tool_references(&self) -> Result<()> {
209 for agent in &self.config.agents {
210 for tool_name in &agent.tools {
211 if !self.registry.has(tool_name) {
212 return Err(RoutexError::ToolNotFound {
213 name: tool_name.clone(),
214 });
215 }
216 }
217 }
218 Ok(())
219 }
220
221 pub fn list_tools(&self) -> Vec<crate::tools::ToolInfo> {
224 self.registry.list()
225 }
226}
227
228fn find_final_output(config: &Config, outputs: &HashMap<String, String>) -> Result<String> {
232 let all_deps: HashSet<String> = config
234 .agents
235 .iter()
236 .flat_map(|a| a.depends.iter().cloned())
237 .collect();
238
239 let final_agents: Vec<&str> = config
240 .agents
241 .iter()
242 .filter(|a| !all_deps.contains(&a.id))
243 .map(|a| a.id.as_str())
244 .collect();
245
246 match final_agents.len() {
247 0 => Err(RoutexError::Config(
248 "could not determine final agent".to_string(),
249 )),
250 1 => {
251 let id = final_agents[0];
252 outputs
253 .get(id)
254 .cloned()
255 .ok_or_else(|| RoutexError::AgentFailed {
256 id: id.to_string(),
257 reason: "no output recorded".to_string(),
258 })
259 }
260 _ => {
261 let combined = final_agents
263 .iter()
264 .filter_map(|id| outputs.get(*id))
265 .cloned()
266 .collect::<Vec<_>>()
267 .join("\n\n");
268 Ok(combined)
269 }
270 }
271}
272
273fn build_agent_task(
276 original_task: &str,
277 depends: &[String],
278 outputs: &HashMap<String, String>,
279) -> String {
280 if depends.is_empty() {
281 return original_task.to_string();
282 }
283
284 let mut context = format!("Task: {}\n\nContext from previous agents:\n", original_task);
286
287 for dep_id in depends {
288 if let Some(output) = outputs.get(dep_id) {
289 context.push_str(&format!("\n[{}]:\n{}\n", dep_id, output));
290 }
291 }
292
293 context
294}
295
296fn build_execution_waves(config: &Config) -> Result<Vec<Vec<String>>> {
308 let mut in_degree: HashMap<String, usize> = HashMap::new();
310
311 let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
313
314 for agent in &config.agents {
315 in_degree.entry(agent.id.clone()).or_insert(0);
316 dependents.entry(agent.id.clone()).or_default();
317
318 for dep in &agent.depends {
319 *in_degree.entry(agent.id.clone()).or_insert(0) += 1;
320 dependents
321 .entry(dep.clone())
322 .or_default()
323 .push(agent.id.clone());
324 }
325 }
326
327 let mut queue: VecDeque<String> = in_degree
328 .iter()
329 .filter(|&(_, °ree)| degree == 0)
330 .map(|(id, _)| id.clone())
331 .collect();
332
333 if queue.is_empty() && !config.agents.is_empty() {
334 return Err(RoutexError::Config(
335 "all agents have dependencies — possible cycle".to_string(),
336 ));
337 }
338
339 let mut waves: Vec<Vec<String>> = Vec::new();
340 let mut scheduled: HashSet<String> = HashSet::new();
341
342 while !queue.is_empty() {
344 let wave: Vec<String> = queue.drain(..).collect();
346
347 for id in &wave {
348 scheduled.insert(id.clone());
349
350 if let Some(deps) = dependents.get(id) {
352 for dependent in deps {
353 let degree = in_degree.get_mut(dependent).unwrap();
354 *degree -= 1;
355 if *degree == 0 {
357 queue.push_back(dependent.clone());
358 }
359 }
360 }
361 }
362
363 waves.push(wave);
364 }
365
366 if scheduled.len() != config.agents.len() {
368 let unscheduled: Vec<String> = config
369 .agents
370 .iter()
371 .filter(|a| !scheduled.contains(&a.id))
372 .map(|a| a.id.clone())
373 .collect();
374
375 return Err(RoutexError::CyclicDependency {
376 id: unscheduled.first().cloned().unwrap_or_default(),
377 });
378 }
379
380 Ok(waves)
381}
382
383fn build_adapter(config: &Config) -> Result<Arc<dyn Adapter + Send + Sync>> {
386 match config.runtime.llm_provider.as_str() {
387 "anthropic" => {
388 if config.runtime.api_key.is_empty() {
389 return Err(RoutexError::Config(
390 "anthropic provider require an api_key".to_string(),
391 ));
392 }
393 Ok(Arc::new(AnthropicAdapter::new(
394 &config.runtime.api_key,
395 &config.runtime.model,
396 )))
397 }
398 "openai" => {
399 if config.runtime.api_key.is_empty() {
400 return Err(RoutexError::Config(
401 "openai provider require an api_key".to_string(),
402 ));
403 }
404 Ok(Arc::new(OpenAIAdapter::new(
405 &config.runtime.api_key,
406 &config.runtime.model,
407 )))
408 }
409 other => Err(RoutexError::Config(format!(
410 "unknown llm_provider '{}' - supported: anthropic",
411 other
412 ))),
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use crate::config::{AgentConfig, Config, RuntimeConfig, TaskConfig};
420
421 fn make_config(agents: Vec<AgentConfig>) -> Config {
422 Config {
423 runtime: RuntimeConfig {
424 name: "test".to_string(),
425 llm_provider: "anthropic".to_string(),
426 model: "claude-haiku-4-5-20251001".to_string(),
427 api_key: "test-key".to_string(),
428 base_url: None,
429 log_level: "info".to_string(),
430 max_tokens: 4096,
431 },
432 task: TaskConfig {
433 input: "Research Go frameworks".to_string(),
434 },
435 agents,
436 tools: vec![],
437 }
438 }
439
440 fn simple_agent(id: &str, depends: Vec<&str>) -> AgentConfig {
441 AgentConfig {
442 id: id.to_string(),
443 role: crate::config::Role::Researcher,
444 goal: "research".to_string(),
445 backstory: None,
446 tools: vec![],
447 depends: depends.iter().map(|s| s.to_string()).collect(),
448 restart: "one_for_one".to_string(),
449 llm: None,
450 max_tool_calls: 20,
451 }
452 }
453
454 #[test]
455 fn test_single_agent_wave() {
456 let config = make_config(vec![simple_agent("researcher", vec![])]);
457 let waves = build_execution_waves(&config).unwrap();
458 assert_eq!(waves.len(), 1);
459 assert_eq!(waves[0], vec!["researcher"]);
460 }
461
462 #[test]
463 fn test_sequential_agents_two_waves() {
464 let config = make_config(vec![
465 simple_agent("researcher", vec![]),
466 simple_agent("writer", vec!["researcher"]),
467 ]);
468 let waves = build_execution_waves(&config).unwrap();
469 assert_eq!(waves.len(), 2);
470 assert!(waves[0].contains(&"researcher".to_string()));
471 assert!(waves[1].contains(&"writer".to_string()));
472 }
473
474 #[test]
475 fn test_parallel_agents_one_wave() {
476 let config = make_config(vec![
477 simple_agent("researcher-1", vec![]),
478 simple_agent("researcher-2", vec![]),
479 simple_agent("researcher-3", vec![]),
480 ]);
481 let waves = build_execution_waves(&config).unwrap();
482 assert_eq!(waves.len(), 1);
483 assert_eq!(waves[0].len(), 3);
484 }
485
486 #[test]
487 fn test_fan_in_pattern() {
488 let config = make_config(vec![
490 simple_agent("researcher-1", vec![]),
491 simple_agent("researcher-2", vec![]),
492 simple_agent("writer", vec!["researcher-1", "researcher-2"]),
493 ]);
494 let waves = build_execution_waves(&config).unwrap();
495 assert_eq!(waves.len(), 2);
496 assert_eq!(waves[0].len(), 2); assert_eq!(waves[1].len(), 1); assert!(waves[1].contains(&"writer".to_string()));
499 }
500
501 #[test]
502 fn test_cyclic_dependency_detected() {
503 let agent_a = simple_agent("a", vec!["b"]);
505 let agent_b = simple_agent("b", vec!["a"]);
506 let config = make_config(vec![agent_a, agent_b]);
507 let result = build_execution_waves(&config);
508 assert!(result.is_err());
509 }
510
511 #[test]
512 fn test_cyclic_dependency_detected2() {
513 let agent_a = simple_agent("a", vec!["b"]);
515 let agent_b = simple_agent("b", vec!["c"]);
516 let agent_c = simple_agent("c", vec!["b"]);
517 let config = make_config(vec![agent_a, agent_b, agent_c]);
518 let result = build_execution_waves(&config);
519 assert!(result.is_err());
520 }
521
522 #[test]
523 fn test_build_agent_task_no_deps() {
524 let outputs = HashMap::new();
525 let task = build_agent_task("Research Go", &[], &outputs);
526 assert_eq!(task, "Research Go");
527 }
528
529 #[test]
530 fn test_build_agent_task_with_deps() {
531 let mut outputs = HashMap::new();
532 outputs.insert(
533 "researcher".to_string(),
534 "Go is fast and concurrent.".to_string(),
535 );
536 let task = build_agent_task("Write a report", &["researcher".to_string()], &outputs);
537 assert!(task.contains("Write a report"));
538 assert!(task.contains("Go is fast and concurrent."));
539 assert!(task.contains("[researcher]"));
540 }
541
542 #[test]
543 fn test_find_final_output_single() {
544 let config = make_config(vec![
545 simple_agent("researcher", vec![]),
546 simple_agent("writer", vec!["researcher"]),
547 ]);
548 let mut outputs = HashMap::new();
549 outputs.insert("researcher".to_string(), "research done".to_string());
550 outputs.insert("writer".to_string(), "report written".to_string());
551
552 let result = find_final_output(&config, &outputs).unwrap();
553 assert_eq!(result, "report written");
554 }
555
556 #[test]
557 fn test_find_final_output_multiple() {
558 let config = make_config(vec![
560 simple_agent("agent-a", vec![]),
561 simple_agent("agent-b", vec![]),
562 ]);
563 let mut outputs = HashMap::new();
564 outputs.insert("agent-a".to_string(), "output a".to_string());
565 outputs.insert("agent-b".to_string(), "output b".to_string());
566
567 let result = find_final_output(&config, &outputs).unwrap();
568 assert!(result.contains("output a") || result.contains("output b"));
569 }
570}