Skip to main content

codetether_agent/swarm/
orchestrator.rs

1//! Orchestrator for decomposing tasks and coordinating sub-agents
2//!
3//! The orchestrator analyzes complex tasks and decomposes them into
4//! parallelizable subtasks, then coordinates their execution.
5
6use super::{
7    DecompositionStrategy, SubAgent, SubTask, SubTaskResult, SubTaskStatus, SwarmConfig, SwarmStats,
8};
9use crate::provider::{CompletionRequest, ContentPart, Message, ProviderRegistry, Role};
10use anyhow::Result;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14/// The orchestrator manages task decomposition and sub-agent coordination
15pub struct Orchestrator {
16    /// Configuration
17    config: SwarmConfig,
18
19    /// Provider registry for AI calls
20    providers: ProviderRegistry,
21
22    /// All subtasks
23    subtasks: HashMap<String, SubTask>,
24
25    /// All sub-agents
26    subagents: HashMap<String, SubAgent>,
27
28    /// Completed subtask IDs
29    completed: Vec<String>,
30
31    /// Current model for orchestration
32    model: String,
33
34    /// Current provider
35    provider: String,
36
37    /// Stats
38    stats: SwarmStats,
39}
40
41impl Orchestrator {
42    /// Create a new orchestrator
43    pub async fn new(config: SwarmConfig) -> Result<Self> {
44        use crate::provider::parse_model_string;
45
46        let providers = ProviderRegistry::from_vault().await?;
47        let provider_list = providers.list();
48
49        if provider_list.is_empty() {
50            anyhow::bail!("No providers available for orchestration");
51        }
52
53        // Parse model from config, env var, or use default
54        let model_str = config
55            .model
56            .clone()
57            .or_else(|| std::env::var("CODETETHER_DEFAULT_MODEL").ok());
58
59        let (provider, model) = if let Some(ref model_str) = model_str {
60            let (prov, mod_id) = parse_model_string(model_str);
61            let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
62            let provider = if let Some(explicit_provider) = prov {
63                if provider_list.contains(&explicit_provider) {
64                    explicit_provider.to_string()
65                } else {
66                    anyhow::bail!(
67                        "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
68                        explicit_provider,
69                        provider_list.join(", ")
70                    );
71                }
72            } else {
73                choose_default_provider(provider_list.as_slice())
74                    .ok_or_else(|| anyhow::anyhow!("No providers available for orchestration"))?
75                    .to_string()
76            };
77            let model = if mod_id.trim().is_empty() {
78                default_model_for_provider(&provider)
79            } else {
80                mod_id.to_string()
81            };
82            (provider, model)
83        } else {
84            let provider = choose_default_provider(provider_list.as_slice())
85                .ok_or_else(|| anyhow::anyhow!("No providers available for orchestration"))?
86                .to_string();
87            let model = default_model_for_provider(&provider);
88            (provider, model)
89        };
90
91        tracing::info!("Orchestrator using model {} via {}", model, provider);
92
93        Ok(Self {
94            config,
95            providers,
96            subtasks: HashMap::new(),
97            subagents: HashMap::new(),
98            completed: Vec::new(),
99            model,
100            provider,
101            stats: SwarmStats::default(),
102        })
103    }
104
105    fn prefers_temperature_one(model: &str) -> bool {
106        let normalized = model.to_ascii_lowercase();
107        normalized.contains("kimi-k2")
108            || normalized.contains("glm-")
109            || normalized.contains("minimax")
110    }
111
112    /// Decompose a complex task into subtasks
113    pub async fn decompose(
114        &mut self,
115        task: &str,
116        strategy: DecompositionStrategy,
117    ) -> Result<Vec<SubTask>> {
118        if strategy == DecompositionStrategy::None {
119            // Single task, no decomposition
120            let subtask = SubTask::new("Main Task", task);
121            self.subtasks.insert(subtask.id.clone(), subtask.clone());
122            return Ok(vec![subtask]);
123        }
124
125        // Use AI to decompose the task
126        let decomposition_prompt = self.build_decomposition_prompt(task, strategy);
127
128        let provider = self
129            .providers
130            .get(&self.provider)
131            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", self.provider))?;
132
133        let temperature = if Self::prefers_temperature_one(&self.model) {
134            1.0
135        } else {
136            0.7
137        };
138
139        let request = CompletionRequest {
140            messages: vec![Message {
141                role: Role::User,
142                content: vec![ContentPart::Text {
143                    text: decomposition_prompt,
144                }],
145            }],
146            tools: Vec::new(),
147            model: self.model.clone(),
148            temperature: Some(temperature),
149            top_p: None,
150            max_tokens: Some(8192),
151            stop: Vec::new(),
152        };
153
154        let response = provider.complete(request).await?;
155
156        // Parse the decomposition response
157        let text = response
158            .message
159            .content
160            .iter()
161            .filter_map(|p| match p {
162                ContentPart::Text { text } => Some(text.clone()),
163                _ => None,
164            })
165            .collect::<Vec<_>>()
166            .join("\n");
167
168        tracing::debug!("Decomposition response: {}", text);
169
170        if text.trim().is_empty() {
171            // Fallback to single task if decomposition fails
172            tracing::warn!("Empty decomposition response, falling back to single task");
173            let subtask = SubTask::new("Main Task", task);
174            self.subtasks.insert(subtask.id.clone(), subtask.clone());
175            return Ok(vec![subtask]);
176        }
177
178        let subtasks = self.parse_decomposition(&text)?;
179
180        // Store subtasks
181        for subtask in &subtasks {
182            self.subtasks.insert(subtask.id.clone(), subtask.clone());
183        }
184
185        // Assign stages based on dependencies
186        self.assign_stages();
187
188        tracing::info!(
189            "Decomposed task into {} subtasks across {} stages",
190            subtasks.len(),
191            self.max_stage() + 1
192        );
193
194        Ok(subtasks)
195    }
196
197    /// Build the decomposition prompt
198    fn build_decomposition_prompt(&self, task: &str, strategy: DecompositionStrategy) -> String {
199        let strategy_instruction = match strategy {
200            DecompositionStrategy::Automatic => {
201                "Analyze the task and determine the optimal way to decompose it into parallel subtasks."
202            }
203            DecompositionStrategy::ByDomain => {
204                "Decompose the task by domain expertise (e.g., research, coding, analysis, verification)."
205            }
206            DecompositionStrategy::ByData => {
207                "Decompose the task by data partition (e.g., different files, sections, or datasets)."
208            }
209            DecompositionStrategy::ByStage => {
210                "Decompose the task by workflow stages (e.g., gather, process, synthesize)."
211            }
212            DecompositionStrategy::None => unreachable!(),
213        };
214
215        format!(
216            r#"You are a task orchestrator. Your job is to decompose complex tasks into parallelizable subtasks.
217
218TASK: {task}
219
220STRATEGY: {strategy_instruction}
221
222CONSTRAINTS:
223- Maximum {max_subtasks} subtasks
224- Each subtask should be independently executable
225- Identify dependencies between subtasks (which must complete before others can start)
226- Assign a specialty/role to each subtask
227
228OUTPUT FORMAT (JSON):
229```json
230{{
231  "subtasks": [
232    {{
233      "name": "Subtask Name",
234      "instruction": "Detailed instruction for this subtask",
235      "specialty": "Role/specialty (e.g., Researcher, Coder, Analyst)",
236      "dependencies": ["id-of-dependency-1"],
237      "priority": 1,
238      "needs_worktree": false
239    }}
240  ]
241}}
242```
243
244Set `needs_worktree: true` only when the subtask will **edit or create \
245files** in the repository (implementation, refactor, patch). Set it to \
246`false` for read-only work (research, review, analysis, fact-check, \
247planning, summarisation). When in doubt, omit the field and the \
248executor will decide from heuristics.
249
250Decompose the task now:"#,
251            task = task,
252            strategy_instruction = strategy_instruction,
253            max_subtasks = self.config.max_subagents,
254        )
255    }
256
257    /// Parse the decomposition response
258    fn parse_decomposition(&self, response: &str) -> Result<Vec<SubTask>> {
259        // Try to extract JSON from the response
260        let json_str = if let Some(start) = response.find("```json") {
261            let start = start + 7;
262            if let Some(end) = response[start..].find("```") {
263                &response[start..start + end]
264            } else {
265                response
266            }
267        } else if let Some(start) = response.find('{') {
268            if let Some(end) = response.rfind('}') {
269                &response[start..=end]
270            } else {
271                response
272            }
273        } else {
274            response
275        };
276
277        #[derive(Deserialize)]
278        struct DecompositionResponse {
279            subtasks: Vec<SubTaskDef>,
280        }
281
282        #[derive(Deserialize)]
283        struct SubTaskDef {
284            name: String,
285            instruction: String,
286            specialty: Option<String>,
287            #[serde(default)]
288            dependencies: Vec<String>,
289            #[serde(default)]
290            priority: i32,
291            #[serde(default)]
292            needs_worktree: Option<bool>,
293        }
294
295        let parsed: DecompositionResponse = serde_json::from_str(json_str.trim())
296            .map_err(|e| anyhow::anyhow!("Failed to parse decomposition: {}", e))?;
297
298        // Create SubTask objects with proper IDs
299        let mut subtasks = Vec::new();
300        let mut name_to_id: HashMap<String, String> = HashMap::new();
301
302        // First pass: create subtasks and map names to IDs
303        for def in &parsed.subtasks {
304            let subtask = SubTask::new(&def.name, &def.instruction).with_priority(def.priority);
305
306            let subtask = if let Some(ref specialty) = def.specialty {
307                subtask.with_specialty(specialty)
308            } else {
309                subtask
310            };
311
312            let subtask = match def.needs_worktree {
313                Some(explicit) => subtask.with_needs_worktree(explicit),
314                None => subtask,
315            };
316
317            name_to_id.insert(def.name.clone(), subtask.id.clone());
318            subtasks.push((subtask, def.dependencies.clone()));
319        }
320
321        // Second pass: resolve dependencies
322        let result: Vec<SubTask> = subtasks
323            .into_iter()
324            .map(|(mut subtask, deps)| {
325                let resolved_deps: Vec<String> = deps
326                    .iter()
327                    .filter_map(|dep| name_to_id.get(dep).cloned())
328                    .collect();
329                subtask.dependencies = resolved_deps;
330                subtask
331            })
332            .collect();
333
334        Ok(result)
335    }
336
337    /// Assign stages to subtasks based on dependencies
338    fn assign_stages(&mut self) {
339        let mut changed = true;
340
341        while changed {
342            changed = false;
343
344            // First collect all updates needed
345            let updates: Vec<(String, usize)> = self
346                .subtasks
347                .iter()
348                .filter_map(|(id, subtask)| {
349                    if subtask.dependencies.is_empty() {
350                        if subtask.stage != 0 {
351                            Some((id.clone(), 0))
352                        } else {
353                            None
354                        }
355                    } else {
356                        let max_dep_stage = subtask
357                            .dependencies
358                            .iter()
359                            .filter_map(|dep_id| self.subtasks.get(dep_id))
360                            .map(|dep| dep.stage)
361                            .max()
362                            .unwrap_or(0);
363
364                        let new_stage = max_dep_stage + 1;
365                        if subtask.stage != new_stage {
366                            Some((id.clone(), new_stage))
367                        } else {
368                            None
369                        }
370                    }
371                })
372                .collect();
373
374            // Then apply updates
375            for (id, new_stage) in updates {
376                if let Some(subtask) = self.subtasks.get_mut(&id) {
377                    subtask.stage = new_stage;
378                    changed = true;
379                }
380            }
381        }
382    }
383
384    /// Get maximum stage number
385    fn max_stage(&self) -> usize {
386        self.subtasks.values().map(|s| s.stage).max().unwrap_or(0)
387    }
388
389    /// Get subtasks ready to execute (dependencies satisfied)
390    pub fn ready_subtasks(&self) -> Vec<&SubTask> {
391        self.subtasks
392            .values()
393            .filter(|s| s.status == SubTaskStatus::Pending && s.can_run(&self.completed))
394            .collect()
395    }
396
397    /// Get subtasks for a specific stage
398    pub fn subtasks_for_stage(&self, stage: usize) -> Vec<&SubTask> {
399        self.subtasks
400            .values()
401            .filter(|s| s.stage == stage)
402            .collect()
403    }
404
405    /// Create a sub-agent for a subtask
406    pub fn create_subagent(&mut self, subtask: &SubTask) -> SubAgent {
407        let specialty = subtask
408            .specialty
409            .clone()
410            .unwrap_or_else(|| "General".to_string());
411        let name = format!("{} Agent", specialty);
412
413        let subagent = SubAgent::new(name, specialty, &subtask.id, &self.model, &self.provider);
414
415        self.subagents.insert(subagent.id.clone(), subagent.clone());
416        self.stats.subagents_spawned += 1;
417
418        subagent
419    }
420
421    /// Mark a subtask as completed
422    pub fn complete_subtask(&mut self, subtask_id: &str, result: SubTaskResult) {
423        if let Some(subtask) = self.subtasks.get_mut(subtask_id) {
424            subtask.complete(result.success);
425
426            if result.success {
427                self.completed.push(subtask_id.to_string());
428                self.stats.subagents_completed += 1;
429            } else {
430                self.stats.subagents_failed += 1;
431            }
432
433            self.stats.total_tool_calls += result.tool_calls;
434        }
435    }
436
437    /// Get all subtasks
438    pub fn all_subtasks(&self) -> Vec<&SubTask> {
439        self.subtasks.values().collect()
440    }
441
442    /// Get statistics
443    pub fn stats(&self) -> &SwarmStats {
444        &self.stats
445    }
446
447    /// Get mutable statistics
448    pub fn stats_mut(&mut self) -> &mut SwarmStats {
449        &mut self.stats
450    }
451
452    /// Check if all subtasks are complete
453    pub fn is_complete(&self) -> bool {
454        self.subtasks.values().all(|s| {
455            matches!(
456                s.status,
457                SubTaskStatus::Completed | SubTaskStatus::Failed | SubTaskStatus::Cancelled
458            )
459        })
460    }
461
462    /// Get the provider registry
463    pub fn providers(&self) -> &ProviderRegistry {
464        &self.providers
465    }
466
467    /// Get current model
468    pub fn model(&self) -> &str {
469        &self.model
470    }
471
472    /// Get current provider
473    pub fn provider(&self) -> &str {
474        &self.provider
475    }
476}
477
478pub(crate) fn choose_default_provider<'a>(providers: &'a [&'a str]) -> Option<&'a str> {
479    let preferred = [
480        "openai",
481        "anthropic",
482        "github-copilot",
483        "github-copilot-enterprise",
484        "openai-codex",
485        "zai",
486        "minimax",
487        "moonshotai",
488        "openrouter",
489        "novita",
490        "google",
491        "bedrock",
492    ];
493    for name in preferred {
494        if let Some(found) = providers.iter().copied().find(|p| *p == name) {
495            return Some(found);
496        }
497    }
498    providers.first().copied()
499}
500
501/// Get default model for a provider.
502pub(crate) fn default_model_for_provider(provider: &str) -> String {
503    match provider {
504        "moonshotai" => "kimi-k2.5".to_string(),
505        "anthropic" => "claude-sonnet-4-20250514".to_string(),
506        "bedrock" => "us.anthropic.claude-opus-4-6-v1".to_string(),
507        "openai" => "gpt-4o".to_string(),
508        "google" => "gemini-2.5-pro".to_string(),
509        "zhipuai" | "zai" => "glm-5".to_string(),
510        "openrouter" => "z-ai/glm-5".to_string(),
511        "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
512        "github-copilot" | "github-copilot-enterprise" | "openai-codex" => "gpt-5-mini".to_string(),
513        _ => "gpt-4o".to_string(),
514    }
515}
516
517/// Message from sub-agent to orchestrator
518#[derive(Debug, Clone, Serialize, Deserialize)]
519pub enum SubAgentMessage {
520    /// Progress update
521    Progress {
522        subagent_id: String,
523        subtask_id: String,
524        steps: usize,
525        status: String,
526    },
527
528    /// Tool call made
529    ToolCall {
530        subagent_id: String,
531        tool_name: String,
532        success: bool,
533    },
534
535    /// Subtask completed
536    Completed {
537        subagent_id: String,
538        result: SubTaskResult,
539    },
540
541    /// Request for resources
542    ResourceRequest {
543        subagent_id: String,
544        resource_type: String,
545        resource_id: String,
546    },
547}
548
549/// Message from orchestrator to sub-agent
550#[derive(Debug, Clone, Serialize, Deserialize)]
551pub enum OrchestratorMessage {
552    /// Start execution
553    Start { subtask: Box<SubTask> },
554
555    /// Provide resource
556    Resource {
557        resource_type: String,
558        resource_id: String,
559        content: String,
560    },
561
562    /// Terminate execution
563    Terminate { reason: String },
564
565    /// Context update (from completed dependency)
566    ContextUpdate {
567        dependency_id: String,
568        result: String,
569    },
570}