Skip to main content

sparrow/orchestrator/
mod.rs

1use std::collections::HashSet;
2use std::path::PathBuf;
3use std::sync::Arc;
4use tokio::sync::{Mutex, mpsc};
5
6use crate::config::Config;
7use crate::engine::{Identity, Workspace};
8use crate::event::{AgentStatus, Block, Event, OutcomeSummary, RiskLevel, RunId, TokenUsage};
9use crate::memory::Memory;
10use crate::provider::{
11    Brain, BrainRequest, BrainStream, ContentBlock, ModelCaps, Msg, PromptCacheConfig, ToolSpec,
12};
13use crate::router::{BudgetState, Router, TaskTier};
14use crate::sandbox::LocalSandbox;
15use crate::tools::edit::{Edit, MultiEdit};
16use crate::tools::exec::Exec;
17use crate::tools::fs::{FsList, FsRead, FsWrite};
18use crate::tools::git::Git;
19use crate::tools::search_and_web::Search;
20use crate::tools::{ToolCtx, ToolRegistry};
21
22fn prompt_cache_key(scope: &str, workspace_root: &std::path::Path, tools: &[ToolSpec]) -> String {
23    use std::hash::{Hash, Hasher};
24
25    let mut hasher = std::collections::hash_map::DefaultHasher::new();
26    scope.hash(&mut hasher);
27    workspace_root.display().to_string().hash(&mut hasher);
28    for tool in tools {
29        tool.name.hash(&mut hasher);
30        tool.description.hash(&mut hasher);
31        tool.input_schema.to_string().hash(&mut hasher);
32    }
33    format!("sparrow-{}-{:016x}", scope, hasher.finish())
34}
35
36// ─── Fallback brain ──────────────────────────────────────────────────────────
37// Wraps an ordered chain of brains and tries each on `complete()` until one
38// succeeds. Lets each swarm role (planner/coder/verifier) survive a model that
39// 404s or rate-limits — discovery sometimes lists models that aren't callable.
40
41struct FallbackBrain {
42    id: String,
43    caps: ModelCaps,
44    chain: Vec<Arc<dyn Brain>>,
45}
46
47impl FallbackBrain {
48    fn new(chain: Vec<Arc<dyn Brain>>) -> Self {
49        let id = chain
50            .first()
51            .map(|b| b.id().to_string())
52            .unwrap_or_else(|| "none".into());
53        let caps = chain.first().map(|b| b.caps()).unwrap_or_default();
54        Self { id, caps, chain }
55    }
56}
57
58#[async_trait::async_trait]
59impl Brain for FallbackBrain {
60    fn id(&self) -> &str {
61        &self.id
62    }
63    fn caps(&self) -> ModelCaps {
64        self.caps.clone()
65    }
66    async fn complete(&self, req: BrainRequest) -> anyhow::Result<BrainStream> {
67        let mut last_err: Option<anyhow::Error> = None;
68        for brain in &self.chain {
69            match brain.complete(req.clone()).await {
70                Ok(stream) => return Ok(stream),
71                Err(e) => {
72                    tracing::warn!("swarm brain {} failed, trying next: {}", brain.id(), e);
73                    last_err = Some(e);
74                }
75            }
76        }
77        Err(last_err.unwrap_or_else(|| anyhow::anyhow!("no brains in fallback chain")))
78    }
79}
80
81// ─── Swarm types ────────────────────────────────────────────────────────────────
82
83#[derive(Debug, Clone)]
84pub struct SwarmPlan {
85    pub task: String,
86    pub workspace: PathBuf,
87    pub max_reworks: u32,
88}
89
90impl Default for SwarmPlan {
91    fn default() -> Self {
92        Self {
93            task: String::new(),
94            workspace: PathBuf::from("."),
95            max_reworks: 3,
96        }
97    }
98}
99
100#[derive(Debug, Clone)]
101pub struct SwarmOutcome {
102    pub status: String,
103    pub plan: Option<String>,
104    pub diffs: Vec<crate::event::FileDiff>,
105    pub passes: u32,
106    pub reworks: u32,
107    pub cost_usd: f64,
108}
109
110#[derive(Debug, Clone)]
111pub enum Verdict {
112    Pass,
113    Rework { findings: Vec<String> },
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub enum SwarmPhase {
118    Planning,
119    Coding,
120    Verifying,
121    Reworking,
122    Done,
123}
124
125impl std::fmt::Display for SwarmPhase {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        match self {
128            SwarmPhase::Planning => write!(f, "planning"),
129            SwarmPhase::Coding => write!(f, "coding"),
130            SwarmPhase::Verifying => write!(f, "verifying"),
131            SwarmPhase::Reworking => write!(f, "reworking"),
132            SwarmPhase::Done => write!(f, "done"),
133        }
134    }
135}
136
137// ─── Anti-collision: file-level locks ───────────────────────────────────────────
138
139pub struct FileLocks {
140    locked: Mutex<HashSet<String>>,
141}
142
143impl FileLocks {
144    pub fn new() -> Self {
145        Self {
146            locked: Mutex::new(HashSet::new()),
147        }
148    }
149
150    pub async fn try_lock(&self, files: &[String]) -> Result<FileLockGuard, Vec<String>> {
151        let mut locked = self.locked.lock().await;
152        let mut conflicts = Vec::new();
153        for f in files {
154            if locked.contains(f) {
155                conflicts.push(f.clone());
156            }
157        }
158        if !conflicts.is_empty() {
159            return Err(conflicts);
160        }
161        for f in files {
162            locked.insert(f.clone());
163        }
164        Ok(FileLockGuard {
165            _files: files.to_vec(),
166        })
167    }
168
169    pub async fn release(&self, files: &[String]) {
170        let mut locked = self.locked.lock().await;
171        for f in files {
172            locked.remove(f);
173        }
174    }
175}
176
177pub struct FileLockGuard {
178    _files: Vec<String>,
179}
180
181fn swarm_tool_registry(workspace: &Workspace, write_enabled: bool) -> Arc<ToolRegistry> {
182    let mut registry = ToolRegistry::new();
183    registry.register(Arc::new(FsRead));
184    registry.register(Arc::new(FsList));
185    if write_enabled {
186        registry.register(Arc::new(FsWrite));
187        registry.register(Arc::new(Edit));
188        registry.register(Arc::new(MultiEdit));
189        registry.register(Arc::new(Search));
190        registry.register(Arc::new(Git));
191        registry.register(Arc::new(Exec::new(workspace.sandbox.clone())));
192    }
193    Arc::new(registry)
194}
195
196fn tool_blocks_text(blocks: &[Block]) -> String {
197    blocks
198        .iter()
199        .map(|block| match block {
200            Block::Text(text) => text.clone(),
201            Block::Json(value) => value.to_string(),
202            Block::Image { mime, data } => format!("[image: {}, {} bytes]", mime, data.len()),
203            Block::Diff { file, patch } => format!("diff for {}\n{}", file, patch),
204        })
205        .collect::<Vec<_>>()
206        .join("\n")
207}
208
209fn track_tool_diff(
210    diffs: &mut Vec<crate::event::FileDiff>,
211    tool_name: &str,
212    args: &serde_json::Value,
213    blocks: &[Block],
214) {
215    for block in blocks {
216        if let Block::Diff { file, patch } = block {
217            let plus = patch
218                .lines()
219                .filter(|line| line.starts_with('+') && !line.starts_with("+++"))
220                .count() as u32;
221            let minus = patch
222                .lines()
223                .filter(|line| line.starts_with('-') && !line.starts_with("---"))
224                .count() as u32;
225            if !diffs.iter().any(|diff| diff.file == *file) {
226                diffs.push(crate::event::FileDiff {
227                    file: file.clone(),
228                    plus,
229                    minus,
230                });
231            }
232        }
233    }
234
235    if matches!(tool_name, "fs_write" | "edit" | "multi_edit") {
236        if let Some(path) = args.get("path").and_then(|value| value.as_str()) {
237            if !diffs.iter().any(|diff| diff.file == path) {
238                diffs.push(crate::event::FileDiff {
239                    file: path.to_string(),
240                    plus: 0,
241                    minus: 0,
242                });
243            }
244        }
245    }
246}
247
248// ─── THE ORCHESTRATOR TRAIT ─────────────────────────────────────────────────────
249
250#[async_trait::async_trait]
251pub trait Orchestrator: Send + Sync {
252    async fn run_swarm(
253        &self,
254        plan: SwarmPlan,
255        event_tx: mpsc::UnboundedSender<Event>,
256    ) -> anyhow::Result<SwarmOutcome>;
257}
258
259// ─── Default orchestrator: Planner → Coder → Verifier ───────────────────────────
260
261pub struct DefaultOrchestrator {
262    router: Arc<dyn Router>,
263    config: Config,
264    memory: Arc<dyn Memory>,
265    file_locks: Arc<FileLocks>,
266}
267
268impl DefaultOrchestrator {
269    pub fn new(router: Arc<dyn Router>, config: Config, memory: Arc<dyn Memory>) -> Self {
270        Self {
271            router,
272            config,
273            memory,
274            file_locks: Arc::new(FileLocks::new()),
275        }
276    }
277
278    /// Classify task tier for model selection
279    fn classify(&self, task: &str) -> TaskTier {
280        let lower = task.to_lowercase();
281        if lower.len() < 20 {
282            TaskTier::Trivial
283        } else if lower.contains("refactor") || lower.contains("architecture") {
284            TaskTier::Hard
285        } else if lower.contains("bug") || lower.contains("fix") {
286            TaskTier::Small
287        } else {
288            TaskTier::Medium
289        }
290    }
291
292    /// Select a brain for a given role and tier
293    fn select_brain(&self, role: &str, tier: TaskTier) -> Option<Arc<dyn Brain>> {
294        let need = match role {
295            "planner" => crate::router::RoutingNeed {
296                tier: TaskTier::Hard, // Planner always uses a strong model
297                required_tools: false,
298                required_vision: false,
299                prefer_local: false,
300            },
301            "verifier" => crate::router::RoutingNeed {
302                tier: TaskTier::Medium, // Verifier uses medium model
303                required_tools: true,
304                required_vision: false,
305                prefer_local: false,
306            },
307            _ => crate::router::RoutingNeed {
308                // Coder writes files via tools — it must get a tool-capable,
309                // competent model. A "trivial"/"small" classification would route
310                // to the cheapest free model that often emits prose instead of
311                // tool calls (no diff lands). Floor the coder at Medium.
312                tier: match tier {
313                    TaskTier::Trivial | TaskTier::Small => TaskTier::Medium,
314                    other => other,
315                },
316                required_tools: true,
317                required_vision: false,
318                prefer_local: false,
319            },
320        };
321
322        let budget = BudgetState {
323            daily_limit_usd: self.config.budget.daily_usd,
324            daily_spent_usd: 0.0,
325            session_limit_usd: self.config.budget.session_usd,
326            session_spent_usd: 0.0,
327        };
328
329        // Wrap the whole fallback chain so a 404/ratelimit on the top model
330        // transparently advances to the next instead of killing the role.
331        let chain = self.router.select(&need, &budget);
332        if chain.is_empty() {
333            None
334        } else {
335            Some(Arc::new(FallbackBrain::new(chain)) as Arc<dyn Brain>)
336        }
337    }
338
339    /// Run the planner agent
340    async fn run_planner(
341        &self,
342        task: &str,
343        workspace: &Workspace,
344        brain: Arc<dyn Brain>,
345        event_tx: &mpsc::UnboundedSender<Event>,
346        parent_run: &RunId,
347    ) -> anyhow::Result<(String, f64, TokenUsage)> {
348        let planner_identity = Identity {
349            name: "planner".into(),
350            role: "technical architect and planner".into(),
351            personality: "analytical, thorough, produces clear structured plans with concrete steps and acceptance criteria.".into(),
352        };
353
354        let system = format!(
355            r#"You are the PLANNER agent in a swarm.
356
357{personality}
358
359Your job: take a task and produce a detailed implementation SPEC.
360- Break the task into clear, numbered steps.
361- For each step, specify what files to create/modify.
362- Include acceptance criteria for the verifier.
363- Output ONLY the spec. No code. No implementation.
364
365Output format:
366## SPEC: <title>
367
368### Step 1: <description>
369- Files: <list>
370- Changes: <what changes>
371- Acceptance: <verification criteria>
372
373### Step 2: ...
374"#,
375            personality = planner_identity.personality,
376        );
377
378        let messages = vec![Msg {
379            role: "user".into(),
380            content: vec![ContentBlock::Text {
381                text: format!("Task to plan:\n\n{}", task),
382            }],
383        }];
384
385        let tools = swarm_tool_registry(workspace, false);
386
387        let tool_specs = tools.to_specs();
388        let req = BrainRequest {
389            system: Some(system),
390            messages,
391            tools: tool_specs.clone(),
392            max_tokens: 4096,
393            temperature: 0.0,
394            stop: vec![],
395            cache: PromptCacheConfig::enabled(Some(prompt_cache_key(
396                "swarm-planner",
397                &workspace.root,
398                &tool_specs,
399            ))),
400        };
401
402        let _ = event_tx.send(Event::AgentStatus {
403            run: parent_run.clone(),
404            role: "planner".into(),
405            status: AgentStatus::Thinking,
406            note: format!("planning with {}", brain.id()),
407        });
408
409        let mut stream = brain.complete(req).await?;
410        let mut plan = String::new();
411        let caps = brain.caps();
412        let mut cost = 0.0_f64;
413        let mut tokens = TokenUsage {
414            input: 0,
415            output: 0,
416        };
417
418        while let Some(ev) = futures::StreamExt::next(&mut stream).await {
419            match ev {
420                crate::provider::BrainEvent::TextDelta(text) => {
421                    plan.push_str(&text);
422                    let _ = event_tx.send(Event::ThinkingDelta {
423                        run: parent_run.clone(),
424                        text,
425                    });
426                }
427                crate::provider::BrainEvent::Usage(usage) => {
428                    tokens.input = tokens.input.saturating_add(usage.input);
429                    tokens.output = tokens.output.saturating_add(usage.output);
430                    cost += caps.cost_input_per_mtok * (usage.input as f64) / 1_000_000.0
431                        + caps.cost_output_per_mtok * (usage.output as f64) / 1_000_000.0;
432                }
433                crate::provider::BrainEvent::Done(_) => break,
434                crate::provider::BrainEvent::Error(e) => {
435                    anyhow::bail!("Planner error: {}", e)
436                }
437                _ => {}
438            }
439        }
440
441        let _ = event_tx.send(Event::AgentStatus {
442            run: parent_run.clone(),
443            role: "planner".into(),
444            status: AgentStatus::Done,
445            note: "plan complete".into(),
446        });
447
448        Ok((plan, cost, tokens))
449    }
450
451    /// Run the coder agent with a given spec
452    async fn run_coder(
453        &self,
454        spec: &str,
455        rework_notes: Option<&[String]>,
456        workspace: &Workspace,
457        brain: Arc<dyn Brain>,
458        event_tx: &mpsc::UnboundedSender<Event>,
459        parent_run: &RunId,
460    ) -> anyhow::Result<(Vec<crate::event::FileDiff>, f64, TokenUsage)> {
461        let coder_identity = Identity {
462            name: "coder".into(),
463            role: "implementation engineer".into(),
464            personality:
465                "precise, produces clean working code, uses exact file edits with the edit tool."
466                    .into(),
467        };
468
469        let rework_section = if let Some(notes) = rework_notes {
470            if notes.is_empty() {
471                String::new()
472            } else {
473                format!(
474                    "\n## REWORK NOTES (from verifier)\nThe previous implementation had issues. Fix these:\n{}",
475                    notes
476                        .iter()
477                        .enumerate()
478                        .map(|(i, n)| format!("{}. {}", i + 1, n))
479                        .collect::<Vec<_>>()
480                        .join("\n")
481                )
482            }
483        } else {
484            String::new()
485        };
486
487        let system = format!(
488            r#"You are the CODER agent in a swarm.
489
490{}
491
492Your job: implement the SPEC exactly. Use tools to read existing files and write edits.
493- Follow the spec steps in order.
494- Use the edit or fs_write tool to make changes.
495- After each file edit, note what you changed.
496- Produce working, compilable code.
497{}
498"#,
499            coder_identity.personality, rework_section,
500        );
501
502        // Build repo map context
503        let repo_map = self.memory.repo_map(&workspace.root);
504        let file_list: Vec<String> = repo_map
505            .files
506            .iter()
507            .map(|f| format!("  {}", f.path))
508            .collect();
509
510        let context_msg = format!(
511            "## SPEC TO IMPLEMENT\n\n{}\n\n## WORKSPACE FILES\n{}",
512            spec,
513            file_list.join("\n"),
514        );
515
516        let mut messages = vec![Msg {
517            role: "user".into(),
518            content: vec![ContentBlock::Text { text: context_msg }],
519        }];
520        let tools = swarm_tool_registry(workspace, true);
521        let tool_specs = tools.to_specs();
522
523        let _ = event_tx.send(Event::AgentStatus {
524            run: parent_run.clone(),
525            role: "coder".into(),
526            status: AgentStatus::Working,
527            note: format!("implementing with {}", brain.id()),
528        });
529
530        let mut output = String::new();
531        let mut diffs = Vec::new();
532        let caps = brain.caps();
533        let mut cost = 0.0_f64;
534        let mut tokens = TokenUsage {
535            input: 0,
536            output: 0,
537        };
538
539        for _turn in 0..8 {
540            let req = BrainRequest {
541                system: Some(system.clone()),
542                messages: messages.clone(),
543                tools: tool_specs.clone(),
544                max_tokens: 8192,
545                temperature: 0.0,
546                stop: vec![],
547                cache: PromptCacheConfig::enabled(Some(prompt_cache_key(
548                    "swarm-coder",
549                    &workspace.root,
550                    &tool_specs,
551                ))),
552            };
553
554            let mut stream = brain.complete(req).await?;
555            let mut assistant_text = String::new();
556            let mut assistant_blocks = Vec::new();
557            let mut tool_result_blocks = Vec::new();
558            let mut current_tool_id = String::new();
559            let mut current_tool_name = String::new();
560            let mut current_tool_json = String::new();
561
562            while let Some(ev) = futures::StreamExt::next(&mut stream).await {
563                match ev {
564                    crate::provider::BrainEvent::TextDelta(text) => {
565                        output.push_str(&text);
566                        assistant_text.push_str(&text);
567                        let _ = event_tx.send(Event::ThinkingDelta {
568                            run: parent_run.clone(),
569                            text,
570                        });
571                    }
572                    // The orchestrator's coder loop doesn't need to round-trip
573                    // reasoning content (each phase uses a fresh BrainRequest),
574                    // so we just swallow it here to keep the match exhaustive.
575                    crate::provider::BrainEvent::ReasoningDelta(_) => {}
576                    crate::provider::BrainEvent::ToolUseStart { id, name } => {
577                        current_tool_id = id.clone();
578                        current_tool_name = name.clone();
579                        current_tool_json.clear();
580                        let risk = tools
581                            .get(&name)
582                            .map(|tool| tool.risk())
583                            .unwrap_or(RiskLevel::ReadOnly);
584                        let _ = event_tx.send(Event::ToolUseProposed {
585                            run: parent_run.clone(),
586                            id,
587                            name,
588                            args: serde_json::json!({}),
589                            risk,
590                        });
591                    }
592                    crate::provider::BrainEvent::ToolUseDelta { id: _, json } => {
593                        current_tool_json.push_str(&json);
594                    }
595                    crate::provider::BrainEvent::ToolUseEnd { id } => {
596                        let args = serde_json::from_str::<serde_json::Value>(&current_tool_json)
597                            .unwrap_or_else(|_| serde_json::json!({}));
598                        let tool_name = if current_tool_name.is_empty() {
599                            "unknown".to_string()
600                        } else {
601                            current_tool_name.clone()
602                        };
603                        assistant_blocks.push(ContentBlock::ToolUse {
604                            id: id.clone(),
605                            name: tool_name.clone(),
606                            input: args.clone(),
607                        });
608                        let _ = event_tx.send(Event::ToolUseStarted {
609                            run: parent_run.clone(),
610                            id: id.clone(),
611                        });
612                        let result = if let Some(tool) = tools.get(&tool_name) {
613                            let ctx = ToolCtx {
614                                workspace_root: workspace.root.clone(),
615                                run_id: parent_run.clone(),
616                            };
617                            match tool.call(args.clone(), &ctx).await {
618                                Ok(result) => result,
619                                Err(err) => crate::tools::ToolResult::error(format!(
620                                    "Tool {} failed: {}",
621                                    tool_name, err
622                                )),
623                            }
624                        } else {
625                            crate::tools::ToolResult::error(format!("Unknown tool: {}", tool_name))
626                        };
627                        track_tool_diff(&mut diffs, &tool_name, &args, &result.content);
628                        for diff in &diffs {
629                            let _ = event_tx.send(Event::DiffProposed {
630                                run: parent_run.clone(),
631                                file: diff.file.clone(),
632                                patch: String::new(),
633                                plus: diff.plus,
634                                minus: diff.minus,
635                            });
636                        }
637                        let blocks = result.content.clone();
638                        let text = tool_blocks_text(&blocks);
639                        let _ = event_tx.send(Event::ToolOutput {
640                            run: parent_run.clone(),
641                            id: id.clone(),
642                            blocks,
643                        });
644                        tool_result_blocks.push(ContentBlock::ToolResult {
645                            tool_use_id: id,
646                            content: vec![ContentBlock::Text { text }],
647                            is_error: Some(result.is_error),
648                        });
649                        current_tool_id.clear();
650                        current_tool_name.clear();
651                        current_tool_json.clear();
652                    }
653                    crate::provider::BrainEvent::Done(_) => break,
654                    crate::provider::BrainEvent::Error(e) => anyhow::bail!("Coder error: {}", e),
655                    crate::provider::BrainEvent::Usage(usage) => {
656                        tokens.input = tokens.input.saturating_add(usage.input);
657                        tokens.output = tokens.output.saturating_add(usage.output);
658                        cost += caps.cost_input_per_mtok * (usage.input as f64) / 1_000_000.0
659                            + caps.cost_output_per_mtok * (usage.output as f64) / 1_000_000.0;
660                    }
661                }
662            }
663
664            if !assistant_text.is_empty() {
665                assistant_blocks.insert(
666                    0,
667                    ContentBlock::Text {
668                        text: assistant_text,
669                    },
670                );
671            }
672            if tool_result_blocks.is_empty() {
673                break;
674            }
675            messages.push(Msg {
676                role: "assistant".into(),
677                content: assistant_blocks,
678            });
679            messages.push(Msg {
680                role: "user".into(),
681                content: tool_result_blocks,
682            });
683        }
684
685        if diffs.is_empty() {
686            for line in output.lines() {
687                if let Some(file) = line.strip_prefix("Edited ") {
688                    let file = file
689                        .trim()
690                        .split(':')
691                        .next()
692                        .unwrap_or("")
693                        .trim()
694                        .to_string();
695                    if !file.is_empty() && !diffs.iter().any(|d| d.file == file) {
696                        diffs.push(crate::event::FileDiff {
697                            file,
698                            plus: 1,
699                            minus: 1,
700                        });
701                    }
702                }
703            }
704        }
705
706        // Emit diff events
707        for diff in &diffs {
708            let _ = event_tx.send(Event::DiffProposed {
709                run: parent_run.clone(),
710                file: diff.file.clone(),
711                patch: String::new(),
712                plus: diff.plus,
713                minus: diff.minus,
714            });
715        }
716
717        let _ = event_tx.send(Event::AgentStatus {
718            run: parent_run.clone(),
719            role: "coder".into(),
720            status: AgentStatus::Done,
721            note: format!("{} files changed", diffs.len()),
722        });
723
724        Ok((diffs, cost, tokens))
725    }
726
727    /// Run the verifier agent
728    async fn run_verifier(
729        &self,
730        spec: &str,
731        diffs: &[crate::event::FileDiff],
732        workspace: &Workspace,
733        brain: Arc<dyn Brain>,
734        event_tx: &mpsc::UnboundedSender<Event>,
735        parent_run: &RunId,
736    ) -> anyhow::Result<(Verdict, f64, TokenUsage)> {
737        let verifier_identity = Identity {
738            name: "verifier".into(),
739            role: "code reviewer and quality assurance".into(),
740            personality: "adversarial, meticulous, catches issues the coder missed. Checks correctness, style, edge cases, and spec compliance.".into(),
741        };
742
743        let diff_summary: String = diffs
744            .iter()
745            .map(|d| format!("  {}: +{} -{}", d.file, d.plus, d.minus))
746            .collect::<Vec<_>>()
747            .join("\n");
748
749        let tools = swarm_tool_registry(workspace, false);
750        let read_tool = tools.get("fs_read");
751        let mut files_to_check = Vec::new();
752        for d in diffs {
753            let content = if let Some(tool) = &read_tool {
754                let ctx = ToolCtx {
755                    workspace_root: workspace.root.clone(),
756                    run_id: parent_run.clone(),
757                };
758                let args = serde_json::json!({ "path": d.file, "limit": 220 });
759                match tool.call(args, &ctx).await {
760                    Ok(result) => tool_blocks_text(&result.content),
761                    Err(_) => format!("[cannot read {}]", d.file),
762                }
763            } else {
764                format!("[cannot read {}]", d.file)
765            };
766            files_to_check.push(content);
767        }
768
769        let files_context: String = diffs
770            .iter()
771            .zip(files_to_check.iter())
772            .map(|(d, content)| {
773                format!(
774                    "### {}\n```\n{}\n```",
775                    d.file,
776                    if content.len() > 3000 {
777                        format!("{}... [truncated]", &content[..3000])
778                    } else {
779                        content.clone()
780                    }
781                )
782            })
783            .collect::<Vec<_>>()
784            .join("\n\n");
785
786        let system = format!(
787            r#"You are the VERIFIER agent in a swarm.
788
789{personality}
790
791Your job: review the coder's implementation against the SPEC.
792- For each spec requirement, check if it's satisfied.
793- Find bugs, style issues, missing edge cases, spec violations.
794- Output EXACTLY one of:
795  ✓ PASS — if everything is correct and complete.
796  ✗ REWORK — followed by numbered concrete findings.
797
798Format:
799✓ PASS
800(no issues found)
801
802or:
803
804✗ REWORK
8051. <specific finding with file:line>
8062. <another finding>
807"#,
808            personality = verifier_identity.personality,
809        );
810
811        let context = format!(
812            "## SPEC\n{}\n\n## CHANGED FILES\n{}\n\n## FILE CONTENTS\n{}",
813            spec, diff_summary, files_context,
814        );
815
816        let messages = vec![Msg {
817            role: "user".into(),
818            content: vec![ContentBlock::Text { text: context }],
819        }];
820
821        let tool_specs = tools.to_specs();
822        let req = BrainRequest {
823            system: Some(system),
824            messages,
825            tools: tool_specs.clone(),
826            max_tokens: 4096,
827            temperature: 0.0,
828            stop: vec![],
829            cache: PromptCacheConfig::enabled(Some(prompt_cache_key(
830                "swarm-verifier",
831                &workspace.root,
832                &tool_specs,
833            ))),
834        };
835
836        let _ = event_tx.send(Event::AgentStatus {
837            run: parent_run.clone(),
838            role: "verifier".into(),
839            status: AgentStatus::Working,
840            note: format!("reviewing with {}", brain.id()),
841        });
842
843        let mut stream = brain.complete(req).await?;
844        let mut verdict_text = String::new();
845        let caps = brain.caps();
846        let mut cost = 0.0_f64;
847        let mut tokens = TokenUsage {
848            input: 0,
849            output: 0,
850        };
851
852        while let Some(ev) = futures::StreamExt::next(&mut stream).await {
853            match ev {
854                crate::provider::BrainEvent::TextDelta(text) => {
855                    verdict_text.push_str(&text);
856                }
857                crate::provider::BrainEvent::Usage(usage) => {
858                    tokens.input = tokens.input.saturating_add(usage.input);
859                    tokens.output = tokens.output.saturating_add(usage.output);
860                    cost += caps.cost_input_per_mtok * (usage.input as f64) / 1_000_000.0
861                        + caps.cost_output_per_mtok * (usage.output as f64) / 1_000_000.0;
862                }
863                crate::provider::BrainEvent::Done(_) => break,
864                _ => {}
865            }
866        }
867
868        // Case-insensitive PASS/REWORK detection. "PASS" wins only if no rework signal.
869        let upper = verdict_text.to_uppercase();
870        let has_rework = upper.contains("REWORK")
871            || upper.contains("NEEDS REWORK")
872            || verdict_text.contains("✗");
873        let has_pass = (upper.contains("PASS") || verdict_text.contains("✓")) && !has_rework;
874
875        let verdict = if has_rework {
876            let findings: Vec<String> = verdict_text
877                .lines()
878                .filter(|l| l.trim().starts_with(|c: char| c.is_ascii_digit()) && l.contains('.'))
879                .map(|l| l.trim().to_string())
880                .collect();
881
882            if findings.is_empty() {
883                Verdict::Rework {
884                    findings: vec![verdict_text.clone()],
885                }
886            } else {
887                Verdict::Rework { findings }
888            }
889        } else if has_pass {
890            Verdict::Pass
891        } else {
892            // No clear verdict — treat as rework with the raw text to be safe.
893            Verdict::Rework {
894                findings: vec![format!("Verifier verdict unclear: {}", verdict_text)],
895            }
896        };
897
898        let _ = event_tx.send(Event::AgentStatus {
899            run: parent_run.clone(),
900            role: "verifier".into(),
901            status: AgentStatus::Done,
902            note: match &verdict {
903                Verdict::Pass => "PASS".into(),
904                Verdict::Rework { findings } => format!("REWORK ({} issues)", findings.len()),
905            },
906        });
907
908        Ok((verdict, cost, tokens))
909    }
910}
911
912#[async_trait::async_trait]
913impl Orchestrator for DefaultOrchestrator {
914    async fn run_swarm(
915        &self,
916        plan: SwarmPlan,
917        event_tx: mpsc::UnboundedSender<Event>,
918    ) -> anyhow::Result<SwarmOutcome> {
919        let run_id = RunId::new();
920        let task = plan.task.clone();
921
922        let _ = event_tx.send(Event::RunStarted {
923            run: run_id.clone(),
924            task: task.clone(),
925            agent: "swarm".into(),
926        });
927
928        // Select brains for each role
929        let planner_brain = self
930            .select_brain("planner", self.classify(&task))
931            .ok_or_else(|| anyhow::anyhow!("No model available for planner"))?;
932
933        let coder_brain = self
934            .select_brain("coder", self.classify(&task))
935            .unwrap_or_else(|| planner_brain.clone());
936
937        let verifier_brain = self
938            .select_brain("verifier", self.classify(&task))
939            .unwrap_or_else(|| coder_brain.clone());
940
941        // Create workspace
942        let sandbox = LocalSandbox::new(plan.workspace.clone());
943        let workspace = Workspace {
944            root: plan.workspace.clone(),
945            sandbox: Arc::new(sandbox),
946        };
947
948        let mut total_cost: f64 = 0.0;
949        let mut total_tokens = TokenUsage {
950            input: 0,
951            output: 0,
952        };
953
954        // ▸ PHASE 1: PLANNING
955        let _ = event_tx.send(Event::AgentSpawned {
956            run: run_id.clone(),
957            role: "planner".into(),
958            model: planner_brain.id().to_string(),
959        });
960
961        let (spec, planner_cost, planner_tokens) = self
962            .run_planner(&task, &workspace, planner_brain, &event_tx, &run_id)
963            .await?;
964        total_cost += planner_cost;
965        total_tokens.input = total_tokens.input.saturating_add(planner_tokens.input);
966        total_tokens.output = total_tokens.output.saturating_add(planner_tokens.output);
967        let _ = event_tx.send(Event::CostUpdate {
968            run: run_id.clone(),
969            usd: total_cost,
970        });
971
972        // Post plan to shared memory
973        let _ = self.memory.upsert_doc(crate::memory::WorkingDoc {
974            id: format!("plan-{}", run_id.0),
975            title: format!("Plan: {}", &task[..task.len().min(60)]),
976            content: spec.clone(),
977            updated_at: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
978        });
979
980        // ▸ PHASE 2–3: CODING + VERIFYING (with REWORK loop)
981        let _ = event_tx.send(Event::AgentSpawned {
982            run: run_id.clone(),
983            role: "coder".into(),
984            model: coder_brain.id().to_string(),
985        });
986
987        let _ = event_tx.send(Event::AgentSpawned {
988            run: run_id.clone(),
989            role: "verifier".into(),
990            model: verifier_brain.id().to_string(),
991        });
992
993        let mut rework_notes: Option<Vec<String>> = None;
994        let mut all_diffs: Vec<crate::event::FileDiff> = Vec::new();
995        let mut reworks = 0u32;
996        let mut passes = 0u32;
997
998        for attempt in 0..=plan.max_reworks {
999            // ▸ CODER
1000            let (diffs, coder_cost, coder_tokens) = self
1001                .run_coder(
1002                    &spec,
1003                    rework_notes.as_deref(),
1004                    &workspace,
1005                    coder_brain.clone(),
1006                    &event_tx,
1007                    &run_id,
1008                )
1009                .await?;
1010            total_cost += coder_cost;
1011            total_tokens.input = total_tokens.input.saturating_add(coder_tokens.input);
1012            total_tokens.output = total_tokens.output.saturating_add(coder_tokens.output);
1013            let _ = event_tx.send(Event::CostUpdate {
1014                run: run_id.clone(),
1015                usd: total_cost,
1016            });
1017
1018            // Release any locks from previous attempt
1019            if attempt > 0 {
1020                let prev_files: Vec<String> = all_diffs.iter().map(|d| d.file.clone()).collect();
1021                self.file_locks.release(&prev_files).await;
1022            }
1023
1024            // Acquire locks for new diffs
1025            let new_files: Vec<String> = diffs.iter().map(|d| d.file.clone()).collect();
1026            if let Err(conflicts) = self.file_locks.try_lock(&new_files).await {
1027                let _ = event_tx.send(Event::Error {
1028                    run: run_id.clone(),
1029                    message: format!("File collision detected: {:?}", conflicts),
1030                });
1031            }
1032
1033            all_diffs = diffs.clone();
1034
1035            // ▸ Empty-diff guard: if the coder changed no files, don't waste a
1036            // verify pass (and never let it PASS) — force a REWORK with an
1037            // explicit instruction to actually call the tools. This rescues weak
1038            // models that describe a change in prose instead of emitting tool calls.
1039            if diffs.is_empty() && attempt < plan.max_reworks {
1040                reworks += 1;
1041                let note = "You made NO file changes. You MUST call the fs_write or edit \
1042                    tool to create/modify the files in the spec — do not merely describe \
1043                    the change in prose. Emit the tool call now."
1044                    .to_string();
1045                rework_notes = Some(vec![note.clone()]);
1046                let _ = event_tx.send(Event::TestResult {
1047                    run: run_id.clone(),
1048                    passed: 0,
1049                    failed: 1,
1050                    detail: "no file changes — coder must use tools".into(),
1051                });
1052                let _ = event_tx.send(Event::AgentStatus {
1053                    run: run_id.clone(),
1054                    role: "coder".into(),
1055                    status: AgentStatus::Working,
1056                    note: format!("no diff — forcing tool use (attempt {})", attempt + 1),
1057                });
1058                continue;
1059            }
1060
1061            // ▸ VERIFIER
1062            let (verdict, verifier_cost, verifier_tokens) = self
1063                .run_verifier(
1064                    &spec,
1065                    &diffs,
1066                    &workspace,
1067                    verifier_brain.clone(),
1068                    &event_tx,
1069                    &run_id,
1070                )
1071                .await?;
1072            total_cost += verifier_cost;
1073            total_tokens.input = total_tokens.input.saturating_add(verifier_tokens.input);
1074            total_tokens.output = total_tokens.output.saturating_add(verifier_tokens.output);
1075            let _ = event_tx.send(Event::CostUpdate {
1076                run: run_id.clone(),
1077                usd: total_cost,
1078            });
1079
1080            match verdict {
1081                Verdict::Pass => {
1082                    passes += 1;
1083                    // Signal PASS to shared memory
1084                    let _ = self.memory.post_signal(crate::memory::SharedSignal {
1085                        id: format!("pass-{}-{}", run_id.0, attempt),
1086                        from_agent: "verifier".into(),
1087                        to_agent: "coder".into(),
1088                        content: "PASS — all checks satisfied".into(),
1089                        timestamp: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
1090                    });
1091
1092                    let _ = event_tx.send(Event::TestResult {
1093                        run: run_id.clone(),
1094                        passed: 1,
1095                        failed: 0,
1096                        detail: "Verifier PASS".into(),
1097                    });
1098                    for diff in &diffs {
1099                        let _ = event_tx.send(Event::DiffApplied {
1100                            run: run_id.clone(),
1101                            file: diff.file.clone(),
1102                        });
1103                    }
1104                    break; // Done!
1105                }
1106                Verdict::Rework { findings } => {
1107                    reworks += 1;
1108                    rework_notes = Some(findings.clone());
1109
1110                    // Signal REWORK to shared memory
1111                    let _ = self.memory.post_signal(crate::memory::SharedSignal {
1112                        id: format!("rework-{}-{}", run_id.0, attempt),
1113                        from_agent: "verifier".into(),
1114                        to_agent: "coder".into(),
1115                        content: format!("REWORK: {}", findings.join("; ")),
1116                        timestamp: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
1117                    });
1118
1119                    let _ = event_tx.send(Event::TestResult {
1120                        run: run_id.clone(),
1121                        passed: 0,
1122                        failed: findings.len() as u32,
1123                        detail: findings.join("\n"),
1124                    });
1125
1126                    let _ = event_tx.send(Event::AgentStatus {
1127                        run: run_id.clone(),
1128                        role: "coder".into(),
1129                        status: AgentStatus::Working,
1130                        note: format!("rework attempt {}/{}", attempt + 1, plan.max_reworks),
1131                    });
1132                }
1133            }
1134        }
1135
1136        let outcome = SwarmOutcome {
1137            status: if passes > 0 {
1138                "PASS".into()
1139            } else {
1140                format!("FAILED after {} reworks", reworks)
1141            },
1142            plan: Some(spec),
1143            diffs: all_diffs,
1144            passes,
1145            reworks,
1146            cost_usd: total_cost,
1147        };
1148
1149        let outcome_summary = OutcomeSummary {
1150            status: outcome.status.clone(),
1151            diffs: outcome.diffs.clone(),
1152            cost_usd: outcome.cost_usd,
1153            tokens: total_tokens,
1154        };
1155
1156        let _ = event_tx.send(Event::RunFinished {
1157            run: run_id,
1158            outcome: outcome_summary,
1159        });
1160
1161        Ok(outcome)
1162    }
1163}