Skip to main content

codetether_agent/swarm/
orchestrator.rs

1//! Orchestrator for decomposing tasks and coordinating sub-agents
2//!
3//! The orchestrator analyzes complex tasks and decomposes them into
4//! parallelizable subtasks, then coordinates their execution.
5
6use super::{
7    DecompositionStrategy, SubAgent, SubTask, SubTaskResult, SubTaskStatus, SwarmConfig, SwarmStats,
8};
9use crate::provider::{CompletionRequest, ContentPart, Message, ProviderRegistry, Role};
10use anyhow::Result;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14/// The orchestrator manages task decomposition and sub-agent coordination
15pub struct Orchestrator {
16    /// Configuration
17    config: SwarmConfig,
18
19    /// Provider registry for AI calls
20    providers: ProviderRegistry,
21
22    /// All subtasks
23    subtasks: HashMap<String, SubTask>,
24
25    /// All sub-agents
26    subagents: HashMap<String, SubAgent>,
27
28    /// Completed subtask IDs
29    completed: Vec<String>,
30
31    /// Current model for orchestration
32    model: String,
33
34    /// Current provider
35    provider: String,
36
37    /// Stats
38    stats: SwarmStats,
39}
40
41impl Orchestrator {
42    /// Create a new orchestrator
43    pub async fn new(config: SwarmConfig) -> Result<Self> {
44        use crate::provider::parse_model_string;
45
46        let providers = ProviderRegistry::from_vault().await?;
47        let provider_list = providers.list();
48
49        if provider_list.is_empty() {
50            anyhow::bail!("No providers available for orchestration");
51        }
52
53        // Parse model from config, env var, or use default
54        let model_str = config
55            .model
56            .clone()
57            .or_else(|| std::env::var("CODETETHER_DEFAULT_MODEL").ok());
58
59        let (provider, model) = if let Some(ref model_str) = model_str {
60            let (prov, mod_id) = parse_model_string(model_str);
61            let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
62            let provider = if let Some(explicit_provider) = prov {
63                if provider_list.contains(&explicit_provider) {
64                    explicit_provider.to_string()
65                } else {
66                    anyhow::bail!(
67                        "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
68                        explicit_provider,
69                        provider_list.join(", ")
70                    );
71                }
72            } else {
73                choose_default_provider(provider_list.as_slice())
74                    .ok_or_else(|| anyhow::anyhow!("No providers available for orchestration"))?
75                    .to_string()
76            };
77            let model = if mod_id.trim().is_empty() {
78                default_model_for_provider(&provider)
79            } else {
80                mod_id.to_string()
81            };
82            (provider, model)
83        } else {
84            let provider = choose_default_provider(provider_list.as_slice())
85                .ok_or_else(|| anyhow::anyhow!("No providers available for orchestration"))?
86                .to_string();
87            let model = default_model_for_provider(&provider);
88            (provider, model)
89        };
90
91        tracing::info!("Orchestrator using model {} via {}", model, provider);
92
93        Ok(Self {
94            config,
95            providers,
96            subtasks: HashMap::new(),
97            subagents: HashMap::new(),
98            completed: Vec::new(),
99            model,
100            provider,
101            stats: SwarmStats::default(),
102        })
103    }
104
105    fn prefers_temperature_one(model: &str) -> bool {
106        crate::session::helper::provider::prefers_temperature_one(model)
107    }
108
109    /// Decompose a complex task into subtasks
110    pub async fn decompose(
111        &mut self,
112        task: &str,
113        strategy: DecompositionStrategy,
114    ) -> Result<Vec<SubTask>> {
115        if strategy == DecompositionStrategy::None {
116            // Single task, no decomposition
117            let subtask = SubTask::new("Main Task", task);
118            self.subtasks.insert(subtask.id.clone(), subtask.clone());
119            return Ok(vec![subtask]);
120        }
121
122        // Use AI to decompose the task
123        let decomposition_prompt = self.build_decomposition_prompt(task, strategy);
124
125        let provider = self
126            .providers
127            .get(&self.provider)
128            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", self.provider))?;
129
130        let temperature = if Self::prefers_temperature_one(&self.model) {
131            1.0
132        } else {
133            0.7
134        };
135
136        let request = CompletionRequest {
137            messages: vec![Message {
138                role: Role::User,
139                content: vec![ContentPart::Text {
140                    text: decomposition_prompt,
141                }],
142            }],
143            tools: Vec::new(),
144            model: self.model.clone(),
145            temperature: Some(temperature),
146            top_p: None,
147            max_tokens: Some(8192),
148            stop: Vec::new(),
149        };
150
151        let response = provider.complete(request).await?;
152
153        // Parse the decomposition response
154        let text = response
155            .message
156            .content
157            .iter()
158            .filter_map(|p| match p {
159                ContentPart::Text { text } => Some(text.clone()),
160                _ => None,
161            })
162            .collect::<Vec<_>>()
163            .join("\n");
164
165        tracing::debug!("Decomposition response: {}", text);
166
167        if text.trim().is_empty() {
168            // Fallback to single task if decomposition fails
169            tracing::warn!("Empty decomposition response, falling back to single task");
170            let subtask = SubTask::new("Main Task", task);
171            self.subtasks.insert(subtask.id.clone(), subtask.clone());
172            return Ok(vec![subtask]);
173        }
174
175        let subtasks = self.parse_decomposition(&text)?;
176
177        // Store subtasks
178        for subtask in &subtasks {
179            self.subtasks.insert(subtask.id.clone(), subtask.clone());
180        }
181
182        // Assign stages based on dependencies
183        self.assign_stages();
184
185        tracing::info!(
186            "Decomposed task into {} subtasks across {} stages",
187            subtasks.len(),
188            self.max_stage() + 1
189        );
190
191        Ok(subtasks)
192    }
193
194    /// Build the decomposition prompt
195    fn build_decomposition_prompt(&self, task: &str, strategy: DecompositionStrategy) -> String {
196        let strategy_instruction = match strategy {
197            DecompositionStrategy::Automatic => {
198                "Analyze the task and determine the optimal way to decompose it into parallel subtasks."
199            }
200            DecompositionStrategy::ByDomain => {
201                "Decompose the task by domain expertise (e.g., research, coding, analysis, verification)."
202            }
203            DecompositionStrategy::ByData => {
204                "Decompose the task by data partition (e.g., different files, sections, or datasets)."
205            }
206            DecompositionStrategy::ByStage => {
207                "Decompose the task by workflow stages (e.g., gather, process, synthesize)."
208            }
209            DecompositionStrategy::None => unreachable!(),
210        };
211
212        format!(
213            r#"You are a task orchestrator. Your job is to decompose complex tasks into parallelizable subtasks.
214
215TASK: {task}
216
217STRATEGY: {strategy_instruction}
218
219CONSTRAINTS:
220- Maximum {max_subtasks} subtasks
221- Each subtask should be independently executable
222- Identify dependencies between subtasks (which must complete before others can start)
223- Assign a specialty/role to each subtask
224
225OUTPUT FORMAT (JSON):
226```json
227{{
228  "subtasks": [
229    {{
230      "name": "Subtask Name",
231      "instruction": "Detailed instruction for this subtask",
232      "specialty": "Role/specialty (e.g., Researcher, Coder, Analyst)",
233      "dependencies": ["id-of-dependency-1"],
234      "priority": 1,
235      "needs_worktree": false
236    }}
237  ]
238}}
239```
240
241Set `needs_worktree: true` only when the subtask will **edit or create \
242files** in the repository (implementation, refactor, patch). Set it to \
243`false` for read-only work (research, review, analysis, fact-check, \
244planning, summarisation). When in doubt, omit the field and the \
245executor will decide from heuristics.
246
247Decompose the task now:"#,
248            task = task,
249            strategy_instruction = strategy_instruction,
250            max_subtasks = self.config.max_subagents,
251        )
252    }
253
254    /// Parse the decomposition response
255    fn parse_decomposition(&self, response: &str) -> Result<Vec<SubTask>> {
256        // Try to extract JSON from the response
257        let json_str = if let Some(start) = response.find("```json") {
258            let start = start + 7;
259            if let Some(end) = response[start..].find("```") {
260                &response[start..start + end]
261            } else {
262                response
263            }
264        } else if let Some(start) = response.find('{') {
265            if let Some(end) = response.rfind('}') {
266                &response[start..=end]
267            } else {
268                response
269            }
270        } else {
271            response
272        };
273
274        #[derive(Deserialize)]
275        struct DecompositionResponse {
276            subtasks: Vec<SubTaskDef>,
277        }
278
279        #[derive(Deserialize)]
280        struct SubTaskDef {
281            name: String,
282            instruction: String,
283            specialty: Option<String>,
284            #[serde(default)]
285            dependencies: Vec<String>,
286            #[serde(default)]
287            priority: i32,
288            #[serde(default)]
289            needs_worktree: Option<bool>,
290        }
291
292        let parsed: DecompositionResponse = serde_json::from_str(json_str.trim())
293            .map_err(|e| anyhow::anyhow!("Failed to parse decomposition: {}", e))?;
294
295        // Create SubTask objects with proper IDs
296        let mut subtasks = Vec::new();
297        let mut name_to_id: HashMap<String, String> = HashMap::new();
298
299        // First pass: create subtasks and map names to IDs
300        for def in &parsed.subtasks {
301            let subtask = SubTask::new(&def.name, &def.instruction).with_priority(def.priority);
302
303            let subtask = if let Some(ref specialty) = def.specialty {
304                subtask.with_specialty(specialty)
305            } else {
306                subtask
307            };
308
309            let subtask = match def.needs_worktree {
310                Some(explicit) => subtask.with_needs_worktree(explicit),
311                None => subtask,
312            };
313
314            name_to_id.insert(def.name.clone(), subtask.id.clone());
315            subtasks.push((subtask, def.dependencies.clone()));
316        }
317
318        // Second pass: resolve dependencies
319        let result: Vec<SubTask> = subtasks
320            .into_iter()
321            .map(|(mut subtask, deps)| {
322                let resolved_deps: Vec<String> = deps
323                    .iter()
324                    .filter_map(|dep| name_to_id.get(dep).cloned())
325                    .collect();
326                subtask.dependencies = resolved_deps;
327                subtask
328            })
329            .collect();
330
331        Ok(result)
332    }
333
334    /// Assign stages to subtasks based on dependencies
335    fn assign_stages(&mut self) {
336        let mut changed = true;
337
338        while changed {
339            changed = false;
340
341            // First collect all updates needed
342            let updates: Vec<(String, usize)> = self
343                .subtasks
344                .iter()
345                .filter_map(|(id, subtask)| {
346                    if subtask.dependencies.is_empty() {
347                        if subtask.stage != 0 {
348                            Some((id.clone(), 0))
349                        } else {
350                            None
351                        }
352                    } else {
353                        let max_dep_stage = subtask
354                            .dependencies
355                            .iter()
356                            .filter_map(|dep_id| self.subtasks.get(dep_id))
357                            .map(|dep| dep.stage)
358                            .max()
359                            .unwrap_or(0);
360
361                        let new_stage = max_dep_stage + 1;
362                        if subtask.stage != new_stage {
363                            Some((id.clone(), new_stage))
364                        } else {
365                            None
366                        }
367                    }
368                })
369                .collect();
370
371            // Then apply updates
372            for (id, new_stage) in updates {
373                if let Some(subtask) = self.subtasks.get_mut(&id) {
374                    subtask.stage = new_stage;
375                    changed = true;
376                }
377            }
378        }
379    }
380
381    /// Get maximum stage number
382    fn max_stage(&self) -> usize {
383        self.subtasks.values().map(|s| s.stage).max().unwrap_or(0)
384    }
385
386    /// Get subtasks ready to execute (dependencies satisfied)
387    pub fn ready_subtasks(&self) -> Vec<&SubTask> {
388        self.subtasks
389            .values()
390            .filter(|s| s.status == SubTaskStatus::Pending && s.can_run(&self.completed))
391            .collect()
392    }
393
394    /// Get subtasks for a specific stage
395    pub fn subtasks_for_stage(&self, stage: usize) -> Vec<&SubTask> {
396        self.subtasks
397            .values()
398            .filter(|s| s.stage == stage)
399            .collect()
400    }
401
402    /// Create a sub-agent for a subtask
403    pub fn create_subagent(&mut self, subtask: &SubTask) -> SubAgent {
404        let specialty = subtask
405            .specialty
406            .clone()
407            .unwrap_or_else(|| "General".to_string());
408        let name = format!("{} Agent", specialty);
409
410        let subagent = SubAgent::new(name, specialty, &subtask.id, &self.model, &self.provider);
411
412        self.subagents.insert(subagent.id.clone(), subagent.clone());
413        self.stats.subagents_spawned += 1;
414
415        subagent
416    }
417
418    /// Mark a subtask as completed
419    pub fn complete_subtask(&mut self, subtask_id: &str, result: SubTaskResult) {
420        if let Some(subtask) = self.subtasks.get_mut(subtask_id) {
421            subtask.complete(result.success);
422
423            if result.success {
424                self.completed.push(subtask_id.to_string());
425                self.stats.subagents_completed += 1;
426            } else {
427                self.stats.subagents_failed += 1;
428            }
429
430            self.stats.total_tool_calls += result.tool_calls;
431        }
432    }
433
434    /// Get all subtasks
435    pub fn all_subtasks(&self) -> Vec<&SubTask> {
436        self.subtasks.values().collect()
437    }
438
439    /// Get statistics
440    pub fn stats(&self) -> &SwarmStats {
441        &self.stats
442    }
443
444    /// Get mutable statistics
445    pub fn stats_mut(&mut self) -> &mut SwarmStats {
446        &mut self.stats
447    }
448
449    /// Check if all subtasks are complete
450    pub fn is_complete(&self) -> bool {
451        self.subtasks.values().all(|s| {
452            matches!(
453                s.status,
454                SubTaskStatus::Completed | SubTaskStatus::Failed | SubTaskStatus::Cancelled
455            )
456        })
457    }
458
459    /// Get the provider registry
460    pub fn providers(&self) -> &ProviderRegistry {
461        &self.providers
462    }
463
464    /// Get current model
465    pub fn model(&self) -> &str {
466        &self.model
467    }
468
469    /// Get current provider
470    pub fn provider(&self) -> &str {
471        &self.provider
472    }
473}
474
475pub(crate) fn choose_default_provider<'a>(providers: &'a [&'a str]) -> Option<&'a str> {
476    let preferred = [
477        "openai-codex",
478        "openai",
479        "anthropic",
480        "github-copilot",
481        "github-copilot-enterprise",
482        "zai",
483        "minimax",
484        "moonshotai",
485        "openrouter",
486        "novita",
487        "google",
488        "bedrock",
489    ];
490    for name in preferred {
491        if let Some(found) = providers.iter().copied().find(|p| *p == name) {
492            return Some(found);
493        }
494    }
495    providers.first().copied()
496}
497
498/// Get default model for a provider.
499pub(crate) fn default_model_for_provider(provider: &str) -> String {
500    match provider {
501        "moonshotai" => "kimi-k2.5".to_string(),
502        "anthropic" => "claude-sonnet-4-20250514".to_string(),
503        "bedrock" => "us.anthropic.claude-opus-4-6-v1".to_string(),
504        "openai" => "gpt-4o".to_string(),
505        "google" => "gemini-2.5-pro".to_string(),
506        "zhipuai" | "zai" => "glm-5".to_string(),
507        "openrouter" => "z-ai/glm-5".to_string(),
508        "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
509        "github-copilot" | "github-copilot-enterprise" => "gpt-5-mini".to_string(),
510        "openai-codex" => "gpt-5.5".to_string(),
511        _ => "gpt-4o".to_string(),
512    }
513}
514
515/// Message from sub-agent to orchestrator
516#[derive(Debug, Clone, Serialize, Deserialize)]
517pub enum SubAgentMessage {
518    /// Progress update
519    Progress {
520        subagent_id: String,
521        subtask_id: String,
522        steps: usize,
523        status: String,
524    },
525
526    /// Tool call made
527    ToolCall {
528        subagent_id: String,
529        tool_name: String,
530        success: bool,
531    },
532
533    /// Subtask completed
534    Completed {
535        subagent_id: String,
536        result: SubTaskResult,
537    },
538
539    /// Request for resources
540    ResourceRequest {
541        subagent_id: String,
542        resource_type: String,
543        resource_id: String,
544    },
545}
546
547/// Message from orchestrator to sub-agent
548#[derive(Debug, Clone, Serialize, Deserialize)]
549pub enum OrchestratorMessage {
550    /// Start execution
551    Start { subtask: Box<SubTask> },
552
553    /// Provide resource
554    Resource {
555        resource_type: String,
556        resource_id: String,
557        content: String,
558    },
559
560    /// Terminate execution
561    Terminate { reason: String },
562
563    /// Context update (from completed dependency)
564    ContextUpdate {
565        dependency_id: String,
566        result: String,
567    },
568}