1use super::{
7 DecompositionStrategy, SubAgent, SubTask, SubTaskResult, SubTaskStatus, SwarmConfig, SwarmStats,
8};
9use crate::provider::{CompletionRequest, ContentPart, Message, ProviderRegistry, Role};
10use anyhow::Result;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14pub struct Orchestrator {
16 config: SwarmConfig,
18
19 providers: ProviderRegistry,
21
22 subtasks: HashMap<String, SubTask>,
24
25 subagents: HashMap<String, SubAgent>,
27
28 completed: Vec<String>,
30
31 model: String,
33
34 provider: String,
36
37 stats: SwarmStats,
39}
40
41impl Orchestrator {
42 pub async fn new(config: SwarmConfig) -> Result<Self> {
44 use crate::provider::parse_model_string;
45
46 let providers = ProviderRegistry::from_vault().await?;
47 let provider_list = providers.list();
48
49 if provider_list.is_empty() {
50 anyhow::bail!("No providers available for orchestration");
51 }
52
53 let model_str = config
55 .model
56 .clone()
57 .or_else(|| std::env::var("CODETETHER_DEFAULT_MODEL").ok());
58
59 let (provider, model) = if let Some(ref model_str) = model_str {
60 let (prov, mod_id) = parse_model_string(model_str);
61 let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
62 let provider = if let Some(explicit_provider) = prov {
63 if provider_list.contains(&explicit_provider) {
64 explicit_provider.to_string()
65 } else {
66 anyhow::bail!(
67 "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
68 explicit_provider,
69 provider_list.join(", ")
70 );
71 }
72 } else {
73 choose_default_provider(provider_list.as_slice())
74 .ok_or_else(|| anyhow::anyhow!("No providers available for orchestration"))?
75 .to_string()
76 };
77 let model = if mod_id.trim().is_empty() {
78 default_model_for_provider(&provider)
79 } else {
80 mod_id.to_string()
81 };
82 (provider, model)
83 } else {
84 let provider = choose_default_provider(provider_list.as_slice())
85 .ok_or_else(|| anyhow::anyhow!("No providers available for orchestration"))?
86 .to_string();
87 let model = default_model_for_provider(&provider);
88 (provider, model)
89 };
90
91 tracing::info!("Orchestrator using model {} via {}", model, provider);
92
93 Ok(Self {
94 config,
95 providers,
96 subtasks: HashMap::new(),
97 subagents: HashMap::new(),
98 completed: Vec::new(),
99 model,
100 provider,
101 stats: SwarmStats::default(),
102 })
103 }
104
105 fn prefers_temperature_one(model: &str) -> bool {
106 let normalized = model.to_ascii_lowercase();
107 normalized.contains("kimi-k2")
108 || normalized.contains("glm-")
109 || normalized.contains("minimax")
110 }
111
112 pub async fn decompose(
114 &mut self,
115 task: &str,
116 strategy: DecompositionStrategy,
117 ) -> Result<Vec<SubTask>> {
118 if strategy == DecompositionStrategy::None {
119 let subtask = SubTask::new("Main Task", task);
121 self.subtasks.insert(subtask.id.clone(), subtask.clone());
122 return Ok(vec![subtask]);
123 }
124
125 let decomposition_prompt = self.build_decomposition_prompt(task, strategy);
127
128 let provider = self
129 .providers
130 .get(&self.provider)
131 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", self.provider))?;
132
133 let temperature = if Self::prefers_temperature_one(&self.model) {
134 1.0
135 } else {
136 0.7
137 };
138
139 let request = CompletionRequest {
140 messages: vec![Message {
141 role: Role::User,
142 content: vec![ContentPart::Text {
143 text: decomposition_prompt,
144 }],
145 }],
146 tools: Vec::new(),
147 model: self.model.clone(),
148 temperature: Some(temperature),
149 top_p: None,
150 max_tokens: Some(8192),
151 stop: Vec::new(),
152 };
153
154 let response = provider.complete(request).await?;
155
156 let text = response
158 .message
159 .content
160 .iter()
161 .filter_map(|p| match p {
162 ContentPart::Text { text } => Some(text.clone()),
163 _ => None,
164 })
165 .collect::<Vec<_>>()
166 .join("\n");
167
168 tracing::debug!("Decomposition response: {}", text);
169
170 if text.trim().is_empty() {
171 tracing::warn!("Empty decomposition response, falling back to single task");
173 let subtask = SubTask::new("Main Task", task);
174 self.subtasks.insert(subtask.id.clone(), subtask.clone());
175 return Ok(vec![subtask]);
176 }
177
178 let subtasks = self.parse_decomposition(&text)?;
179
180 for subtask in &subtasks {
182 self.subtasks.insert(subtask.id.clone(), subtask.clone());
183 }
184
185 self.assign_stages();
187
188 tracing::info!(
189 "Decomposed task into {} subtasks across {} stages",
190 subtasks.len(),
191 self.max_stage() + 1
192 );
193
194 Ok(subtasks)
195 }
196
197 fn build_decomposition_prompt(&self, task: &str, strategy: DecompositionStrategy) -> String {
199 let strategy_instruction = match strategy {
200 DecompositionStrategy::Automatic => {
201 "Analyze the task and determine the optimal way to decompose it into parallel subtasks."
202 }
203 DecompositionStrategy::ByDomain => {
204 "Decompose the task by domain expertise (e.g., research, coding, analysis, verification)."
205 }
206 DecompositionStrategy::ByData => {
207 "Decompose the task by data partition (e.g., different files, sections, or datasets)."
208 }
209 DecompositionStrategy::ByStage => {
210 "Decompose the task by workflow stages (e.g., gather, process, synthesize)."
211 }
212 DecompositionStrategy::None => unreachable!(),
213 };
214
215 format!(
216 r#"You are a task orchestrator. Your job is to decompose complex tasks into parallelizable subtasks.
217
218TASK: {task}
219
220STRATEGY: {strategy_instruction}
221
222CONSTRAINTS:
223- Maximum {max_subtasks} subtasks
224- Each subtask should be independently executable
225- Identify dependencies between subtasks (which must complete before others can start)
226- Assign a specialty/role to each subtask
227
228OUTPUT FORMAT (JSON):
229```json
230{{
231 "subtasks": [
232 {{
233 "name": "Subtask Name",
234 "instruction": "Detailed instruction for this subtask",
235 "specialty": "Role/specialty (e.g., Researcher, Coder, Analyst)",
236 "dependencies": ["id-of-dependency-1"],
237 "priority": 1
238 }}
239 ]
240}}
241```
242
243Decompose the task now:"#,
244 task = task,
245 strategy_instruction = strategy_instruction,
246 max_subtasks = self.config.max_subagents,
247 )
248 }
249
250 fn parse_decomposition(&self, response: &str) -> Result<Vec<SubTask>> {
252 let json_str = if let Some(start) = response.find("```json") {
254 let start = start + 7;
255 if let Some(end) = response[start..].find("```") {
256 &response[start..start + end]
257 } else {
258 response
259 }
260 } else if let Some(start) = response.find('{') {
261 if let Some(end) = response.rfind('}') {
262 &response[start..=end]
263 } else {
264 response
265 }
266 } else {
267 response
268 };
269
270 #[derive(Deserialize)]
271 struct DecompositionResponse {
272 subtasks: Vec<SubTaskDef>,
273 }
274
275 #[derive(Deserialize)]
276 struct SubTaskDef {
277 name: String,
278 instruction: String,
279 specialty: Option<String>,
280 #[serde(default)]
281 dependencies: Vec<String>,
282 #[serde(default)]
283 priority: i32,
284 }
285
286 let parsed: DecompositionResponse = serde_json::from_str(json_str.trim())
287 .map_err(|e| anyhow::anyhow!("Failed to parse decomposition: {}", e))?;
288
289 let mut subtasks = Vec::new();
291 let mut name_to_id: HashMap<String, String> = HashMap::new();
292
293 for def in &parsed.subtasks {
295 let subtask = SubTask::new(&def.name, &def.instruction).with_priority(def.priority);
296
297 let subtask = if let Some(ref specialty) = def.specialty {
298 subtask.with_specialty(specialty)
299 } else {
300 subtask
301 };
302
303 name_to_id.insert(def.name.clone(), subtask.id.clone());
304 subtasks.push((subtask, def.dependencies.clone()));
305 }
306
307 let result: Vec<SubTask> = subtasks
309 .into_iter()
310 .map(|(mut subtask, deps)| {
311 let resolved_deps: Vec<String> = deps
312 .iter()
313 .filter_map(|dep| name_to_id.get(dep).cloned())
314 .collect();
315 subtask.dependencies = resolved_deps;
316 subtask
317 })
318 .collect();
319
320 Ok(result)
321 }
322
323 fn assign_stages(&mut self) {
325 let mut changed = true;
326
327 while changed {
328 changed = false;
329
330 let updates: Vec<(String, usize)> = self
332 .subtasks
333 .iter()
334 .filter_map(|(id, subtask)| {
335 if subtask.dependencies.is_empty() {
336 if subtask.stage != 0 {
337 Some((id.clone(), 0))
338 } else {
339 None
340 }
341 } else {
342 let max_dep_stage = subtask
343 .dependencies
344 .iter()
345 .filter_map(|dep_id| self.subtasks.get(dep_id))
346 .map(|dep| dep.stage)
347 .max()
348 .unwrap_or(0);
349
350 let new_stage = max_dep_stage + 1;
351 if subtask.stage != new_stage {
352 Some((id.clone(), new_stage))
353 } else {
354 None
355 }
356 }
357 })
358 .collect();
359
360 for (id, new_stage) in updates {
362 if let Some(subtask) = self.subtasks.get_mut(&id) {
363 subtask.stage = new_stage;
364 changed = true;
365 }
366 }
367 }
368 }
369
370 fn max_stage(&self) -> usize {
372 self.subtasks.values().map(|s| s.stage).max().unwrap_or(0)
373 }
374
375 pub fn ready_subtasks(&self) -> Vec<&SubTask> {
377 self.subtasks
378 .values()
379 .filter(|s| s.status == SubTaskStatus::Pending && s.can_run(&self.completed))
380 .collect()
381 }
382
383 pub fn subtasks_for_stage(&self, stage: usize) -> Vec<&SubTask> {
385 self.subtasks
386 .values()
387 .filter(|s| s.stage == stage)
388 .collect()
389 }
390
391 pub fn create_subagent(&mut self, subtask: &SubTask) -> SubAgent {
393 let specialty = subtask
394 .specialty
395 .clone()
396 .unwrap_or_else(|| "General".to_string());
397 let name = format!("{} Agent", specialty);
398
399 let subagent = SubAgent::new(name, specialty, &subtask.id, &self.model, &self.provider);
400
401 self.subagents.insert(subagent.id.clone(), subagent.clone());
402 self.stats.subagents_spawned += 1;
403
404 subagent
405 }
406
407 pub fn complete_subtask(&mut self, subtask_id: &str, result: SubTaskResult) {
409 if let Some(subtask) = self.subtasks.get_mut(subtask_id) {
410 subtask.complete(result.success);
411
412 if result.success {
413 self.completed.push(subtask_id.to_string());
414 self.stats.subagents_completed += 1;
415 } else {
416 self.stats.subagents_failed += 1;
417 }
418
419 self.stats.total_tool_calls += result.tool_calls;
420 }
421 }
422
423 pub fn all_subtasks(&self) -> Vec<&SubTask> {
425 self.subtasks.values().collect()
426 }
427
428 pub fn stats(&self) -> &SwarmStats {
430 &self.stats
431 }
432
433 pub fn stats_mut(&mut self) -> &mut SwarmStats {
435 &mut self.stats
436 }
437
438 pub fn is_complete(&self) -> bool {
440 self.subtasks.values().all(|s| {
441 matches!(
442 s.status,
443 SubTaskStatus::Completed | SubTaskStatus::Failed | SubTaskStatus::Cancelled
444 )
445 })
446 }
447
448 pub fn providers(&self) -> &ProviderRegistry {
450 &self.providers
451 }
452
453 pub fn model(&self) -> &str {
455 &self.model
456 }
457
458 pub fn provider(&self) -> &str {
460 &self.provider
461 }
462}
463
464pub(crate) fn choose_default_provider<'a>(providers: &'a [&'a str]) -> Option<&'a str> {
465 let preferred = [
466 "openai",
467 "anthropic",
468 "github-copilot",
469 "github-copilot-enterprise",
470 "openai-codex",
471 "zai",
472 "minimax",
473 "moonshotai",
474 "openrouter",
475 "novita",
476 "google",
477 "bedrock",
478 ];
479 for name in preferred {
480 if let Some(found) = providers.iter().copied().find(|p| *p == name) {
481 return Some(found);
482 }
483 }
484 providers.first().copied()
485}
486
487pub(crate) fn default_model_for_provider(provider: &str) -> String {
489 match provider {
490 "moonshotai" => "kimi-k2.5".to_string(),
491 "anthropic" => "claude-sonnet-4-20250514".to_string(),
492 "bedrock" => "us.anthropic.claude-opus-4-6-v1".to_string(),
493 "openai" => "gpt-4o".to_string(),
494 "google" => "gemini-2.5-pro".to_string(),
495 "zhipuai" | "zai" => "glm-5".to_string(),
496 "openrouter" => "z-ai/glm-5".to_string(),
497 "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
498 "github-copilot" | "github-copilot-enterprise" | "openai-codex" => "gpt-5-mini".to_string(),
499 _ => "gpt-4o".to_string(),
500 }
501}
502
503#[derive(Debug, Clone, Serialize, Deserialize)]
505pub enum SubAgentMessage {
506 Progress {
508 subagent_id: String,
509 subtask_id: String,
510 steps: usize,
511 status: String,
512 },
513
514 ToolCall {
516 subagent_id: String,
517 tool_name: String,
518 success: bool,
519 },
520
521 Completed {
523 subagent_id: String,
524 result: SubTaskResult,
525 },
526
527 ResourceRequest {
529 subagent_id: String,
530 resource_type: String,
531 resource_id: String,
532 },
533}
534
535#[derive(Debug, Clone, Serialize, Deserialize)]
537pub enum OrchestratorMessage {
538 Start { subtask: Box<SubTask> },
540
541 Resource {
543 resource_type: String,
544 resource_id: String,
545 content: String,
546 },
547
548 Terminate { reason: String },
550
551 ContextUpdate {
553 dependency_id: String,
554 result: String,
555 },
556}