Skip to main content

routex/
runtime.rs

1use std::{
2    collections::{HashMap, HashSet, VecDeque},
3    sync::Arc,
4};
5
6use tokio::sync::mpsc;
7
8use crate::{
9    Result, RoutexError,
10    agent::{Agent, AgentMessage},
11    config::Config,
12    llm::{Adapter, anthropic::AnthropicAdapter, openai::OpenAIAdapter},
13    tools::Registry,
14};
15
16/// RunResult is what you get back after a crew completes.
17/// Contains the final output from the last agent in the graph
18/// and all individual agent outputs for inspection.
19#[derive(Debug, Clone)]
20pub struct RunResult {
21    /// The final output — from the last agent in the dependency graph
22    pub output: String,
23
24    /// Individual outputs from every agent keyed by agent ID
25    pub agent_outputs: HashMap<String, String>,
26
27    /// Total token usage across all agents
28    pub total_input_tokens: u32,
29    pub total_output_tokens: u32,
30}
31
32/// Runtime is the top-level entry point for routex-rs.
33///
34/// It owns the tool registry, the LLM adapter, and the agent configs.
35/// It is responsible for scheduling agents, running them in the correct
36/// order, and collecting results.
37///
38/// Library usage:
39///
40///   let runtime = Runtime::from_config("agents.yaml")?;
41///   let result = runtime.run().await?;
42///   println!("{}", result.output);
43///
44/// Programmatic usage:
45///
46///   let mut runtime = Runtime::new(config);
47///   runtime.register_tool(MyTool::new());
48///   let result = runtime.run().await?;
49pub struct Runtime {
50    config: Config,
51    registry: Arc<Registry>,
52    adapter: Option<Arc<dyn Adapter + Send + Sync>>,
53}
54
55impl Runtime {
56    /// Load a config file and create a Runtime.
57    /// This is the primary entry point for most users.
58    pub fn from_file(path: impl AsRef<std::path::Path>) -> Result<Self> {
59        let config = Config::from_file(path)?;
60        Self::from_config(config)
61    }
62
63    /// Create a Runtime from an already-parsed Config.
64    pub fn from_config(config: Config) -> Result<Self> {
65        let mut registry = Registry::new();
66
67        // Auto-register built-in tools declared in config
68        for tool_cfg in &config.tools {
69            match tool_cfg.name.as_str() {
70                "web_search" => {
71                    registry.register(crate::tools::web_search::WebSearchTool::new());
72                }
73                unknown => {
74                    return Err(RoutexError::ToolNotFound {
75                        name: unknown.to_string(),
76                    });
77                }
78            }
79        }
80
81        Ok(Self {
82            config,
83            registry: Arc::new(registry),
84            adapter: None,
85        })
86    }
87
88    /// Register a tool with the runtime.
89    /// Must be called before run().
90    pub fn register_tool(&mut self, tool: impl crate::tools::Tool + 'static) {
91        if let Some(registry) = Arc::get_mut(&mut self.registry) {
92            registry.register(tool);
93        }
94    }
95
96    /// Run the crew and return when all agents complete.
97    ///
98    /// It:
99    ///   1. Builds the dependency graph from config
100    ///   2. Runs agents in topological order — independent agents in parallel
101    ///   3. Passes results from upstream agents to downstream agents
102    ///   4. Returns the final output when all agents complete
103    pub async fn run(&self) -> Result<RunResult> {
104        // Validate tool references before starting
105        self.validate_tool_references()?;
106
107        let adapter = build_adapter(&self.config)?;
108
109        let agent_count = self.config.agents.len();
110
111        let (status_tx, mut status_rx) = mpsc::channel::<AgentMessage>(agent_count * 10);
112
113        // Track outputs from completed agents
114        let mut agent_outputs: HashMap<String, String> = HashMap::new();
115
116        let waves = build_execution_waves(&self.config)?;
117
118        // Execute wave by wave
119        // Each wave contains agents that can run in parallel
120        for wave in waves {
121            // Spawn all agents in this wave concurrently
122            let mut handles = Vec::new();
123
124            for agent_id in &wave {
125                let agent_config = self
126                    .config
127                    .agents
128                    .iter()
129                    .find(|a| &a.id == agent_id)
130                    .expect("agent in wave must exist in config")
131                    .clone();
132
133                // Build the task input for this agent
134                // Include the original task + outputs from dependencies
135                let task = build_agent_task(
136                    &self.config.task.input,
137                    &agent_config.depends,
138                    &agent_outputs,
139                );
140
141                let agent = Agent::new(
142                    agent_config,
143                    Arc::clone(&adapter),
144                    Arc::clone(&self.registry),
145                );
146
147                // Create channels for this agent
148                let (inbox_tx, inbox_rx) = mpsc::channel::<String>(1);
149                let status_tx = status_tx.clone();
150
151                // Send the task to the agent's inbox
152                inbox_tx
153                    .send(task)
154                    .await
155                    .map_err(|e| RoutexError::AgentFailed {
156                        id: agent_id.clone(),
157                        reason: format!("failed to send task: {}", e),
158                    })?;
159
160                // Spawn the agent as an independent Tokio task
161                let handle = tokio::spawn(async move { agent.run(inbox_rx, status_tx).await });
162
163                handles.push((agent_id.clone(), handle));
164            }
165
166            // Wait for all agents in this wave to complete
167            // Same as wg.Wait() in Go — blocks until the wave is done
168            for (agent_id, handle) in handles {
169                match handle.await {
170                    Ok(Ok(output)) => {
171                        agent_outputs.insert(agent_id, output);
172                    }
173                    Ok(Err(e)) => {
174                        return Err(RoutexError::AgentFailed {
175                            id: agent_id,
176                            reason: e.to_string(),
177                        });
178                    }
179                    Err(e) => {
180                        // JoinError — the task panicked
181                        return Err(RoutexError::AgentFailed {
182                            id: agent_id,
183                            reason: format!("task panicked: {}", e),
184                        });
185                    }
186                }
187            }
188        }
189
190        // Drain remaining status messages
191        drop(status_tx);
192        while status_rx.try_recv().is_ok() {}
193
194        // The final output is from the last agent in the dependency graph
195        // — the agent with no dependents
196        let final_output = find_final_output(&self.config, &agent_outputs)?;
197
198        Ok(RunResult {
199            output: final_output,
200            agent_outputs,
201            total_input_tokens: 0, // TODO: collect from agent status
202            total_output_tokens: 0,
203        })
204    }
205
206    /// Validate that all tool references in agent configs exist
207    /// in the registry. Catches configuration mistakes early.
208    fn validate_tool_references(&self) -> Result<()> {
209        for agent in &self.config.agents {
210            for tool_name in &agent.tools {
211                if !self.registry.has(tool_name) {
212                    return Err(RoutexError::ToolNotFound {
213                        name: tool_name.clone(),
214                    });
215                }
216            }
217        }
218        Ok(())
219    }
220
221    /// List all registered tools.
222    /// Used by the CLI's `routex tools list` command.
223    pub fn list_tools(&self) -> Vec<crate::tools::ToolInfo> {
224        self.registry.list()
225    }
226}
227
228/// Find the final output — from the agent with no dependents.
229/// In a linear crew (A → B → C), C is the final agent.
230/// In a fan-in crew (A, B → C), C is the final agent.
231fn find_final_output(config: &Config, outputs: &HashMap<String, String>) -> Result<String> {
232    // Find agent IDs that no other agent depends on
233    let all_deps: HashSet<String> = config
234        .agents
235        .iter()
236        .flat_map(|a| a.depends.iter().cloned())
237        .collect();
238
239    let final_agents: Vec<&str> = config
240        .agents
241        .iter()
242        .filter(|a| !all_deps.contains(&a.id))
243        .map(|a| a.id.as_str())
244        .collect();
245
246    match final_agents.len() {
247        0 => Err(RoutexError::Config(
248            "could not determine final agent".to_string(),
249        )),
250        1 => {
251            let id = final_agents[0];
252            outputs
253                .get(id)
254                .cloned()
255                .ok_or_else(|| RoutexError::AgentFailed {
256                    id: id.to_string(),
257                    reason: "no output recorded".to_string(),
258                })
259        }
260        _ => {
261            // Multiple final agents — concatenate their outputs
262            let combined = final_agents
263                .iter()
264                .filter_map(|id| outputs.get(*id))
265                .cloned()
266                .collect::<Vec<_>>()
267                .join("\n\n");
268            Ok(combined)
269        }
270    }
271}
272
273/// Build the task input for an agent.
274/// Includes the original task and outputs from dependency agents.
275fn build_agent_task(
276    original_task: &str,
277    depends: &[String],
278    outputs: &HashMap<String, String>,
279) -> String {
280    if depends.is_empty() {
281        return original_task.to_string();
282    }
283
284    // Build context from dependency outputs
285    let mut context = format!("Task: {}\n\nContext from previous agents:\n", original_task);
286
287    for dep_id in depends {
288        if let Some(output) = outputs.get(dep_id) {
289            context.push_str(&format!("\n[{}]:\n{}\n", dep_id, output));
290        }
291    }
292
293    context
294}
295
296/// Build the execution waves using Kahn's algorithm.
297///
298/// Returns a Vec of waves where each wave contains agent IDs
299/// that can run in parallel. Wave N only starts after wave N-1 completes.
300///
301/// The algorithm:
302///   1. Count in-degrees (how many dependencies each agent has)
303///   2. Start with agents that have zero dependencies (wave 1)
304///   3. When an agent completes, decrement in-degrees of its dependents
305///   4. Add newly zero-degree agents to the next wave
306///   5. Repeat until all agents are scheduled
307fn build_execution_waves(config: &Config) -> Result<Vec<Vec<String>>> {
308    // Build how many dependencies each agent has
309    let mut in_degree: HashMap<String, usize> = HashMap::new();
310
311    // Build dependencies map - which agents depend on each agent
312    let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
313
314    for agent in &config.agents {
315        in_degree.entry(agent.id.clone()).or_insert(0);
316        dependents.entry(agent.id.clone()).or_default();
317
318        for dep in &agent.depends {
319            *in_degree.entry(agent.id.clone()).or_insert(0) += 1;
320            dependents
321                .entry(dep.clone())
322                .or_default()
323                .push(agent.id.clone());
324        }
325    }
326
327    let mut queue: VecDeque<String> = in_degree
328        .iter()
329        .filter(|&(_, &degree)| degree == 0)
330        .map(|(id, _)| id.clone())
331        .collect();
332
333    if queue.is_empty() && !config.agents.is_empty() {
334        return Err(RoutexError::Config(
335            "all agents have dependencies — possible cycle".to_string(),
336        ));
337    }
338
339    let mut waves: Vec<Vec<String>> = Vec::new();
340    let mut scheduled: HashSet<String> = HashSet::new();
341
342    // Process wave by wave
343    while !queue.is_empty() {
344        // Current wave = everything in the queue right now
345        let wave: Vec<String> = queue.drain(..).collect();
346
347        for id in &wave {
348            scheduled.insert(id.clone());
349
350            // Decrement in-degree of dependents
351            if let Some(deps) = dependents.get(id) {
352                for dependent in deps {
353                    let degree = in_degree.get_mut(dependent).unwrap();
354                    *degree -= 1;
355                    // If all dependencies are satisfied, add to next wave
356                    if *degree == 0 {
357                        queue.push_back(dependent.clone());
358                    }
359                }
360            }
361        }
362
363        waves.push(wave);
364    }
365
366    // If not all agents were scheduled, there's a cycle
367    if scheduled.len() != config.agents.len() {
368        let unscheduled: Vec<String> = config
369            .agents
370            .iter()
371            .filter(|a| !scheduled.contains(&a.id))
372            .map(|a| a.id.clone())
373            .collect();
374
375        return Err(RoutexError::CyclicDependency {
376            id: unscheduled.first().cloned().unwrap_or_default(),
377        });
378    }
379
380    Ok(waves)
381}
382
383/// Build an LLM adapter from the runtime config.
384/// Currently supports Anthropic — OpenAI and Ollama come next.
385fn build_adapter(config: &Config) -> Result<Arc<dyn Adapter + Send + Sync>> {
386    match config.runtime.llm_provider.as_str() {
387        "anthropic" => {
388            if config.runtime.api_key.is_empty() {
389                return Err(RoutexError::Config(
390                    "anthropic provider require an api_key".to_string(),
391                ));
392            }
393            Ok(Arc::new(AnthropicAdapter::new(
394                &config.runtime.api_key,
395                &config.runtime.model,
396            )))
397        }
398        "openai" => {
399            if config.runtime.api_key.is_empty() {
400                return Err(RoutexError::Config(
401                    "openai provider require an api_key".to_string(),
402                ));
403            }
404            Ok(Arc::new(OpenAIAdapter::new(
405                &config.runtime.api_key,
406                &config.runtime.model,
407            )))
408        }
409        other => Err(RoutexError::Config(format!(
410            "unknown llm_provider '{}' - supported: anthropic",
411            other
412        ))),
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use crate::config::{AgentConfig, Config, RuntimeConfig, TaskConfig};
420
421    fn make_config(agents: Vec<AgentConfig>) -> Config {
422        Config {
423            runtime: RuntimeConfig {
424                name: "test".to_string(),
425                llm_provider: "anthropic".to_string(),
426                model: "claude-haiku-4-5-20251001".to_string(),
427                api_key: "test-key".to_string(),
428                base_url: None,
429                log_level: "info".to_string(),
430                max_tokens: 4096,
431            },
432            task: TaskConfig {
433                input: "Research Go frameworks".to_string(),
434            },
435            agents,
436            tools: vec![],
437        }
438    }
439
440    fn simple_agent(id: &str, depends: Vec<&str>) -> AgentConfig {
441        AgentConfig {
442            id: id.to_string(),
443            role: crate::config::Role::Researcher,
444            goal: "research".to_string(),
445            backstory: None,
446            tools: vec![],
447            depends: depends.iter().map(|s| s.to_string()).collect(),
448            restart: "one_for_one".to_string(),
449            llm: None,
450            max_tool_calls: 20,
451        }
452    }
453
454    #[test]
455    fn test_single_agent_wave() {
456        let config = make_config(vec![simple_agent("researcher", vec![])]);
457        let waves = build_execution_waves(&config).unwrap();
458        assert_eq!(waves.len(), 1);
459        assert_eq!(waves[0], vec!["researcher"]);
460    }
461
462    #[test]
463    fn test_sequential_agents_two_waves() {
464        let config = make_config(vec![
465            simple_agent("researcher", vec![]),
466            simple_agent("writer", vec!["researcher"]),
467        ]);
468        let waves = build_execution_waves(&config).unwrap();
469        assert_eq!(waves.len(), 2);
470        assert!(waves[0].contains(&"researcher".to_string()));
471        assert!(waves[1].contains(&"writer".to_string()));
472    }
473
474    #[test]
475    fn test_parallel_agents_one_wave() {
476        let config = make_config(vec![
477            simple_agent("researcher-1", vec![]),
478            simple_agent("researcher-2", vec![]),
479            simple_agent("researcher-3", vec![]),
480        ]);
481        let waves = build_execution_waves(&config).unwrap();
482        assert_eq!(waves.len(), 1);
483        assert_eq!(waves[0].len(), 3);
484    }
485
486    #[test]
487    fn test_fan_in_pattern() {
488        // researcher-1, researcher-2 → writer
489        let config = make_config(vec![
490            simple_agent("researcher-1", vec![]),
491            simple_agent("researcher-2", vec![]),
492            simple_agent("writer", vec!["researcher-1", "researcher-2"]),
493        ]);
494        let waves = build_execution_waves(&config).unwrap();
495        assert_eq!(waves.len(), 2);
496        assert_eq!(waves[0].len(), 2); // two parallel researchers
497        assert_eq!(waves[1].len(), 1); // one writer
498        assert!(waves[1].contains(&"writer".to_string()));
499    }
500
501    #[test]
502    fn test_cyclic_dependency_detected() {
503        // A depends on B, B depends on A — cycle
504        let agent_a = simple_agent("a", vec!["b"]);
505        let agent_b = simple_agent("b", vec!["a"]);
506        let config = make_config(vec![agent_a, agent_b]);
507        let result = build_execution_waves(&config);
508        assert!(result.is_err());
509    }
510
511    #[test]
512    fn test_cyclic_dependency_detected2() {
513        // A depends on B, B depends on A — cycle
514        let agent_a = simple_agent("a", vec!["b"]);
515        let agent_b = simple_agent("b", vec!["c"]);
516        let agent_c = simple_agent("c", vec!["b"]);
517        let config = make_config(vec![agent_a, agent_b, agent_c]);
518        let result = build_execution_waves(&config);
519        assert!(result.is_err());
520    }
521
522    #[test]
523    fn test_build_agent_task_no_deps() {
524        let outputs = HashMap::new();
525        let task = build_agent_task("Research Go", &[], &outputs);
526        assert_eq!(task, "Research Go");
527    }
528
529    #[test]
530    fn test_build_agent_task_with_deps() {
531        let mut outputs = HashMap::new();
532        outputs.insert(
533            "researcher".to_string(),
534            "Go is fast and concurrent.".to_string(),
535        );
536        let task = build_agent_task("Write a report", &["researcher".to_string()], &outputs);
537        assert!(task.contains("Write a report"));
538        assert!(task.contains("Go is fast and concurrent."));
539        assert!(task.contains("[researcher]"));
540    }
541
542    #[test]
543    fn test_find_final_output_single() {
544        let config = make_config(vec![
545            simple_agent("researcher", vec![]),
546            simple_agent("writer", vec!["researcher"]),
547        ]);
548        let mut outputs = HashMap::new();
549        outputs.insert("researcher".to_string(), "research done".to_string());
550        outputs.insert("writer".to_string(), "report written".to_string());
551
552        let result = find_final_output(&config, &outputs).unwrap();
553        assert_eq!(result, "report written");
554    }
555
556    #[test]
557    fn test_find_final_output_multiple() {
558        // Two agents with no dependents — both are final
559        let config = make_config(vec![
560            simple_agent("agent-a", vec![]),
561            simple_agent("agent-b", vec![]),
562        ]);
563        let mut outputs = HashMap::new();
564        outputs.insert("agent-a".to_string(), "output a".to_string());
565        outputs.insert("agent-b".to_string(), "output b".to_string());
566
567        let result = find_final_output(&config, &outputs).unwrap();
568        assert!(result.contains("output a") || result.contains("output b"));
569    }
570}