Skip to main content

sparrow/orchestrator/
mod.rs

1use std::collections::HashSet;
2use std::path::PathBuf;
3use std::sync::Arc;
4use tokio::sync::{Mutex, mpsc};
5
6use crate::config::Config;
7use crate::engine::{Identity, Workspace};
8use crate::event::{AgentStatus, Block, Event, OutcomeSummary, RiskLevel, RunId, TokenUsage};
9use crate::memory::Memory;
10use crate::provider::{
11    Brain, BrainRequest, BrainStream, ContentBlock, ModelCaps, Msg, PromptCacheConfig, ToolSpec,
12};
13use crate::router::{BudgetState, Router, TaskTier};
14use crate::sandbox::LocalSandbox;
15use crate::tools::edit::{Edit, MultiEdit};
16use crate::tools::exec::Exec;
17use crate::tools::fs::{FsList, FsRead, FsWrite};
18use crate::tools::git::Git;
19use crate::tools::search_and_web::Search;
20use crate::tools::{ToolCtx, ToolRegistry};
21
22fn prompt_cache_key(scope: &str, workspace_root: &std::path::Path, tools: &[ToolSpec]) -> String {
23    use std::hash::{Hash, Hasher};
24
25    let mut hasher = std::collections::hash_map::DefaultHasher::new();
26    scope.hash(&mut hasher);
27    workspace_root.display().to_string().hash(&mut hasher);
28    for tool in tools {
29        tool.name.hash(&mut hasher);
30        tool.description.hash(&mut hasher);
31        tool.input_schema.to_string().hash(&mut hasher);
32    }
33    format!("sparrow-{}-{:016x}", scope, hasher.finish())
34}
35
36// ─── Fallback brain ──────────────────────────────────────────────────────────
37// Wraps an ordered chain of brains and tries each on `complete()` until one
38// succeeds. Lets each swarm role (planner/coder/verifier) survive a model that
39// 404s or rate-limits — discovery sometimes lists models that aren't callable.
40
41struct FallbackBrain {
42    id: String,
43    caps: ModelCaps,
44    chain: Vec<Arc<dyn Brain>>,
45}
46
47impl FallbackBrain {
48    fn new(chain: Vec<Arc<dyn Brain>>) -> Self {
49        let id = chain
50            .first()
51            .map(|b| b.id().to_string())
52            .unwrap_or_else(|| "none".into());
53        let caps = chain.first().map(|b| b.caps()).unwrap_or_default();
54        Self { id, caps, chain }
55    }
56}
57
58#[async_trait::async_trait]
59impl Brain for FallbackBrain {
60    fn id(&self) -> &str {
61        &self.id
62    }
63    fn caps(&self) -> ModelCaps {
64        self.caps.clone()
65    }
66    async fn complete(&self, req: BrainRequest) -> anyhow::Result<BrainStream> {
67        let mut last_err: Option<anyhow::Error> = None;
68        for brain in &self.chain {
69            match brain.complete(req.clone()).await {
70                Ok(stream) => return Ok(stream),
71                Err(e) => {
72                    tracing::warn!("swarm brain {} failed, trying next: {}", brain.id(), e);
73                    last_err = Some(e);
74                }
75            }
76        }
77        Err(last_err.unwrap_or_else(|| anyhow::anyhow!("no brains in fallback chain")))
78    }
79}
80
81// ─── Swarm types ────────────────────────────────────────────────────────────────
82
83#[derive(Debug, Clone)]
84pub struct SwarmPlan {
85    pub task: String,
86    pub workspace: PathBuf,
87    pub max_reworks: u32,
88}
89
90impl Default for SwarmPlan {
91    fn default() -> Self {
92        Self {
93            task: String::new(),
94            workspace: PathBuf::from("."),
95            max_reworks: 3,
96        }
97    }
98}
99
100#[derive(Debug, Clone)]
101pub struct SwarmOutcome {
102    pub status: String,
103    pub plan: Option<String>,
104    pub diffs: Vec<crate::event::FileDiff>,
105    pub passes: u32,
106    pub reworks: u32,
107    pub cost_usd: f64,
108}
109
110#[derive(Debug, Clone)]
111pub enum Verdict {
112    Pass,
113    Rework { findings: Vec<String> },
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub enum SwarmPhase {
118    Planning,
119    Coding,
120    Verifying,
121    Reworking,
122    Done,
123}
124
125impl std::fmt::Display for SwarmPhase {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        match self {
128            SwarmPhase::Planning => write!(f, "planning"),
129            SwarmPhase::Coding => write!(f, "coding"),
130            SwarmPhase::Verifying => write!(f, "verifying"),
131            SwarmPhase::Reworking => write!(f, "reworking"),
132            SwarmPhase::Done => write!(f, "done"),
133        }
134    }
135}
136
137// ─── Anti-collision: file-level locks ───────────────────────────────────────────
138
139pub struct FileLocks {
140    locked: Mutex<HashSet<String>>,
141}
142
143impl FileLocks {
144    pub fn new() -> Self {
145        Self {
146            locked: Mutex::new(HashSet::new()),
147        }
148    }
149
150    pub async fn try_lock(&self, files: &[String]) -> Result<FileLockGuard, Vec<String>> {
151        let mut locked = self.locked.lock().await;
152        let mut conflicts = Vec::new();
153        for f in files {
154            if locked.contains(f) {
155                conflicts.push(f.clone());
156            }
157        }
158        if !conflicts.is_empty() {
159            return Err(conflicts);
160        }
161        for f in files {
162            locked.insert(f.clone());
163        }
164        Ok(FileLockGuard {
165            _files: files.to_vec(),
166        })
167    }
168
169    pub async fn release(&self, files: &[String]) {
170        let mut locked = self.locked.lock().await;
171        for f in files {
172            locked.remove(f);
173        }
174    }
175}
176
177pub struct FileLockGuard {
178    _files: Vec<String>,
179}
180
181fn swarm_tool_registry(workspace: &Workspace, write_enabled: bool) -> Arc<ToolRegistry> {
182    let mut registry = ToolRegistry::new();
183    registry.register(Arc::new(FsRead));
184    registry.register(Arc::new(FsList));
185    if write_enabled {
186        registry.register(Arc::new(FsWrite));
187        registry.register(Arc::new(Edit));
188        registry.register(Arc::new(MultiEdit));
189        registry.register(Arc::new(Search));
190        registry.register(Arc::new(Git));
191        registry.register(Arc::new(Exec::new(workspace.sandbox.clone())));
192    }
193    Arc::new(registry)
194}
195
196fn tool_blocks_text(blocks: &[Block]) -> String {
197    blocks
198        .iter()
199        .map(|block| match block {
200            Block::Text(text) => text.clone(),
201            Block::Json(value) => value.to_string(),
202            Block::Image { mime, data } => format!("[image: {}, {} bytes]", mime, data.len()),
203            Block::Diff { file, patch } => format!("diff for {}\n{}", file, patch),
204        })
205        .collect::<Vec<_>>()
206        .join("\n")
207}
208
209fn track_tool_diff(
210    diffs: &mut Vec<crate::event::FileDiff>,
211    tool_name: &str,
212    args: &serde_json::Value,
213    blocks: &[Block],
214) {
215    for block in blocks {
216        if let Block::Diff { file, patch } = block {
217            let plus = patch
218                .lines()
219                .filter(|line| line.starts_with('+') && !line.starts_with("+++"))
220                .count() as u32;
221            let minus = patch
222                .lines()
223                .filter(|line| line.starts_with('-') && !line.starts_with("---"))
224                .count() as u32;
225            if !diffs.iter().any(|diff| diff.file == *file) {
226                diffs.push(crate::event::FileDiff {
227                    file: file.clone(),
228                    plus,
229                    minus,
230                });
231            }
232        }
233    }
234
235    if matches!(tool_name, "fs_write" | "edit" | "multi_edit") {
236        if let Some(path) = args.get("path").and_then(|value| value.as_str()) {
237            if !diffs.iter().any(|diff| diff.file == path) {
238                diffs.push(crate::event::FileDiff {
239                    file: path.to_string(),
240                    plus: 0,
241                    minus: 0,
242                });
243            }
244        }
245    }
246}
247
248// ─── THE ORCHESTRATOR TRAIT ─────────────────────────────────────────────────────
249
250#[async_trait::async_trait]
251pub trait Orchestrator: Send + Sync {
252    async fn run_swarm(
253        &self,
254        plan: SwarmPlan,
255        event_tx: mpsc::UnboundedSender<Event>,
256    ) -> anyhow::Result<SwarmOutcome>;
257}
258
259// ─── Default orchestrator: Planner → Coder → Verifier ───────────────────────────
260
261pub struct DefaultOrchestrator {
262    router: Arc<dyn Router>,
263    config: Config,
264    memory: Arc<dyn Memory>,
265    file_locks: Arc<FileLocks>,
266}
267
268impl DefaultOrchestrator {
269    pub fn new(router: Arc<dyn Router>, config: Config, memory: Arc<dyn Memory>) -> Self {
270        Self {
271            router,
272            config,
273            memory,
274            file_locks: Arc::new(FileLocks::new()),
275        }
276    }
277
278    /// Classify task tier for model selection
279    fn classify(&self, task: &str) -> TaskTier {
280        let lower = task.to_lowercase();
281        if lower.len() < 20 {
282            TaskTier::Trivial
283        } else if lower.contains("refactor") || lower.contains("architecture") {
284            TaskTier::Hard
285        } else if lower.contains("bug") || lower.contains("fix") {
286            TaskTier::Small
287        } else {
288            TaskTier::Medium
289        }
290    }
291
292    /// Select a brain for a given role and tier
293    fn select_brain(&self, role: &str, tier: TaskTier) -> Option<Arc<dyn Brain>> {
294        let need = match role {
295            "planner" => crate::router::RoutingNeed {
296                tier: TaskTier::Hard, // Planner always uses a strong model
297                required_tools: false,
298                required_vision: false,
299                prefer_local: false,
300            },
301            "verifier" => crate::router::RoutingNeed {
302                tier: TaskTier::Medium, // Verifier uses medium model
303                required_tools: true,
304                required_vision: false,
305                prefer_local: false,
306            },
307            _ => crate::router::RoutingNeed {
308                // Coder writes files via tools — it must get a tool-capable,
309                // competent model. A "trivial"/"small" classification would route
310                // to the cheapest free model that often emits prose instead of
311                // tool calls (no diff lands). Floor the coder at Medium.
312                tier: match tier {
313                    TaskTier::Trivial | TaskTier::Small => TaskTier::Medium,
314                    other => other,
315                },
316                required_tools: true,
317                required_vision: false,
318                prefer_local: false,
319            },
320        };
321
322        let budget = BudgetState {
323            daily_limit_usd: self.config.budget.daily_usd,
324            daily_spent_usd: 0.0,
325            session_limit_usd: self.config.budget.session_usd,
326            session_spent_usd: 0.0,
327        };
328
329        // Wrap the whole fallback chain so a 404/ratelimit on the top model
330        // transparently advances to the next instead of killing the role.
331        let chain = self.router.select(&need, &budget);
332        if chain.is_empty() {
333            None
334        } else {
335            Some(Arc::new(FallbackBrain::new(chain)) as Arc<dyn Brain>)
336        }
337    }
338
339    /// Run the planner agent
340    async fn run_planner(
341        &self,
342        task: &str,
343        workspace: &Workspace,
344        brain: Arc<dyn Brain>,
345        event_tx: &mpsc::UnboundedSender<Event>,
346        parent_run: &RunId,
347    ) -> anyhow::Result<(String, f64, TokenUsage)> {
348        let planner_identity = Identity {
349            name: "planner".into(),
350            role: "technical architect and planner".into(),
351            personality: "analytical, thorough, produces clear structured plans with concrete steps and acceptance criteria.".into(),
352        };
353
354        let system = format!(
355            r#"You are the PLANNER agent in a swarm.
356
357{personality}
358
359Your job: take a task and produce a detailed implementation SPEC.
360- Break the task into clear, numbered steps.
361- For each step, specify what files to create/modify.
362- Include acceptance criteria for the verifier.
363- Output ONLY the spec. No code. No implementation.
364
365Output format:
366## SPEC: <title>
367
368### Step 1: <description>
369- Files: <list>
370- Changes: <what changes>
371- Acceptance: <verification criteria>
372
373### Step 2: ...
374"#,
375            personality = planner_identity.personality,
376        );
377
378        let messages = vec![Msg {
379            role: "user".into(),
380            content: vec![ContentBlock::Text {
381                text: format!("Task to plan:\n\n{}", task),
382            }],
383        }];
384
385        let tools = swarm_tool_registry(workspace, false);
386
387        let tool_specs = tools.to_specs();
388        let req = BrainRequest {
389            system: Some(system),
390            messages,
391            tools: tool_specs.clone(),
392            max_tokens: 4096,
393            temperature: 0.0,
394            stop: vec![],
395            cache: PromptCacheConfig::enabled(Some(prompt_cache_key(
396                "swarm-planner",
397                &workspace.root,
398                &tool_specs,
399            ))),
400        };
401
402        let _ = event_tx.send(Event::AgentStatus {
403            run: parent_run.clone(),
404            role: "planner".into(),
405            status: AgentStatus::Thinking,
406            note: format!("planning with {}", brain.id()),
407        });
408
409        let mut stream = brain.complete(req).await?;
410        let mut plan = String::new();
411        let caps = brain.caps();
412        let mut cost = 0.0_f64;
413        let mut tokens = TokenUsage {
414            input: 0,
415            output: 0,
416        };
417
418        while let Some(ev) = futures::StreamExt::next(&mut stream).await {
419            match ev {
420                crate::provider::BrainEvent::TextDelta(text) => {
421                    plan.push_str(&text);
422                    let _ = event_tx.send(Event::ThinkingDelta {
423                        run: parent_run.clone(),
424                        text,
425                    });
426                }
427                crate::provider::BrainEvent::Usage(usage) => {
428                    tokens.input = tokens.input.saturating_add(usage.input);
429                    tokens.output = tokens.output.saturating_add(usage.output);
430                    cost += caps.cost_input_per_mtok * (usage.input as f64) / 1_000_000.0
431                        + caps.cost_output_per_mtok * (usage.output as f64) / 1_000_000.0;
432                }
433                crate::provider::BrainEvent::Done(_) => break,
434                crate::provider::BrainEvent::Error(e) => {
435                    anyhow::bail!("Planner error: {}", e)
436                }
437                _ => {}
438            }
439        }
440
441        let _ = event_tx.send(Event::AgentStatus {
442            run: parent_run.clone(),
443            role: "planner".into(),
444            status: AgentStatus::Done,
445            note: "plan complete".into(),
446        });
447
448        Ok((plan, cost, tokens))
449    }
450
451    /// Run the coder agent with a given spec
452    async fn run_coder(
453        &self,
454        spec: &str,
455        rework_notes: Option<&[String]>,
456        workspace: &Workspace,
457        brain: Arc<dyn Brain>,
458        event_tx: &mpsc::UnboundedSender<Event>,
459        parent_run: &RunId,
460    ) -> anyhow::Result<(Vec<crate::event::FileDiff>, f64, TokenUsage)> {
461        let coder_identity = Identity {
462            name: "coder".into(),
463            role: "implementation engineer".into(),
464            personality:
465                "precise, produces clean working code, uses exact file edits with the edit tool."
466                    .into(),
467        };
468
469        let rework_section = if let Some(notes) = rework_notes {
470            if notes.is_empty() {
471                String::new()
472            } else {
473                format!(
474                    "\n## REWORK NOTES (from verifier)\nThe previous implementation had issues. Fix these:\n{}",
475                    notes
476                        .iter()
477                        .enumerate()
478                        .map(|(i, n)| format!("{}. {}", i + 1, n))
479                        .collect::<Vec<_>>()
480                        .join("\n")
481                )
482            }
483        } else {
484            String::new()
485        };
486
487        let system = format!(
488            r#"You are the CODER agent in a swarm.
489
490{}
491
492Your job: implement the SPEC exactly. Use tools to read existing files and write edits.
493- Follow the spec steps in order.
494- Use the edit or fs_write tool to make changes.
495- After each file edit, note what you changed.
496- Produce working, compilable code.
497{}
498"#,
499            coder_identity.personality, rework_section,
500        );
501
502        // Build repo map context
503        let repo_map = self.memory.repo_map(&workspace.root);
504        let file_list: Vec<String> = repo_map
505            .files
506            .iter()
507            .map(|f| format!("  {}", f.path))
508            .collect();
509
510        let context_msg = format!(
511            "## SPEC TO IMPLEMENT\n\n{}\n\n## WORKSPACE FILES\n{}",
512            spec,
513            file_list.join("\n"),
514        );
515
516        let mut messages = vec![Msg {
517            role: "user".into(),
518            content: vec![ContentBlock::Text { text: context_msg }],
519        }];
520        let tools = swarm_tool_registry(workspace, true);
521        let tool_specs = tools.to_specs();
522
523        let _ = event_tx.send(Event::AgentStatus {
524            run: parent_run.clone(),
525            role: "coder".into(),
526            status: AgentStatus::Working,
527            note: format!("implementing with {}", brain.id()),
528        });
529
530        let mut output = String::new();
531        let mut diffs = Vec::new();
532        let caps = brain.caps();
533        let mut cost = 0.0_f64;
534        let mut tokens = TokenUsage {
535            input: 0,
536            output: 0,
537        };
538
539        for _turn in 0..8 {
540            let req = BrainRequest {
541                system: Some(system.clone()),
542                messages: messages.clone(),
543                tools: tool_specs.clone(),
544                max_tokens: 8192,
545                temperature: 0.0,
546                stop: vec![],
547                cache: PromptCacheConfig::enabled(Some(prompt_cache_key(
548                    "swarm-coder",
549                    &workspace.root,
550                    &tool_specs,
551                ))),
552            };
553
554            let mut stream = brain.complete(req).await?;
555            let mut assistant_text = String::new();
556            let mut assistant_blocks = Vec::new();
557            let mut tool_result_blocks = Vec::new();
558            let mut current_tool_id = String::new();
559            let mut current_tool_name = String::new();
560            let mut current_tool_json = String::new();
561
562            while let Some(ev) = futures::StreamExt::next(&mut stream).await {
563                match ev {
564                    crate::provider::BrainEvent::TextDelta(text) => {
565                        output.push_str(&text);
566                        assistant_text.push_str(&text);
567                        let _ = event_tx.send(Event::ThinkingDelta {
568                            run: parent_run.clone(),
569                            text,
570                        });
571                    }
572                    // The orchestrator's coder loop doesn't need to round-trip
573                    // reasoning content (each phase uses a fresh BrainRequest),
574                    // so we just swallow it here to keep the match exhaustive.
575                    crate::provider::BrainEvent::ReasoningDelta(_) => {}
576                    crate::provider::BrainEvent::ToolUseStart { id, name } => {
577                        current_tool_id = id.clone();
578                        current_tool_name = name.clone();
579                        current_tool_json.clear();
580                        let risk = tools
581                            .get(&name)
582                            .map(|tool| tool.risk())
583                            .unwrap_or(RiskLevel::ReadOnly);
584                        let _ = event_tx.send(Event::ToolUseProposed {
585                            run: parent_run.clone(),
586                            id,
587                            name,
588                            args: serde_json::json!({}),
589                            risk,
590                        });
591                    }
592                    crate::provider::BrainEvent::ToolUseDelta { id: _, json } => {
593                        current_tool_json.push_str(&json);
594                    }
595                    crate::provider::BrainEvent::ToolUseEnd { id } => {
596                        let args = serde_json::from_str::<serde_json::Value>(&current_tool_json)
597                            .unwrap_or_else(|_| serde_json::json!({}));
598                        let tool_name = if current_tool_name.is_empty() {
599                            "unknown".to_string()
600                        } else {
601                            current_tool_name.clone()
602                        };
603                        assistant_blocks.push(ContentBlock::ToolUse {
604                            id: id.clone(),
605                            name: tool_name.clone(),
606                            input: args.clone(),
607                        });
608                        let _ = event_tx.send(Event::ToolUseStarted {
609                            run: parent_run.clone(),
610                            id: id.clone(),
611                        });
612                        let result = if let Some(tool) = tools.get(&tool_name) {
613                            let ctx = ToolCtx {
614                                workspace_root: workspace.root.clone(),
615                                run_id: parent_run.clone(),
616                            };
617                            match tool.call(args.clone(), &ctx).await {
618                                Ok(result) => result,
619                                Err(err) => crate::tools::ToolResult::error(format!(
620                                    "Tool {} failed: {}",
621                                    tool_name, err
622                                )),
623                            }
624                        } else {
625                            crate::tools::ToolResult::error(format!("Unknown tool: {}", tool_name))
626                        };
627                        track_tool_diff(&mut diffs, &tool_name, &args, &result.content);
628                        for diff in &diffs {
629                            let _ = event_tx.send(Event::DiffProposed {
630                                run: parent_run.clone(),
631                                file: diff.file.clone(),
632                                patch: String::new(),
633                                plus: diff.plus,
634                                minus: diff.minus,
635                            });
636                        }
637                        let blocks = result.content.clone();
638                        let text = tool_blocks_text(&blocks);
639                        let _ = event_tx.send(Event::ToolOutput {
640                            run: parent_run.clone(),
641                            id: id.clone(),
642                            blocks,
643                            is_error: result.is_error,
644                        });
645                        tool_result_blocks.push(ContentBlock::ToolResult {
646                            tool_use_id: id,
647                            content: vec![ContentBlock::Text { text }],
648                            is_error: Some(result.is_error),
649                        });
650                        current_tool_id.clear();
651                        current_tool_name.clear();
652                        current_tool_json.clear();
653                    }
654                    crate::provider::BrainEvent::Done(_) => break,
655                    crate::provider::BrainEvent::Error(e) => anyhow::bail!("Coder error: {}", e),
656                    crate::provider::BrainEvent::Usage(usage) => {
657                        tokens.input = tokens.input.saturating_add(usage.input);
658                        tokens.output = tokens.output.saturating_add(usage.output);
659                        cost += caps.cost_input_per_mtok * (usage.input as f64) / 1_000_000.0
660                            + caps.cost_output_per_mtok * (usage.output as f64) / 1_000_000.0;
661                    }
662                }
663            }
664
665            if !assistant_text.is_empty() {
666                assistant_blocks.insert(
667                    0,
668                    ContentBlock::Text {
669                        text: assistant_text,
670                    },
671                );
672            }
673            if tool_result_blocks.is_empty() {
674                break;
675            }
676            messages.push(Msg {
677                role: "assistant".into(),
678                content: assistant_blocks,
679            });
680            messages.push(Msg {
681                role: "user".into(),
682                content: tool_result_blocks,
683            });
684        }
685
686        if diffs.is_empty() {
687            for line in output.lines() {
688                if let Some(file) = line.strip_prefix("Edited ") {
689                    let file = file
690                        .trim()
691                        .split(':')
692                        .next()
693                        .unwrap_or("")
694                        .trim()
695                        .to_string();
696                    if !file.is_empty() && !diffs.iter().any(|d| d.file == file) {
697                        diffs.push(crate::event::FileDiff {
698                            file,
699                            plus: 1,
700                            minus: 1,
701                        });
702                    }
703                }
704            }
705        }
706
707        // Emit diff events
708        for diff in &diffs {
709            let _ = event_tx.send(Event::DiffProposed {
710                run: parent_run.clone(),
711                file: diff.file.clone(),
712                patch: String::new(),
713                plus: diff.plus,
714                minus: diff.minus,
715            });
716        }
717
718        let _ = event_tx.send(Event::AgentStatus {
719            run: parent_run.clone(),
720            role: "coder".into(),
721            status: AgentStatus::Done,
722            note: format!("{} files changed", diffs.len()),
723        });
724
725        Ok((diffs, cost, tokens))
726    }
727
728    /// Run the verifier agent
729    async fn run_verifier(
730        &self,
731        spec: &str,
732        diffs: &[crate::event::FileDiff],
733        workspace: &Workspace,
734        brain: Arc<dyn Brain>,
735        event_tx: &mpsc::UnboundedSender<Event>,
736        parent_run: &RunId,
737    ) -> anyhow::Result<(Verdict, f64, TokenUsage)> {
738        let verifier_identity = Identity {
739            name: "verifier".into(),
740            role: "code reviewer and quality assurance".into(),
741            personality: "adversarial, meticulous, catches issues the coder missed. Checks correctness, style, edge cases, and spec compliance.".into(),
742        };
743
744        let diff_summary: String = diffs
745            .iter()
746            .map(|d| format!("  {}: +{} -{}", d.file, d.plus, d.minus))
747            .collect::<Vec<_>>()
748            .join("\n");
749
750        let tools = swarm_tool_registry(workspace, false);
751        let read_tool = tools.get("fs_read");
752        let mut files_to_check = Vec::new();
753        for d in diffs {
754            let content = if let Some(tool) = &read_tool {
755                let ctx = ToolCtx {
756                    workspace_root: workspace.root.clone(),
757                    run_id: parent_run.clone(),
758                };
759                let args = serde_json::json!({ "path": d.file, "limit": 220 });
760                match tool.call(args, &ctx).await {
761                    Ok(result) => tool_blocks_text(&result.content),
762                    Err(_) => format!("[cannot read {}]", d.file),
763                }
764            } else {
765                format!("[cannot read {}]", d.file)
766            };
767            files_to_check.push(content);
768        }
769
770        let files_context: String = diffs
771            .iter()
772            .zip(files_to_check.iter())
773            .map(|(d, content)| {
774                format!(
775                    "### {}\n```\n{}\n```",
776                    d.file,
777                    if content.len() > 3000 {
778                        format!("{}... [truncated]", &content[..3000])
779                    } else {
780                        content.clone()
781                    }
782                )
783            })
784            .collect::<Vec<_>>()
785            .join("\n\n");
786
787        let system = format!(
788            r#"You are the VERIFIER agent in a swarm.
789
790{personality}
791
792Your job: review the coder's implementation against the SPEC.
793- For each spec requirement, check if it's satisfied.
794- Find bugs, style issues, missing edge cases, spec violations.
795- Output EXACTLY one of:
796  ✓ PASS — if everything is correct and complete.
797  ✗ REWORK — followed by numbered concrete findings.
798
799Format:
800✓ PASS
801(no issues found)
802
803or:
804
805✗ REWORK
8061. <specific finding with file:line>
8072. <another finding>
808"#,
809            personality = verifier_identity.personality,
810        );
811
812        let context = format!(
813            "## SPEC\n{}\n\n## CHANGED FILES\n{}\n\n## FILE CONTENTS\n{}",
814            spec, diff_summary, files_context,
815        );
816
817        let messages = vec![Msg {
818            role: "user".into(),
819            content: vec![ContentBlock::Text { text: context }],
820        }];
821
822        let tool_specs = tools.to_specs();
823        let req = BrainRequest {
824            system: Some(system),
825            messages,
826            tools: tool_specs.clone(),
827            max_tokens: 4096,
828            temperature: 0.0,
829            stop: vec![],
830            cache: PromptCacheConfig::enabled(Some(prompt_cache_key(
831                "swarm-verifier",
832                &workspace.root,
833                &tool_specs,
834            ))),
835        };
836
837        let _ = event_tx.send(Event::AgentStatus {
838            run: parent_run.clone(),
839            role: "verifier".into(),
840            status: AgentStatus::Working,
841            note: format!("reviewing with {}", brain.id()),
842        });
843
844        let mut stream = brain.complete(req).await?;
845        let mut verdict_text = String::new();
846        let caps = brain.caps();
847        let mut cost = 0.0_f64;
848        let mut tokens = TokenUsage {
849            input: 0,
850            output: 0,
851        };
852
853        while let Some(ev) = futures::StreamExt::next(&mut stream).await {
854            match ev {
855                crate::provider::BrainEvent::TextDelta(text) => {
856                    verdict_text.push_str(&text);
857                }
858                crate::provider::BrainEvent::Usage(usage) => {
859                    tokens.input = tokens.input.saturating_add(usage.input);
860                    tokens.output = tokens.output.saturating_add(usage.output);
861                    cost += caps.cost_input_per_mtok * (usage.input as f64) / 1_000_000.0
862                        + caps.cost_output_per_mtok * (usage.output as f64) / 1_000_000.0;
863                }
864                crate::provider::BrainEvent::Done(_) => break,
865                _ => {}
866            }
867        }
868
869        // Case-insensitive PASS/REWORK detection. "PASS" wins only if no rework signal.
870        let upper = verdict_text.to_uppercase();
871        let has_rework = upper.contains("REWORK")
872            || upper.contains("NEEDS REWORK")
873            || verdict_text.contains("✗");
874        let has_pass = (upper.contains("PASS") || verdict_text.contains("✓")) && !has_rework;
875
876        let verdict = if has_rework {
877            let findings: Vec<String> = verdict_text
878                .lines()
879                .filter(|l| l.trim().starts_with(|c: char| c.is_ascii_digit()) && l.contains('.'))
880                .map(|l| l.trim().to_string())
881                .collect();
882
883            if findings.is_empty() {
884                Verdict::Rework {
885                    findings: vec![verdict_text.clone()],
886                }
887            } else {
888                Verdict::Rework { findings }
889            }
890        } else if has_pass {
891            Verdict::Pass
892        } else {
893            // No clear verdict — treat as rework with the raw text to be safe.
894            Verdict::Rework {
895                findings: vec![format!("Verifier verdict unclear: {}", verdict_text)],
896            }
897        };
898
899        let _ = event_tx.send(Event::AgentStatus {
900            run: parent_run.clone(),
901            role: "verifier".into(),
902            status: AgentStatus::Done,
903            note: match &verdict {
904                Verdict::Pass => "PASS".into(),
905                Verdict::Rework { findings } => format!("REWORK ({} issues)", findings.len()),
906            },
907        });
908
909        Ok((verdict, cost, tokens))
910    }
911}
912
913#[async_trait::async_trait]
914impl Orchestrator for DefaultOrchestrator {
915    async fn run_swarm(
916        &self,
917        plan: SwarmPlan,
918        event_tx: mpsc::UnboundedSender<Event>,
919    ) -> anyhow::Result<SwarmOutcome> {
920        let run_id = RunId::new();
921        let task = plan.task.clone();
922
923        let _ = event_tx.send(Event::RunStarted {
924            run: run_id.clone(),
925            task: task.clone(),
926            agent: "swarm".into(),
927        });
928
929        // Select brains for each role
930        let planner_brain = self
931            .select_brain("planner", self.classify(&task))
932            .ok_or_else(|| anyhow::anyhow!("No model available for planner"))?;
933
934        let coder_brain = self
935            .select_brain("coder", self.classify(&task))
936            .unwrap_or_else(|| planner_brain.clone());
937
938        let verifier_brain = self
939            .select_brain("verifier", self.classify(&task))
940            .unwrap_or_else(|| coder_brain.clone());
941
942        // Create workspace
943        let sandbox = LocalSandbox::new(plan.workspace.clone());
944        let workspace = Workspace {
945            root: plan.workspace.clone(),
946            sandbox: Arc::new(sandbox),
947        };
948
949        let mut total_cost: f64 = 0.0;
950        let mut total_tokens = TokenUsage {
951            input: 0,
952            output: 0,
953        };
954
955        // ▸ PHASE 1: PLANNING
956        let _ = event_tx.send(Event::AgentSpawned {
957            run: run_id.clone(),
958            role: "planner".into(),
959            model: planner_brain.id().to_string(),
960        });
961
962        let (spec, planner_cost, planner_tokens) = self
963            .run_planner(&task, &workspace, planner_brain, &event_tx, &run_id)
964            .await?;
965        total_cost += planner_cost;
966        total_tokens.input = total_tokens.input.saturating_add(planner_tokens.input);
967        total_tokens.output = total_tokens.output.saturating_add(planner_tokens.output);
968        let _ = event_tx.send(Event::CostUpdate {
969            run: run_id.clone(),
970            usd: total_cost,
971        });
972
973        // Post plan to shared memory
974        let _ = self.memory.upsert_doc(crate::memory::WorkingDoc {
975            id: format!("plan-{}", run_id.0),
976            title: format!("Plan: {}", &task[..task.len().min(60)]),
977            content: spec.clone(),
978            updated_at: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
979        });
980
981        // ▸ PHASE 2–3: CODING + VERIFYING (with REWORK loop)
982        let _ = event_tx.send(Event::AgentSpawned {
983            run: run_id.clone(),
984            role: "coder".into(),
985            model: coder_brain.id().to_string(),
986        });
987
988        let _ = event_tx.send(Event::AgentSpawned {
989            run: run_id.clone(),
990            role: "verifier".into(),
991            model: verifier_brain.id().to_string(),
992        });
993
994        let mut rework_notes: Option<Vec<String>> = None;
995        let mut all_diffs: Vec<crate::event::FileDiff> = Vec::new();
996        let mut reworks = 0u32;
997        let mut passes = 0u32;
998
999        for attempt in 0..=plan.max_reworks {
1000            // ▸ CODER
1001            let (diffs, coder_cost, coder_tokens) = self
1002                .run_coder(
1003                    &spec,
1004                    rework_notes.as_deref(),
1005                    &workspace,
1006                    coder_brain.clone(),
1007                    &event_tx,
1008                    &run_id,
1009                )
1010                .await?;
1011            total_cost += coder_cost;
1012            total_tokens.input = total_tokens.input.saturating_add(coder_tokens.input);
1013            total_tokens.output = total_tokens.output.saturating_add(coder_tokens.output);
1014            let _ = event_tx.send(Event::CostUpdate {
1015                run: run_id.clone(),
1016                usd: total_cost,
1017            });
1018
1019            // Release any locks from previous attempt
1020            if attempt > 0 {
1021                let prev_files: Vec<String> = all_diffs.iter().map(|d| d.file.clone()).collect();
1022                self.file_locks.release(&prev_files).await;
1023            }
1024
1025            // Acquire locks for new diffs
1026            let new_files: Vec<String> = diffs.iter().map(|d| d.file.clone()).collect();
1027            if let Err(conflicts) = self.file_locks.try_lock(&new_files).await {
1028                let _ = event_tx.send(Event::Error {
1029                    run: run_id.clone(),
1030                    message: format!("File collision detected: {:?}", conflicts),
1031                });
1032            }
1033
1034            all_diffs = diffs.clone();
1035
1036            // ▸ Empty-diff guard: if the coder changed no files, don't waste a
1037            // verify pass (and never let it PASS) — force a REWORK with an
1038            // explicit instruction to actually call the tools. This rescues weak
1039            // models that describe a change in prose instead of emitting tool calls.
1040            if diffs.is_empty() && attempt < plan.max_reworks {
1041                reworks += 1;
1042                let note = "You made NO file changes. You MUST call the fs_write or edit \
1043                    tool to create/modify the files in the spec — do not merely describe \
1044                    the change in prose. Emit the tool call now."
1045                    .to_string();
1046                rework_notes = Some(vec![note.clone()]);
1047                let _ = event_tx.send(Event::TestResult {
1048                    run: run_id.clone(),
1049                    passed: 0,
1050                    failed: 1,
1051                    detail: "no file changes — coder must use tools".into(),
1052                });
1053                let _ = event_tx.send(Event::AgentStatus {
1054                    run: run_id.clone(),
1055                    role: "coder".into(),
1056                    status: AgentStatus::Working,
1057                    note: format!("no diff — forcing tool use (attempt {})", attempt + 1),
1058                });
1059                continue;
1060            }
1061
1062            // ▸ VERIFIER
1063            let (verdict, verifier_cost, verifier_tokens) = self
1064                .run_verifier(
1065                    &spec,
1066                    &diffs,
1067                    &workspace,
1068                    verifier_brain.clone(),
1069                    &event_tx,
1070                    &run_id,
1071                )
1072                .await?;
1073            total_cost += verifier_cost;
1074            total_tokens.input = total_tokens.input.saturating_add(verifier_tokens.input);
1075            total_tokens.output = total_tokens.output.saturating_add(verifier_tokens.output);
1076            let _ = event_tx.send(Event::CostUpdate {
1077                run: run_id.clone(),
1078                usd: total_cost,
1079            });
1080
1081            match verdict {
1082                Verdict::Pass => {
1083                    passes += 1;
1084                    // Signal PASS to shared memory
1085                    let _ = self.memory.post_signal(crate::memory::SharedSignal {
1086                        id: format!("pass-{}-{}", run_id.0, attempt),
1087                        from_agent: "verifier".into(),
1088                        to_agent: "coder".into(),
1089                        content: "PASS — all checks satisfied".into(),
1090                        timestamp: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
1091                    });
1092
1093                    let _ = event_tx.send(Event::TestResult {
1094                        run: run_id.clone(),
1095                        passed: 1,
1096                        failed: 0,
1097                        detail: "Verifier PASS".into(),
1098                    });
1099                    for diff in &diffs {
1100                        let _ = event_tx.send(Event::DiffApplied {
1101                            run: run_id.clone(),
1102                            file: diff.file.clone(),
1103                        });
1104                    }
1105                    break; // Done!
1106                }
1107                Verdict::Rework { findings } => {
1108                    reworks += 1;
1109                    rework_notes = Some(findings.clone());
1110
1111                    // Signal REWORK to shared memory
1112                    let _ = self.memory.post_signal(crate::memory::SharedSignal {
1113                        id: format!("rework-{}-{}", run_id.0, attempt),
1114                        from_agent: "verifier".into(),
1115                        to_agent: "coder".into(),
1116                        content: format!("REWORK: {}", findings.join("; ")),
1117                        timestamp: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
1118                    });
1119
1120                    let _ = event_tx.send(Event::TestResult {
1121                        run: run_id.clone(),
1122                        passed: 0,
1123                        failed: findings.len() as u32,
1124                        detail: findings.join("\n"),
1125                    });
1126
1127                    let _ = event_tx.send(Event::AgentStatus {
1128                        run: run_id.clone(),
1129                        role: "coder".into(),
1130                        status: AgentStatus::Working,
1131                        note: format!("rework attempt {}/{}", attempt + 1, plan.max_reworks),
1132                    });
1133                }
1134            }
1135        }
1136
1137        let outcome = SwarmOutcome {
1138            status: if passes > 0 {
1139                "PASS".into()
1140            } else {
1141                format!("FAILED after {} reworks", reworks)
1142            },
1143            plan: Some(spec),
1144            diffs: all_diffs,
1145            passes,
1146            reworks,
1147            cost_usd: total_cost,
1148        };
1149
1150        let cost_comparison = crate::cost::format_comparison(outcome.cost_usd, &total_tokens);
1151        let outcome_summary = OutcomeSummary {
1152            status: outcome.status.clone(),
1153            diffs: outcome.diffs.clone(),
1154            cost_usd: outcome.cost_usd,
1155            tokens: total_tokens,
1156            cost_comparison,
1157            duration_ms: None,
1158        };
1159
1160        let _ = event_tx.send(Event::RunFinished {
1161            run: run_id,
1162            outcome: outcome_summary,
1163        });
1164
1165        Ok(outcome)
1166    }
1167}