1use super::{
7 DecompositionStrategy, SubAgent, SubTask, SubTaskResult, SubTaskStatus, SwarmConfig, SwarmStats,
8};
9use crate::provider::{CompletionRequest, ContentPart, Message, ProviderRegistry, Role};
10use anyhow::Result;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14pub struct Orchestrator {
16 config: SwarmConfig,
18
19 providers: ProviderRegistry,
21
22 subtasks: HashMap<String, SubTask>,
24
25 subagents: HashMap<String, SubAgent>,
27
28 completed: Vec<String>,
30
31 model: String,
33
34 provider: String,
36
37 stats: SwarmStats,
39}
40
41impl Orchestrator {
42 pub async fn new(config: SwarmConfig) -> Result<Self> {
44 use crate::provider::parse_model_string;
45
46 let providers = ProviderRegistry::from_vault().await?;
47 let provider_list = providers.list();
48
49 if provider_list.is_empty() {
50 anyhow::bail!("No providers available for orchestration");
51 }
52
53 let model_str = config
55 .model
56 .clone()
57 .or_else(|| std::env::var("CODETETHER_DEFAULT_MODEL").ok());
58
59 let (provider, model) = if let Some(ref model_str) = model_str {
60 let (prov, mod_id) = parse_model_string(model_str);
61 let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
62 let provider = if let Some(explicit_provider) = prov {
63 if provider_list.contains(&explicit_provider) {
64 explicit_provider.to_string()
65 } else {
66 anyhow::bail!(
67 "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
68 explicit_provider,
69 provider_list.join(", ")
70 );
71 }
72 } else {
73 choose_default_provider(provider_list.as_slice())
74 .ok_or_else(|| anyhow::anyhow!("No providers available for orchestration"))?
75 .to_string()
76 };
77 let model = if mod_id.trim().is_empty() {
78 default_model_for_provider(&provider)
79 } else {
80 mod_id.to_string()
81 };
82 (provider, model)
83 } else {
84 let provider = choose_default_provider(provider_list.as_slice())
85 .ok_or_else(|| anyhow::anyhow!("No providers available for orchestration"))?
86 .to_string();
87 let model = default_model_for_provider(&provider);
88 (provider, model)
89 };
90
91 tracing::info!("Orchestrator using model {} via {}", model, provider);
92
93 Ok(Self {
94 config,
95 providers,
96 subtasks: HashMap::new(),
97 subagents: HashMap::new(),
98 completed: Vec::new(),
99 model,
100 provider,
101 stats: SwarmStats::default(),
102 })
103 }
104
105 fn prefers_temperature_one(model: &str) -> bool {
106 crate::session::helper::provider::prefers_temperature_one(model)
107 }
108
109 pub async fn decompose(
111 &mut self,
112 task: &str,
113 strategy: DecompositionStrategy,
114 ) -> Result<Vec<SubTask>> {
115 if strategy == DecompositionStrategy::None {
116 let subtask = SubTask::new("Main Task", task);
118 self.subtasks.insert(subtask.id.clone(), subtask.clone());
119 return Ok(vec![subtask]);
120 }
121
122 let decomposition_prompt = self.build_decomposition_prompt(task, strategy);
124
125 let provider = self
126 .providers
127 .get(&self.provider)
128 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", self.provider))?;
129
130 let temperature = if Self::prefers_temperature_one(&self.model) {
131 1.0
132 } else {
133 0.7
134 };
135
136 let request = CompletionRequest {
137 messages: vec![Message {
138 role: Role::User,
139 content: vec![ContentPart::Text {
140 text: decomposition_prompt,
141 }],
142 }],
143 tools: Vec::new(),
144 model: self.model.clone(),
145 temperature: Some(temperature),
146 top_p: None,
147 max_tokens: Some(8192),
148 stop: Vec::new(),
149 };
150
151 let response = provider.complete(request).await?;
152
153 let text = response
155 .message
156 .content
157 .iter()
158 .filter_map(|p| match p {
159 ContentPart::Text { text } => Some(text.clone()),
160 _ => None,
161 })
162 .collect::<Vec<_>>()
163 .join("\n");
164
165 tracing::debug!("Decomposition response: {}", text);
166
167 if text.trim().is_empty() {
168 tracing::warn!("Empty decomposition response, falling back to single task");
170 let subtask = SubTask::new("Main Task", task);
171 self.subtasks.insert(subtask.id.clone(), subtask.clone());
172 return Ok(vec![subtask]);
173 }
174
175 let subtasks = self.parse_decomposition(&text)?;
176
177 for subtask in &subtasks {
179 self.subtasks.insert(subtask.id.clone(), subtask.clone());
180 }
181
182 self.assign_stages();
184
185 tracing::info!(
186 "Decomposed task into {} subtasks across {} stages",
187 subtasks.len(),
188 self.max_stage() + 1
189 );
190
191 Ok(subtasks)
192 }
193
194 fn build_decomposition_prompt(&self, task: &str, strategy: DecompositionStrategy) -> String {
196 let strategy_instruction = match strategy {
197 DecompositionStrategy::Automatic => {
198 "Analyze the task and determine the optimal way to decompose it into parallel subtasks."
199 }
200 DecompositionStrategy::ByDomain => {
201 "Decompose the task by domain expertise (e.g., research, coding, analysis, verification)."
202 }
203 DecompositionStrategy::ByData => {
204 "Decompose the task by data partition (e.g., different files, sections, or datasets)."
205 }
206 DecompositionStrategy::ByStage => {
207 "Decompose the task by workflow stages (e.g., gather, process, synthesize)."
208 }
209 DecompositionStrategy::None => unreachable!(),
210 };
211
212 format!(
213 r#"You are a task orchestrator. Your job is to decompose complex tasks into parallelizable subtasks.
214
215TASK: {task}
216
217STRATEGY: {strategy_instruction}
218
219CONSTRAINTS:
220- Maximum {max_subtasks} subtasks
221- Each subtask should be independently executable
222- Identify dependencies between subtasks (which must complete before others can start)
223- Assign a specialty/role to each subtask
224
225OUTPUT FORMAT (JSON):
226```json
227{{
228 "subtasks": [
229 {{
230 "name": "Subtask Name",
231 "instruction": "Detailed instruction for this subtask",
232 "specialty": "Role/specialty (e.g., Researcher, Coder, Analyst)",
233 "dependencies": ["id-of-dependency-1"],
234 "priority": 1,
235 "needs_worktree": false
236 }}
237 ]
238}}
239```
240
241Set `needs_worktree: true` only when the subtask will **edit or create \
242files** in the repository (implementation, refactor, patch). Set it to \
243`false` for read-only work (research, review, analysis, fact-check, \
244planning, summarisation). When in doubt, omit the field and the \
245executor will decide from heuristics.
246
247Decompose the task now:"#,
248 task = task,
249 strategy_instruction = strategy_instruction,
250 max_subtasks = self.config.max_subagents,
251 )
252 }
253
254 fn parse_decomposition(&self, response: &str) -> Result<Vec<SubTask>> {
256 let json_str = if let Some(start) = response.find("```json") {
258 let start = start + 7;
259 if let Some(end) = response[start..].find("```") {
260 &response[start..start + end]
261 } else {
262 response
263 }
264 } else if let Some(start) = response.find('{') {
265 if let Some(end) = response.rfind('}') {
266 &response[start..=end]
267 } else {
268 response
269 }
270 } else {
271 response
272 };
273
274 #[derive(Deserialize)]
275 struct DecompositionResponse {
276 subtasks: Vec<SubTaskDef>,
277 }
278
279 #[derive(Deserialize)]
280 struct SubTaskDef {
281 name: String,
282 instruction: String,
283 specialty: Option<String>,
284 #[serde(default)]
285 dependencies: Vec<String>,
286 #[serde(default)]
287 priority: i32,
288 #[serde(default)]
289 needs_worktree: Option<bool>,
290 }
291
292 let parsed: DecompositionResponse = serde_json::from_str(json_str.trim())
293 .map_err(|e| anyhow::anyhow!("Failed to parse decomposition: {}", e))?;
294
295 let mut subtasks = Vec::new();
297 let mut name_to_id: HashMap<String, String> = HashMap::new();
298
299 for def in &parsed.subtasks {
301 let subtask = SubTask::new(&def.name, &def.instruction).with_priority(def.priority);
302
303 let subtask = if let Some(ref specialty) = def.specialty {
304 subtask.with_specialty(specialty)
305 } else {
306 subtask
307 };
308
309 let subtask = match def.needs_worktree {
310 Some(explicit) => subtask.with_needs_worktree(explicit),
311 None => subtask,
312 };
313
314 name_to_id.insert(def.name.clone(), subtask.id.clone());
315 subtasks.push((subtask, def.dependencies.clone()));
316 }
317
318 let result: Vec<SubTask> = subtasks
320 .into_iter()
321 .map(|(mut subtask, deps)| {
322 let resolved_deps: Vec<String> = deps
323 .iter()
324 .filter_map(|dep| name_to_id.get(dep).cloned())
325 .collect();
326 subtask.dependencies = resolved_deps;
327 subtask
328 })
329 .collect();
330
331 Ok(result)
332 }
333
334 fn assign_stages(&mut self) {
336 let mut changed = true;
337
338 while changed {
339 changed = false;
340
341 let updates: Vec<(String, usize)> = self
343 .subtasks
344 .iter()
345 .filter_map(|(id, subtask)| {
346 if subtask.dependencies.is_empty() {
347 if subtask.stage != 0 {
348 Some((id.clone(), 0))
349 } else {
350 None
351 }
352 } else {
353 let max_dep_stage = subtask
354 .dependencies
355 .iter()
356 .filter_map(|dep_id| self.subtasks.get(dep_id))
357 .map(|dep| dep.stage)
358 .max()
359 .unwrap_or(0);
360
361 let new_stage = max_dep_stage + 1;
362 if subtask.stage != new_stage {
363 Some((id.clone(), new_stage))
364 } else {
365 None
366 }
367 }
368 })
369 .collect();
370
371 for (id, new_stage) in updates {
373 if let Some(subtask) = self.subtasks.get_mut(&id) {
374 subtask.stage = new_stage;
375 changed = true;
376 }
377 }
378 }
379 }
380
381 fn max_stage(&self) -> usize {
383 self.subtasks.values().map(|s| s.stage).max().unwrap_or(0)
384 }
385
386 pub fn ready_subtasks(&self) -> Vec<&SubTask> {
388 self.subtasks
389 .values()
390 .filter(|s| s.status == SubTaskStatus::Pending && s.can_run(&self.completed))
391 .collect()
392 }
393
394 pub fn subtasks_for_stage(&self, stage: usize) -> Vec<&SubTask> {
396 self.subtasks
397 .values()
398 .filter(|s| s.stage == stage)
399 .collect()
400 }
401
402 pub fn create_subagent(&mut self, subtask: &SubTask) -> SubAgent {
404 let specialty = subtask
405 .specialty
406 .clone()
407 .unwrap_or_else(|| "General".to_string());
408 let name = format!("{} Agent", specialty);
409
410 let subagent = SubAgent::new(name, specialty, &subtask.id, &self.model, &self.provider);
411
412 self.subagents.insert(subagent.id.clone(), subagent.clone());
413 self.stats.subagents_spawned += 1;
414
415 subagent
416 }
417
418 pub fn complete_subtask(&mut self, subtask_id: &str, result: SubTaskResult) {
420 if let Some(subtask) = self.subtasks.get_mut(subtask_id) {
421 subtask.complete(result.success);
422
423 if result.success {
424 self.completed.push(subtask_id.to_string());
425 self.stats.subagents_completed += 1;
426 } else {
427 self.stats.subagents_failed += 1;
428 }
429
430 self.stats.total_tool_calls += result.tool_calls;
431 }
432 }
433
434 pub fn all_subtasks(&self) -> Vec<&SubTask> {
436 self.subtasks.values().collect()
437 }
438
439 pub fn stats(&self) -> &SwarmStats {
441 &self.stats
442 }
443
444 pub fn stats_mut(&mut self) -> &mut SwarmStats {
446 &mut self.stats
447 }
448
449 pub fn is_complete(&self) -> bool {
451 self.subtasks.values().all(|s| {
452 matches!(
453 s.status,
454 SubTaskStatus::Completed | SubTaskStatus::Failed | SubTaskStatus::Cancelled
455 )
456 })
457 }
458
459 pub fn providers(&self) -> &ProviderRegistry {
461 &self.providers
462 }
463
464 pub fn model(&self) -> &str {
466 &self.model
467 }
468
469 pub fn provider(&self) -> &str {
471 &self.provider
472 }
473}
474
475pub(crate) fn choose_default_provider<'a>(providers: &'a [&'a str]) -> Option<&'a str> {
476 let preferred = [
477 "openai-codex",
478 "openai",
479 "anthropic",
480 "github-copilot",
481 "github-copilot-enterprise",
482 "zai",
483 "minimax",
484 "moonshotai",
485 "openrouter",
486 "novita",
487 "google",
488 "bedrock",
489 ];
490 for name in preferred {
491 if let Some(found) = providers.iter().copied().find(|p| *p == name) {
492 return Some(found);
493 }
494 }
495 providers.first().copied()
496}
497
498pub(crate) fn default_model_for_provider(provider: &str) -> String {
500 match provider {
501 "moonshotai" => "kimi-k2.5".to_string(),
502 "anthropic" => "claude-sonnet-4-20250514".to_string(),
503 "bedrock" => "us.anthropic.claude-opus-4-6-v1".to_string(),
504 "openai" => "gpt-4o".to_string(),
505 "google" => "gemini-2.5-pro".to_string(),
506 "zhipuai" | "zai" => "glm-5".to_string(),
507 "openrouter" => "z-ai/glm-5".to_string(),
508 "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
509 "github-copilot" | "github-copilot-enterprise" => "gpt-5-mini".to_string(),
510 "openai-codex" => "gpt-5.5".to_string(),
511 _ => "gpt-4o".to_string(),
512 }
513}
514
515#[derive(Debug, Clone, Serialize, Deserialize)]
517pub enum SubAgentMessage {
518 Progress {
520 subagent_id: String,
521 subtask_id: String,
522 steps: usize,
523 status: String,
524 },
525
526 ToolCall {
528 subagent_id: String,
529 tool_name: String,
530 success: bool,
531 },
532
533 Completed {
535 subagent_id: String,
536 result: SubTaskResult,
537 },
538
539 ResourceRequest {
541 subagent_id: String,
542 resource_type: String,
543 resource_id: String,
544 },
545}
546
547#[derive(Debug, Clone, Serialize, Deserialize)]
549pub enum OrchestratorMessage {
550 Start { subtask: Box<SubTask> },
552
553 Resource {
555 resource_type: String,
556 resource_id: String,
557 content: String,
558 },
559
560 Terminate { reason: String },
562
563 ContextUpdate {
565 dependency_id: String,
566 result: String,
567 },
568}