Skip to main content

codetether_agent/swarm/
orchestrator.rs

1//! Orchestrator for decomposing tasks and coordinating sub-agents
2//!
3//! The orchestrator analyzes complex tasks and decomposes them into
4//! parallelizable subtasks, then coordinates their execution.
5
6use super::{
7    DecompositionStrategy, SubAgent, SubTask, SubTaskResult, SubTaskStatus, SwarmConfig, SwarmStats,
8};
9use crate::provider::{CompletionRequest, ContentPart, Message, ProviderRegistry, Role};
10use anyhow::Result;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14/// The orchestrator manages task decomposition and sub-agent coordination
15pub struct Orchestrator {
16    /// Configuration
17    config: SwarmConfig,
18
19    /// Provider registry for AI calls
20    providers: ProviderRegistry,
21
22    /// All subtasks
23    subtasks: HashMap<String, SubTask>,
24
25    /// All sub-agents
26    subagents: HashMap<String, SubAgent>,
27
28    /// Completed subtask IDs
29    completed: Vec<String>,
30
31    /// Current model for orchestration
32    model: String,
33
34    /// Current provider
35    provider: String,
36
37    /// Stats
38    stats: SwarmStats,
39}
40
41impl Orchestrator {
42    /// Create a new orchestrator
43    pub async fn new(config: SwarmConfig) -> Result<Self> {
44        use crate::provider::parse_model_string;
45
46        let providers = ProviderRegistry::from_vault().await?;
47        let provider_list = providers.list();
48
49        if provider_list.is_empty() {
50            anyhow::bail!("No providers available for orchestration");
51        }
52
53        // Parse model from config, env var, or use default
54        let model_str = config
55            .model
56            .clone()
57            .or_else(|| std::env::var("CODETETHER_DEFAULT_MODEL").ok());
58
59        let (provider, model) = if let Some(ref model_str) = model_str {
60            let (prov, mod_id) = parse_model_string(model_str);
61            let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
62            let provider = if let Some(explicit_provider) = prov {
63                if provider_list.contains(&explicit_provider) {
64                    explicit_provider.to_string()
65                } else {
66                    anyhow::bail!(
67                        "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
68                        explicit_provider,
69                        provider_list.join(", ")
70                    );
71                }
72            } else {
73                choose_default_provider(provider_list.as_slice())
74                    .ok_or_else(|| anyhow::anyhow!("No providers available for orchestration"))?
75                    .to_string()
76            };
77            let model = if mod_id.trim().is_empty() {
78                default_model_for_provider(&provider)
79            } else {
80                mod_id.to_string()
81            };
82            (provider, model)
83        } else {
84            let provider = choose_default_provider(provider_list.as_slice())
85                .ok_or_else(|| anyhow::anyhow!("No providers available for orchestration"))?
86                .to_string();
87            let model = default_model_for_provider(&provider);
88            (provider, model)
89        };
90
91        tracing::info!("Orchestrator using model {} via {}", model, provider);
92
93        Ok(Self {
94            config,
95            providers,
96            subtasks: HashMap::new(),
97            subagents: HashMap::new(),
98            completed: Vec::new(),
99            model,
100            provider,
101            stats: SwarmStats::default(),
102        })
103    }
104
105    fn prefers_temperature_one(model: &str) -> bool {
106        let normalized = model.to_ascii_lowercase();
107        normalized.contains("kimi-k2")
108            || normalized.contains("glm-")
109            || normalized.contains("minimax")
110    }
111
112    /// Decompose a complex task into subtasks
113    pub async fn decompose(
114        &mut self,
115        task: &str,
116        strategy: DecompositionStrategy,
117    ) -> Result<Vec<SubTask>> {
118        if strategy == DecompositionStrategy::None {
119            // Single task, no decomposition
120            let subtask = SubTask::new("Main Task", task);
121            self.subtasks.insert(subtask.id.clone(), subtask.clone());
122            return Ok(vec![subtask]);
123        }
124
125        // Use AI to decompose the task
126        let decomposition_prompt = self.build_decomposition_prompt(task, strategy);
127
128        let provider = self
129            .providers
130            .get(&self.provider)
131            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", self.provider))?;
132
133        let temperature = if Self::prefers_temperature_one(&self.model) {
134            1.0
135        } else {
136            0.7
137        };
138
139        let request = CompletionRequest {
140            messages: vec![Message {
141                role: Role::User,
142                content: vec![ContentPart::Text {
143                    text: decomposition_prompt,
144                }],
145            }],
146            tools: Vec::new(),
147            model: self.model.clone(),
148            temperature: Some(temperature),
149            top_p: None,
150            max_tokens: Some(8192),
151            stop: Vec::new(),
152        };
153
154        let response = provider.complete(request).await?;
155
156        // Parse the decomposition response
157        let text = response
158            .message
159            .content
160            .iter()
161            .filter_map(|p| match p {
162                ContentPart::Text { text } => Some(text.clone()),
163                _ => None,
164            })
165            .collect::<Vec<_>>()
166            .join("\n");
167
168        tracing::debug!("Decomposition response: {}", text);
169
170        if text.trim().is_empty() {
171            // Fallback to single task if decomposition fails
172            tracing::warn!("Empty decomposition response, falling back to single task");
173            let subtask = SubTask::new("Main Task", task);
174            self.subtasks.insert(subtask.id.clone(), subtask.clone());
175            return Ok(vec![subtask]);
176        }
177
178        let subtasks = self.parse_decomposition(&text)?;
179
180        // Store subtasks
181        for subtask in &subtasks {
182            self.subtasks.insert(subtask.id.clone(), subtask.clone());
183        }
184
185        // Assign stages based on dependencies
186        self.assign_stages();
187
188        tracing::info!(
189            "Decomposed task into {} subtasks across {} stages",
190            subtasks.len(),
191            self.max_stage() + 1
192        );
193
194        Ok(subtasks)
195    }
196
197    /// Build the decomposition prompt
198    fn build_decomposition_prompt(&self, task: &str, strategy: DecompositionStrategy) -> String {
199        let strategy_instruction = match strategy {
200            DecompositionStrategy::Automatic => {
201                "Analyze the task and determine the optimal way to decompose it into parallel subtasks."
202            }
203            DecompositionStrategy::ByDomain => {
204                "Decompose the task by domain expertise (e.g., research, coding, analysis, verification)."
205            }
206            DecompositionStrategy::ByData => {
207                "Decompose the task by data partition (e.g., different files, sections, or datasets)."
208            }
209            DecompositionStrategy::ByStage => {
210                "Decompose the task by workflow stages (e.g., gather, process, synthesize)."
211            }
212            DecompositionStrategy::None => unreachable!(),
213        };
214
215        format!(
216            r#"You are a task orchestrator. Your job is to decompose complex tasks into parallelizable subtasks.
217
218TASK: {task}
219
220STRATEGY: {strategy_instruction}
221
222CONSTRAINTS:
223- Maximum {max_subtasks} subtasks
224- Each subtask should be independently executable
225- Identify dependencies between subtasks (which must complete before others can start)
226- Assign a specialty/role to each subtask
227
228OUTPUT FORMAT (JSON):
229```json
230{{
231  "subtasks": [
232    {{
233      "name": "Subtask Name",
234      "instruction": "Detailed instruction for this subtask",
235      "specialty": "Role/specialty (e.g., Researcher, Coder, Analyst)",
236      "dependencies": ["id-of-dependency-1"],
237      "priority": 1
238    }}
239  ]
240}}
241```
242
243Decompose the task now:"#,
244            task = task,
245            strategy_instruction = strategy_instruction,
246            max_subtasks = self.config.max_subagents,
247        )
248    }
249
250    /// Parse the decomposition response
251    fn parse_decomposition(&self, response: &str) -> Result<Vec<SubTask>> {
252        // Try to extract JSON from the response
253        let json_str = if let Some(start) = response.find("```json") {
254            let start = start + 7;
255            if let Some(end) = response[start..].find("```") {
256                &response[start..start + end]
257            } else {
258                response
259            }
260        } else if let Some(start) = response.find('{') {
261            if let Some(end) = response.rfind('}') {
262                &response[start..=end]
263            } else {
264                response
265            }
266        } else {
267            response
268        };
269
270        #[derive(Deserialize)]
271        struct DecompositionResponse {
272            subtasks: Vec<SubTaskDef>,
273        }
274
275        #[derive(Deserialize)]
276        struct SubTaskDef {
277            name: String,
278            instruction: String,
279            specialty: Option<String>,
280            #[serde(default)]
281            dependencies: Vec<String>,
282            #[serde(default)]
283            priority: i32,
284        }
285
286        let parsed: DecompositionResponse = serde_json::from_str(json_str.trim())
287            .map_err(|e| anyhow::anyhow!("Failed to parse decomposition: {}", e))?;
288
289        // Create SubTask objects with proper IDs
290        let mut subtasks = Vec::new();
291        let mut name_to_id: HashMap<String, String> = HashMap::new();
292
293        // First pass: create subtasks and map names to IDs
294        for def in &parsed.subtasks {
295            let subtask = SubTask::new(&def.name, &def.instruction).with_priority(def.priority);
296
297            let subtask = if let Some(ref specialty) = def.specialty {
298                subtask.with_specialty(specialty)
299            } else {
300                subtask
301            };
302
303            name_to_id.insert(def.name.clone(), subtask.id.clone());
304            subtasks.push((subtask, def.dependencies.clone()));
305        }
306
307        // Second pass: resolve dependencies
308        let result: Vec<SubTask> = subtasks
309            .into_iter()
310            .map(|(mut subtask, deps)| {
311                let resolved_deps: Vec<String> = deps
312                    .iter()
313                    .filter_map(|dep| name_to_id.get(dep).cloned())
314                    .collect();
315                subtask.dependencies = resolved_deps;
316                subtask
317            })
318            .collect();
319
320        Ok(result)
321    }
322
323    /// Assign stages to subtasks based on dependencies
324    fn assign_stages(&mut self) {
325        let mut changed = true;
326
327        while changed {
328            changed = false;
329
330            // First collect all updates needed
331            let updates: Vec<(String, usize)> = self
332                .subtasks
333                .iter()
334                .filter_map(|(id, subtask)| {
335                    if subtask.dependencies.is_empty() {
336                        if subtask.stage != 0 {
337                            Some((id.clone(), 0))
338                        } else {
339                            None
340                        }
341                    } else {
342                        let max_dep_stage = subtask
343                            .dependencies
344                            .iter()
345                            .filter_map(|dep_id| self.subtasks.get(dep_id))
346                            .map(|dep| dep.stage)
347                            .max()
348                            .unwrap_or(0);
349
350                        let new_stage = max_dep_stage + 1;
351                        if subtask.stage != new_stage {
352                            Some((id.clone(), new_stage))
353                        } else {
354                            None
355                        }
356                    }
357                })
358                .collect();
359
360            // Then apply updates
361            for (id, new_stage) in updates {
362                if let Some(subtask) = self.subtasks.get_mut(&id) {
363                    subtask.stage = new_stage;
364                    changed = true;
365                }
366            }
367        }
368    }
369
370    /// Get maximum stage number
371    fn max_stage(&self) -> usize {
372        self.subtasks.values().map(|s| s.stage).max().unwrap_or(0)
373    }
374
375    /// Get subtasks ready to execute (dependencies satisfied)
376    pub fn ready_subtasks(&self) -> Vec<&SubTask> {
377        self.subtasks
378            .values()
379            .filter(|s| s.status == SubTaskStatus::Pending && s.can_run(&self.completed))
380            .collect()
381    }
382
383    /// Get subtasks for a specific stage
384    pub fn subtasks_for_stage(&self, stage: usize) -> Vec<&SubTask> {
385        self.subtasks
386            .values()
387            .filter(|s| s.stage == stage)
388            .collect()
389    }
390
391    /// Create a sub-agent for a subtask
392    pub fn create_subagent(&mut self, subtask: &SubTask) -> SubAgent {
393        let specialty = subtask
394            .specialty
395            .clone()
396            .unwrap_or_else(|| "General".to_string());
397        let name = format!("{} Agent", specialty);
398
399        let subagent = SubAgent::new(name, specialty, &subtask.id, &self.model, &self.provider);
400
401        self.subagents.insert(subagent.id.clone(), subagent.clone());
402        self.stats.subagents_spawned += 1;
403
404        subagent
405    }
406
407    /// Mark a subtask as completed
408    pub fn complete_subtask(&mut self, subtask_id: &str, result: SubTaskResult) {
409        if let Some(subtask) = self.subtasks.get_mut(subtask_id) {
410            subtask.complete(result.success);
411
412            if result.success {
413                self.completed.push(subtask_id.to_string());
414                self.stats.subagents_completed += 1;
415            } else {
416                self.stats.subagents_failed += 1;
417            }
418
419            self.stats.total_tool_calls += result.tool_calls;
420        }
421    }
422
423    /// Get all subtasks
424    pub fn all_subtasks(&self) -> Vec<&SubTask> {
425        self.subtasks.values().collect()
426    }
427
428    /// Get statistics
429    pub fn stats(&self) -> &SwarmStats {
430        &self.stats
431    }
432
433    /// Get mutable statistics
434    pub fn stats_mut(&mut self) -> &mut SwarmStats {
435        &mut self.stats
436    }
437
438    /// Check if all subtasks are complete
439    pub fn is_complete(&self) -> bool {
440        self.subtasks.values().all(|s| {
441            matches!(
442                s.status,
443                SubTaskStatus::Completed | SubTaskStatus::Failed | SubTaskStatus::Cancelled
444            )
445        })
446    }
447
448    /// Get the provider registry
449    pub fn providers(&self) -> &ProviderRegistry {
450        &self.providers
451    }
452
453    /// Get current model
454    pub fn model(&self) -> &str {
455        &self.model
456    }
457
458    /// Get current provider
459    pub fn provider(&self) -> &str {
460        &self.provider
461    }
462}
463
464pub(crate) fn choose_default_provider<'a>(providers: &'a [&'a str]) -> Option<&'a str> {
465    let preferred = [
466        "openai",
467        "anthropic",
468        "github-copilot",
469        "github-copilot-enterprise",
470        "openai-codex",
471        "zai",
472        "minimax",
473        "moonshotai",
474        "openrouter",
475        "novita",
476        "google",
477        "bedrock",
478    ];
479    for name in preferred {
480        if let Some(found) = providers.iter().copied().find(|p| *p == name) {
481            return Some(found);
482        }
483    }
484    providers.first().copied()
485}
486
487/// Get default model for a provider.
488pub(crate) fn default_model_for_provider(provider: &str) -> String {
489    match provider {
490        "moonshotai" => "kimi-k2.5".to_string(),
491        "anthropic" => "claude-sonnet-4-20250514".to_string(),
492        "bedrock" => "us.anthropic.claude-opus-4-6-v1".to_string(),
493        "openai" => "gpt-4o".to_string(),
494        "google" => "gemini-2.5-pro".to_string(),
495        "zhipuai" | "zai" => "glm-5".to_string(),
496        "openrouter" => "z-ai/glm-5".to_string(),
497        "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
498        "github-copilot" | "github-copilot-enterprise" | "openai-codex" => "gpt-5-mini".to_string(),
499        _ => "gpt-4o".to_string(),
500    }
501}
502
503/// Message from sub-agent to orchestrator
504#[derive(Debug, Clone, Serialize, Deserialize)]
505pub enum SubAgentMessage {
506    /// Progress update
507    Progress {
508        subagent_id: String,
509        subtask_id: String,
510        steps: usize,
511        status: String,
512    },
513
514    /// Tool call made
515    ToolCall {
516        subagent_id: String,
517        tool_name: String,
518        success: bool,
519    },
520
521    /// Subtask completed
522    Completed {
523        subagent_id: String,
524        result: SubTaskResult,
525    },
526
527    /// Request for resources
528    ResourceRequest {
529        subagent_id: String,
530        resource_type: String,
531        resource_id: String,
532    },
533}
534
535/// Message from orchestrator to sub-agent
536#[derive(Debug, Clone, Serialize, Deserialize)]
537pub enum OrchestratorMessage {
538    /// Start execution
539    Start { subtask: Box<SubTask> },
540
541    /// Provide resource
542    Resource {
543        resource_type: String,
544        resource_id: String,
545        content: String,
546    },
547
548    /// Terminate execution
549    Terminate { reason: String },
550
551    /// Context update (from completed dependency)
552    ContextUpdate {
553        dependency_id: String,
554        result: String,
555    },
556}