Skip to main content

a3s_code_core/
agent_teams.rs

1//! Agent Teams — Peer-to-peer multi-agent coordination
2//!
3//! Enables multiple `AgentSession` instances to collaborate on complex tasks
4//! through a shared task board and message passing. Each agent has a role
5//! (Lead, Worker, Reviewer) and can post/claim tasks on the board.
6//!
7//! ## Architecture
8//!
9//! ```text
10//! AgentTeam
11//!   +-- TeamTaskBoard (shared task queue)
12//!   +-- TeamMember[] (role + session reference)
13//!   +-- mpsc channels (peer-to-peer messaging)
14//! ```
15//!
16//! ## Usage
17//!
18//! ```rust,no_run
19//! use a3s_code_core::agent_teams::{AgentTeam, TeamConfig, TeamRole};
20//!
21//! # async fn run() -> anyhow::Result<()> {
22//! let config = TeamConfig::default();
23//! let mut team = AgentTeam::new("refactor-auth", config);
24//!
25//! // Add members (each wraps an AgentSession)
26//! team.add_member("lead", TeamRole::Lead);
27//! team.add_member("worker-1", TeamRole::Worker);
28//! team.add_member("reviewer", TeamRole::Reviewer);
29//!
30//! // Post a task to the board
31//! team.task_board().post("Refactor auth module", "lead", None);
32//!
33//! // Worker claims and works on it
34//! let task = team.task_board().claim("worker-1");
35//! # Ok(())
36//! # }
37//! ```
38
39use std::collections::HashMap;
40use std::sync::{Arc, RwLock};
41use std::time::Duration;
42use tokio::sync::mpsc;
43
44/// Team configuration.
45#[derive(Debug, Clone)]
46pub struct TeamConfig {
47    /// Maximum concurrent tasks on the board.
48    /// Default: 50
49    pub max_tasks: usize,
50    /// Message channel buffer size.
51    /// Default: 128
52    pub channel_buffer: usize,
53    /// Maximum coordination rounds before `run_until_done` exits.
54    /// Default: 10
55    pub max_rounds: usize,
56    /// Worker/Reviewer polling interval in milliseconds.
57    /// Default: 200
58    pub poll_interval_ms: u64,
59}
60
61impl Default for TeamConfig {
62    fn default() -> Self {
63        Self {
64            max_tasks: 50,
65            channel_buffer: 128,
66            max_rounds: 10,
67            poll_interval_ms: 200,
68        }
69    }
70}
71
72/// Role of a team member.
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
74#[serde(rename_all = "snake_case")]
75pub enum TeamRole {
76    /// Decomposes goals into tasks, assigns work.
77    Lead,
78    /// Executes assigned tasks.
79    Worker,
80    /// Reviews completed work, provides feedback.
81    Reviewer,
82}
83
84impl std::fmt::Display for TeamRole {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        match self {
87            TeamRole::Lead => write!(f, "lead"),
88            TeamRole::Worker => write!(f, "worker"),
89            TeamRole::Reviewer => write!(f, "reviewer"),
90        }
91    }
92}
93
94/// A message passed between team members.
95#[derive(Debug, Clone)]
96pub struct TeamMessage {
97    /// Sender member ID.
98    pub from: String,
99    /// Recipient member ID.
100    pub to: String,
101    /// Message content.
102    pub content: String,
103    /// Optional task ID this message relates to.
104    pub task_id: Option<String>,
105    /// Timestamp (Unix epoch seconds).
106    pub timestamp: i64,
107}
108
109/// Task status on the board.
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum TaskStatus {
112    /// Waiting to be claimed.
113    Open,
114    /// Claimed by a worker.
115    InProgress,
116    /// Work done, awaiting review.
117    InReview,
118    /// Approved by reviewer.
119    Done,
120    /// Rejected, needs rework.
121    Rejected,
122}
123
124impl std::fmt::Display for TaskStatus {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        match self {
127            TaskStatus::Open => write!(f, "open"),
128            TaskStatus::InProgress => write!(f, "in_progress"),
129            TaskStatus::InReview => write!(f, "in_review"),
130            TaskStatus::Done => write!(f, "done"),
131            TaskStatus::Rejected => write!(f, "rejected"),
132        }
133    }
134}
135
136/// A task on the team board.
137#[derive(Debug, Clone)]
138pub struct TeamTask {
139    /// Unique task ID.
140    pub id: String,
141    /// Task description.
142    pub description: String,
143    /// Who posted it.
144    pub posted_by: String,
145    /// Who is working on it (if claimed).
146    pub assigned_to: Option<String>,
147    /// Current status.
148    pub status: TaskStatus,
149    /// Optional result/output when completed.
150    pub result: Option<String>,
151    /// Created timestamp.
152    pub created_at: i64,
153    /// Last updated timestamp.
154    pub updated_at: i64,
155}
156
157/// Shared task board for team coordination.
158#[derive(Debug)]
159pub struct TeamTaskBoard {
160    tasks: RwLock<Vec<TeamTask>>,
161    max_tasks: usize,
162    next_id: RwLock<u64>,
163}
164
165impl TeamTaskBoard {
166    /// Create a new task board.
167    pub fn new(max_tasks: usize) -> Self {
168        Self {
169            tasks: RwLock::new(Vec::new()),
170            max_tasks,
171            next_id: RwLock::new(1),
172        }
173    }
174
175    /// Post a new task to the board. Returns the task ID.
176    pub fn post(
177        &self,
178        description: &str,
179        posted_by: &str,
180        assign_to: Option<&str>,
181    ) -> Option<String> {
182        let mut tasks = self.tasks.write().unwrap();
183        if tasks.len() >= self.max_tasks {
184            return None;
185        }
186
187        let mut id_counter = self.next_id.write().unwrap();
188        let id = format!("task-{}", *id_counter);
189        *id_counter += 1;
190
191        let now = chrono::Utc::now().timestamp();
192        let status = if assign_to.is_some() {
193            TaskStatus::InProgress
194        } else {
195            TaskStatus::Open
196        };
197
198        tasks.push(TeamTask {
199            id: id.clone(),
200            description: description.to_string(),
201            posted_by: posted_by.to_string(),
202            assigned_to: assign_to.map(|s| s.to_string()),
203            status,
204            result: None,
205            created_at: now,
206            updated_at: now,
207        });
208
209        Some(id)
210    }
211
212    /// Claim the next open or rejected task for a member. Returns the task if available.
213    ///
214    /// Rejected tasks are treated as retriable: a worker can claim them again
215    /// for another execution attempt.
216    pub fn claim(&self, member_id: &str) -> Option<TeamTask> {
217        let mut tasks = self.tasks.write().unwrap();
218        let task = tasks
219            .iter_mut()
220            .find(|t| t.status == TaskStatus::Open || t.status == TaskStatus::Rejected)?;
221        task.assigned_to = Some(member_id.to_string());
222        task.status = TaskStatus::InProgress;
223        task.updated_at = chrono::Utc::now().timestamp();
224        Some(task.clone())
225    }
226
227    /// Mark a task as complete with a result.
228    pub fn complete(&self, task_id: &str, result: &str) -> bool {
229        let mut tasks = self.tasks.write().unwrap();
230        if let Some(task) = tasks.iter_mut().find(|t| t.id == task_id) {
231            task.status = TaskStatus::InReview;
232            task.result = Some(result.to_string());
233            task.updated_at = chrono::Utc::now().timestamp();
234            true
235        } else {
236            false
237        }
238    }
239
240    /// Approve a task (reviewer action).
241    pub fn approve(&self, task_id: &str) -> bool {
242        let mut tasks = self.tasks.write().unwrap();
243        if let Some(task) = tasks
244            .iter_mut()
245            .find(|t| t.id == task_id && t.status == TaskStatus::InReview)
246        {
247            task.status = TaskStatus::Done;
248            task.updated_at = chrono::Utc::now().timestamp();
249            true
250        } else {
251            false
252        }
253    }
254
255    /// Reject a task back to open (reviewer action).
256    pub fn reject(&self, task_id: &str) -> bool {
257        let mut tasks = self.tasks.write().unwrap();
258        if let Some(task) = tasks
259            .iter_mut()
260            .find(|t| t.id == task_id && t.status == TaskStatus::InReview)
261        {
262            task.status = TaskStatus::Rejected;
263            task.assigned_to = None;
264            task.updated_at = chrono::Utc::now().timestamp();
265            true
266        } else {
267            false
268        }
269    }
270
271    /// Get all tasks with a given status.
272    pub fn by_status(&self, status: TaskStatus) -> Vec<TeamTask> {
273        self.tasks
274            .read()
275            .unwrap()
276            .iter()
277            .filter(|t| t.status == status)
278            .cloned()
279            .collect()
280    }
281
282    /// Get all tasks assigned to a member.
283    pub fn by_assignee(&self, member_id: &str) -> Vec<TeamTask> {
284        self.tasks
285            .read()
286            .unwrap()
287            .iter()
288            .filter(|t| t.assigned_to.as_deref() == Some(member_id))
289            .cloned()
290            .collect()
291    }
292
293    /// Get a task by ID.
294    pub fn get(&self, task_id: &str) -> Option<TeamTask> {
295        self.tasks
296            .read()
297            .unwrap()
298            .iter()
299            .find(|t| t.id == task_id)
300            .cloned()
301    }
302
303    /// Number of tasks on the board.
304    pub fn len(&self) -> usize {
305        self.tasks.read().unwrap().len()
306    }
307
308    /// Whether the board is empty.
309    pub fn is_empty(&self) -> bool {
310        self.tasks.read().unwrap().is_empty()
311    }
312
313    /// Summary stats: (open, in_progress, in_review, done, rejected).
314    pub fn stats(&self) -> (usize, usize, usize, usize, usize) {
315        let tasks = self.tasks.read().unwrap();
316        let open = tasks
317            .iter()
318            .filter(|t| t.status == TaskStatus::Open)
319            .count();
320        let progress = tasks
321            .iter()
322            .filter(|t| t.status == TaskStatus::InProgress)
323            .count();
324        let review = tasks
325            .iter()
326            .filter(|t| t.status == TaskStatus::InReview)
327            .count();
328        let done = tasks
329            .iter()
330            .filter(|t| t.status == TaskStatus::Done)
331            .count();
332        let rejected = tasks
333            .iter()
334            .filter(|t| t.status == TaskStatus::Rejected)
335            .count();
336        (open, progress, review, done, rejected)
337    }
338}
339
340/// A team member.
341#[derive(Debug, Clone)]
342pub struct TeamMember {
343    /// Unique member ID.
344    pub id: String,
345    /// Member role.
346    pub role: TeamRole,
347}
348
349/// Multi-agent team coordinator.
350pub struct AgentTeam {
351    /// Team name.
352    name: String,
353    /// Configuration.
354    config: TeamConfig,
355    /// Registered members.
356    members: HashMap<String, TeamMember>,
357    /// Shared task board.
358    task_board: Arc<TeamTaskBoard>,
359    /// Message senders per member.
360    senders: HashMap<String, mpsc::Sender<TeamMessage>>,
361    /// Message receivers per member (taken on first access).
362    receivers: HashMap<String, mpsc::Receiver<TeamMessage>>,
363}
364
365impl AgentTeam {
366    /// Create a new team.
367    pub fn new(name: &str, config: TeamConfig) -> Self {
368        Self {
369            name: name.to_string(),
370            config,
371            members: HashMap::new(),
372            task_board: Arc::new(TeamTaskBoard::new(50)),
373            senders: HashMap::new(),
374            receivers: HashMap::new(),
375        }
376    }
377
378    /// Team name.
379    pub fn name(&self) -> &str {
380        &self.name
381    }
382
383    /// Add a member to the team.
384    pub fn add_member(&mut self, id: &str, role: TeamRole) {
385        let (tx, rx) = mpsc::channel(self.config.channel_buffer);
386        self.members.insert(
387            id.to_string(),
388            TeamMember {
389                id: id.to_string(),
390                role,
391            },
392        );
393        self.senders.insert(id.to_string(), tx);
394        self.receivers.insert(id.to_string(), rx);
395    }
396
397    /// Remove a member from the team.
398    pub fn remove_member(&mut self, id: &str) -> bool {
399        self.senders.remove(id);
400        self.receivers.remove(id);
401        self.members.remove(id).is_some()
402    }
403
404    /// Get a reference to the shared task board.
405    pub fn task_board(&self) -> &TeamTaskBoard {
406        &self.task_board
407    }
408
409    /// Get a cloneable Arc to the task board.
410    pub fn task_board_arc(&self) -> Arc<TeamTaskBoard> {
411        Arc::clone(&self.task_board)
412    }
413
414    /// Send a message to a team member.
415    pub async fn send_message(
416        &self,
417        from: &str,
418        to: &str,
419        content: &str,
420        task_id: Option<&str>,
421    ) -> bool {
422        let sender = match self.senders.get(to) {
423            Some(s) => s,
424            None => return false,
425        };
426
427        let msg = TeamMessage {
428            from: from.to_string(),
429            to: to.to_string(),
430            content: content.to_string(),
431            task_id: task_id.map(|s| s.to_string()),
432            timestamp: chrono::Utc::now().timestamp(),
433        };
434
435        sender.send(msg).await.is_ok()
436    }
437
438    /// Take the message receiver for a member (can only be called once per member).
439    pub fn take_receiver(&mut self, member_id: &str) -> Option<mpsc::Receiver<TeamMessage>> {
440        self.receivers.remove(member_id)
441    }
442
443    /// Broadcast a message to all members except the sender.
444    pub async fn broadcast(&self, from: &str, content: &str, task_id: Option<&str>) {
445        for (id, sender) in &self.senders {
446            if id == from {
447                continue;
448            }
449            let msg = TeamMessage {
450                from: from.to_string(),
451                to: id.clone(),
452                content: content.to_string(),
453                task_id: task_id.map(|s| s.to_string()),
454                timestamp: chrono::Utc::now().timestamp(),
455            };
456            let _ = sender.send(msg).await;
457        }
458    }
459
460    /// Get all members.
461    pub fn members(&self) -> Vec<&TeamMember> {
462        self.members.values().collect()
463    }
464
465    /// Get members by role.
466    pub fn members_by_role(&self, role: TeamRole) -> Vec<&TeamMember> {
467        self.members.values().filter(|m| m.role == role).collect()
468    }
469
470    /// Number of members.
471    pub fn member_count(&self) -> usize {
472        self.members.len()
473    }
474}
475
476impl std::fmt::Debug for AgentTeam {
477    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
478        f.debug_struct("AgentTeam")
479            .field("name", &self.name)
480            .field("members", &self.members.len())
481            .field("tasks", &self.task_board.len())
482            .finish()
483    }
484}
485
486// ---------------------------------------------------------------------------
487// AgentExecutor — abstraction over AgentSession for testability
488// ---------------------------------------------------------------------------
489
490/// Minimal execution interface for a team member.
491///
492/// `AgentSession` implements this trait. In tests, a mock can be used instead.
493#[async_trait::async_trait]
494pub trait AgentExecutor: Send + Sync {
495    /// Execute a prompt and return the text response.
496    async fn execute(&self, prompt: &str) -> crate::error::Result<String>;
497}
498
499#[async_trait::async_trait]
500impl AgentExecutor for crate::agent_api::AgentSession {
501    async fn execute(&self, prompt: &str) -> crate::error::Result<String> {
502        let result = self.send(prompt, None).await?;
503        Ok(result.text)
504    }
505}
506
507// ---------------------------------------------------------------------------
508// TeamRunResult
509// ---------------------------------------------------------------------------
510
511/// Result returned by `TeamRunner::run_until_done`.
512#[derive(Debug)]
513pub struct TeamRunResult {
514    /// Tasks that reached the `Done` state.
515    pub done_tasks: Vec<TeamTask>,
516    /// Tasks that are still `Rejected` after all rounds (could not be approved).
517    pub rejected_tasks: Vec<TeamTask>,
518    /// Number of reviewer polling rounds completed.
519    pub rounds: usize,
520}
521
522// ---------------------------------------------------------------------------
523// TeamRunner — binds AgentSession execution to AgentTeam coordination
524// ---------------------------------------------------------------------------
525
526const LEAD_PROMPT: &str = crate::prompts::TEAM_LEAD;
527
528const REVIEWER_PROMPT: &str = crate::prompts::TEAM_REVIEWER;
529
530/// Per-member session overrides for [`TeamRunner::add_lead`], [`TeamRunner::add_worker`],
531/// and [`TeamRunner::add_reviewer`].
532#[derive(Debug, Default, Clone)]
533pub struct TeamMemberOptions {
534    /// Override the workspace for this member.
535    ///
536    /// When set, this member uses the given path (e.g. a git worktree) instead
537    /// of the default workspace from [`TeamRunner::with_agent`].
538    pub workspace: Option<String>,
539    /// Model override. Format: `"provider/model"` (e.g. `"openai/gpt-4o"`).
540    pub model: Option<String>,
541    /// Prompt slot customization (role, guidelines, response style, extra).
542    pub prompt_slots: Option<crate::prompts::SystemPromptSlots>,
543    /// Override maximum tool-call rounds for this member's session.
544    pub max_tool_rounds: Option<usize>,
545}
546
547impl TeamMemberOptions {
548    fn into_session_options(self) -> Option<crate::agent_api::SessionOptions> {
549        if self.model.is_none() && self.prompt_slots.is_none() && self.max_tool_rounds.is_none() {
550            return None;
551        }
552        let mut opts = crate::agent_api::SessionOptions::new();
553        if let Some(m) = self.model {
554            opts = opts.with_model(m);
555        }
556        if let Some(slots) = self.prompt_slots {
557            opts = opts.with_prompt_slots(slots);
558        }
559        if let Some(rounds) = self.max_tool_rounds {
560            opts = opts.with_max_tool_rounds(rounds);
561        }
562        Some(opts)
563    }
564}
565
566/// Default agent context stored on [`TeamRunner`] to support simplified member
567/// addition via [`TeamRunner::add_lead`], [`TeamRunner::add_worker`], and
568/// [`TeamRunner::add_reviewer`].
569struct DefaultAgentContext {
570    agent: Arc<crate::agent_api::Agent>,
571    workspace: String,
572    registry: Arc<crate::subagent::AgentRegistry>,
573}
574
575/// Binds an `AgentTeam` to concrete `AgentExecutor` sessions, enabling
576/// Lead → Worker → Reviewer automated workflows.
577pub struct TeamRunner {
578    team: AgentTeam,
579    sessions: HashMap<String, Arc<dyn AgentExecutor>>,
580    default_ctx: Option<DefaultAgentContext>,
581    worker_count: usize,
582}
583
584impl TeamRunner {
585    /// Create a new runner wrapping the given team.
586    pub fn new(team: AgentTeam) -> Self {
587        Self {
588            team,
589            sessions: HashMap::new(),
590            default_ctx: None,
591            worker_count: 0,
592        }
593    }
594
595    /// Create a runner with a default agent context for simplified member addition.
596    ///
597    /// Unlike [`TeamRunner::new`], this constructor lets you call
598    /// [`add_lead`](Self::add_lead), [`add_worker`](Self::add_worker), and
599    /// [`add_reviewer`](Self::add_reviewer) without repeating the agent,
600    /// workspace, and registry on every call.
601    pub fn with_agent(
602        team: AgentTeam,
603        agent: Arc<crate::agent_api::Agent>,
604        workspace: &str,
605        registry: Arc<crate::subagent::AgentRegistry>,
606    ) -> Self {
607        Self {
608            team,
609            sessions: HashMap::new(),
610            default_ctx: Some(DefaultAgentContext {
611                agent,
612                workspace: workspace.to_string(),
613                registry,
614            }),
615            worker_count: 0,
616        }
617    }
618
619    /// Bind an executor to a team member.
620    ///
621    /// Returns an error if `member_id` is not registered in the team.
622    pub fn bind_session(
623        &mut self,
624        member_id: &str,
625        executor: Arc<dyn AgentExecutor>,
626    ) -> crate::error::Result<()> {
627        if !self.team.members.contains_key(member_id) {
628            return Err(anyhow::anyhow!(
629                "member '{}' not found in team '{}'",
630                member_id,
631                self.team.name()
632            )
633            .into());
634        }
635        self.sessions.insert(member_id.to_string(), executor);
636        Ok(())
637    }
638
639    /// Bind a team member to a session created from a named agent definition.
640    ///
641    /// Looks up `agent_name` in `registry`, applies the definition's prompt,
642    /// permissions, model, and `max_steps` to a new [`AgentSession`] via
643    /// [`Agent::session_for_agent`], then binds it to `member_id`.
644    ///
645    /// This is the primary integration point between the `AgentTeam` coordination
646    /// layer and the markdown/YAML-defined subagent capability layer.
647    ///
648    /// # Errors
649    ///
650    /// Returns an error if `member_id` is not in the team, `agent_name` is not
651    /// in `registry`, or session creation fails.
652    pub fn bind_agent(
653        &mut self,
654        member_id: &str,
655        agent: &crate::agent_api::Agent,
656        workspace: &str,
657        agent_name: &str,
658        registry: &crate::subagent::AgentRegistry,
659    ) -> crate::error::Result<()> {
660        let def = registry
661            .get(agent_name)
662            .ok_or_else(|| anyhow::anyhow!("agent '{}' not found in registry", agent_name))?;
663        let session = agent.session_for_agent(workspace, &def, None)?;
664        self.bind_session(member_id, Arc::new(session))
665    }
666
667    /// Create a session from the default agent context, applying optional member overrides.
668    ///
669    /// Returns an error if no default context is set or the agent name is not
670    /// found in the registry.
671    fn create_session_from_default(
672        &self,
673        agent_name: &str,
674        member_opts: Option<TeamMemberOptions>,
675    ) -> crate::error::Result<crate::agent_api::AgentSession> {
676        let ctx = self.default_ctx.as_ref().ok_or_else(|| {
677            anyhow::anyhow!("no default agent context; use TeamRunner::with_agent")
678        })?;
679        let def = ctx
680            .registry
681            .get(agent_name)
682            .ok_or_else(|| anyhow::anyhow!("agent '{}' not found in registry", agent_name))?;
683        let workspace = member_opts
684            .as_ref()
685            .and_then(|o| o.workspace.clone())
686            .unwrap_or_else(|| ctx.workspace.clone());
687        let session_opts = member_opts.and_then(|o| o.into_session_options());
688        ctx.agent.session_for_agent(workspace, &def, session_opts)
689    }
690
691    /// Add a Lead member and bind it to the named agent definition.
692    ///
693    /// Requires a default agent context set via [`TeamRunner::with_agent`].
694    /// The member ID is fixed to `"lead"`.
695    ///
696    /// Use `opts` to override the model, prompt slots, or workspace (e.g. a git worktree).
697    ///
698    /// # Errors
699    ///
700    /// Returns an error if no default context is set or `agent_name` is not
701    /// found in the registry.
702    pub fn add_lead(
703        &mut self,
704        agent_name: &str,
705        opts: Option<TeamMemberOptions>,
706    ) -> crate::error::Result<()> {
707        let session = self.create_session_from_default(agent_name, opts)?;
708        self.team.add_member("lead", TeamRole::Lead);
709        self.sessions.insert("lead".to_string(), Arc::new(session));
710        Ok(())
711    }
712
713    /// Add a Worker member and bind it to the named agent definition.
714    ///
715    /// Requires a default agent context set via [`TeamRunner::with_agent`].
716    /// Member IDs are auto-generated as `"worker-1"`, `"worker-2"`, etc.
717    /// Multiple workers can be added; they run concurrently during execution.
718    ///
719    /// Use `opts` to override the model, prompt slots, or workspace — set
720    /// `workspace` to an isolated git worktree path to prevent filesystem
721    /// conflicts between concurrent workers.
722    ///
723    /// # Errors
724    ///
725    /// Returns an error if no default context is set or `agent_name` is not
726    /// found in the registry.
727    pub fn add_worker(
728        &mut self,
729        agent_name: &str,
730        opts: Option<TeamMemberOptions>,
731    ) -> crate::error::Result<()> {
732        self.worker_count += 1;
733        let id = format!("worker-{}", self.worker_count);
734        let session = self.create_session_from_default(agent_name, opts)?;
735        self.team.add_member(&id, TeamRole::Worker);
736        self.sessions.insert(id, Arc::new(session));
737        Ok(())
738    }
739
740    /// Add a Reviewer member and bind it to the named agent definition.
741    ///
742    /// Requires a default agent context set via [`TeamRunner::with_agent`].
743    /// The member ID is fixed to `"reviewer"`.
744    ///
745    /// Use `opts` to override the model, prompt slots, or workspace.
746    ///
747    /// # Errors
748    ///
749    /// Returns an error if no default context is set or `agent_name` is not
750    /// found in the registry.
751    pub fn add_reviewer(
752        &mut self,
753        agent_name: &str,
754        opts: Option<TeamMemberOptions>,
755    ) -> crate::error::Result<()> {
756        let session = self.create_session_from_default(agent_name, opts)?;
757        self.team.add_member("reviewer", TeamRole::Reviewer);
758        self.sessions
759            .insert("reviewer".to_string(), Arc::new(session));
760        Ok(())
761    }
762
763    /// Get a mutable reference to the underlying team.
764    pub fn team_mut(&mut self) -> &mut AgentTeam {
765        &mut self.team
766    }
767
768    /// Access the shared task board.
769    pub fn task_board(&self) -> Arc<TeamTaskBoard> {
770        self.team.task_board_arc()
771    }
772
773    /// Run the full Lead → Worker → Reviewer workflow until all tasks are done
774    /// or `max_rounds` retry cycles are exceeded.
775    ///
776    /// Steps:
777    /// 1. Lead decomposes `goal` into tasks via JSON response.
778    /// 2. Workers run concurrently until all tasks are in review or done.
779    /// 3. Reviewer processes all InReview tasks (after workers finish).
780    /// 4. If rejected tasks remain, workers retry them (back to step 2).
781    ///
782    /// Running the reviewer after workers (rather than concurrently) prevents a
783    /// race where the reviewer's polling timeout fires before long-running agent
784    /// calls have produced any InReview tasks.
785    pub async fn run_until_done(&self, goal: &str) -> crate::error::Result<TeamRunResult> {
786        // --- Step 1: Lead decomposes the goal ---
787        let lead = self
788            .team
789            .members_by_role(TeamRole::Lead)
790            .into_iter()
791            .next()
792            .ok_or_else(|| anyhow::anyhow!("team has no Lead member"))?;
793
794        let lead_executor = self
795            .sessions
796            .get(&lead.id)
797            .ok_or_else(|| anyhow::anyhow!("no executor bound for lead member '{}'", lead.id))?;
798
799        let lead_prompt = LEAD_PROMPT.replace("{goal}", goal);
800        let raw = lead_executor.execute(&lead_prompt).await?;
801        let task_descriptions = parse_task_list(&raw)?;
802
803        let board = self.team.task_board_arc();
804        for desc in &task_descriptions {
805            board.post(desc, &lead.id, None);
806        }
807
808        // --- Step 2 & 3: Workers then reviewer, cycling until all tasks Done ---
809        //
810        // Workers run to completion first, then the reviewer processes all
811        // InReview tasks. If the reviewer rejects any tasks, workers are
812        // re-spawned to retry them. This sequential-then-cycle approach avoids
813        // a race where the reviewer's polling timeout fires before long-running
814        // LLM agent calls have produced any InReview tasks.
815        let poll = Duration::from_millis(self.team.config.poll_interval_ms);
816        let max_rounds = self.team.config.max_rounds;
817
818        let workers: Vec<(String, Arc<dyn AgentExecutor>)> = self
819            .team
820            .members_by_role(TeamRole::Worker)
821            .into_iter()
822            .filter_map(|m| {
823                self.sessions
824                    .get(&m.id)
825                    .map(|e| (m.id.clone(), Arc::clone(e)))
826            })
827            .collect();
828
829        let reviewer: Option<(String, Arc<dyn AgentExecutor>)> = self
830            .team
831            .members_by_role(TeamRole::Reviewer)
832            .into_iter()
833            .next()
834            .and_then(|m| {
835                self.sessions
836                    .get(&m.id)
837                    .map(|e| (m.id.clone(), Arc::clone(e)))
838            });
839
840        let mut total_reviewer_rounds = 0usize;
841
842        for _cycle in 0..max_rounds {
843            // Run all workers concurrently until no claimable work remains.
844            let mut worker_handles = Vec::new();
845            for (id, executor) in &workers {
846                let b = Arc::clone(&board);
847                let id = id.clone();
848                let executor = Arc::clone(executor);
849                let handle = tokio::spawn(async move {
850                    run_worker(id, executor, b, max_rounds, poll).await;
851                });
852                worker_handles.push(handle);
853            }
854            for h in worker_handles {
855                let _ = h.await;
856            }
857
858            // Run reviewer on all InReview tasks (workers are done; no race).
859            if let Some((ref id, ref executor)) = reviewer {
860                let rounds = run_reviewer(
861                    id.clone(),
862                    Arc::clone(executor),
863                    Arc::clone(&board),
864                    max_rounds,
865                    poll,
866                )
867                .await;
868                total_reviewer_rounds += rounds;
869            }
870
871            // Stop if no rejected tasks remain to retry.
872            let (open, in_progress, in_review, _, rejected) = board.stats();
873            if open == 0 && in_progress == 0 && in_review == 0 && rejected == 0 {
874                break;
875            }
876            if rejected == 0 {
877                break;
878            }
879        }
880
881        let done_tasks = board.by_status(TaskStatus::Done);
882        let rejected_tasks = board.by_status(TaskStatus::Rejected);
883
884        Ok(TeamRunResult {
885            done_tasks,
886            rejected_tasks,
887            rounds: total_reviewer_rounds,
888        })
889    }
890}
891
892impl std::fmt::Debug for TeamRunner {
893    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
894        f.debug_struct("TeamRunner")
895            .field("team", &self.team.name())
896            .field("bound_sessions", &self.sessions.len())
897            .finish()
898    }
899}
900
901/// Parse `{"tasks": ["...", "..."]}` from a Lead response.
902fn parse_task_list(response: &str) -> crate::error::Result<Vec<String>> {
903    // Find JSON object boundaries in case there is extra whitespace or newlines.
904    let start = response
905        .find('{')
906        .ok_or_else(|| anyhow::anyhow!("lead response contains no JSON object: {}", response))?;
907    let end = response
908        .rfind('}')
909        .ok_or_else(|| anyhow::anyhow!("lead response JSON object is unclosed"))?;
910    let json_str = &response[start..=end];
911
912    let value: serde_json::Value = serde_json::from_str(json_str)
913        .map_err(|e| anyhow::anyhow!("failed to parse lead JSON response: {e}"))?;
914
915    let tasks: Vec<String> = value["tasks"]
916        .as_array()
917        .ok_or_else(|| anyhow::anyhow!("lead JSON response missing 'tasks' array"))?
918        .iter()
919        .filter_map(|v: &serde_json::Value| v.as_str().map(|s| s.to_string()))
920        .collect();
921
922    Ok(tasks)
923}
924
925/// Worker loop: claim and execute tasks until the board is quiescent.
926async fn run_worker(
927    member_id: String,
928    executor: Arc<dyn AgentExecutor>,
929    board: Arc<TeamTaskBoard>,
930    max_rounds: usize,
931    poll: Duration,
932) {
933    let mut idle = 0usize;
934    loop {
935        if let Some(task) = board.claim(&member_id) {
936            idle = 0;
937            let result = executor
938                .execute(&task.description)
939                .await
940                .unwrap_or_else(|e| format!("execution error: {e}"));
941            board.complete(&task.id, &result);
942        } else {
943            let (open, in_progress, in_review, _, rejected) = board.stats();
944            // No claimable work and no tasks that could re-enter the queue → stop.
945            // Include in_review so we wait for the reviewer's verdict (which may
946            // produce a Rejected task for us to retry).
947            if open == 0 && in_progress == 0 && in_review == 0 && rejected == 0 {
948                break;
949            }
950            idle += 1;
951            if idle >= max_rounds {
952                break;
953            }
954            tokio::time::sleep(poll).await;
955        }
956    }
957}
958
959/// Reviewer loop: review InReview tasks and approve or reject them.
960/// Returns the number of rounds completed.
961async fn run_reviewer(
962    _member_id: String,
963    executor: Arc<dyn AgentExecutor>,
964    board: Arc<TeamTaskBoard>,
965    max_rounds: usize,
966    poll: Duration,
967) -> usize {
968    let mut rounds = 0usize;
969    loop {
970        let in_review = board.by_status(TaskStatus::InReview);
971        for task in in_review {
972            let result_text = task.result.as_deref().unwrap_or("");
973            let prompt = REVIEWER_PROMPT
974                .replace("{task}", &task.description)
975                .replace("{result}", result_text);
976            let verdict = executor
977                .execute(&prompt)
978                .await
979                .unwrap_or_else(|_| "REJECTED: execution error".to_string());
980            if verdict.contains("APPROVED") {
981                board.approve(&task.id);
982            } else {
983                board.reject(&task.id);
984            }
985            // Yield after each decision so workers can pick up rejected tasks.
986            tokio::task::yield_now().await;
987        }
988
989        let (open, in_progress, in_review_count, _, rejected) = board.stats();
990        if open == 0 && in_progress == 0 && in_review_count == 0 && rejected == 0 {
991            break;
992        }
993        rounds += 1;
994        if rounds >= max_rounds {
995            break;
996        }
997        tokio::time::sleep(poll).await;
998    }
999    rounds
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004    use super::*;
1005
1006    #[test]
1007    fn test_team_creation() {
1008        let team = AgentTeam::new("test-team", TeamConfig::default());
1009        assert_eq!(team.name(), "test-team");
1010        assert_eq!(team.member_count(), 0);
1011    }
1012
1013    #[test]
1014    fn test_add_remove_members() {
1015        let mut team = AgentTeam::new("test", TeamConfig::default());
1016        team.add_member("lead", TeamRole::Lead);
1017        team.add_member("w1", TeamRole::Worker);
1018        team.add_member("w2", TeamRole::Worker);
1019        team.add_member("rev", TeamRole::Reviewer);
1020        assert_eq!(team.member_count(), 4);
1021        assert_eq!(team.members_by_role(TeamRole::Worker).len(), 2);
1022
1023        assert!(team.remove_member("w2"));
1024        assert_eq!(team.member_count(), 3);
1025        assert!(!team.remove_member("nonexistent"));
1026    }
1027
1028    #[test]
1029    fn test_task_board_post_and_claim() {
1030        let board = TeamTaskBoard::new(10);
1031        let id = board.post("Fix auth bug", "lead", None).unwrap();
1032        assert_eq!(board.len(), 1);
1033
1034        let task = board.claim("worker-1").unwrap();
1035        assert_eq!(task.id, id);
1036        assert_eq!(task.assigned_to.as_deref(), Some("worker-1"));
1037        assert_eq!(task.status, TaskStatus::InProgress);
1038
1039        // No more open tasks
1040        assert!(board.claim("worker-2").is_none());
1041    }
1042
1043    #[test]
1044    fn test_task_board_workflow() {
1045        let board = TeamTaskBoard::new(10);
1046        let id = board.post("Write tests", "lead", None).unwrap();
1047
1048        // Claim
1049        board.claim("worker-1");
1050
1051        // Complete
1052        assert!(board.complete(&id, "Added 5 tests"));
1053        let task = board.get(&id).unwrap();
1054        assert_eq!(task.status, TaskStatus::InReview);
1055
1056        // Approve
1057        assert!(board.approve(&id));
1058        let task = board.get(&id).unwrap();
1059        assert_eq!(task.status, TaskStatus::Done);
1060    }
1061
1062    #[test]
1063    fn test_task_board_reject() {
1064        let board = TeamTaskBoard::new(10);
1065        let id = board.post("Refactor module", "lead", None).unwrap();
1066        board.claim("worker-1");
1067        board.complete(&id, "Done");
1068
1069        assert!(board.reject(&id));
1070        let task = board.get(&id).unwrap();
1071        assert_eq!(task.status, TaskStatus::Rejected);
1072        assert!(task.assigned_to.is_none());
1073    }
1074
1075    #[test]
1076    fn test_task_board_max_capacity() {
1077        let board = TeamTaskBoard::new(2);
1078        assert!(board.post("Task 1", "lead", None).is_some());
1079        assert!(board.post("Task 2", "lead", None).is_some());
1080        assert!(board.post("Task 3", "lead", None).is_none()); // Full
1081    }
1082
1083    #[test]
1084    fn test_task_board_stats() {
1085        let board = TeamTaskBoard::new(10);
1086        board.post("T1", "lead", None);
1087        board.post("T2", "lead", None);
1088        let id3 = board.post("T3", "lead", Some("w1")).unwrap();
1089        board.complete(&id3, "done");
1090
1091        let (open, progress, review, done, rejected) = board.stats();
1092        assert_eq!(open, 2);
1093        assert_eq!(progress, 0);
1094        assert_eq!(review, 1);
1095        assert_eq!(done, 0);
1096        assert_eq!(rejected, 0);
1097    }
1098
1099    #[test]
1100    fn test_task_board_by_assignee() {
1101        let board = TeamTaskBoard::new(10);
1102        board.post("T1", "lead", Some("w1"));
1103        board.post("T2", "lead", Some("w2"));
1104        board.post("T3", "lead", Some("w1"));
1105
1106        let w1_tasks = board.by_assignee("w1");
1107        assert_eq!(w1_tasks.len(), 2);
1108    }
1109
1110    #[tokio::test]
1111    async fn test_send_message() {
1112        let mut team = AgentTeam::new("msg-test", TeamConfig::default());
1113        team.add_member("lead", TeamRole::Lead);
1114        team.add_member("worker", TeamRole::Worker);
1115
1116        let mut rx = team.take_receiver("worker").unwrap();
1117
1118        assert!(
1119            team.send_message("lead", "worker", "Please fix the bug", Some("task-1"))
1120                .await
1121        );
1122
1123        let msg = rx.recv().await.unwrap();
1124        assert_eq!(msg.from, "lead");
1125        assert_eq!(msg.to, "worker");
1126        assert_eq!(msg.content, "Please fix the bug");
1127        assert_eq!(msg.task_id.as_deref(), Some("task-1"));
1128    }
1129
1130    #[tokio::test]
1131    async fn test_broadcast() {
1132        let mut team = AgentTeam::new("broadcast-test", TeamConfig::default());
1133        team.add_member("lead", TeamRole::Lead);
1134        team.add_member("w1", TeamRole::Worker);
1135        team.add_member("w2", TeamRole::Worker);
1136
1137        let mut rx1 = team.take_receiver("w1").unwrap();
1138        let mut rx2 = team.take_receiver("w2").unwrap();
1139
1140        team.broadcast("lead", "New task available", None).await;
1141
1142        let m1 = rx1.recv().await.unwrap();
1143        let m2 = rx2.recv().await.unwrap();
1144        assert_eq!(m1.content, "New task available");
1145        assert_eq!(m2.content, "New task available");
1146    }
1147
1148    #[test]
1149    fn test_role_display() {
1150        assert_eq!(TeamRole::Lead.to_string(), "lead");
1151        assert_eq!(TeamRole::Worker.to_string(), "worker");
1152        assert_eq!(TeamRole::Reviewer.to_string(), "reviewer");
1153    }
1154
1155    #[test]
1156    fn test_task_status_display() {
1157        assert_eq!(TaskStatus::Open.to_string(), "open");
1158        assert_eq!(TaskStatus::InProgress.to_string(), "in_progress");
1159        assert_eq!(TaskStatus::InReview.to_string(), "in_review");
1160        assert_eq!(TaskStatus::Done.to_string(), "done");
1161        assert_eq!(TaskStatus::Rejected.to_string(), "rejected");
1162    }
1163
1164    // -----------------------------------------------------------------------
1165    // TeamRunner tests (mock executor)
1166    // -----------------------------------------------------------------------
1167
1168    /// Minimal mock executor for unit tests.
1169    struct MockExecutor {
1170        response: String,
1171    }
1172
1173    impl MockExecutor {
1174        fn new(response: impl Into<String>) -> Arc<Self> {
1175            Arc::new(Self {
1176                response: response.into(),
1177            })
1178        }
1179    }
1180
1181    #[async_trait::async_trait]
1182    impl AgentExecutor for MockExecutor {
1183        async fn execute(&self, _prompt: &str) -> crate::error::Result<String> {
1184            Ok(self.response.clone())
1185        }
1186    }
1187
1188    #[test]
1189    fn test_team_runner_session_binding() {
1190        let mut team = AgentTeam::new("bind-test", TeamConfig::default());
1191        team.add_member("lead", TeamRole::Lead);
1192        team.add_member("w1", TeamRole::Worker);
1193
1194        let mut runner = TeamRunner::new(team);
1195
1196        // Binding to a known member succeeds.
1197        assert!(runner.bind_session("lead", MockExecutor::new("ok")).is_ok());
1198        assert!(runner.bind_session("w1", MockExecutor::new("ok")).is_ok());
1199
1200        // Binding to an unknown member fails.
1201        assert!(runner
1202            .bind_session("nobody", MockExecutor::new("ok"))
1203            .is_err());
1204    }
1205
1206    #[test]
1207    fn test_parse_task_list() {
1208        let json = r#"{"tasks": ["Write tests", "Fix lints", "Update docs"]}"#;
1209        let tasks = parse_task_list(json).unwrap();
1210        assert_eq!(tasks.len(), 3);
1211        assert_eq!(tasks[0], "Write tests");
1212        assert_eq!(tasks[2], "Update docs");
1213    }
1214
1215    #[test]
1216    fn test_parse_task_list_no_json() {
1217        assert!(parse_task_list("no json here").is_err());
1218    }
1219
1220    #[test]
1221    fn test_claim_rejected_tasks() {
1222        let board = TeamTaskBoard::new(10);
1223        let id = board.post("Refactor module", "lead", None).unwrap();
1224
1225        // Simulate workflow: claim → complete → reject
1226        board.claim("worker-1");
1227        board.complete(&id, "initial attempt");
1228        board.reject(&id);
1229
1230        assert_eq!(board.get(&id).unwrap().status, TaskStatus::Rejected);
1231
1232        // A new worker can re-claim the rejected task.
1233        let task = board.claim("worker-2");
1234        assert!(task.is_some());
1235        let task = task.unwrap();
1236        assert_eq!(task.id, id);
1237        assert_eq!(task.assigned_to.as_deref(), Some("worker-2"));
1238        assert_eq!(task.status, TaskStatus::InProgress);
1239    }
1240
1241    #[tokio::test]
1242    async fn test_team_runner_goal_decomposition() {
1243        let config = TeamConfig {
1244            poll_interval_ms: 1,
1245            max_rounds: 3,
1246            ..TeamConfig::default()
1247        };
1248        let mut team = AgentTeam::new("decomp-test", config);
1249        team.add_member("lead", TeamRole::Lead);
1250        team.add_member("w1", TeamRole::Worker);
1251        team.add_member("rev", TeamRole::Reviewer);
1252
1253        let mut runner = TeamRunner::new(team);
1254        // Lead returns two tasks as JSON.
1255        runner
1256            .bind_session(
1257                "lead",
1258                MockExecutor::new(r#"{"tasks": ["Task A", "Task B"]}"#),
1259            )
1260            .unwrap();
1261        // Worker always succeeds.
1262        runner
1263            .bind_session("w1", MockExecutor::new("done"))
1264            .unwrap();
1265        // Reviewer always approves.
1266        runner
1267            .bind_session("rev", MockExecutor::new("APPROVED: looks good"))
1268            .unwrap();
1269
1270        let result = runner.run_until_done("Build the feature").await.unwrap();
1271
1272        assert_eq!(result.done_tasks.len(), 2);
1273        assert!(result.rejected_tasks.is_empty());
1274    }
1275
1276    #[tokio::test]
1277    async fn test_team_runner_worker_execution() {
1278        let config = TeamConfig {
1279            poll_interval_ms: 1,
1280            max_rounds: 3,
1281            ..TeamConfig::default()
1282        };
1283        let mut team = AgentTeam::new("worker-exec-test", config);
1284        team.add_member("lead", TeamRole::Lead);
1285        team.add_member("w1", TeamRole::Worker);
1286
1287        let mut runner = TeamRunner::new(team);
1288        runner
1289            .bind_session(
1290                "lead",
1291                MockExecutor::new(r#"{"tasks": ["Write unit tests"]}"#),
1292            )
1293            .unwrap();
1294        runner
1295            .bind_session("w1", MockExecutor::new("Added 3 tests"))
1296            .unwrap();
1297
1298        // No reviewer bound → tasks end up InReview (not Done), which is fine for this test.
1299        let board = runner.task_board();
1300        let _ = runner.run_until_done("Test the module").await;
1301
1302        // The task must have been claimed and completed (InReview or Done).
1303        let tasks = board.by_status(TaskStatus::InReview);
1304        assert_eq!(tasks.len(), 1);
1305        assert_eq!(tasks[0].result.as_deref(), Some("Added 3 tests"));
1306    }
1307
1308    #[tokio::test]
1309    async fn test_team_runner_reviewer_approval() {
1310        let config = TeamConfig {
1311            poll_interval_ms: 1,
1312            max_rounds: 5,
1313            ..TeamConfig::default()
1314        };
1315        let mut team = AgentTeam::new("reviewer-test", config);
1316        team.add_member("lead", TeamRole::Lead);
1317        team.add_member("w1", TeamRole::Worker);
1318        team.add_member("rev", TeamRole::Reviewer);
1319
1320        let mut runner = TeamRunner::new(team);
1321        runner
1322            .bind_session(
1323                "lead",
1324                MockExecutor::new(r#"{"tasks": ["Implement feature X"]}"#),
1325            )
1326            .unwrap();
1327        runner
1328            .bind_session("w1", MockExecutor::new("Feature X implemented"))
1329            .unwrap();
1330        runner
1331            .bind_session("rev", MockExecutor::new("APPROVED: complete"))
1332            .unwrap();
1333
1334        let result = runner.run_until_done("Ship feature X").await.unwrap();
1335
1336        assert_eq!(result.done_tasks.len(), 1);
1337        assert_eq!(
1338            result.done_tasks[0].result.as_deref(),
1339            Some("Feature X implemented")
1340        );
1341    }
1342
1343    #[tokio::test]
1344    async fn test_team_runner_rejection_and_retry() {
1345        use std::sync::atomic::{AtomicUsize, Ordering};
1346
1347        // Reviewer approves on the second attempt.
1348        struct ConditionalReviewer {
1349            calls: AtomicUsize,
1350        }
1351
1352        #[async_trait::async_trait]
1353        impl AgentExecutor for ConditionalReviewer {
1354            async fn execute(&self, _prompt: &str) -> crate::error::Result<String> {
1355                let n = self.calls.fetch_add(1, Ordering::SeqCst);
1356                if n == 0 {
1357                    Ok("REJECTED: needs improvement".to_string())
1358                } else {
1359                    Ok("APPROVED: now correct".to_string())
1360                }
1361            }
1362        }
1363
1364        let config = TeamConfig {
1365            poll_interval_ms: 1,
1366            max_rounds: 10,
1367            ..TeamConfig::default()
1368        };
1369        let mut team = AgentTeam::new("retry-test", config);
1370        team.add_member("lead", TeamRole::Lead);
1371        team.add_member("w1", TeamRole::Worker);
1372        team.add_member("rev", TeamRole::Reviewer);
1373
1374        let mut runner = TeamRunner::new(team);
1375        runner
1376            .bind_session("lead", MockExecutor::new(r#"{"tasks": ["Do the thing"]}"#))
1377            .unwrap();
1378        runner
1379            .bind_session("w1", MockExecutor::new("attempt result"))
1380            .unwrap();
1381        runner
1382            .bind_session(
1383                "rev",
1384                Arc::new(ConditionalReviewer {
1385                    calls: AtomicUsize::new(0),
1386                }),
1387            )
1388            .unwrap();
1389
1390        let result = runner.run_until_done("Complete the thing").await.unwrap();
1391
1392        // After retry, the task is eventually approved.
1393        assert_eq!(result.done_tasks.len(), 1);
1394        assert!(result.rejected_tasks.is_empty());
1395    }
1396}