Skip to main content

a3s_code_core/
agent_teams.rs

1//! Agent Teams — Peer-to-peer multi-agent coordination
2//!
3//! Enables multiple `AgentSession` instances to collaborate on complex tasks
4//! through a shared task board and message passing. Each agent has a role
5//! (Lead, Worker, Reviewer) and can post/claim tasks on the board.
6//!
7//! ## Architecture
8//!
9//! ```text
10//! AgentTeam
11//!   +-- TeamTaskBoard (shared task queue)
12//!   +-- TeamMember[] (role + session reference)
13//!   +-- mpsc channels (peer-to-peer messaging)
14//! ```
15//!
16//! ## Usage
17//!
18//! ```rust,no_run
19//! use a3s_code_core::agent_teams::{AgentTeam, TeamConfig, TeamRole};
20//!
21//! # async fn run() -> anyhow::Result<()> {
22//! let config = TeamConfig::default();
23//! let mut team = AgentTeam::new("refactor-auth", config);
24//!
25//! // Add members (each wraps an AgentSession)
26//! team.add_member("lead", TeamRole::Lead);
27//! team.add_member("worker-1", TeamRole::Worker);
28//! team.add_member("reviewer", TeamRole::Reviewer);
29//!
30//! // Post a task to the board
31//! team.task_board().post("Refactor auth module", "lead", None);
32//!
33//! // Worker claims and works on it
34//! let task = team.task_board().claim("worker-1");
35//! # Ok(())
36//! # }
37//! ```
38
39use std::collections::HashMap;
40use std::sync::{Arc, RwLock};
41use std::time::Duration;
42use tokio::sync::mpsc;
43
44/// Team configuration.
45#[derive(Debug, Clone)]
46pub struct TeamConfig {
47    /// Maximum concurrent tasks on the board.
48    /// Default: 50
49    pub max_tasks: usize,
50    /// Message channel buffer size.
51    /// Default: 128
52    pub channel_buffer: usize,
53    /// Maximum coordination rounds before `run_until_done` exits.
54    /// Default: 10
55    pub max_rounds: usize,
56    /// Worker/Reviewer polling interval in milliseconds.
57    /// Default: 200
58    pub poll_interval_ms: u64,
59}
60
61impl Default for TeamConfig {
62    fn default() -> Self {
63        Self {
64            max_tasks: 50,
65            channel_buffer: 128,
66            max_rounds: 10,
67            poll_interval_ms: 200,
68        }
69    }
70}
71
72/// Role of a team member.
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
74#[serde(rename_all = "snake_case")]
75pub enum TeamRole {
76    /// Decomposes goals into tasks, assigns work.
77    Lead,
78    /// Executes assigned tasks.
79    Worker,
80    /// Reviews completed work, provides feedback.
81    Reviewer,
82}
83
84impl std::fmt::Display for TeamRole {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        match self {
87            TeamRole::Lead => write!(f, "lead"),
88            TeamRole::Worker => write!(f, "worker"),
89            TeamRole::Reviewer => write!(f, "reviewer"),
90        }
91    }
92}
93
94/// A message passed between team members.
95#[derive(Debug, Clone)]
96pub struct TeamMessage {
97    /// Sender member ID.
98    pub from: String,
99    /// Recipient member ID.
100    pub to: String,
101    /// Message content.
102    pub content: String,
103    /// Optional task ID this message relates to.
104    pub task_id: Option<String>,
105    /// Timestamp (Unix epoch seconds).
106    pub timestamp: i64,
107}
108
109/// Task status on the board.
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum TaskStatus {
112    /// Waiting to be claimed.
113    Open,
114    /// Claimed by a worker.
115    InProgress,
116    /// Work done, awaiting review.
117    InReview,
118    /// Approved by reviewer.
119    Done,
120    /// Rejected, needs rework.
121    Rejected,
122}
123
124impl std::fmt::Display for TaskStatus {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        match self {
127            TaskStatus::Open => write!(f, "open"),
128            TaskStatus::InProgress => write!(f, "in_progress"),
129            TaskStatus::InReview => write!(f, "in_review"),
130            TaskStatus::Done => write!(f, "done"),
131            TaskStatus::Rejected => write!(f, "rejected"),
132        }
133    }
134}
135
136/// A task on the team board.
137#[derive(Debug, Clone)]
138pub struct TeamTask {
139    /// Unique task ID.
140    pub id: String,
141    /// Task description.
142    pub description: String,
143    /// Who posted it.
144    pub posted_by: String,
145    /// Who is working on it (if claimed).
146    pub assigned_to: Option<String>,
147    /// Current status.
148    pub status: TaskStatus,
149    /// Optional result/output when completed.
150    pub result: Option<String>,
151    /// Created timestamp.
152    pub created_at: i64,
153    /// Last updated timestamp.
154    pub updated_at: i64,
155}
156
157/// Shared task board for team coordination.
158#[derive(Debug)]
159pub struct TeamTaskBoard {
160    tasks: RwLock<Vec<TeamTask>>,
161    max_tasks: usize,
162    next_id: RwLock<u64>,
163}
164
165impl TeamTaskBoard {
166    /// Create a new task board.
167    pub fn new(max_tasks: usize) -> Self {
168        Self {
169            tasks: RwLock::new(Vec::new()),
170            max_tasks,
171            next_id: RwLock::new(1),
172        }
173    }
174
175    /// Post a new task to the board. Returns the task ID.
176    pub fn post(
177        &self,
178        description: &str,
179        posted_by: &str,
180        assign_to: Option<&str>,
181    ) -> Option<String> {
182        let mut tasks = self.tasks.write().unwrap();
183        if tasks.len() >= self.max_tasks {
184            return None;
185        }
186
187        let mut id_counter = self.next_id.write().unwrap();
188        let id = format!("task-{}", *id_counter);
189        *id_counter += 1;
190
191        let now = chrono::Utc::now().timestamp();
192        let status = if assign_to.is_some() {
193            TaskStatus::InProgress
194        } else {
195            TaskStatus::Open
196        };
197
198        tasks.push(TeamTask {
199            id: id.clone(),
200            description: description.to_string(),
201            posted_by: posted_by.to_string(),
202            assigned_to: assign_to.map(|s| s.to_string()),
203            status,
204            result: None,
205            created_at: now,
206            updated_at: now,
207        });
208
209        Some(id)
210    }
211
212    /// Claim the next open or rejected task for a member. Returns the task if available.
213    ///
214    /// Rejected tasks are treated as retriable: a worker can claim them again
215    /// for another execution attempt.
216    pub fn claim(&self, member_id: &str) -> Option<TeamTask> {
217        let mut tasks = self.tasks.write().unwrap();
218        let task = tasks
219            .iter_mut()
220            .find(|t| t.status == TaskStatus::Open || t.status == TaskStatus::Rejected)?;
221        task.assigned_to = Some(member_id.to_string());
222        task.status = TaskStatus::InProgress;
223        task.updated_at = chrono::Utc::now().timestamp();
224        Some(task.clone())
225    }
226
227    /// Mark a task as complete with a result.
228    pub fn complete(&self, task_id: &str, result: &str) -> bool {
229        let mut tasks = self.tasks.write().unwrap();
230        if let Some(task) = tasks.iter_mut().find(|t| t.id == task_id) {
231            task.status = TaskStatus::InReview;
232            task.result = Some(result.to_string());
233            task.updated_at = chrono::Utc::now().timestamp();
234            true
235        } else {
236            false
237        }
238    }
239
240    /// Approve a task (reviewer action).
241    pub fn approve(&self, task_id: &str) -> bool {
242        let mut tasks = self.tasks.write().unwrap();
243        if let Some(task) = tasks
244            .iter_mut()
245            .find(|t| t.id == task_id && t.status == TaskStatus::InReview)
246        {
247            task.status = TaskStatus::Done;
248            task.updated_at = chrono::Utc::now().timestamp();
249            true
250        } else {
251            false
252        }
253    }
254
255    /// Reject a task back to open (reviewer action).
256    pub fn reject(&self, task_id: &str) -> bool {
257        let mut tasks = self.tasks.write().unwrap();
258        if let Some(task) = tasks
259            .iter_mut()
260            .find(|t| t.id == task_id && t.status == TaskStatus::InReview)
261        {
262            task.status = TaskStatus::Rejected;
263            task.assigned_to = None;
264            task.updated_at = chrono::Utc::now().timestamp();
265            true
266        } else {
267            false
268        }
269    }
270
271    /// Get all tasks with a given status.
272    pub fn by_status(&self, status: TaskStatus) -> Vec<TeamTask> {
273        self.tasks
274            .read()
275            .unwrap()
276            .iter()
277            .filter(|t| t.status == status)
278            .cloned()
279            .collect()
280    }
281
282    /// Get all tasks assigned to a member.
283    pub fn by_assignee(&self, member_id: &str) -> Vec<TeamTask> {
284        self.tasks
285            .read()
286            .unwrap()
287            .iter()
288            .filter(|t| t.assigned_to.as_deref() == Some(member_id))
289            .cloned()
290            .collect()
291    }
292
293    /// Get a task by ID.
294    pub fn get(&self, task_id: &str) -> Option<TeamTask> {
295        self.tasks
296            .read()
297            .unwrap()
298            .iter()
299            .find(|t| t.id == task_id)
300            .cloned()
301    }
302
303    /// Number of tasks on the board.
304    pub fn len(&self) -> usize {
305        self.tasks.read().unwrap().len()
306    }
307
308    /// Whether the board is empty.
309    pub fn is_empty(&self) -> bool {
310        self.tasks.read().unwrap().is_empty()
311    }
312
313    /// Summary stats: (open, in_progress, in_review, done, rejected).
314    pub fn stats(&self) -> (usize, usize, usize, usize, usize) {
315        let tasks = self.tasks.read().unwrap();
316        let open = tasks
317            .iter()
318            .filter(|t| t.status == TaskStatus::Open)
319            .count();
320        let progress = tasks
321            .iter()
322            .filter(|t| t.status == TaskStatus::InProgress)
323            .count();
324        let review = tasks
325            .iter()
326            .filter(|t| t.status == TaskStatus::InReview)
327            .count();
328        let done = tasks
329            .iter()
330            .filter(|t| t.status == TaskStatus::Done)
331            .count();
332        let rejected = tasks
333            .iter()
334            .filter(|t| t.status == TaskStatus::Rejected)
335            .count();
336        (open, progress, review, done, rejected)
337    }
338}
339
340/// A team member.
341#[derive(Debug, Clone)]
342pub struct TeamMember {
343    /// Unique member ID.
344    pub id: String,
345    /// Member role.
346    pub role: TeamRole,
347}
348
349/// Multi-agent team coordinator.
350pub struct AgentTeam {
351    /// Team name.
352    name: String,
353    /// Configuration.
354    config: TeamConfig,
355    /// Registered members.
356    members: HashMap<String, TeamMember>,
357    /// Shared task board.
358    task_board: Arc<TeamTaskBoard>,
359    /// Message senders per member.
360    senders: HashMap<String, mpsc::Sender<TeamMessage>>,
361    /// Message receivers per member (taken on first access).
362    receivers: HashMap<String, mpsc::Receiver<TeamMessage>>,
363}
364
365impl AgentTeam {
366    /// Create a new team.
367    pub fn new(name: &str, config: TeamConfig) -> Self {
368        Self {
369            name: name.to_string(),
370            config,
371            members: HashMap::new(),
372            task_board: Arc::new(TeamTaskBoard::new(50)),
373            senders: HashMap::new(),
374            receivers: HashMap::new(),
375        }
376    }
377
378    /// Team name.
379    pub fn name(&self) -> &str {
380        &self.name
381    }
382
383    /// Add a member to the team.
384    pub fn add_member(&mut self, id: &str, role: TeamRole) {
385        let (tx, rx) = mpsc::channel(self.config.channel_buffer);
386        self.members.insert(
387            id.to_string(),
388            TeamMember {
389                id: id.to_string(),
390                role,
391            },
392        );
393        self.senders.insert(id.to_string(), tx);
394        self.receivers.insert(id.to_string(), rx);
395    }
396
397    /// Remove a member from the team.
398    pub fn remove_member(&mut self, id: &str) -> bool {
399        self.senders.remove(id);
400        self.receivers.remove(id);
401        self.members.remove(id).is_some()
402    }
403
404    /// Get a reference to the shared task board.
405    pub fn task_board(&self) -> &TeamTaskBoard {
406        &self.task_board
407    }
408
409    /// Get a cloneable Arc to the task board.
410    pub fn task_board_arc(&self) -> Arc<TeamTaskBoard> {
411        Arc::clone(&self.task_board)
412    }
413
414    /// Send a message to a team member.
415    pub async fn send_message(
416        &self,
417        from: &str,
418        to: &str,
419        content: &str,
420        task_id: Option<&str>,
421    ) -> bool {
422        let sender = match self.senders.get(to) {
423            Some(s) => s,
424            None => return false,
425        };
426
427        let msg = TeamMessage {
428            from: from.to_string(),
429            to: to.to_string(),
430            content: content.to_string(),
431            task_id: task_id.map(|s| s.to_string()),
432            timestamp: chrono::Utc::now().timestamp(),
433        };
434
435        sender.send(msg).await.is_ok()
436    }
437
438    /// Take the message receiver for a member (can only be called once per member).
439    pub fn take_receiver(&mut self, member_id: &str) -> Option<mpsc::Receiver<TeamMessage>> {
440        self.receivers.remove(member_id)
441    }
442
443    /// Broadcast a message to all members except the sender.
444    pub async fn broadcast(&self, from: &str, content: &str, task_id: Option<&str>) {
445        for (id, sender) in &self.senders {
446            if id == from {
447                continue;
448            }
449            let msg = TeamMessage {
450                from: from.to_string(),
451                to: id.clone(),
452                content: content.to_string(),
453                task_id: task_id.map(|s| s.to_string()),
454                timestamp: chrono::Utc::now().timestamp(),
455            };
456            let _ = sender.send(msg).await;
457        }
458    }
459
460    /// Get all members.
461    pub fn members(&self) -> Vec<&TeamMember> {
462        self.members.values().collect()
463    }
464
465    /// Get members by role.
466    pub fn members_by_role(&self, role: TeamRole) -> Vec<&TeamMember> {
467        self.members.values().filter(|m| m.role == role).collect()
468    }
469
470    /// Number of members.
471    pub fn member_count(&self) -> usize {
472        self.members.len()
473    }
474}
475
476impl std::fmt::Debug for AgentTeam {
477    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
478        f.debug_struct("AgentTeam")
479            .field("name", &self.name)
480            .field("members", &self.members.len())
481            .field("tasks", &self.task_board.len())
482            .finish()
483    }
484}
485
486// ---------------------------------------------------------------------------
487// AgentExecutor — abstraction over AgentSession for testability
488// ---------------------------------------------------------------------------
489
490/// Minimal execution interface for a team member.
491///
492/// `AgentSession` implements this trait. In tests, a mock can be used instead.
493#[async_trait::async_trait]
494pub trait AgentExecutor: Send + Sync {
495    /// Execute a prompt and return the text response.
496    async fn execute(&self, prompt: &str) -> crate::error::Result<String>;
497}
498
499#[async_trait::async_trait]
500impl AgentExecutor for crate::agent_api::AgentSession {
501    async fn execute(&self, prompt: &str) -> crate::error::Result<String> {
502        let result = self.send(prompt, None).await?;
503        Ok(result.text)
504    }
505}
506
507// ---------------------------------------------------------------------------
508// TeamRunResult
509// ---------------------------------------------------------------------------
510
511/// Result returned by `TeamRunner::run_until_done`.
512#[derive(Debug)]
513pub struct TeamRunResult {
514    /// Tasks that reached the `Done` state.
515    pub done_tasks: Vec<TeamTask>,
516    /// Tasks that are still `Rejected` after all rounds (could not be approved).
517    pub rejected_tasks: Vec<TeamTask>,
518    /// Number of reviewer polling rounds completed.
519    pub rounds: usize,
520}
521
522// ---------------------------------------------------------------------------
523// TeamRunner — binds AgentSession execution to AgentTeam coordination
524// ---------------------------------------------------------------------------
525
526const LEAD_PROMPT: &str = "You are the lead agent in a team. Your goal is: {goal}
527
528Decompose this goal into concrete, self-contained tasks for your team workers.
529Each task should be independently executable by an AI coding agent.
530
531Respond with ONLY valid JSON in this exact format:
532{{\"tasks\": [\"task description 1\", \"task description 2\", ...]}}
533
534No markdown, no explanation, just the JSON.";
535
536const REVIEWER_PROMPT: &str = "Review the following completed task:
537
538Task: {task}
539Result: {result}
540
541If the result satisfactorily completes the task, respond with \"APPROVED: <brief reason>\".
542If the result is incomplete or incorrect, respond with \"REJECTED: <specific feedback for improvement>\".";
543
544/// Per-member session overrides for [`TeamRunner::add_lead`], [`TeamRunner::add_worker`],
545/// and [`TeamRunner::add_reviewer`].
546#[derive(Debug, Default, Clone)]
547pub struct TeamMemberOptions {
548    /// Override the workspace for this member.
549    ///
550    /// When set, this member uses the given path (e.g. a git worktree) instead
551    /// of the default workspace from [`TeamRunner::with_agent`].
552    pub workspace: Option<String>,
553    /// Model override. Format: `"provider/model"` (e.g. `"openai/gpt-4o"`).
554    pub model: Option<String>,
555    /// Prompt slot customization (role, guidelines, response style, extra).
556    pub prompt_slots: Option<crate::prompts::SystemPromptSlots>,
557    /// Override maximum tool-call rounds for this member's session.
558    pub max_tool_rounds: Option<usize>,
559}
560
561impl TeamMemberOptions {
562    fn into_session_options(self) -> Option<crate::agent_api::SessionOptions> {
563        if self.model.is_none() && self.prompt_slots.is_none() && self.max_tool_rounds.is_none() {
564            return None;
565        }
566        let mut opts = crate::agent_api::SessionOptions::new();
567        if let Some(m) = self.model {
568            opts = opts.with_model(m);
569        }
570        if let Some(slots) = self.prompt_slots {
571            opts = opts.with_prompt_slots(slots);
572        }
573        if let Some(rounds) = self.max_tool_rounds {
574            opts = opts.with_max_tool_rounds(rounds);
575        }
576        Some(opts)
577    }
578}
579
580/// Default agent context stored on [`TeamRunner`] to support simplified member
581/// addition via [`TeamRunner::add_lead`], [`TeamRunner::add_worker`], and
582/// [`TeamRunner::add_reviewer`].
583struct DefaultAgentContext {
584    agent: Arc<crate::agent_api::Agent>,
585    workspace: String,
586    registry: Arc<crate::subagent::AgentRegistry>,
587}
588
589/// Binds an `AgentTeam` to concrete `AgentExecutor` sessions, enabling
590/// Lead → Worker → Reviewer automated workflows.
591pub struct TeamRunner {
592    team: AgentTeam,
593    sessions: HashMap<String, Arc<dyn AgentExecutor>>,
594    default_ctx: Option<DefaultAgentContext>,
595    worker_count: usize,
596}
597
598impl TeamRunner {
599    /// Create a new runner wrapping the given team.
600    pub fn new(team: AgentTeam) -> Self {
601        Self {
602            team,
603            sessions: HashMap::new(),
604            default_ctx: None,
605            worker_count: 0,
606        }
607    }
608
609    /// Create a runner with a default agent context for simplified member addition.
610    ///
611    /// Unlike [`TeamRunner::new`], this constructor lets you call
612    /// [`add_lead`](Self::add_lead), [`add_worker`](Self::add_worker), and
613    /// [`add_reviewer`](Self::add_reviewer) without repeating the agent,
614    /// workspace, and registry on every call.
615    pub fn with_agent(
616        team: AgentTeam,
617        agent: Arc<crate::agent_api::Agent>,
618        workspace: &str,
619        registry: Arc<crate::subagent::AgentRegistry>,
620    ) -> Self {
621        Self {
622            team,
623            sessions: HashMap::new(),
624            default_ctx: Some(DefaultAgentContext {
625                agent,
626                workspace: workspace.to_string(),
627                registry,
628            }),
629            worker_count: 0,
630        }
631    }
632
633    /// Bind an executor to a team member.
634    ///
635    /// Returns an error if `member_id` is not registered in the team.
636    pub fn bind_session(
637        &mut self,
638        member_id: &str,
639        executor: Arc<dyn AgentExecutor>,
640    ) -> crate::error::Result<()> {
641        if !self.team.members.contains_key(member_id) {
642            return Err(anyhow::anyhow!(
643                "member '{}' not found in team '{}'",
644                member_id,
645                self.team.name()
646            )
647            .into());
648        }
649        self.sessions.insert(member_id.to_string(), executor);
650        Ok(())
651    }
652
653    /// Bind a team member to a session created from a named agent definition.
654    ///
655    /// Looks up `agent_name` in `registry`, applies the definition's prompt,
656    /// permissions, model, and `max_steps` to a new [`AgentSession`] via
657    /// [`Agent::session_for_agent`], then binds it to `member_id`.
658    ///
659    /// This is the primary integration point between the `AgentTeam` coordination
660    /// layer and the markdown/YAML-defined subagent capability layer.
661    ///
662    /// # Errors
663    ///
664    /// Returns an error if `member_id` is not in the team, `agent_name` is not
665    /// in `registry`, or session creation fails.
666    pub fn bind_agent(
667        &mut self,
668        member_id: &str,
669        agent: &crate::agent_api::Agent,
670        workspace: &str,
671        agent_name: &str,
672        registry: &crate::subagent::AgentRegistry,
673    ) -> crate::error::Result<()> {
674        let def = registry
675            .get(agent_name)
676            .ok_or_else(|| anyhow::anyhow!("agent '{}' not found in registry", agent_name))?;
677        let session = agent.session_for_agent(workspace, &def, None)?;
678        self.bind_session(member_id, Arc::new(session))
679    }
680
681    /// Create a session from the default agent context, applying optional member overrides.
682    ///
683    /// Returns an error if no default context is set or the agent name is not
684    /// found in the registry.
685    fn create_session_from_default(
686        &self,
687        agent_name: &str,
688        member_opts: Option<TeamMemberOptions>,
689    ) -> crate::error::Result<crate::agent_api::AgentSession> {
690        let ctx = self.default_ctx.as_ref().ok_or_else(|| {
691            anyhow::anyhow!("no default agent context; use TeamRunner::with_agent")
692        })?;
693        let def = ctx
694            .registry
695            .get(agent_name)
696            .ok_or_else(|| anyhow::anyhow!("agent '{}' not found in registry", agent_name))?;
697        let workspace = member_opts
698            .as_ref()
699            .and_then(|o| o.workspace.clone())
700            .unwrap_or_else(|| ctx.workspace.clone());
701        let session_opts = member_opts.and_then(|o| o.into_session_options());
702        ctx.agent.session_for_agent(workspace, &def, session_opts)
703    }
704
705    /// Add a Lead member and bind it to the named agent definition.
706    ///
707    /// Requires a default agent context set via [`TeamRunner::with_agent`].
708    /// The member ID is fixed to `"lead"`.
709    ///
710    /// Use `opts` to override the model, prompt slots, or workspace (e.g. a git worktree).
711    ///
712    /// # Errors
713    ///
714    /// Returns an error if no default context is set or `agent_name` is not
715    /// found in the registry.
716    pub fn add_lead(
717        &mut self,
718        agent_name: &str,
719        opts: Option<TeamMemberOptions>,
720    ) -> crate::error::Result<()> {
721        let session = self.create_session_from_default(agent_name, opts)?;
722        self.team.add_member("lead", TeamRole::Lead);
723        self.sessions.insert("lead".to_string(), Arc::new(session));
724        Ok(())
725    }
726
727    /// Add a Worker member and bind it to the named agent definition.
728    ///
729    /// Requires a default agent context set via [`TeamRunner::with_agent`].
730    /// Member IDs are auto-generated as `"worker-1"`, `"worker-2"`, etc.
731    /// Multiple workers can be added; they run concurrently during execution.
732    ///
733    /// Use `opts` to override the model, prompt slots, or workspace — set
734    /// `workspace` to an isolated git worktree path to prevent filesystem
735    /// conflicts between concurrent workers.
736    ///
737    /// # Errors
738    ///
739    /// Returns an error if no default context is set or `agent_name` is not
740    /// found in the registry.
741    pub fn add_worker(
742        &mut self,
743        agent_name: &str,
744        opts: Option<TeamMemberOptions>,
745    ) -> crate::error::Result<()> {
746        self.worker_count += 1;
747        let id = format!("worker-{}", self.worker_count);
748        let session = self.create_session_from_default(agent_name, opts)?;
749        self.team.add_member(&id, TeamRole::Worker);
750        self.sessions.insert(id, Arc::new(session));
751        Ok(())
752    }
753
754    /// Add a Reviewer member and bind it to the named agent definition.
755    ///
756    /// Requires a default agent context set via [`TeamRunner::with_agent`].
757    /// The member ID is fixed to `"reviewer"`.
758    ///
759    /// Use `opts` to override the model, prompt slots, or workspace.
760    ///
761    /// # Errors
762    ///
763    /// Returns an error if no default context is set or `agent_name` is not
764    /// found in the registry.
765    pub fn add_reviewer(
766        &mut self,
767        agent_name: &str,
768        opts: Option<TeamMemberOptions>,
769    ) -> crate::error::Result<()> {
770        let session = self.create_session_from_default(agent_name, opts)?;
771        self.team.add_member("reviewer", TeamRole::Reviewer);
772        self.sessions
773            .insert("reviewer".to_string(), Arc::new(session));
774        Ok(())
775    }
776
777    /// Get a mutable reference to the underlying team.
778    pub fn team_mut(&mut self) -> &mut AgentTeam {
779        &mut self.team
780    }
781
782    /// Access the shared task board.
783    pub fn task_board(&self) -> Arc<TeamTaskBoard> {
784        self.team.task_board_arc()
785    }
786
787    /// Run the full Lead → Worker → Reviewer workflow until all tasks are done
788    /// or `max_rounds` retry cycles are exceeded.
789    ///
790    /// Steps:
791    /// 1. Lead decomposes `goal` into tasks via JSON response.
792    /// 2. Workers run concurrently until all tasks are in review or done.
793    /// 3. Reviewer processes all InReview tasks (after workers finish).
794    /// 4. If rejected tasks remain, workers retry them (back to step 2).
795    ///
796    /// Running the reviewer after workers (rather than concurrently) prevents a
797    /// race where the reviewer's polling timeout fires before long-running agent
798    /// calls have produced any InReview tasks.
799    pub async fn run_until_done(&self, goal: &str) -> crate::error::Result<TeamRunResult> {
800        // --- Step 1: Lead decomposes the goal ---
801        let lead = self
802            .team
803            .members_by_role(TeamRole::Lead)
804            .into_iter()
805            .next()
806            .ok_or_else(|| anyhow::anyhow!("team has no Lead member"))?;
807
808        let lead_executor = self
809            .sessions
810            .get(&lead.id)
811            .ok_or_else(|| anyhow::anyhow!("no executor bound for lead member '{}'", lead.id))?;
812
813        let lead_prompt = LEAD_PROMPT.replace("{goal}", goal);
814        let raw = lead_executor.execute(&lead_prompt).await?;
815        let task_descriptions = parse_task_list(&raw)?;
816
817        let board = self.team.task_board_arc();
818        for desc in &task_descriptions {
819            board.post(desc, &lead.id, None);
820        }
821
822        // --- Step 2 & 3: Workers then reviewer, cycling until all tasks Done ---
823        //
824        // Workers run to completion first, then the reviewer processes all
825        // InReview tasks. If the reviewer rejects any tasks, workers are
826        // re-spawned to retry them. This sequential-then-cycle approach avoids
827        // a race where the reviewer's polling timeout fires before long-running
828        // LLM agent calls have produced any InReview tasks.
829        let poll = Duration::from_millis(self.team.config.poll_interval_ms);
830        let max_rounds = self.team.config.max_rounds;
831
832        let workers: Vec<(String, Arc<dyn AgentExecutor>)> = self
833            .team
834            .members_by_role(TeamRole::Worker)
835            .into_iter()
836            .filter_map(|m| {
837                self.sessions
838                    .get(&m.id)
839                    .map(|e| (m.id.clone(), Arc::clone(e)))
840            })
841            .collect();
842
843        let reviewer: Option<(String, Arc<dyn AgentExecutor>)> = self
844            .team
845            .members_by_role(TeamRole::Reviewer)
846            .into_iter()
847            .next()
848            .and_then(|m| {
849                self.sessions
850                    .get(&m.id)
851                    .map(|e| (m.id.clone(), Arc::clone(e)))
852            });
853
854        let mut total_reviewer_rounds = 0usize;
855
856        for _cycle in 0..max_rounds {
857            // Run all workers concurrently until no claimable work remains.
858            let mut worker_handles = Vec::new();
859            for (id, executor) in &workers {
860                let b = Arc::clone(&board);
861                let id = id.clone();
862                let executor = Arc::clone(executor);
863                let handle = tokio::spawn(async move {
864                    run_worker(id, executor, b, max_rounds, poll).await;
865                });
866                worker_handles.push(handle);
867            }
868            for h in worker_handles {
869                let _ = h.await;
870            }
871
872            // Run reviewer on all InReview tasks (workers are done; no race).
873            if let Some((ref id, ref executor)) = reviewer {
874                let rounds = run_reviewer(
875                    id.clone(),
876                    Arc::clone(executor),
877                    Arc::clone(&board),
878                    max_rounds,
879                    poll,
880                )
881                .await;
882                total_reviewer_rounds += rounds;
883            }
884
885            // Stop if no rejected tasks remain to retry.
886            let (open, in_progress, in_review, _, rejected) = board.stats();
887            if open == 0 && in_progress == 0 && in_review == 0 && rejected == 0 {
888                break;
889            }
890            if rejected == 0 {
891                break;
892            }
893        }
894
895        let done_tasks = board.by_status(TaskStatus::Done);
896        let rejected_tasks = board.by_status(TaskStatus::Rejected);
897
898        Ok(TeamRunResult {
899            done_tasks,
900            rejected_tasks,
901            rounds: total_reviewer_rounds,
902        })
903    }
904}
905
906impl std::fmt::Debug for TeamRunner {
907    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
908        f.debug_struct("TeamRunner")
909            .field("team", &self.team.name())
910            .field("bound_sessions", &self.sessions.len())
911            .finish()
912    }
913}
914
915/// Parse `{"tasks": ["...", "..."]}` from a Lead response.
916fn parse_task_list(response: &str) -> crate::error::Result<Vec<String>> {
917    // Find JSON object boundaries in case there is extra whitespace or newlines.
918    let start = response
919        .find('{')
920        .ok_or_else(|| anyhow::anyhow!("lead response contains no JSON object: {}", response))?;
921    let end = response
922        .rfind('}')
923        .ok_or_else(|| anyhow::anyhow!("lead response JSON object is unclosed"))?;
924    let json_str = &response[start..=end];
925
926    let value: serde_json::Value = serde_json::from_str(json_str)
927        .map_err(|e| anyhow::anyhow!("failed to parse lead JSON response: {e}"))?;
928
929    let tasks: Vec<String> = value["tasks"]
930        .as_array()
931        .ok_or_else(|| anyhow::anyhow!("lead JSON response missing 'tasks' array"))?
932        .iter()
933        .filter_map(|v: &serde_json::Value| v.as_str().map(|s| s.to_string()))
934        .collect();
935
936    Ok(tasks)
937}
938
939/// Worker loop: claim and execute tasks until the board is quiescent.
940async fn run_worker(
941    member_id: String,
942    executor: Arc<dyn AgentExecutor>,
943    board: Arc<TeamTaskBoard>,
944    max_rounds: usize,
945    poll: Duration,
946) {
947    let mut idle = 0usize;
948    loop {
949        if let Some(task) = board.claim(&member_id) {
950            idle = 0;
951            let result = executor
952                .execute(&task.description)
953                .await
954                .unwrap_or_else(|e| format!("execution error: {e}"));
955            board.complete(&task.id, &result);
956        } else {
957            let (open, in_progress, in_review, _, rejected) = board.stats();
958            // No claimable work and no tasks that could re-enter the queue → stop.
959            // Include in_review so we wait for the reviewer's verdict (which may
960            // produce a Rejected task for us to retry).
961            if open == 0 && in_progress == 0 && in_review == 0 && rejected == 0 {
962                break;
963            }
964            idle += 1;
965            if idle >= max_rounds {
966                break;
967            }
968            tokio::time::sleep(poll).await;
969        }
970    }
971}
972
973/// Reviewer loop: review InReview tasks and approve or reject them.
974/// Returns the number of rounds completed.
975async fn run_reviewer(
976    _member_id: String,
977    executor: Arc<dyn AgentExecutor>,
978    board: Arc<TeamTaskBoard>,
979    max_rounds: usize,
980    poll: Duration,
981) -> usize {
982    let mut rounds = 0usize;
983    loop {
984        let in_review = board.by_status(TaskStatus::InReview);
985        for task in in_review {
986            let result_text = task.result.as_deref().unwrap_or("");
987            let prompt = REVIEWER_PROMPT
988                .replace("{task}", &task.description)
989                .replace("{result}", result_text);
990            let verdict = executor
991                .execute(&prompt)
992                .await
993                .unwrap_or_else(|_| "REJECTED: execution error".to_string());
994            if verdict.contains("APPROVED") {
995                board.approve(&task.id);
996            } else {
997                board.reject(&task.id);
998            }
999            // Yield after each decision so workers can pick up rejected tasks.
1000            tokio::task::yield_now().await;
1001        }
1002
1003        let (open, in_progress, in_review_count, _, rejected) = board.stats();
1004        if open == 0 && in_progress == 0 && in_review_count == 0 && rejected == 0 {
1005            break;
1006        }
1007        rounds += 1;
1008        if rounds >= max_rounds {
1009            break;
1010        }
1011        tokio::time::sleep(poll).await;
1012    }
1013    rounds
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018    use super::*;
1019
1020    #[test]
1021    fn test_team_creation() {
1022        let team = AgentTeam::new("test-team", TeamConfig::default());
1023        assert_eq!(team.name(), "test-team");
1024        assert_eq!(team.member_count(), 0);
1025    }
1026
1027    #[test]
1028    fn test_add_remove_members() {
1029        let mut team = AgentTeam::new("test", TeamConfig::default());
1030        team.add_member("lead", TeamRole::Lead);
1031        team.add_member("w1", TeamRole::Worker);
1032        team.add_member("w2", TeamRole::Worker);
1033        team.add_member("rev", TeamRole::Reviewer);
1034        assert_eq!(team.member_count(), 4);
1035        assert_eq!(team.members_by_role(TeamRole::Worker).len(), 2);
1036
1037        assert!(team.remove_member("w2"));
1038        assert_eq!(team.member_count(), 3);
1039        assert!(!team.remove_member("nonexistent"));
1040    }
1041
1042    #[test]
1043    fn test_task_board_post_and_claim() {
1044        let board = TeamTaskBoard::new(10);
1045        let id = board.post("Fix auth bug", "lead", None).unwrap();
1046        assert_eq!(board.len(), 1);
1047
1048        let task = board.claim("worker-1").unwrap();
1049        assert_eq!(task.id, id);
1050        assert_eq!(task.assigned_to.as_deref(), Some("worker-1"));
1051        assert_eq!(task.status, TaskStatus::InProgress);
1052
1053        // No more open tasks
1054        assert!(board.claim("worker-2").is_none());
1055    }
1056
1057    #[test]
1058    fn test_task_board_workflow() {
1059        let board = TeamTaskBoard::new(10);
1060        let id = board.post("Write tests", "lead", None).unwrap();
1061
1062        // Claim
1063        board.claim("worker-1");
1064
1065        // Complete
1066        assert!(board.complete(&id, "Added 5 tests"));
1067        let task = board.get(&id).unwrap();
1068        assert_eq!(task.status, TaskStatus::InReview);
1069
1070        // Approve
1071        assert!(board.approve(&id));
1072        let task = board.get(&id).unwrap();
1073        assert_eq!(task.status, TaskStatus::Done);
1074    }
1075
1076    #[test]
1077    fn test_task_board_reject() {
1078        let board = TeamTaskBoard::new(10);
1079        let id = board.post("Refactor module", "lead", None).unwrap();
1080        board.claim("worker-1");
1081        board.complete(&id, "Done");
1082
1083        assert!(board.reject(&id));
1084        let task = board.get(&id).unwrap();
1085        assert_eq!(task.status, TaskStatus::Rejected);
1086        assert!(task.assigned_to.is_none());
1087    }
1088
1089    #[test]
1090    fn test_task_board_max_capacity() {
1091        let board = TeamTaskBoard::new(2);
1092        assert!(board.post("Task 1", "lead", None).is_some());
1093        assert!(board.post("Task 2", "lead", None).is_some());
1094        assert!(board.post("Task 3", "lead", None).is_none()); // Full
1095    }
1096
1097    #[test]
1098    fn test_task_board_stats() {
1099        let board = TeamTaskBoard::new(10);
1100        board.post("T1", "lead", None);
1101        board.post("T2", "lead", None);
1102        let id3 = board.post("T3", "lead", Some("w1")).unwrap();
1103        board.complete(&id3, "done");
1104
1105        let (open, progress, review, done, rejected) = board.stats();
1106        assert_eq!(open, 2);
1107        assert_eq!(progress, 0);
1108        assert_eq!(review, 1);
1109        assert_eq!(done, 0);
1110        assert_eq!(rejected, 0);
1111    }
1112
1113    #[test]
1114    fn test_task_board_by_assignee() {
1115        let board = TeamTaskBoard::new(10);
1116        board.post("T1", "lead", Some("w1"));
1117        board.post("T2", "lead", Some("w2"));
1118        board.post("T3", "lead", Some("w1"));
1119
1120        let w1_tasks = board.by_assignee("w1");
1121        assert_eq!(w1_tasks.len(), 2);
1122    }
1123
1124    #[tokio::test]
1125    async fn test_send_message() {
1126        let mut team = AgentTeam::new("msg-test", TeamConfig::default());
1127        team.add_member("lead", TeamRole::Lead);
1128        team.add_member("worker", TeamRole::Worker);
1129
1130        let mut rx = team.take_receiver("worker").unwrap();
1131
1132        assert!(
1133            team.send_message("lead", "worker", "Please fix the bug", Some("task-1"))
1134                .await
1135        );
1136
1137        let msg = rx.recv().await.unwrap();
1138        assert_eq!(msg.from, "lead");
1139        assert_eq!(msg.to, "worker");
1140        assert_eq!(msg.content, "Please fix the bug");
1141        assert_eq!(msg.task_id.as_deref(), Some("task-1"));
1142    }
1143
1144    #[tokio::test]
1145    async fn test_broadcast() {
1146        let mut team = AgentTeam::new("broadcast-test", TeamConfig::default());
1147        team.add_member("lead", TeamRole::Lead);
1148        team.add_member("w1", TeamRole::Worker);
1149        team.add_member("w2", TeamRole::Worker);
1150
1151        let mut rx1 = team.take_receiver("w1").unwrap();
1152        let mut rx2 = team.take_receiver("w2").unwrap();
1153
1154        team.broadcast("lead", "New task available", None).await;
1155
1156        let m1 = rx1.recv().await.unwrap();
1157        let m2 = rx2.recv().await.unwrap();
1158        assert_eq!(m1.content, "New task available");
1159        assert_eq!(m2.content, "New task available");
1160    }
1161
1162    #[test]
1163    fn test_role_display() {
1164        assert_eq!(TeamRole::Lead.to_string(), "lead");
1165        assert_eq!(TeamRole::Worker.to_string(), "worker");
1166        assert_eq!(TeamRole::Reviewer.to_string(), "reviewer");
1167    }
1168
1169    #[test]
1170    fn test_task_status_display() {
1171        assert_eq!(TaskStatus::Open.to_string(), "open");
1172        assert_eq!(TaskStatus::InProgress.to_string(), "in_progress");
1173        assert_eq!(TaskStatus::InReview.to_string(), "in_review");
1174        assert_eq!(TaskStatus::Done.to_string(), "done");
1175        assert_eq!(TaskStatus::Rejected.to_string(), "rejected");
1176    }
1177
1178    // -----------------------------------------------------------------------
1179    // TeamRunner tests (mock executor)
1180    // -----------------------------------------------------------------------
1181
1182    /// Minimal mock executor for unit tests.
1183    struct MockExecutor {
1184        response: String,
1185    }
1186
1187    impl MockExecutor {
1188        fn new(response: impl Into<String>) -> Arc<Self> {
1189            Arc::new(Self {
1190                response: response.into(),
1191            })
1192        }
1193    }
1194
1195    #[async_trait::async_trait]
1196    impl AgentExecutor for MockExecutor {
1197        async fn execute(&self, _prompt: &str) -> crate::error::Result<String> {
1198            Ok(self.response.clone())
1199        }
1200    }
1201
1202    #[test]
1203    fn test_team_runner_session_binding() {
1204        let mut team = AgentTeam::new("bind-test", TeamConfig::default());
1205        team.add_member("lead", TeamRole::Lead);
1206        team.add_member("w1", TeamRole::Worker);
1207
1208        let mut runner = TeamRunner::new(team);
1209
1210        // Binding to a known member succeeds.
1211        assert!(runner.bind_session("lead", MockExecutor::new("ok")).is_ok());
1212        assert!(runner.bind_session("w1", MockExecutor::new("ok")).is_ok());
1213
1214        // Binding to an unknown member fails.
1215        assert!(runner
1216            .bind_session("nobody", MockExecutor::new("ok"))
1217            .is_err());
1218    }
1219
1220    #[test]
1221    fn test_parse_task_list() {
1222        let json = r#"{"tasks": ["Write tests", "Fix lints", "Update docs"]}"#;
1223        let tasks = parse_task_list(json).unwrap();
1224        assert_eq!(tasks.len(), 3);
1225        assert_eq!(tasks[0], "Write tests");
1226        assert_eq!(tasks[2], "Update docs");
1227    }
1228
1229    #[test]
1230    fn test_parse_task_list_no_json() {
1231        assert!(parse_task_list("no json here").is_err());
1232    }
1233
1234    #[test]
1235    fn test_claim_rejected_tasks() {
1236        let board = TeamTaskBoard::new(10);
1237        let id = board.post("Refactor module", "lead", None).unwrap();
1238
1239        // Simulate workflow: claim → complete → reject
1240        board.claim("worker-1");
1241        board.complete(&id, "initial attempt");
1242        board.reject(&id);
1243
1244        assert_eq!(board.get(&id).unwrap().status, TaskStatus::Rejected);
1245
1246        // A new worker can re-claim the rejected task.
1247        let task = board.claim("worker-2");
1248        assert!(task.is_some());
1249        let task = task.unwrap();
1250        assert_eq!(task.id, id);
1251        assert_eq!(task.assigned_to.as_deref(), Some("worker-2"));
1252        assert_eq!(task.status, TaskStatus::InProgress);
1253    }
1254
1255    #[tokio::test]
1256    async fn test_team_runner_goal_decomposition() {
1257        let config = TeamConfig {
1258            poll_interval_ms: 1,
1259            max_rounds: 3,
1260            ..TeamConfig::default()
1261        };
1262        let mut team = AgentTeam::new("decomp-test", config);
1263        team.add_member("lead", TeamRole::Lead);
1264        team.add_member("w1", TeamRole::Worker);
1265        team.add_member("rev", TeamRole::Reviewer);
1266
1267        let mut runner = TeamRunner::new(team);
1268        // Lead returns two tasks as JSON.
1269        runner
1270            .bind_session(
1271                "lead",
1272                MockExecutor::new(r#"{"tasks": ["Task A", "Task B"]}"#),
1273            )
1274            .unwrap();
1275        // Worker always succeeds.
1276        runner
1277            .bind_session("w1", MockExecutor::new("done"))
1278            .unwrap();
1279        // Reviewer always approves.
1280        runner
1281            .bind_session("rev", MockExecutor::new("APPROVED: looks good"))
1282            .unwrap();
1283
1284        let result = runner.run_until_done("Build the feature").await.unwrap();
1285
1286        assert_eq!(result.done_tasks.len(), 2);
1287        assert!(result.rejected_tasks.is_empty());
1288    }
1289
1290    #[tokio::test]
1291    async fn test_team_runner_worker_execution() {
1292        let config = TeamConfig {
1293            poll_interval_ms: 1,
1294            max_rounds: 3,
1295            ..TeamConfig::default()
1296        };
1297        let mut team = AgentTeam::new("worker-exec-test", config);
1298        team.add_member("lead", TeamRole::Lead);
1299        team.add_member("w1", TeamRole::Worker);
1300
1301        let mut runner = TeamRunner::new(team);
1302        runner
1303            .bind_session(
1304                "lead",
1305                MockExecutor::new(r#"{"tasks": ["Write unit tests"]}"#),
1306            )
1307            .unwrap();
1308        runner
1309            .bind_session("w1", MockExecutor::new("Added 3 tests"))
1310            .unwrap();
1311
1312        // No reviewer bound → tasks end up InReview (not Done), which is fine for this test.
1313        let board = runner.task_board();
1314        let _ = runner.run_until_done("Test the module").await;
1315
1316        // The task must have been claimed and completed (InReview or Done).
1317        let tasks = board.by_status(TaskStatus::InReview);
1318        assert_eq!(tasks.len(), 1);
1319        assert_eq!(tasks[0].result.as_deref(), Some("Added 3 tests"));
1320    }
1321
1322    #[tokio::test]
1323    async fn test_team_runner_reviewer_approval() {
1324        let config = TeamConfig {
1325            poll_interval_ms: 1,
1326            max_rounds: 5,
1327            ..TeamConfig::default()
1328        };
1329        let mut team = AgentTeam::new("reviewer-test", config);
1330        team.add_member("lead", TeamRole::Lead);
1331        team.add_member("w1", TeamRole::Worker);
1332        team.add_member("rev", TeamRole::Reviewer);
1333
1334        let mut runner = TeamRunner::new(team);
1335        runner
1336            .bind_session(
1337                "lead",
1338                MockExecutor::new(r#"{"tasks": ["Implement feature X"]}"#),
1339            )
1340            .unwrap();
1341        runner
1342            .bind_session("w1", MockExecutor::new("Feature X implemented"))
1343            .unwrap();
1344        runner
1345            .bind_session("rev", MockExecutor::new("APPROVED: complete"))
1346            .unwrap();
1347
1348        let result = runner.run_until_done("Ship feature X").await.unwrap();
1349
1350        assert_eq!(result.done_tasks.len(), 1);
1351        assert_eq!(
1352            result.done_tasks[0].result.as_deref(),
1353            Some("Feature X implemented")
1354        );
1355    }
1356
1357    #[tokio::test]
1358    async fn test_team_runner_rejection_and_retry() {
1359        use std::sync::atomic::{AtomicUsize, Ordering};
1360
1361        // Reviewer approves on the second attempt.
1362        struct ConditionalReviewer {
1363            calls: AtomicUsize,
1364        }
1365
1366        #[async_trait::async_trait]
1367        impl AgentExecutor for ConditionalReviewer {
1368            async fn execute(&self, _prompt: &str) -> crate::error::Result<String> {
1369                let n = self.calls.fetch_add(1, Ordering::SeqCst);
1370                if n == 0 {
1371                    Ok("REJECTED: needs improvement".to_string())
1372                } else {
1373                    Ok("APPROVED: now correct".to_string())
1374                }
1375            }
1376        }
1377
1378        let config = TeamConfig {
1379            poll_interval_ms: 1,
1380            max_rounds: 10,
1381            ..TeamConfig::default()
1382        };
1383        let mut team = AgentTeam::new("retry-test", config);
1384        team.add_member("lead", TeamRole::Lead);
1385        team.add_member("w1", TeamRole::Worker);
1386        team.add_member("rev", TeamRole::Reviewer);
1387
1388        let mut runner = TeamRunner::new(team);
1389        runner
1390            .bind_session("lead", MockExecutor::new(r#"{"tasks": ["Do the thing"]}"#))
1391            .unwrap();
1392        runner
1393            .bind_session("w1", MockExecutor::new("attempt result"))
1394            .unwrap();
1395        runner
1396            .bind_session(
1397                "rev",
1398                Arc::new(ConditionalReviewer {
1399                    calls: AtomicUsize::new(0),
1400                }),
1401            )
1402            .unwrap();
1403
1404        let result = runner.run_until_done("Complete the thing").await.unwrap();
1405
1406        // After retry, the task is eventually approved.
1407        assert_eq!(result.done_tasks.len(), 1);
1408        assert!(result.rejected_tasks.is_empty());
1409    }
1410}