1use std::collections::HashSet;
2use std::path::PathBuf;
3use std::sync::Arc;
4use tokio::sync::{Mutex, mpsc};
5
6use crate::config::Config;
7use crate::engine::{Identity, Workspace};
8use crate::event::{AgentStatus, Block, Event, OutcomeSummary, RiskLevel, RunId, TokenUsage};
9use crate::memory::Memory;
10use crate::provider::{
11 Brain, BrainRequest, BrainStream, ContentBlock, ModelCaps, Msg, PromptCacheConfig, ToolSpec,
12};
13use crate::router::{BudgetState, Router, TaskTier};
14use crate::sandbox::LocalSandbox;
15use crate::tools::edit::{Edit, MultiEdit};
16use crate::tools::exec::Exec;
17use crate::tools::fs::{FsList, FsRead, FsWrite};
18use crate::tools::git::Git;
19use crate::tools::search_and_web::Search;
20use crate::tools::{ToolCtx, ToolRegistry};
21
22fn prompt_cache_key(scope: &str, workspace_root: &std::path::Path, tools: &[ToolSpec]) -> String {
23 use std::hash::{Hash, Hasher};
24
25 let mut hasher = std::collections::hash_map::DefaultHasher::new();
26 scope.hash(&mut hasher);
27 workspace_root.display().to_string().hash(&mut hasher);
28 for tool in tools {
29 tool.name.hash(&mut hasher);
30 tool.description.hash(&mut hasher);
31 tool.input_schema.to_string().hash(&mut hasher);
32 }
33 format!("sparrow-{}-{:016x}", scope, hasher.finish())
34}
35
36struct FallbackBrain {
42 id: String,
43 caps: ModelCaps,
44 chain: Vec<Arc<dyn Brain>>,
45}
46
47impl FallbackBrain {
48 fn new(chain: Vec<Arc<dyn Brain>>) -> Self {
49 let id = chain
50 .first()
51 .map(|b| b.id().to_string())
52 .unwrap_or_else(|| "none".into());
53 let caps = chain.first().map(|b| b.caps()).unwrap_or_default();
54 Self { id, caps, chain }
55 }
56}
57
58#[async_trait::async_trait]
59impl Brain for FallbackBrain {
60 fn id(&self) -> &str {
61 &self.id
62 }
63 fn caps(&self) -> ModelCaps {
64 self.caps.clone()
65 }
66 async fn complete(&self, req: BrainRequest) -> anyhow::Result<BrainStream> {
67 let mut last_err: Option<anyhow::Error> = None;
68 for brain in &self.chain {
69 match brain.complete(req.clone()).await {
70 Ok(stream) => return Ok(stream),
71 Err(e) => {
72 tracing::warn!("swarm brain {} failed, trying next: {}", brain.id(), e);
73 last_err = Some(e);
74 }
75 }
76 }
77 Err(last_err.unwrap_or_else(|| anyhow::anyhow!("no brains in fallback chain")))
78 }
79}
80
81#[derive(Debug, Clone)]
84pub struct SwarmPlan {
85 pub task: String,
86 pub workspace: PathBuf,
87 pub max_reworks: u32,
88}
89
90impl Default for SwarmPlan {
91 fn default() -> Self {
92 Self {
93 task: String::new(),
94 workspace: PathBuf::from("."),
95 max_reworks: 3,
96 }
97 }
98}
99
100#[derive(Debug, Clone)]
101pub struct SwarmOutcome {
102 pub status: String,
103 pub plan: Option<String>,
104 pub diffs: Vec<crate::event::FileDiff>,
105 pub passes: u32,
106 pub reworks: u32,
107 pub cost_usd: f64,
108}
109
110#[derive(Debug, Clone)]
111pub enum Verdict {
112 Pass,
113 Rework { findings: Vec<String> },
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub enum SwarmPhase {
118 Planning,
119 Coding,
120 Verifying,
121 Reworking,
122 Done,
123}
124
125impl std::fmt::Display for SwarmPhase {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 match self {
128 SwarmPhase::Planning => write!(f, "planning"),
129 SwarmPhase::Coding => write!(f, "coding"),
130 SwarmPhase::Verifying => write!(f, "verifying"),
131 SwarmPhase::Reworking => write!(f, "reworking"),
132 SwarmPhase::Done => write!(f, "done"),
133 }
134 }
135}
136
137pub struct FileLocks {
140 locked: Mutex<HashSet<String>>,
141}
142
143impl FileLocks {
144 pub fn new() -> Self {
145 Self {
146 locked: Mutex::new(HashSet::new()),
147 }
148 }
149
150 pub async fn try_lock(&self, files: &[String]) -> Result<FileLockGuard, Vec<String>> {
151 let mut locked = self.locked.lock().await;
152 let mut conflicts = Vec::new();
153 for f in files {
154 if locked.contains(f) {
155 conflicts.push(f.clone());
156 }
157 }
158 if !conflicts.is_empty() {
159 return Err(conflicts);
160 }
161 for f in files {
162 locked.insert(f.clone());
163 }
164 Ok(FileLockGuard {
165 _files: files.to_vec(),
166 })
167 }
168
169 pub async fn release(&self, files: &[String]) {
170 let mut locked = self.locked.lock().await;
171 for f in files {
172 locked.remove(f);
173 }
174 }
175}
176
177pub struct FileLockGuard {
178 _files: Vec<String>,
179}
180
181fn swarm_tool_registry(workspace: &Workspace, write_enabled: bool) -> Arc<ToolRegistry> {
182 let mut registry = ToolRegistry::new();
183 registry.register(Arc::new(FsRead));
184 registry.register(Arc::new(FsList));
185 if write_enabled {
186 registry.register(Arc::new(FsWrite));
187 registry.register(Arc::new(Edit));
188 registry.register(Arc::new(MultiEdit));
189 registry.register(Arc::new(Search));
190 registry.register(Arc::new(Git));
191 registry.register(Arc::new(Exec::new(workspace.sandbox.clone())));
192 }
193 Arc::new(registry)
194}
195
196fn tool_blocks_text(blocks: &[Block]) -> String {
197 blocks
198 .iter()
199 .map(|block| match block {
200 Block::Text(text) => text.clone(),
201 Block::Json(value) => value.to_string(),
202 Block::Image { mime, data } => format!("[image: {}, {} bytes]", mime, data.len()),
203 Block::Diff { file, patch } => format!("diff for {}\n{}", file, patch),
204 })
205 .collect::<Vec<_>>()
206 .join("\n")
207}
208
209fn track_tool_diff(
210 diffs: &mut Vec<crate::event::FileDiff>,
211 tool_name: &str,
212 args: &serde_json::Value,
213 blocks: &[Block],
214) {
215 for block in blocks {
216 if let Block::Diff { file, patch } = block {
217 let plus = patch
218 .lines()
219 .filter(|line| line.starts_with('+') && !line.starts_with("+++"))
220 .count() as u32;
221 let minus = patch
222 .lines()
223 .filter(|line| line.starts_with('-') && !line.starts_with("---"))
224 .count() as u32;
225 if !diffs.iter().any(|diff| diff.file == *file) {
226 diffs.push(crate::event::FileDiff {
227 file: file.clone(),
228 plus,
229 minus,
230 });
231 }
232 }
233 }
234
235 if matches!(tool_name, "fs_write" | "edit" | "multi_edit") {
236 if let Some(path) = args.get("path").and_then(|value| value.as_str()) {
237 if !diffs.iter().any(|diff| diff.file == path) {
238 diffs.push(crate::event::FileDiff {
239 file: path.to_string(),
240 plus: 0,
241 minus: 0,
242 });
243 }
244 }
245 }
246}
247
248#[async_trait::async_trait]
251pub trait Orchestrator: Send + Sync {
252 async fn run_swarm(
253 &self,
254 plan: SwarmPlan,
255 event_tx: mpsc::UnboundedSender<Event>,
256 ) -> anyhow::Result<SwarmOutcome>;
257}
258
259pub struct DefaultOrchestrator {
262 router: Arc<dyn Router>,
263 config: Config,
264 memory: Arc<dyn Memory>,
265 file_locks: Arc<FileLocks>,
266}
267
268impl DefaultOrchestrator {
269 pub fn new(router: Arc<dyn Router>, config: Config, memory: Arc<dyn Memory>) -> Self {
270 Self {
271 router,
272 config,
273 memory,
274 file_locks: Arc::new(FileLocks::new()),
275 }
276 }
277
278 fn classify(&self, task: &str) -> TaskTier {
280 let lower = task.to_lowercase();
281 if lower.len() < 20 {
282 TaskTier::Trivial
283 } else if lower.contains("refactor") || lower.contains("architecture") {
284 TaskTier::Hard
285 } else if lower.contains("bug") || lower.contains("fix") {
286 TaskTier::Small
287 } else {
288 TaskTier::Medium
289 }
290 }
291
292 fn select_brain(&self, role: &str, tier: TaskTier) -> Option<Arc<dyn Brain>> {
294 let need = match role {
295 "planner" => crate::router::RoutingNeed {
296 tier: TaskTier::Hard, required_tools: false,
298 required_vision: false,
299 prefer_local: false,
300 },
301 "verifier" => crate::router::RoutingNeed {
302 tier: TaskTier::Medium, required_tools: true,
304 required_vision: false,
305 prefer_local: false,
306 },
307 _ => crate::router::RoutingNeed {
308 tier: match tier {
313 TaskTier::Trivial | TaskTier::Small => TaskTier::Medium,
314 other => other,
315 },
316 required_tools: true,
317 required_vision: false,
318 prefer_local: false,
319 },
320 };
321
322 let budget = BudgetState {
323 daily_limit_usd: self.config.budget.daily_usd,
324 daily_spent_usd: 0.0,
325 session_limit_usd: self.config.budget.session_usd,
326 session_spent_usd: 0.0,
327 };
328
329 let chain = self.router.select(&need, &budget);
332 if chain.is_empty() {
333 None
334 } else {
335 Some(Arc::new(FallbackBrain::new(chain)) as Arc<dyn Brain>)
336 }
337 }
338
339 async fn run_planner(
341 &self,
342 task: &str,
343 workspace: &Workspace,
344 brain: Arc<dyn Brain>,
345 event_tx: &mpsc::UnboundedSender<Event>,
346 parent_run: &RunId,
347 ) -> anyhow::Result<(String, f64, TokenUsage)> {
348 let planner_identity = Identity {
349 name: "planner".into(),
350 role: "technical architect and planner".into(),
351 personality: "analytical, thorough, produces clear structured plans with concrete steps and acceptance criteria.".into(),
352 };
353
354 let system = format!(
355 r#"You are the PLANNER agent in a swarm.
356
357{personality}
358
359Your job: take a task and produce a detailed implementation SPEC.
360- Break the task into clear, numbered steps.
361- For each step, specify what files to create/modify.
362- Include acceptance criteria for the verifier.
363- Output ONLY the spec. No code. No implementation.
364
365Output format:
366## SPEC: <title>
367
368### Step 1: <description>
369- Files: <list>
370- Changes: <what changes>
371- Acceptance: <verification criteria>
372
373### Step 2: ...
374"#,
375 personality = planner_identity.personality,
376 );
377
378 let messages = vec![Msg {
379 role: "user".into(),
380 content: vec![ContentBlock::Text {
381 text: format!("Task to plan:\n\n{}", task),
382 }],
383 }];
384
385 let tools = swarm_tool_registry(workspace, false);
386
387 let tool_specs = tools.to_specs();
388 let req = BrainRequest {
389 system: Some(system),
390 messages,
391 tools: tool_specs.clone(),
392 max_tokens: 4096,
393 temperature: 0.0,
394 stop: vec![],
395 cache: PromptCacheConfig::enabled(Some(prompt_cache_key(
396 "swarm-planner",
397 &workspace.root,
398 &tool_specs,
399 ))),
400 };
401
402 let _ = event_tx.send(Event::AgentStatus {
403 run: parent_run.clone(),
404 role: "planner".into(),
405 status: AgentStatus::Thinking,
406 note: format!("planning with {}", brain.id()),
407 });
408
409 let mut stream = brain.complete(req).await?;
410 let mut plan = String::new();
411 let caps = brain.caps();
412 let mut cost = 0.0_f64;
413 let mut tokens = TokenUsage {
414 input: 0,
415 output: 0,
416 };
417
418 while let Some(ev) = futures::StreamExt::next(&mut stream).await {
419 match ev {
420 crate::provider::BrainEvent::TextDelta(text) => {
421 plan.push_str(&text);
422 let _ = event_tx.send(Event::ThinkingDelta {
423 run: parent_run.clone(),
424 text,
425 });
426 }
427 crate::provider::BrainEvent::Usage(usage) => {
428 tokens.input = tokens.input.saturating_add(usage.input);
429 tokens.output = tokens.output.saturating_add(usage.output);
430 cost += caps.cost_input_per_mtok * (usage.input as f64) / 1_000_000.0
431 + caps.cost_output_per_mtok * (usage.output as f64) / 1_000_000.0;
432 }
433 crate::provider::BrainEvent::Done(_) => break,
434 crate::provider::BrainEvent::Error(e) => {
435 anyhow::bail!("Planner error: {}", e)
436 }
437 _ => {}
438 }
439 }
440
441 let _ = event_tx.send(Event::AgentStatus {
442 run: parent_run.clone(),
443 role: "planner".into(),
444 status: AgentStatus::Done,
445 note: "plan complete".into(),
446 });
447
448 Ok((plan, cost, tokens))
449 }
450
451 async fn run_coder(
453 &self,
454 spec: &str,
455 rework_notes: Option<&[String]>,
456 workspace: &Workspace,
457 brain: Arc<dyn Brain>,
458 event_tx: &mpsc::UnboundedSender<Event>,
459 parent_run: &RunId,
460 ) -> anyhow::Result<(Vec<crate::event::FileDiff>, f64, TokenUsage)> {
461 let coder_identity = Identity {
462 name: "coder".into(),
463 role: "implementation engineer".into(),
464 personality:
465 "precise, produces clean working code, uses exact file edits with the edit tool."
466 .into(),
467 };
468
469 let rework_section = if let Some(notes) = rework_notes {
470 if notes.is_empty() {
471 String::new()
472 } else {
473 format!(
474 "\n## REWORK NOTES (from verifier)\nThe previous implementation had issues. Fix these:\n{}",
475 notes
476 .iter()
477 .enumerate()
478 .map(|(i, n)| format!("{}. {}", i + 1, n))
479 .collect::<Vec<_>>()
480 .join("\n")
481 )
482 }
483 } else {
484 String::new()
485 };
486
487 let system = format!(
488 r#"You are the CODER agent in a swarm.
489
490{}
491
492Your job: implement the SPEC exactly. Use tools to read existing files and write edits.
493- Follow the spec steps in order.
494- Use the edit or fs_write tool to make changes.
495- After each file edit, note what you changed.
496- Produce working, compilable code.
497{}
498"#,
499 coder_identity.personality, rework_section,
500 );
501
502 let repo_map = self.memory.repo_map(&workspace.root);
504 let file_list: Vec<String> = repo_map
505 .files
506 .iter()
507 .map(|f| format!(" {}", f.path))
508 .collect();
509
510 let context_msg = format!(
511 "## SPEC TO IMPLEMENT\n\n{}\n\n## WORKSPACE FILES\n{}",
512 spec,
513 file_list.join("\n"),
514 );
515
516 let mut messages = vec![Msg {
517 role: "user".into(),
518 content: vec![ContentBlock::Text { text: context_msg }],
519 }];
520 let tools = swarm_tool_registry(workspace, true);
521 let tool_specs = tools.to_specs();
522
523 let _ = event_tx.send(Event::AgentStatus {
524 run: parent_run.clone(),
525 role: "coder".into(),
526 status: AgentStatus::Working,
527 note: format!("implementing with {}", brain.id()),
528 });
529
530 let mut output = String::new();
531 let mut diffs = Vec::new();
532 let caps = brain.caps();
533 let mut cost = 0.0_f64;
534 let mut tokens = TokenUsage {
535 input: 0,
536 output: 0,
537 };
538
539 for _turn in 0..8 {
540 let req = BrainRequest {
541 system: Some(system.clone()),
542 messages: messages.clone(),
543 tools: tool_specs.clone(),
544 max_tokens: 8192,
545 temperature: 0.0,
546 stop: vec![],
547 cache: PromptCacheConfig::enabled(Some(prompt_cache_key(
548 "swarm-coder",
549 &workspace.root,
550 &tool_specs,
551 ))),
552 };
553
554 let mut stream = brain.complete(req).await?;
555 let mut assistant_text = String::new();
556 let mut assistant_blocks = Vec::new();
557 let mut tool_result_blocks = Vec::new();
558 let mut current_tool_id = String::new();
559 let mut current_tool_name = String::new();
560 let mut current_tool_json = String::new();
561
562 while let Some(ev) = futures::StreamExt::next(&mut stream).await {
563 match ev {
564 crate::provider::BrainEvent::TextDelta(text) => {
565 output.push_str(&text);
566 assistant_text.push_str(&text);
567 let _ = event_tx.send(Event::ThinkingDelta {
568 run: parent_run.clone(),
569 text,
570 });
571 }
572 crate::provider::BrainEvent::ReasoningDelta(_) => {}
576 crate::provider::BrainEvent::ToolUseStart { id, name } => {
577 current_tool_id = id.clone();
578 current_tool_name = name.clone();
579 current_tool_json.clear();
580 let risk = tools
581 .get(&name)
582 .map(|tool| tool.risk())
583 .unwrap_or(RiskLevel::ReadOnly);
584 let _ = event_tx.send(Event::ToolUseProposed {
585 run: parent_run.clone(),
586 id,
587 name,
588 args: serde_json::json!({}),
589 risk,
590 });
591 }
592 crate::provider::BrainEvent::ToolUseDelta { id: _, json } => {
593 current_tool_json.push_str(&json);
594 }
595 crate::provider::BrainEvent::ToolUseEnd { id } => {
596 let args = serde_json::from_str::<serde_json::Value>(¤t_tool_json)
597 .unwrap_or_else(|_| serde_json::json!({}));
598 let tool_name = if current_tool_name.is_empty() {
599 "unknown".to_string()
600 } else {
601 current_tool_name.clone()
602 };
603 assistant_blocks.push(ContentBlock::ToolUse {
604 id: id.clone(),
605 name: tool_name.clone(),
606 input: args.clone(),
607 });
608 let _ = event_tx.send(Event::ToolUseStarted {
609 run: parent_run.clone(),
610 id: id.clone(),
611 });
612 let result = if let Some(tool) = tools.get(&tool_name) {
613 let ctx = ToolCtx {
614 workspace_root: workspace.root.clone(),
615 run_id: parent_run.clone(),
616 };
617 match tool.call(args.clone(), &ctx).await {
618 Ok(result) => result,
619 Err(err) => crate::tools::ToolResult::error(format!(
620 "Tool {} failed: {}",
621 tool_name, err
622 )),
623 }
624 } else {
625 crate::tools::ToolResult::error(format!("Unknown tool: {}", tool_name))
626 };
627 track_tool_diff(&mut diffs, &tool_name, &args, &result.content);
628 for diff in &diffs {
629 let _ = event_tx.send(Event::DiffProposed {
630 run: parent_run.clone(),
631 file: diff.file.clone(),
632 patch: String::new(),
633 plus: diff.plus,
634 minus: diff.minus,
635 });
636 }
637 let blocks = result.content.clone();
638 let text = tool_blocks_text(&blocks);
639 let _ = event_tx.send(Event::ToolOutput {
640 run: parent_run.clone(),
641 id: id.clone(),
642 blocks,
643 });
644 tool_result_blocks.push(ContentBlock::ToolResult {
645 tool_use_id: id,
646 content: vec![ContentBlock::Text { text }],
647 is_error: Some(result.is_error),
648 });
649 current_tool_id.clear();
650 current_tool_name.clear();
651 current_tool_json.clear();
652 }
653 crate::provider::BrainEvent::Done(_) => break,
654 crate::provider::BrainEvent::Error(e) => anyhow::bail!("Coder error: {}", e),
655 crate::provider::BrainEvent::Usage(usage) => {
656 tokens.input = tokens.input.saturating_add(usage.input);
657 tokens.output = tokens.output.saturating_add(usage.output);
658 cost += caps.cost_input_per_mtok * (usage.input as f64) / 1_000_000.0
659 + caps.cost_output_per_mtok * (usage.output as f64) / 1_000_000.0;
660 }
661 }
662 }
663
664 if !assistant_text.is_empty() {
665 assistant_blocks.insert(
666 0,
667 ContentBlock::Text {
668 text: assistant_text,
669 },
670 );
671 }
672 if tool_result_blocks.is_empty() {
673 break;
674 }
675 messages.push(Msg {
676 role: "assistant".into(),
677 content: assistant_blocks,
678 });
679 messages.push(Msg {
680 role: "user".into(),
681 content: tool_result_blocks,
682 });
683 }
684
685 if diffs.is_empty() {
686 for line in output.lines() {
687 if let Some(file) = line.strip_prefix("Edited ") {
688 let file = file
689 .trim()
690 .split(':')
691 .next()
692 .unwrap_or("")
693 .trim()
694 .to_string();
695 if !file.is_empty() && !diffs.iter().any(|d| d.file == file) {
696 diffs.push(crate::event::FileDiff {
697 file,
698 plus: 1,
699 minus: 1,
700 });
701 }
702 }
703 }
704 }
705
706 for diff in &diffs {
708 let _ = event_tx.send(Event::DiffProposed {
709 run: parent_run.clone(),
710 file: diff.file.clone(),
711 patch: String::new(),
712 plus: diff.plus,
713 minus: diff.minus,
714 });
715 }
716
717 let _ = event_tx.send(Event::AgentStatus {
718 run: parent_run.clone(),
719 role: "coder".into(),
720 status: AgentStatus::Done,
721 note: format!("{} files changed", diffs.len()),
722 });
723
724 Ok((diffs, cost, tokens))
725 }
726
727 async fn run_verifier(
729 &self,
730 spec: &str,
731 diffs: &[crate::event::FileDiff],
732 workspace: &Workspace,
733 brain: Arc<dyn Brain>,
734 event_tx: &mpsc::UnboundedSender<Event>,
735 parent_run: &RunId,
736 ) -> anyhow::Result<(Verdict, f64, TokenUsage)> {
737 let verifier_identity = Identity {
738 name: "verifier".into(),
739 role: "code reviewer and quality assurance".into(),
740 personality: "adversarial, meticulous, catches issues the coder missed. Checks correctness, style, edge cases, and spec compliance.".into(),
741 };
742
743 let diff_summary: String = diffs
744 .iter()
745 .map(|d| format!(" {}: +{} -{}", d.file, d.plus, d.minus))
746 .collect::<Vec<_>>()
747 .join("\n");
748
749 let tools = swarm_tool_registry(workspace, false);
750 let read_tool = tools.get("fs_read");
751 let mut files_to_check = Vec::new();
752 for d in diffs {
753 let content = if let Some(tool) = &read_tool {
754 let ctx = ToolCtx {
755 workspace_root: workspace.root.clone(),
756 run_id: parent_run.clone(),
757 };
758 let args = serde_json::json!({ "path": d.file, "limit": 220 });
759 match tool.call(args, &ctx).await {
760 Ok(result) => tool_blocks_text(&result.content),
761 Err(_) => format!("[cannot read {}]", d.file),
762 }
763 } else {
764 format!("[cannot read {}]", d.file)
765 };
766 files_to_check.push(content);
767 }
768
769 let files_context: String = diffs
770 .iter()
771 .zip(files_to_check.iter())
772 .map(|(d, content)| {
773 format!(
774 "### {}\n```\n{}\n```",
775 d.file,
776 if content.len() > 3000 {
777 format!("{}... [truncated]", &content[..3000])
778 } else {
779 content.clone()
780 }
781 )
782 })
783 .collect::<Vec<_>>()
784 .join("\n\n");
785
786 let system = format!(
787 r#"You are the VERIFIER agent in a swarm.
788
789{personality}
790
791Your job: review the coder's implementation against the SPEC.
792- For each spec requirement, check if it's satisfied.
793- Find bugs, style issues, missing edge cases, spec violations.
794- Output EXACTLY one of:
795 ✓ PASS — if everything is correct and complete.
796 ✗ REWORK — followed by numbered concrete findings.
797
798Format:
799✓ PASS
800(no issues found)
801
802or:
803
804✗ REWORK
8051. <specific finding with file:line>
8062. <another finding>
807"#,
808 personality = verifier_identity.personality,
809 );
810
811 let context = format!(
812 "## SPEC\n{}\n\n## CHANGED FILES\n{}\n\n## FILE CONTENTS\n{}",
813 spec, diff_summary, files_context,
814 );
815
816 let messages = vec![Msg {
817 role: "user".into(),
818 content: vec![ContentBlock::Text { text: context }],
819 }];
820
821 let tool_specs = tools.to_specs();
822 let req = BrainRequest {
823 system: Some(system),
824 messages,
825 tools: tool_specs.clone(),
826 max_tokens: 4096,
827 temperature: 0.0,
828 stop: vec![],
829 cache: PromptCacheConfig::enabled(Some(prompt_cache_key(
830 "swarm-verifier",
831 &workspace.root,
832 &tool_specs,
833 ))),
834 };
835
836 let _ = event_tx.send(Event::AgentStatus {
837 run: parent_run.clone(),
838 role: "verifier".into(),
839 status: AgentStatus::Working,
840 note: format!("reviewing with {}", brain.id()),
841 });
842
843 let mut stream = brain.complete(req).await?;
844 let mut verdict_text = String::new();
845 let caps = brain.caps();
846 let mut cost = 0.0_f64;
847 let mut tokens = TokenUsage {
848 input: 0,
849 output: 0,
850 };
851
852 while let Some(ev) = futures::StreamExt::next(&mut stream).await {
853 match ev {
854 crate::provider::BrainEvent::TextDelta(text) => {
855 verdict_text.push_str(&text);
856 }
857 crate::provider::BrainEvent::Usage(usage) => {
858 tokens.input = tokens.input.saturating_add(usage.input);
859 tokens.output = tokens.output.saturating_add(usage.output);
860 cost += caps.cost_input_per_mtok * (usage.input as f64) / 1_000_000.0
861 + caps.cost_output_per_mtok * (usage.output as f64) / 1_000_000.0;
862 }
863 crate::provider::BrainEvent::Done(_) => break,
864 _ => {}
865 }
866 }
867
868 let upper = verdict_text.to_uppercase();
870 let has_rework = upper.contains("REWORK")
871 || upper.contains("NEEDS REWORK")
872 || verdict_text.contains("✗");
873 let has_pass = (upper.contains("PASS") || verdict_text.contains("✓")) && !has_rework;
874
875 let verdict = if has_rework {
876 let findings: Vec<String> = verdict_text
877 .lines()
878 .filter(|l| l.trim().starts_with(|c: char| c.is_ascii_digit()) && l.contains('.'))
879 .map(|l| l.trim().to_string())
880 .collect();
881
882 if findings.is_empty() {
883 Verdict::Rework {
884 findings: vec![verdict_text.clone()],
885 }
886 } else {
887 Verdict::Rework { findings }
888 }
889 } else if has_pass {
890 Verdict::Pass
891 } else {
892 Verdict::Rework {
894 findings: vec![format!("Verifier verdict unclear: {}", verdict_text)],
895 }
896 };
897
898 let _ = event_tx.send(Event::AgentStatus {
899 run: parent_run.clone(),
900 role: "verifier".into(),
901 status: AgentStatus::Done,
902 note: match &verdict {
903 Verdict::Pass => "PASS".into(),
904 Verdict::Rework { findings } => format!("REWORK ({} issues)", findings.len()),
905 },
906 });
907
908 Ok((verdict, cost, tokens))
909 }
910}
911
912#[async_trait::async_trait]
913impl Orchestrator for DefaultOrchestrator {
914 async fn run_swarm(
915 &self,
916 plan: SwarmPlan,
917 event_tx: mpsc::UnboundedSender<Event>,
918 ) -> anyhow::Result<SwarmOutcome> {
919 let run_id = RunId::new();
920 let task = plan.task.clone();
921
922 let _ = event_tx.send(Event::RunStarted {
923 run: run_id.clone(),
924 task: task.clone(),
925 agent: "swarm".into(),
926 });
927
928 let planner_brain = self
930 .select_brain("planner", self.classify(&task))
931 .ok_or_else(|| anyhow::anyhow!("No model available for planner"))?;
932
933 let coder_brain = self
934 .select_brain("coder", self.classify(&task))
935 .unwrap_or_else(|| planner_brain.clone());
936
937 let verifier_brain = self
938 .select_brain("verifier", self.classify(&task))
939 .unwrap_or_else(|| coder_brain.clone());
940
941 let sandbox = LocalSandbox::new(plan.workspace.clone());
943 let workspace = Workspace {
944 root: plan.workspace.clone(),
945 sandbox: Arc::new(sandbox),
946 };
947
948 let mut total_cost: f64 = 0.0;
949 let mut total_tokens = TokenUsage {
950 input: 0,
951 output: 0,
952 };
953
954 let _ = event_tx.send(Event::AgentSpawned {
956 run: run_id.clone(),
957 role: "planner".into(),
958 model: planner_brain.id().to_string(),
959 });
960
961 let (spec, planner_cost, planner_tokens) = self
962 .run_planner(&task, &workspace, planner_brain, &event_tx, &run_id)
963 .await?;
964 total_cost += planner_cost;
965 total_tokens.input = total_tokens.input.saturating_add(planner_tokens.input);
966 total_tokens.output = total_tokens.output.saturating_add(planner_tokens.output);
967 let _ = event_tx.send(Event::CostUpdate {
968 run: run_id.clone(),
969 usd: total_cost,
970 });
971
972 let _ = self.memory.upsert_doc(crate::memory::WorkingDoc {
974 id: format!("plan-{}", run_id.0),
975 title: format!("Plan: {}", &task[..task.len().min(60)]),
976 content: spec.clone(),
977 updated_at: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
978 });
979
980 let _ = event_tx.send(Event::AgentSpawned {
982 run: run_id.clone(),
983 role: "coder".into(),
984 model: coder_brain.id().to_string(),
985 });
986
987 let _ = event_tx.send(Event::AgentSpawned {
988 run: run_id.clone(),
989 role: "verifier".into(),
990 model: verifier_brain.id().to_string(),
991 });
992
993 let mut rework_notes: Option<Vec<String>> = None;
994 let mut all_diffs: Vec<crate::event::FileDiff> = Vec::new();
995 let mut reworks = 0u32;
996 let mut passes = 0u32;
997
998 for attempt in 0..=plan.max_reworks {
999 let (diffs, coder_cost, coder_tokens) = self
1001 .run_coder(
1002 &spec,
1003 rework_notes.as_deref(),
1004 &workspace,
1005 coder_brain.clone(),
1006 &event_tx,
1007 &run_id,
1008 )
1009 .await?;
1010 total_cost += coder_cost;
1011 total_tokens.input = total_tokens.input.saturating_add(coder_tokens.input);
1012 total_tokens.output = total_tokens.output.saturating_add(coder_tokens.output);
1013 let _ = event_tx.send(Event::CostUpdate {
1014 run: run_id.clone(),
1015 usd: total_cost,
1016 });
1017
1018 if attempt > 0 {
1020 let prev_files: Vec<String> = all_diffs.iter().map(|d| d.file.clone()).collect();
1021 self.file_locks.release(&prev_files).await;
1022 }
1023
1024 let new_files: Vec<String> = diffs.iter().map(|d| d.file.clone()).collect();
1026 if let Err(conflicts) = self.file_locks.try_lock(&new_files).await {
1027 let _ = event_tx.send(Event::Error {
1028 run: run_id.clone(),
1029 message: format!("File collision detected: {:?}", conflicts),
1030 });
1031 }
1032
1033 all_diffs = diffs.clone();
1034
1035 if diffs.is_empty() && attempt < plan.max_reworks {
1040 reworks += 1;
1041 let note = "You made NO file changes. You MUST call the fs_write or edit \
1042 tool to create/modify the files in the spec — do not merely describe \
1043 the change in prose. Emit the tool call now."
1044 .to_string();
1045 rework_notes = Some(vec![note.clone()]);
1046 let _ = event_tx.send(Event::TestResult {
1047 run: run_id.clone(),
1048 passed: 0,
1049 failed: 1,
1050 detail: "no file changes — coder must use tools".into(),
1051 });
1052 let _ = event_tx.send(Event::AgentStatus {
1053 run: run_id.clone(),
1054 role: "coder".into(),
1055 status: AgentStatus::Working,
1056 note: format!("no diff — forcing tool use (attempt {})", attempt + 1),
1057 });
1058 continue;
1059 }
1060
1061 let (verdict, verifier_cost, verifier_tokens) = self
1063 .run_verifier(
1064 &spec,
1065 &diffs,
1066 &workspace,
1067 verifier_brain.clone(),
1068 &event_tx,
1069 &run_id,
1070 )
1071 .await?;
1072 total_cost += verifier_cost;
1073 total_tokens.input = total_tokens.input.saturating_add(verifier_tokens.input);
1074 total_tokens.output = total_tokens.output.saturating_add(verifier_tokens.output);
1075 let _ = event_tx.send(Event::CostUpdate {
1076 run: run_id.clone(),
1077 usd: total_cost,
1078 });
1079
1080 match verdict {
1081 Verdict::Pass => {
1082 passes += 1;
1083 let _ = self.memory.post_signal(crate::memory::SharedSignal {
1085 id: format!("pass-{}-{}", run_id.0, attempt),
1086 from_agent: "verifier".into(),
1087 to_agent: "coder".into(),
1088 content: "PASS — all checks satisfied".into(),
1089 timestamp: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
1090 });
1091
1092 let _ = event_tx.send(Event::TestResult {
1093 run: run_id.clone(),
1094 passed: 1,
1095 failed: 0,
1096 detail: "Verifier PASS".into(),
1097 });
1098 for diff in &diffs {
1099 let _ = event_tx.send(Event::DiffApplied {
1100 run: run_id.clone(),
1101 file: diff.file.clone(),
1102 });
1103 }
1104 break; }
1106 Verdict::Rework { findings } => {
1107 reworks += 1;
1108 rework_notes = Some(findings.clone());
1109
1110 let _ = self.memory.post_signal(crate::memory::SharedSignal {
1112 id: format!("rework-{}-{}", run_id.0, attempt),
1113 from_agent: "verifier".into(),
1114 to_agent: "coder".into(),
1115 content: format!("REWORK: {}", findings.join("; ")),
1116 timestamp: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
1117 });
1118
1119 let _ = event_tx.send(Event::TestResult {
1120 run: run_id.clone(),
1121 passed: 0,
1122 failed: findings.len() as u32,
1123 detail: findings.join("\n"),
1124 });
1125
1126 let _ = event_tx.send(Event::AgentStatus {
1127 run: run_id.clone(),
1128 role: "coder".into(),
1129 status: AgentStatus::Working,
1130 note: format!("rework attempt {}/{}", attempt + 1, plan.max_reworks),
1131 });
1132 }
1133 }
1134 }
1135
1136 let outcome = SwarmOutcome {
1137 status: if passes > 0 {
1138 "PASS".into()
1139 } else {
1140 format!("FAILED after {} reworks", reworks)
1141 },
1142 plan: Some(spec),
1143 diffs: all_diffs,
1144 passes,
1145 reworks,
1146 cost_usd: total_cost,
1147 };
1148
1149 let outcome_summary = OutcomeSummary {
1150 status: outcome.status.clone(),
1151 diffs: outcome.diffs.clone(),
1152 cost_usd: outcome.cost_usd,
1153 tokens: total_tokens,
1154 };
1155
1156 let _ = event_tx.send(Event::RunFinished {
1157 run: run_id,
1158 outcome: outcome_summary,
1159 });
1160
1161 Ok(outcome)
1162 }
1163}