1use std::collections::HashSet;
2use std::path::PathBuf;
3use std::sync::Arc;
4use tokio::sync::{Mutex, mpsc};
5
6use crate::config::Config;
7use crate::engine::{Identity, Workspace};
8use crate::event::{AgentStatus, Block, Event, OutcomeSummary, RiskLevel, RunId, TokenUsage};
9use crate::memory::Memory;
10use crate::provider::{
11 Brain, BrainRequest, BrainStream, ContentBlock, ModelCaps, Msg, PromptCacheConfig, ToolSpec,
12};
13use crate::router::{BudgetState, Router, TaskTier};
14use crate::sandbox::LocalSandbox;
15use crate::tools::edit::{Edit, MultiEdit};
16use crate::tools::exec::Exec;
17use crate::tools::fs::{FsList, FsRead, FsWrite};
18use crate::tools::git::Git;
19use crate::tools::search_and_web::Search;
20use crate::tools::{ToolCtx, ToolRegistry};
21
22fn prompt_cache_key(scope: &str, workspace_root: &std::path::Path, tools: &[ToolSpec]) -> String {
23 use std::hash::{Hash, Hasher};
24
25 let mut hasher = std::collections::hash_map::DefaultHasher::new();
26 scope.hash(&mut hasher);
27 workspace_root.display().to_string().hash(&mut hasher);
28 for tool in tools {
29 tool.name.hash(&mut hasher);
30 tool.description.hash(&mut hasher);
31 tool.input_schema.to_string().hash(&mut hasher);
32 }
33 format!("sparrow-{}-{:016x}", scope, hasher.finish())
34}
35
36struct FallbackBrain {
42 id: String,
43 caps: ModelCaps,
44 chain: Vec<Arc<dyn Brain>>,
45}
46
47impl FallbackBrain {
48 fn new(chain: Vec<Arc<dyn Brain>>) -> Self {
49 let id = chain
50 .first()
51 .map(|b| b.id().to_string())
52 .unwrap_or_else(|| "none".into());
53 let caps = chain.first().map(|b| b.caps()).unwrap_or_default();
54 Self { id, caps, chain }
55 }
56}
57
58#[async_trait::async_trait]
59impl Brain for FallbackBrain {
60 fn id(&self) -> &str {
61 &self.id
62 }
63 fn caps(&self) -> ModelCaps {
64 self.caps.clone()
65 }
66 async fn complete(&self, req: BrainRequest) -> anyhow::Result<BrainStream> {
67 let mut last_err: Option<anyhow::Error> = None;
68 for brain in &self.chain {
69 match brain.complete(req.clone()).await {
70 Ok(stream) => return Ok(stream),
71 Err(e) => {
72 tracing::warn!("swarm brain {} failed, trying next: {}", brain.id(), e);
73 last_err = Some(e);
74 }
75 }
76 }
77 Err(last_err.unwrap_or_else(|| anyhow::anyhow!("no brains in fallback chain")))
78 }
79}
80
81#[derive(Debug, Clone)]
84pub struct SwarmPlan {
85 pub task: String,
86 pub workspace: PathBuf,
87 pub max_reworks: u32,
88}
89
90impl Default for SwarmPlan {
91 fn default() -> Self {
92 Self {
93 task: String::new(),
94 workspace: PathBuf::from("."),
95 max_reworks: 3,
96 }
97 }
98}
99
100#[derive(Debug, Clone)]
101pub struct SwarmOutcome {
102 pub status: String,
103 pub plan: Option<String>,
104 pub diffs: Vec<crate::event::FileDiff>,
105 pub passes: u32,
106 pub reworks: u32,
107 pub cost_usd: f64,
108}
109
110#[derive(Debug, Clone)]
111pub enum Verdict {
112 Pass,
113 Rework { findings: Vec<String> },
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub enum SwarmPhase {
118 Planning,
119 Coding,
120 Verifying,
121 Reworking,
122 Done,
123}
124
125impl std::fmt::Display for SwarmPhase {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 match self {
128 SwarmPhase::Planning => write!(f, "planning"),
129 SwarmPhase::Coding => write!(f, "coding"),
130 SwarmPhase::Verifying => write!(f, "verifying"),
131 SwarmPhase::Reworking => write!(f, "reworking"),
132 SwarmPhase::Done => write!(f, "done"),
133 }
134 }
135}
136
137pub struct FileLocks {
140 locked: Mutex<HashSet<String>>,
141}
142
143impl FileLocks {
144 pub fn new() -> Self {
145 Self {
146 locked: Mutex::new(HashSet::new()),
147 }
148 }
149
150 pub async fn try_lock(&self, files: &[String]) -> Result<FileLockGuard, Vec<String>> {
151 let mut locked = self.locked.lock().await;
152 let mut conflicts = Vec::new();
153 for f in files {
154 if locked.contains(f) {
155 conflicts.push(f.clone());
156 }
157 }
158 if !conflicts.is_empty() {
159 return Err(conflicts);
160 }
161 for f in files {
162 locked.insert(f.clone());
163 }
164 Ok(FileLockGuard {
165 _files: files.to_vec(),
166 })
167 }
168
169 pub async fn release(&self, files: &[String]) {
170 let mut locked = self.locked.lock().await;
171 for f in files {
172 locked.remove(f);
173 }
174 }
175}
176
177pub struct FileLockGuard {
178 _files: Vec<String>,
179}
180
181fn swarm_tool_registry(workspace: &Workspace, write_enabled: bool) -> Arc<ToolRegistry> {
182 let mut registry = ToolRegistry::new();
183 registry.register(Arc::new(FsRead));
184 registry.register(Arc::new(FsList));
185 if write_enabled {
186 registry.register(Arc::new(FsWrite));
187 registry.register(Arc::new(Edit));
188 registry.register(Arc::new(MultiEdit));
189 registry.register(Arc::new(Search));
190 registry.register(Arc::new(Git));
191 registry.register(Arc::new(Exec::new(workspace.sandbox.clone())));
192 }
193 Arc::new(registry)
194}
195
196fn tool_blocks_text(blocks: &[Block]) -> String {
197 blocks
198 .iter()
199 .map(|block| match block {
200 Block::Text(text) => text.clone(),
201 Block::Json(value) => value.to_string(),
202 Block::Image { mime, data } => format!("[image: {}, {} bytes]", mime, data.len()),
203 Block::Diff { file, patch } => format!("diff for {}\n{}", file, patch),
204 })
205 .collect::<Vec<_>>()
206 .join("\n")
207}
208
209fn track_tool_diff(
210 diffs: &mut Vec<crate::event::FileDiff>,
211 tool_name: &str,
212 args: &serde_json::Value,
213 blocks: &[Block],
214) {
215 for block in blocks {
216 if let Block::Diff { file, patch } = block {
217 let plus = patch
218 .lines()
219 .filter(|line| line.starts_with('+') && !line.starts_with("+++"))
220 .count() as u32;
221 let minus = patch
222 .lines()
223 .filter(|line| line.starts_with('-') && !line.starts_with("---"))
224 .count() as u32;
225 if !diffs.iter().any(|diff| diff.file == *file) {
226 diffs.push(crate::event::FileDiff {
227 file: file.clone(),
228 plus,
229 minus,
230 });
231 }
232 }
233 }
234
235 if matches!(tool_name, "fs_write" | "edit" | "multi_edit") {
236 if let Some(path) = args.get("path").and_then(|value| value.as_str()) {
237 if !diffs.iter().any(|diff| diff.file == path) {
238 diffs.push(crate::event::FileDiff {
239 file: path.to_string(),
240 plus: 0,
241 minus: 0,
242 });
243 }
244 }
245 }
246}
247
248#[async_trait::async_trait]
251pub trait Orchestrator: Send + Sync {
252 async fn run_swarm(
253 &self,
254 plan: SwarmPlan,
255 event_tx: mpsc::UnboundedSender<Event>,
256 ) -> anyhow::Result<SwarmOutcome>;
257}
258
259pub struct DefaultOrchestrator {
262 router: Arc<dyn Router>,
263 config: Config,
264 memory: Arc<dyn Memory>,
265 file_locks: Arc<FileLocks>,
266}
267
268impl DefaultOrchestrator {
269 pub fn new(router: Arc<dyn Router>, config: Config, memory: Arc<dyn Memory>) -> Self {
270 Self {
271 router,
272 config,
273 memory,
274 file_locks: Arc::new(FileLocks::new()),
275 }
276 }
277
278 fn classify(&self, task: &str) -> TaskTier {
280 let lower = task.to_lowercase();
281 if lower.len() < 20 {
282 TaskTier::Trivial
283 } else if lower.contains("refactor") || lower.contains("architecture") {
284 TaskTier::Hard
285 } else if lower.contains("bug") || lower.contains("fix") {
286 TaskTier::Small
287 } else {
288 TaskTier::Medium
289 }
290 }
291
292 fn select_brain(&self, role: &str, tier: TaskTier) -> Option<Arc<dyn Brain>> {
294 let need = match role {
295 "planner" => crate::router::RoutingNeed {
296 tier: TaskTier::Hard, required_tools: false,
298 required_vision: false,
299 prefer_local: false,
300 },
301 "verifier" => crate::router::RoutingNeed {
302 tier: TaskTier::Medium, required_tools: true,
304 required_vision: false,
305 prefer_local: false,
306 },
307 _ => crate::router::RoutingNeed {
308 tier: match tier {
313 TaskTier::Trivial | TaskTier::Small => TaskTier::Medium,
314 other => other,
315 },
316 required_tools: true,
317 required_vision: false,
318 prefer_local: false,
319 },
320 };
321
322 let budget = BudgetState {
323 daily_limit_usd: self.config.budget.daily_usd,
324 daily_spent_usd: 0.0,
325 session_limit_usd: self.config.budget.session_usd,
326 session_spent_usd: 0.0,
327 };
328
329 let chain = self.router.select(&need, &budget);
332 if chain.is_empty() {
333 None
334 } else {
335 Some(Arc::new(FallbackBrain::new(chain)) as Arc<dyn Brain>)
336 }
337 }
338
339 async fn run_planner(
341 &self,
342 task: &str,
343 workspace: &Workspace,
344 brain: Arc<dyn Brain>,
345 event_tx: &mpsc::UnboundedSender<Event>,
346 parent_run: &RunId,
347 ) -> anyhow::Result<(String, f64, TokenUsage)> {
348 let planner_identity = Identity {
349 name: "planner".into(),
350 role: "technical architect and planner".into(),
351 personality: "analytical, thorough, produces clear structured plans with concrete steps and acceptance criteria.".into(),
352 };
353
354 let system = format!(
355 r#"You are the PLANNER agent in a swarm.
356
357{personality}
358
359Your job: take a task and produce a detailed implementation SPEC.
360- Break the task into clear, numbered steps.
361- For each step, specify what files to create/modify.
362- Include acceptance criteria for the verifier.
363- Output ONLY the spec. No code. No implementation.
364
365Output format:
366## SPEC: <title>
367
368### Step 1: <description>
369- Files: <list>
370- Changes: <what changes>
371- Acceptance: <verification criteria>
372
373### Step 2: ...
374"#,
375 personality = planner_identity.personality,
376 );
377
378 let messages = vec![Msg {
379 role: "user".into(),
380 content: vec![ContentBlock::Text {
381 text: format!("Task to plan:\n\n{}", task),
382 }],
383 }];
384
385 let tools = swarm_tool_registry(workspace, false);
386
387 let tool_specs = tools.to_specs();
388 let req = BrainRequest {
389 system: Some(system),
390 messages,
391 tools: tool_specs.clone(),
392 max_tokens: 4096,
393 temperature: 0.0,
394 stop: vec![],
395 cache: PromptCacheConfig::enabled(Some(prompt_cache_key(
396 "swarm-planner",
397 &workspace.root,
398 &tool_specs,
399 ))),
400 };
401
402 let _ = event_tx.send(Event::AgentStatus {
403 run: parent_run.clone(),
404 role: "planner".into(),
405 status: AgentStatus::Thinking,
406 note: format!("planning with {}", brain.id()),
407 });
408
409 let mut stream = brain.complete(req).await?;
410 let mut plan = String::new();
411 let caps = brain.caps();
412 let mut cost = 0.0_f64;
413 let mut tokens = TokenUsage {
414 input: 0,
415 output: 0,
416 };
417
418 while let Some(ev) = futures::StreamExt::next(&mut stream).await {
419 match ev {
420 crate::provider::BrainEvent::TextDelta(text) => {
421 plan.push_str(&text);
422 let _ = event_tx.send(Event::ThinkingDelta {
423 run: parent_run.clone(),
424 text,
425 });
426 }
427 crate::provider::BrainEvent::Usage(usage) => {
428 tokens.input = tokens.input.saturating_add(usage.input);
429 tokens.output = tokens.output.saturating_add(usage.output);
430 cost += caps.cost_input_per_mtok * (usage.input as f64) / 1_000_000.0
431 + caps.cost_output_per_mtok * (usage.output as f64) / 1_000_000.0;
432 }
433 crate::provider::BrainEvent::Done(_) => break,
434 crate::provider::BrainEvent::Error(e) => {
435 anyhow::bail!("Planner error: {}", e)
436 }
437 _ => {}
438 }
439 }
440
441 let _ = event_tx.send(Event::AgentStatus {
442 run: parent_run.clone(),
443 role: "planner".into(),
444 status: AgentStatus::Done,
445 note: "plan complete".into(),
446 });
447
448 Ok((plan, cost, tokens))
449 }
450
451 async fn run_coder(
453 &self,
454 spec: &str,
455 rework_notes: Option<&[String]>,
456 workspace: &Workspace,
457 brain: Arc<dyn Brain>,
458 event_tx: &mpsc::UnboundedSender<Event>,
459 parent_run: &RunId,
460 ) -> anyhow::Result<(Vec<crate::event::FileDiff>, f64, TokenUsage)> {
461 let coder_identity = Identity {
462 name: "coder".into(),
463 role: "implementation engineer".into(),
464 personality:
465 "precise, produces clean working code, uses exact file edits with the edit tool."
466 .into(),
467 };
468
469 let rework_section = if let Some(notes) = rework_notes {
470 if notes.is_empty() {
471 String::new()
472 } else {
473 format!(
474 "\n## REWORK NOTES (from verifier)\nThe previous implementation had issues. Fix these:\n{}",
475 notes
476 .iter()
477 .enumerate()
478 .map(|(i, n)| format!("{}. {}", i + 1, n))
479 .collect::<Vec<_>>()
480 .join("\n")
481 )
482 }
483 } else {
484 String::new()
485 };
486
487 let system = format!(
488 r#"You are the CODER agent in a swarm.
489
490{}
491
492Your job: implement the SPEC exactly. Use tools to read existing files and write edits.
493- Follow the spec steps in order.
494- Use the edit or fs_write tool to make changes.
495- After each file edit, note what you changed.
496- Produce working, compilable code.
497{}
498"#,
499 coder_identity.personality, rework_section,
500 );
501
502 let repo_map = self.memory.repo_map(&workspace.root);
504 let file_list: Vec<String> = repo_map
505 .files
506 .iter()
507 .map(|f| format!(" {}", f.path))
508 .collect();
509
510 let context_msg = format!(
511 "## SPEC TO IMPLEMENT\n\n{}\n\n## WORKSPACE FILES\n{}",
512 spec,
513 file_list.join("\n"),
514 );
515
516 let mut messages = vec![Msg {
517 role: "user".into(),
518 content: vec![ContentBlock::Text { text: context_msg }],
519 }];
520 let tools = swarm_tool_registry(workspace, true);
521 let tool_specs = tools.to_specs();
522
523 let _ = event_tx.send(Event::AgentStatus {
524 run: parent_run.clone(),
525 role: "coder".into(),
526 status: AgentStatus::Working,
527 note: format!("implementing with {}", brain.id()),
528 });
529
530 let mut output = String::new();
531 let mut diffs = Vec::new();
532 let caps = brain.caps();
533 let mut cost = 0.0_f64;
534 let mut tokens = TokenUsage {
535 input: 0,
536 output: 0,
537 };
538
539 for _turn in 0..8 {
540 let req = BrainRequest {
541 system: Some(system.clone()),
542 messages: messages.clone(),
543 tools: tool_specs.clone(),
544 max_tokens: 8192,
545 temperature: 0.0,
546 stop: vec![],
547 cache: PromptCacheConfig::enabled(Some(prompt_cache_key(
548 "swarm-coder",
549 &workspace.root,
550 &tool_specs,
551 ))),
552 };
553
554 let mut stream = brain.complete(req).await?;
555 let mut assistant_text = String::new();
556 let mut assistant_blocks = Vec::new();
557 let mut tool_result_blocks = Vec::new();
558 let mut current_tool_id = String::new();
559 let mut current_tool_name = String::new();
560 let mut current_tool_json = String::new();
561
562 while let Some(ev) = futures::StreamExt::next(&mut stream).await {
563 match ev {
564 crate::provider::BrainEvent::TextDelta(text) => {
565 output.push_str(&text);
566 assistant_text.push_str(&text);
567 let _ = event_tx.send(Event::ThinkingDelta {
568 run: parent_run.clone(),
569 text,
570 });
571 }
572 crate::provider::BrainEvent::ReasoningDelta(_) => {}
576 crate::provider::BrainEvent::ToolUseStart { id, name } => {
577 current_tool_id = id.clone();
578 current_tool_name = name.clone();
579 current_tool_json.clear();
580 let risk = tools
581 .get(&name)
582 .map(|tool| tool.risk())
583 .unwrap_or(RiskLevel::ReadOnly);
584 let _ = event_tx.send(Event::ToolUseProposed {
585 run: parent_run.clone(),
586 id,
587 name,
588 args: serde_json::json!({}),
589 risk,
590 });
591 }
592 crate::provider::BrainEvent::ToolUseDelta { id: _, json } => {
593 current_tool_json.push_str(&json);
594 }
595 crate::provider::BrainEvent::ToolUseEnd { id } => {
596 let args = serde_json::from_str::<serde_json::Value>(¤t_tool_json)
597 .unwrap_or_else(|_| serde_json::json!({}));
598 let tool_name = if current_tool_name.is_empty() {
599 "unknown".to_string()
600 } else {
601 current_tool_name.clone()
602 };
603 assistant_blocks.push(ContentBlock::ToolUse {
604 id: id.clone(),
605 name: tool_name.clone(),
606 input: args.clone(),
607 });
608 let _ = event_tx.send(Event::ToolUseStarted {
609 run: parent_run.clone(),
610 id: id.clone(),
611 });
612 let result = if let Some(tool) = tools.get(&tool_name) {
613 let ctx = ToolCtx {
614 workspace_root: workspace.root.clone(),
615 run_id: parent_run.clone(),
616 };
617 match tool.call(args.clone(), &ctx).await {
618 Ok(result) => result,
619 Err(err) => crate::tools::ToolResult::error(format!(
620 "Tool {} failed: {}",
621 tool_name, err
622 )),
623 }
624 } else {
625 crate::tools::ToolResult::error(format!("Unknown tool: {}", tool_name))
626 };
627 track_tool_diff(&mut diffs, &tool_name, &args, &result.content);
628 for diff in &diffs {
629 let _ = event_tx.send(Event::DiffProposed {
630 run: parent_run.clone(),
631 file: diff.file.clone(),
632 patch: String::new(),
633 plus: diff.plus,
634 minus: diff.minus,
635 });
636 }
637 let blocks = result.content.clone();
638 let text = tool_blocks_text(&blocks);
639 let _ = event_tx.send(Event::ToolOutput {
640 run: parent_run.clone(),
641 id: id.clone(),
642 blocks,
643 is_error: result.is_error,
644 });
645 tool_result_blocks.push(ContentBlock::ToolResult {
646 tool_use_id: id,
647 content: vec![ContentBlock::Text { text }],
648 is_error: Some(result.is_error),
649 });
650 current_tool_id.clear();
651 current_tool_name.clear();
652 current_tool_json.clear();
653 }
654 crate::provider::BrainEvent::Done(_) => break,
655 crate::provider::BrainEvent::Error(e) => anyhow::bail!("Coder error: {}", e),
656 crate::provider::BrainEvent::Usage(usage) => {
657 tokens.input = tokens.input.saturating_add(usage.input);
658 tokens.output = tokens.output.saturating_add(usage.output);
659 cost += caps.cost_input_per_mtok * (usage.input as f64) / 1_000_000.0
660 + caps.cost_output_per_mtok * (usage.output as f64) / 1_000_000.0;
661 }
662 }
663 }
664
665 if !assistant_text.is_empty() {
666 assistant_blocks.insert(
667 0,
668 ContentBlock::Text {
669 text: assistant_text,
670 },
671 );
672 }
673 if tool_result_blocks.is_empty() {
674 break;
675 }
676 messages.push(Msg {
677 role: "assistant".into(),
678 content: assistant_blocks,
679 });
680 messages.push(Msg {
681 role: "user".into(),
682 content: tool_result_blocks,
683 });
684 }
685
686 if diffs.is_empty() {
687 for line in output.lines() {
688 if let Some(file) = line.strip_prefix("Edited ") {
689 let file = file
690 .trim()
691 .split(':')
692 .next()
693 .unwrap_or("")
694 .trim()
695 .to_string();
696 if !file.is_empty() && !diffs.iter().any(|d| d.file == file) {
697 diffs.push(crate::event::FileDiff {
698 file,
699 plus: 1,
700 minus: 1,
701 });
702 }
703 }
704 }
705 }
706
707 for diff in &diffs {
709 let _ = event_tx.send(Event::DiffProposed {
710 run: parent_run.clone(),
711 file: diff.file.clone(),
712 patch: String::new(),
713 plus: diff.plus,
714 minus: diff.minus,
715 });
716 }
717
718 let _ = event_tx.send(Event::AgentStatus {
719 run: parent_run.clone(),
720 role: "coder".into(),
721 status: AgentStatus::Done,
722 note: format!("{} files changed", diffs.len()),
723 });
724
725 Ok((diffs, cost, tokens))
726 }
727
728 async fn run_verifier(
730 &self,
731 spec: &str,
732 diffs: &[crate::event::FileDiff],
733 workspace: &Workspace,
734 brain: Arc<dyn Brain>,
735 event_tx: &mpsc::UnboundedSender<Event>,
736 parent_run: &RunId,
737 ) -> anyhow::Result<(Verdict, f64, TokenUsage)> {
738 let verifier_identity = Identity {
739 name: "verifier".into(),
740 role: "code reviewer and quality assurance".into(),
741 personality: "adversarial, meticulous, catches issues the coder missed. Checks correctness, style, edge cases, and spec compliance.".into(),
742 };
743
744 let diff_summary: String = diffs
745 .iter()
746 .map(|d| format!(" {}: +{} -{}", d.file, d.plus, d.minus))
747 .collect::<Vec<_>>()
748 .join("\n");
749
750 let tools = swarm_tool_registry(workspace, false);
751 let read_tool = tools.get("fs_read");
752 let mut files_to_check = Vec::new();
753 for d in diffs {
754 let content = if let Some(tool) = &read_tool {
755 let ctx = ToolCtx {
756 workspace_root: workspace.root.clone(),
757 run_id: parent_run.clone(),
758 };
759 let args = serde_json::json!({ "path": d.file, "limit": 220 });
760 match tool.call(args, &ctx).await {
761 Ok(result) => tool_blocks_text(&result.content),
762 Err(_) => format!("[cannot read {}]", d.file),
763 }
764 } else {
765 format!("[cannot read {}]", d.file)
766 };
767 files_to_check.push(content);
768 }
769
770 let files_context: String = diffs
771 .iter()
772 .zip(files_to_check.iter())
773 .map(|(d, content)| {
774 format!(
775 "### {}\n```\n{}\n```",
776 d.file,
777 if content.len() > 3000 {
778 format!("{}... [truncated]", &content[..3000])
779 } else {
780 content.clone()
781 }
782 )
783 })
784 .collect::<Vec<_>>()
785 .join("\n\n");
786
787 let system = format!(
788 r#"You are the VERIFIER agent in a swarm.
789
790{personality}
791
792Your job: review the coder's implementation against the SPEC.
793- For each spec requirement, check if it's satisfied.
794- Find bugs, style issues, missing edge cases, spec violations.
795- Output EXACTLY one of:
796 ✓ PASS — if everything is correct and complete.
797 ✗ REWORK — followed by numbered concrete findings.
798
799Format:
800✓ PASS
801(no issues found)
802
803or:
804
805✗ REWORK
8061. <specific finding with file:line>
8072. <another finding>
808"#,
809 personality = verifier_identity.personality,
810 );
811
812 let context = format!(
813 "## SPEC\n{}\n\n## CHANGED FILES\n{}\n\n## FILE CONTENTS\n{}",
814 spec, diff_summary, files_context,
815 );
816
817 let messages = vec![Msg {
818 role: "user".into(),
819 content: vec![ContentBlock::Text { text: context }],
820 }];
821
822 let tool_specs = tools.to_specs();
823 let req = BrainRequest {
824 system: Some(system),
825 messages,
826 tools: tool_specs.clone(),
827 max_tokens: 4096,
828 temperature: 0.0,
829 stop: vec![],
830 cache: PromptCacheConfig::enabled(Some(prompt_cache_key(
831 "swarm-verifier",
832 &workspace.root,
833 &tool_specs,
834 ))),
835 };
836
837 let _ = event_tx.send(Event::AgentStatus {
838 run: parent_run.clone(),
839 role: "verifier".into(),
840 status: AgentStatus::Working,
841 note: format!("reviewing with {}", brain.id()),
842 });
843
844 let mut stream = brain.complete(req).await?;
845 let mut verdict_text = String::new();
846 let caps = brain.caps();
847 let mut cost = 0.0_f64;
848 let mut tokens = TokenUsage {
849 input: 0,
850 output: 0,
851 };
852
853 while let Some(ev) = futures::StreamExt::next(&mut stream).await {
854 match ev {
855 crate::provider::BrainEvent::TextDelta(text) => {
856 verdict_text.push_str(&text);
857 }
858 crate::provider::BrainEvent::Usage(usage) => {
859 tokens.input = tokens.input.saturating_add(usage.input);
860 tokens.output = tokens.output.saturating_add(usage.output);
861 cost += caps.cost_input_per_mtok * (usage.input as f64) / 1_000_000.0
862 + caps.cost_output_per_mtok * (usage.output as f64) / 1_000_000.0;
863 }
864 crate::provider::BrainEvent::Done(_) => break,
865 _ => {}
866 }
867 }
868
869 let upper = verdict_text.to_uppercase();
871 let has_rework = upper.contains("REWORK")
872 || upper.contains("NEEDS REWORK")
873 || verdict_text.contains("✗");
874 let has_pass = (upper.contains("PASS") || verdict_text.contains("✓")) && !has_rework;
875
876 let verdict = if has_rework {
877 let findings: Vec<String> = verdict_text
878 .lines()
879 .filter(|l| l.trim().starts_with(|c: char| c.is_ascii_digit()) && l.contains('.'))
880 .map(|l| l.trim().to_string())
881 .collect();
882
883 if findings.is_empty() {
884 Verdict::Rework {
885 findings: vec![verdict_text.clone()],
886 }
887 } else {
888 Verdict::Rework { findings }
889 }
890 } else if has_pass {
891 Verdict::Pass
892 } else {
893 Verdict::Rework {
895 findings: vec![format!("Verifier verdict unclear: {}", verdict_text)],
896 }
897 };
898
899 let _ = event_tx.send(Event::AgentStatus {
900 run: parent_run.clone(),
901 role: "verifier".into(),
902 status: AgentStatus::Done,
903 note: match &verdict {
904 Verdict::Pass => "PASS".into(),
905 Verdict::Rework { findings } => format!("REWORK ({} issues)", findings.len()),
906 },
907 });
908
909 Ok((verdict, cost, tokens))
910 }
911}
912
913#[async_trait::async_trait]
914impl Orchestrator for DefaultOrchestrator {
915 async fn run_swarm(
916 &self,
917 plan: SwarmPlan,
918 event_tx: mpsc::UnboundedSender<Event>,
919 ) -> anyhow::Result<SwarmOutcome> {
920 let run_id = RunId::new();
921 let task = plan.task.clone();
922
923 let _ = event_tx.send(Event::RunStarted {
924 run: run_id.clone(),
925 task: task.clone(),
926 agent: "swarm".into(),
927 });
928
929 let planner_brain = self
931 .select_brain("planner", self.classify(&task))
932 .ok_or_else(|| anyhow::anyhow!("No model available for planner"))?;
933
934 let coder_brain = self
935 .select_brain("coder", self.classify(&task))
936 .unwrap_or_else(|| planner_brain.clone());
937
938 let verifier_brain = self
939 .select_brain("verifier", self.classify(&task))
940 .unwrap_or_else(|| coder_brain.clone());
941
942 let sandbox = LocalSandbox::new(plan.workspace.clone());
944 let workspace = Workspace {
945 root: plan.workspace.clone(),
946 sandbox: Arc::new(sandbox),
947 };
948
949 let mut total_cost: f64 = 0.0;
950 let mut total_tokens = TokenUsage {
951 input: 0,
952 output: 0,
953 };
954
955 let _ = event_tx.send(Event::AgentSpawned {
957 run: run_id.clone(),
958 role: "planner".into(),
959 model: planner_brain.id().to_string(),
960 });
961
962 let (spec, planner_cost, planner_tokens) = self
963 .run_planner(&task, &workspace, planner_brain, &event_tx, &run_id)
964 .await?;
965 total_cost += planner_cost;
966 total_tokens.input = total_tokens.input.saturating_add(planner_tokens.input);
967 total_tokens.output = total_tokens.output.saturating_add(planner_tokens.output);
968 let _ = event_tx.send(Event::CostUpdate {
969 run: run_id.clone(),
970 usd: total_cost,
971 });
972
973 let _ = self.memory.upsert_doc(crate::memory::WorkingDoc {
975 id: format!("plan-{}", run_id.0),
976 title: format!("Plan: {}", &task[..task.len().min(60)]),
977 content: spec.clone(),
978 updated_at: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
979 });
980
981 let _ = event_tx.send(Event::AgentSpawned {
983 run: run_id.clone(),
984 role: "coder".into(),
985 model: coder_brain.id().to_string(),
986 });
987
988 let _ = event_tx.send(Event::AgentSpawned {
989 run: run_id.clone(),
990 role: "verifier".into(),
991 model: verifier_brain.id().to_string(),
992 });
993
994 let mut rework_notes: Option<Vec<String>> = None;
995 let mut all_diffs: Vec<crate::event::FileDiff> = Vec::new();
996 let mut reworks = 0u32;
997 let mut passes = 0u32;
998
999 for attempt in 0..=plan.max_reworks {
1000 let (diffs, coder_cost, coder_tokens) = self
1002 .run_coder(
1003 &spec,
1004 rework_notes.as_deref(),
1005 &workspace,
1006 coder_brain.clone(),
1007 &event_tx,
1008 &run_id,
1009 )
1010 .await?;
1011 total_cost += coder_cost;
1012 total_tokens.input = total_tokens.input.saturating_add(coder_tokens.input);
1013 total_tokens.output = total_tokens.output.saturating_add(coder_tokens.output);
1014 let _ = event_tx.send(Event::CostUpdate {
1015 run: run_id.clone(),
1016 usd: total_cost,
1017 });
1018
1019 if attempt > 0 {
1021 let prev_files: Vec<String> = all_diffs.iter().map(|d| d.file.clone()).collect();
1022 self.file_locks.release(&prev_files).await;
1023 }
1024
1025 let new_files: Vec<String> = diffs.iter().map(|d| d.file.clone()).collect();
1027 if let Err(conflicts) = self.file_locks.try_lock(&new_files).await {
1028 let _ = event_tx.send(Event::Error {
1029 run: run_id.clone(),
1030 message: format!("File collision detected: {:?}", conflicts),
1031 });
1032 }
1033
1034 all_diffs = diffs.clone();
1035
1036 if diffs.is_empty() && attempt < plan.max_reworks {
1041 reworks += 1;
1042 let note = "You made NO file changes. You MUST call the fs_write or edit \
1043 tool to create/modify the files in the spec — do not merely describe \
1044 the change in prose. Emit the tool call now."
1045 .to_string();
1046 rework_notes = Some(vec![note.clone()]);
1047 let _ = event_tx.send(Event::TestResult {
1048 run: run_id.clone(),
1049 passed: 0,
1050 failed: 1,
1051 detail: "no file changes — coder must use tools".into(),
1052 });
1053 let _ = event_tx.send(Event::AgentStatus {
1054 run: run_id.clone(),
1055 role: "coder".into(),
1056 status: AgentStatus::Working,
1057 note: format!("no diff — forcing tool use (attempt {})", attempt + 1),
1058 });
1059 continue;
1060 }
1061
1062 let (verdict, verifier_cost, verifier_tokens) = self
1064 .run_verifier(
1065 &spec,
1066 &diffs,
1067 &workspace,
1068 verifier_brain.clone(),
1069 &event_tx,
1070 &run_id,
1071 )
1072 .await?;
1073 total_cost += verifier_cost;
1074 total_tokens.input = total_tokens.input.saturating_add(verifier_tokens.input);
1075 total_tokens.output = total_tokens.output.saturating_add(verifier_tokens.output);
1076 let _ = event_tx.send(Event::CostUpdate {
1077 run: run_id.clone(),
1078 usd: total_cost,
1079 });
1080
1081 match verdict {
1082 Verdict::Pass => {
1083 passes += 1;
1084 let _ = self.memory.post_signal(crate::memory::SharedSignal {
1086 id: format!("pass-{}-{}", run_id.0, attempt),
1087 from_agent: "verifier".into(),
1088 to_agent: "coder".into(),
1089 content: "PASS — all checks satisfied".into(),
1090 timestamp: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
1091 });
1092
1093 let _ = event_tx.send(Event::TestResult {
1094 run: run_id.clone(),
1095 passed: 1,
1096 failed: 0,
1097 detail: "Verifier PASS".into(),
1098 });
1099 for diff in &diffs {
1100 let _ = event_tx.send(Event::DiffApplied {
1101 run: run_id.clone(),
1102 file: diff.file.clone(),
1103 });
1104 }
1105 break; }
1107 Verdict::Rework { findings } => {
1108 reworks += 1;
1109 rework_notes = Some(findings.clone());
1110
1111 let _ = self.memory.post_signal(crate::memory::SharedSignal {
1113 id: format!("rework-{}-{}", run_id.0, attempt),
1114 from_agent: "verifier".into(),
1115 to_agent: "coder".into(),
1116 content: format!("REWORK: {}", findings.join("; ")),
1117 timestamp: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
1118 });
1119
1120 let _ = event_tx.send(Event::TestResult {
1121 run: run_id.clone(),
1122 passed: 0,
1123 failed: findings.len() as u32,
1124 detail: findings.join("\n"),
1125 });
1126
1127 let _ = event_tx.send(Event::AgentStatus {
1128 run: run_id.clone(),
1129 role: "coder".into(),
1130 status: AgentStatus::Working,
1131 note: format!("rework attempt {}/{}", attempt + 1, plan.max_reworks),
1132 });
1133 }
1134 }
1135 }
1136
1137 let outcome = SwarmOutcome {
1138 status: if passes > 0 {
1139 "PASS".into()
1140 } else {
1141 format!("FAILED after {} reworks", reworks)
1142 },
1143 plan: Some(spec),
1144 diffs: all_diffs,
1145 passes,
1146 reworks,
1147 cost_usd: total_cost,
1148 };
1149
1150 let cost_comparison = crate::cost::format_comparison(outcome.cost_usd, &total_tokens);
1151 let outcome_summary = OutcomeSummary {
1152 status: outcome.status.clone(),
1153 diffs: outcome.diffs.clone(),
1154 cost_usd: outcome.cost_usd,
1155 tokens: total_tokens,
1156 cost_comparison,
1157 duration_ms: None,
1158 };
1159
1160 let _ = event_tx.send(Event::RunFinished {
1161 run: run_id,
1162 outcome: outcome_summary,
1163 });
1164
1165 Ok(outcome)
1166 }
1167}