Skip to main content

a3s_code_core/
agent_teams.rs

1//! Agent Teams — Peer-to-peer multi-agent coordination
2//!
3//! Enables multiple `AgentSession` instances to collaborate on complex tasks
4//! through a shared task board and message passing. Each agent has a role
5//! (Lead, Worker, Reviewer) and can post/claim tasks on the board.
6//!
7//! ## Architecture
8//!
9//! ```text
10//! AgentTeam
11//!   +-- TeamTaskBoard (shared task queue)
12//!   +-- TeamMember[] (role + session reference)
13//!   +-- mpsc channels (peer-to-peer messaging)
14//! ```
15//!
16//! ## Usage
17//!
18//! ```rust,no_run
19//! use a3s_code_core::agent_teams::{AgentTeam, TeamConfig, TeamRole};
20//!
21//! # async fn run() -> anyhow::Result<()> {
22//! let config = TeamConfig::default();
23//! let mut team = AgentTeam::new("refactor-auth", config);
24//!
25//! // Add members (each wraps an AgentSession)
26//! team.add_member("lead", TeamRole::Lead);
27//! team.add_member("worker-1", TeamRole::Worker);
28//! team.add_member("reviewer", TeamRole::Reviewer);
29//!
30//! // Post a task to the board
31//! team.task_board().post("Refactor auth module", "lead", None);
32//!
33//! // Worker claims and works on it
34//! let task = team.task_board().claim("worker-1");
35//! # Ok(())
36//! # }
37//! ```
38
39use std::collections::HashMap;
40use std::sync::{Arc, RwLock};
41use std::time::Duration;
42use tokio::sync::mpsc;
43
44/// Team configuration.
45#[derive(Debug, Clone)]
46pub struct TeamConfig {
47    /// Maximum concurrent tasks on the board.
48    /// Default: 50
49    pub max_tasks: usize,
50    /// Message channel buffer size.
51    /// Default: 128
52    pub channel_buffer: usize,
53    /// Maximum coordination rounds before `run_until_done` exits.
54    /// Default: 10
55    pub max_rounds: usize,
56    /// Worker/Reviewer polling interval in milliseconds.
57    /// Default: 200
58    pub poll_interval_ms: u64,
59}
60
61impl Default for TeamConfig {
62    fn default() -> Self {
63        Self {
64            max_tasks: 50,
65            channel_buffer: 128,
66            max_rounds: 10,
67            poll_interval_ms: 200,
68        }
69    }
70}
71
72/// Role of a team member.
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
74pub enum TeamRole {
75    /// Decomposes goals into tasks, assigns work.
76    Lead,
77    /// Executes assigned tasks.
78    Worker,
79    /// Reviews completed work, provides feedback.
80    Reviewer,
81}
82
83impl std::fmt::Display for TeamRole {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        match self {
86            TeamRole::Lead => write!(f, "lead"),
87            TeamRole::Worker => write!(f, "worker"),
88            TeamRole::Reviewer => write!(f, "reviewer"),
89        }
90    }
91}
92
93/// A message passed between team members.
94#[derive(Debug, Clone)]
95pub struct TeamMessage {
96    /// Sender member ID.
97    pub from: String,
98    /// Recipient member ID.
99    pub to: String,
100    /// Message content.
101    pub content: String,
102    /// Optional task ID this message relates to.
103    pub task_id: Option<String>,
104    /// Timestamp (Unix epoch seconds).
105    pub timestamp: i64,
106}
107
108/// Task status on the board.
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum TaskStatus {
111    /// Waiting to be claimed.
112    Open,
113    /// Claimed by a worker.
114    InProgress,
115    /// Work done, awaiting review.
116    InReview,
117    /// Approved by reviewer.
118    Done,
119    /// Rejected, needs rework.
120    Rejected,
121}
122
123impl std::fmt::Display for TaskStatus {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        match self {
126            TaskStatus::Open => write!(f, "open"),
127            TaskStatus::InProgress => write!(f, "in_progress"),
128            TaskStatus::InReview => write!(f, "in_review"),
129            TaskStatus::Done => write!(f, "done"),
130            TaskStatus::Rejected => write!(f, "rejected"),
131        }
132    }
133}
134
135/// A task on the team board.
136#[derive(Debug, Clone)]
137pub struct TeamTask {
138    /// Unique task ID.
139    pub id: String,
140    /// Task description.
141    pub description: String,
142    /// Who posted it.
143    pub posted_by: String,
144    /// Who is working on it (if claimed).
145    pub assigned_to: Option<String>,
146    /// Current status.
147    pub status: TaskStatus,
148    /// Optional result/output when completed.
149    pub result: Option<String>,
150    /// Created timestamp.
151    pub created_at: i64,
152    /// Last updated timestamp.
153    pub updated_at: i64,
154}
155
156/// Shared task board for team coordination.
157#[derive(Debug)]
158pub struct TeamTaskBoard {
159    tasks: RwLock<Vec<TeamTask>>,
160    max_tasks: usize,
161    next_id: RwLock<u64>,
162}
163
164impl TeamTaskBoard {
165    /// Create a new task board.
166    pub fn new(max_tasks: usize) -> Self {
167        Self {
168            tasks: RwLock::new(Vec::new()),
169            max_tasks,
170            next_id: RwLock::new(1),
171        }
172    }
173
174    /// Post a new task to the board. Returns the task ID.
175    pub fn post(
176        &self,
177        description: &str,
178        posted_by: &str,
179        assign_to: Option<&str>,
180    ) -> Option<String> {
181        let mut tasks = self.tasks.write().unwrap();
182        if tasks.len() >= self.max_tasks {
183            return None;
184        }
185
186        let mut id_counter = self.next_id.write().unwrap();
187        let id = format!("task-{}", *id_counter);
188        *id_counter += 1;
189
190        let now = chrono::Utc::now().timestamp();
191        let status = if assign_to.is_some() {
192            TaskStatus::InProgress
193        } else {
194            TaskStatus::Open
195        };
196
197        tasks.push(TeamTask {
198            id: id.clone(),
199            description: description.to_string(),
200            posted_by: posted_by.to_string(),
201            assigned_to: assign_to.map(|s| s.to_string()),
202            status,
203            result: None,
204            created_at: now,
205            updated_at: now,
206        });
207
208        Some(id)
209    }
210
211    /// Claim the next open or rejected task for a member. Returns the task if available.
212    ///
213    /// Rejected tasks are treated as retriable: a worker can claim them again
214    /// for another execution attempt.
215    pub fn claim(&self, member_id: &str) -> Option<TeamTask> {
216        let mut tasks = self.tasks.write().unwrap();
217        let task = tasks
218            .iter_mut()
219            .find(|t| t.status == TaskStatus::Open || t.status == TaskStatus::Rejected)?;
220        task.assigned_to = Some(member_id.to_string());
221        task.status = TaskStatus::InProgress;
222        task.updated_at = chrono::Utc::now().timestamp();
223        Some(task.clone())
224    }
225
226    /// Mark a task as complete with a result.
227    pub fn complete(&self, task_id: &str, result: &str) -> bool {
228        let mut tasks = self.tasks.write().unwrap();
229        if let Some(task) = tasks.iter_mut().find(|t| t.id == task_id) {
230            task.status = TaskStatus::InReview;
231            task.result = Some(result.to_string());
232            task.updated_at = chrono::Utc::now().timestamp();
233            true
234        } else {
235            false
236        }
237    }
238
239    /// Approve a task (reviewer action).
240    pub fn approve(&self, task_id: &str) -> bool {
241        let mut tasks = self.tasks.write().unwrap();
242        if let Some(task) = tasks
243            .iter_mut()
244            .find(|t| t.id == task_id && t.status == TaskStatus::InReview)
245        {
246            task.status = TaskStatus::Done;
247            task.updated_at = chrono::Utc::now().timestamp();
248            true
249        } else {
250            false
251        }
252    }
253
254    /// Reject a task back to open (reviewer action).
255    pub fn reject(&self, task_id: &str) -> bool {
256        let mut tasks = self.tasks.write().unwrap();
257        if let Some(task) = tasks
258            .iter_mut()
259            .find(|t| t.id == task_id && t.status == TaskStatus::InReview)
260        {
261            task.status = TaskStatus::Rejected;
262            task.assigned_to = None;
263            task.updated_at = chrono::Utc::now().timestamp();
264            true
265        } else {
266            false
267        }
268    }
269
270    /// Get all tasks with a given status.
271    pub fn by_status(&self, status: TaskStatus) -> Vec<TeamTask> {
272        self.tasks
273            .read()
274            .unwrap()
275            .iter()
276            .filter(|t| t.status == status)
277            .cloned()
278            .collect()
279    }
280
281    /// Get all tasks assigned to a member.
282    pub fn by_assignee(&self, member_id: &str) -> Vec<TeamTask> {
283        self.tasks
284            .read()
285            .unwrap()
286            .iter()
287            .filter(|t| t.assigned_to.as_deref() == Some(member_id))
288            .cloned()
289            .collect()
290    }
291
292    /// Get a task by ID.
293    pub fn get(&self, task_id: &str) -> Option<TeamTask> {
294        self.tasks
295            .read()
296            .unwrap()
297            .iter()
298            .find(|t| t.id == task_id)
299            .cloned()
300    }
301
302    /// Number of tasks on the board.
303    pub fn len(&self) -> usize {
304        self.tasks.read().unwrap().len()
305    }
306
307    /// Whether the board is empty.
308    pub fn is_empty(&self) -> bool {
309        self.tasks.read().unwrap().is_empty()
310    }
311
312    /// Summary stats: (open, in_progress, in_review, done, rejected).
313    pub fn stats(&self) -> (usize, usize, usize, usize, usize) {
314        let tasks = self.tasks.read().unwrap();
315        let open = tasks
316            .iter()
317            .filter(|t| t.status == TaskStatus::Open)
318            .count();
319        let progress = tasks
320            .iter()
321            .filter(|t| t.status == TaskStatus::InProgress)
322            .count();
323        let review = tasks
324            .iter()
325            .filter(|t| t.status == TaskStatus::InReview)
326            .count();
327        let done = tasks
328            .iter()
329            .filter(|t| t.status == TaskStatus::Done)
330            .count();
331        let rejected = tasks
332            .iter()
333            .filter(|t| t.status == TaskStatus::Rejected)
334            .count();
335        (open, progress, review, done, rejected)
336    }
337}
338
339/// A team member.
340#[derive(Debug, Clone)]
341pub struct TeamMember {
342    /// Unique member ID.
343    pub id: String,
344    /// Member role.
345    pub role: TeamRole,
346}
347
348/// Multi-agent team coordinator.
349pub struct AgentTeam {
350    /// Team name.
351    name: String,
352    /// Configuration.
353    config: TeamConfig,
354    /// Registered members.
355    members: HashMap<String, TeamMember>,
356    /// Shared task board.
357    task_board: Arc<TeamTaskBoard>,
358    /// Message senders per member.
359    senders: HashMap<String, mpsc::Sender<TeamMessage>>,
360    /// Message receivers per member (taken on first access).
361    receivers: HashMap<String, mpsc::Receiver<TeamMessage>>,
362}
363
364impl AgentTeam {
365    /// Create a new team.
366    pub fn new(name: &str, config: TeamConfig) -> Self {
367        Self {
368            name: name.to_string(),
369            config,
370            members: HashMap::new(),
371            task_board: Arc::new(TeamTaskBoard::new(50)),
372            senders: HashMap::new(),
373            receivers: HashMap::new(),
374        }
375    }
376
377    /// Team name.
378    pub fn name(&self) -> &str {
379        &self.name
380    }
381
382    /// Add a member to the team.
383    pub fn add_member(&mut self, id: &str, role: TeamRole) {
384        let (tx, rx) = mpsc::channel(self.config.channel_buffer);
385        self.members.insert(
386            id.to_string(),
387            TeamMember {
388                id: id.to_string(),
389                role,
390            },
391        );
392        self.senders.insert(id.to_string(), tx);
393        self.receivers.insert(id.to_string(), rx);
394    }
395
396    /// Remove a member from the team.
397    pub fn remove_member(&mut self, id: &str) -> bool {
398        self.senders.remove(id);
399        self.receivers.remove(id);
400        self.members.remove(id).is_some()
401    }
402
403    /// Get a reference to the shared task board.
404    pub fn task_board(&self) -> &TeamTaskBoard {
405        &self.task_board
406    }
407
408    /// Get a cloneable Arc to the task board.
409    pub fn task_board_arc(&self) -> Arc<TeamTaskBoard> {
410        Arc::clone(&self.task_board)
411    }
412
413    /// Send a message to a team member.
414    pub async fn send_message(
415        &self,
416        from: &str,
417        to: &str,
418        content: &str,
419        task_id: Option<&str>,
420    ) -> bool {
421        let sender = match self.senders.get(to) {
422            Some(s) => s,
423            None => return false,
424        };
425
426        let msg = TeamMessage {
427            from: from.to_string(),
428            to: to.to_string(),
429            content: content.to_string(),
430            task_id: task_id.map(|s| s.to_string()),
431            timestamp: chrono::Utc::now().timestamp(),
432        };
433
434        sender.send(msg).await.is_ok()
435    }
436
437    /// Take the message receiver for a member (can only be called once per member).
438    pub fn take_receiver(&mut self, member_id: &str) -> Option<mpsc::Receiver<TeamMessage>> {
439        self.receivers.remove(member_id)
440    }
441
442    /// Broadcast a message to all members except the sender.
443    pub async fn broadcast(&self, from: &str, content: &str, task_id: Option<&str>) {
444        for (id, sender) in &self.senders {
445            if id == from {
446                continue;
447            }
448            let msg = TeamMessage {
449                from: from.to_string(),
450                to: id.clone(),
451                content: content.to_string(),
452                task_id: task_id.map(|s| s.to_string()),
453                timestamp: chrono::Utc::now().timestamp(),
454            };
455            let _ = sender.send(msg).await;
456        }
457    }
458
459    /// Get all members.
460    pub fn members(&self) -> Vec<&TeamMember> {
461        self.members.values().collect()
462    }
463
464    /// Get members by role.
465    pub fn members_by_role(&self, role: TeamRole) -> Vec<&TeamMember> {
466        self.members.values().filter(|m| m.role == role).collect()
467    }
468
469    /// Number of members.
470    pub fn member_count(&self) -> usize {
471        self.members.len()
472    }
473}
474
475impl std::fmt::Debug for AgentTeam {
476    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
477        f.debug_struct("AgentTeam")
478            .field("name", &self.name)
479            .field("members", &self.members.len())
480            .field("tasks", &self.task_board.len())
481            .finish()
482    }
483}
484
485// ---------------------------------------------------------------------------
486// AgentExecutor — abstraction over AgentSession for testability
487// ---------------------------------------------------------------------------
488
489/// Minimal execution interface for a team member.
490///
491/// `AgentSession` implements this trait. In tests, a mock can be used instead.
492#[async_trait::async_trait]
493pub trait AgentExecutor: Send + Sync {
494    /// Execute a prompt and return the text response.
495    async fn execute(&self, prompt: &str) -> crate::error::Result<String>;
496}
497
498#[async_trait::async_trait]
499impl AgentExecutor for crate::agent_api::AgentSession {
500    async fn execute(&self, prompt: &str) -> crate::error::Result<String> {
501        let result = self.send(prompt, None).await?;
502        Ok(result.text)
503    }
504}
505
506// ---------------------------------------------------------------------------
507// TeamRunResult
508// ---------------------------------------------------------------------------
509
510/// Result returned by `TeamRunner::run_until_done`.
511#[derive(Debug)]
512pub struct TeamRunResult {
513    /// Tasks that reached the `Done` state.
514    pub done_tasks: Vec<TeamTask>,
515    /// Tasks that are still `Rejected` after all rounds (could not be approved).
516    pub rejected_tasks: Vec<TeamTask>,
517    /// Number of reviewer polling rounds completed.
518    pub rounds: usize,
519}
520
521// ---------------------------------------------------------------------------
522// TeamRunner — binds AgentSession execution to AgentTeam coordination
523// ---------------------------------------------------------------------------
524
525const LEAD_PROMPT: &str = "You are the lead agent in a team. Your goal is: {goal}
526
527Decompose this goal into concrete, self-contained tasks for your team workers.
528Each task should be independently executable by an AI coding agent.
529
530Respond with ONLY valid JSON in this exact format:
531{{\"tasks\": [\"task description 1\", \"task description 2\", ...]}}
532
533No markdown, no explanation, just the JSON.";
534
535const REVIEWER_PROMPT: &str = "Review the following completed task:
536
537Task: {task}
538Result: {result}
539
540If the result satisfactorily completes the task, respond with \"APPROVED: <brief reason>\".
541If the result is incomplete or incorrect, respond with \"REJECTED: <specific feedback for improvement>\".";
542
543/// Binds an `AgentTeam` to concrete `AgentExecutor` sessions, enabling
544/// Lead → Worker → Reviewer automated workflows.
545pub struct TeamRunner {
546    team: AgentTeam,
547    sessions: HashMap<String, Arc<dyn AgentExecutor>>,
548}
549
550impl TeamRunner {
551    /// Create a new runner wrapping the given team.
552    pub fn new(team: AgentTeam) -> Self {
553        Self {
554            team,
555            sessions: HashMap::new(),
556        }
557    }
558
559    /// Bind an executor to a team member.
560    ///
561    /// Returns an error if `member_id` is not registered in the team.
562    pub fn bind_session(
563        &mut self,
564        member_id: &str,
565        executor: Arc<dyn AgentExecutor>,
566    ) -> crate::error::Result<()> {
567        if !self.team.members.contains_key(member_id) {
568            return Err(anyhow::anyhow!(
569                "member '{}' not found in team '{}'",
570                member_id,
571                self.team.name()
572            )
573            .into());
574        }
575        self.sessions.insert(member_id.to_string(), executor);
576        Ok(())
577    }
578
579    /// Bind a team member to a session created from a named agent definition.
580    ///
581    /// Looks up `agent_name` in `registry`, applies the definition's prompt,
582    /// permissions, model, and `max_steps` to a new [`AgentSession`] via
583    /// [`Agent::session_for_agent`], then binds it to `member_id`.
584    ///
585    /// This is the primary integration point between the `AgentTeam` coordination
586    /// layer and the markdown/YAML-defined subagent capability layer.
587    ///
588    /// # Errors
589    ///
590    /// Returns an error if `member_id` is not in the team, `agent_name` is not
591    /// in `registry`, or session creation fails.
592    pub fn bind_agent(
593        &mut self,
594        member_id: &str,
595        agent: &crate::agent_api::Agent,
596        workspace: &str,
597        agent_name: &str,
598        registry: &crate::subagent::AgentRegistry,
599    ) -> crate::error::Result<()> {
600        let def = registry
601            .get(agent_name)
602            .ok_or_else(|| anyhow::anyhow!("agent '{}' not found in registry", agent_name))?;
603        let session = agent.session_for_agent(workspace, &def, None)?;
604        self.bind_session(member_id, Arc::new(session))
605    }
606
607    /// Access the shared task board.
608    pub fn task_board(&self) -> Arc<TeamTaskBoard> {
609        self.team.task_board_arc()
610    }
611
612    /// Run the full Lead → Worker → Reviewer workflow until all tasks are done
613    /// or `max_rounds` is exceeded.
614    ///
615    /// Steps:
616    /// 1. Lead decomposes `goal` into tasks via JSON response.
617    /// 2. Workers concurrently claim and execute tasks.
618    /// 3. Reviewer approves or rejects completed tasks.
619    /// 4. Rejected tasks re-enter the work queue for retry.
620    pub async fn run_until_done(&self, goal: &str) -> crate::error::Result<TeamRunResult> {
621        // --- Step 1: Lead decomposes the goal ---
622        let lead = self
623            .team
624            .members_by_role(TeamRole::Lead)
625            .into_iter()
626            .next()
627            .ok_or_else(|| anyhow::anyhow!("team has no Lead member"))?;
628
629        let lead_executor = self
630            .sessions
631            .get(&lead.id)
632            .ok_or_else(|| anyhow::anyhow!("no executor bound for lead member '{}'", lead.id))?;
633
634        let lead_prompt = LEAD_PROMPT.replace("{goal}", goal);
635        let raw = lead_executor.execute(&lead_prompt).await?;
636        let task_descriptions = parse_task_list(&raw)?;
637
638        let board = self.team.task_board_arc();
639        for desc in &task_descriptions {
640            board.post(desc, &lead.id, None);
641        }
642
643        // --- Step 2 & 3: Spawn workers and reviewer concurrently ---
644        let poll = Duration::from_millis(self.team.config.poll_interval_ms);
645        let max_rounds = self.team.config.max_rounds;
646
647        let workers: Vec<(String, Arc<dyn AgentExecutor>)> = self
648            .team
649            .members_by_role(TeamRole::Worker)
650            .into_iter()
651            .filter_map(|m| {
652                self.sessions
653                    .get(&m.id)
654                    .map(|e| (m.id.clone(), Arc::clone(e)))
655            })
656            .collect();
657
658        let reviewer: Option<(String, Arc<dyn AgentExecutor>)> = self
659            .team
660            .members_by_role(TeamRole::Reviewer)
661            .into_iter()
662            .next()
663            .and_then(|m| {
664                self.sessions
665                    .get(&m.id)
666                    .map(|e| (m.id.clone(), Arc::clone(e)))
667            });
668
669        let mut worker_handles = Vec::new();
670        for (id, executor) in workers {
671            let b = Arc::clone(&board);
672            let handle = tokio::spawn(async move {
673                run_worker(id, executor, b, max_rounds, poll).await;
674            });
675            worker_handles.push(handle);
676        }
677
678        let reviewer_rounds = if let Some((id, executor)) = reviewer {
679            let b = Arc::clone(&board);
680            let handle =
681                tokio::spawn(async move { run_reviewer(id, executor, b, max_rounds, poll).await });
682            for h in worker_handles {
683                let _ = h.await;
684            }
685            handle.await.unwrap_or(0)
686        } else {
687            for h in worker_handles {
688                let _ = h.await;
689            }
690            0
691        };
692
693        let done_tasks = board.by_status(TaskStatus::Done);
694        let rejected_tasks = board.by_status(TaskStatus::Rejected);
695
696        Ok(TeamRunResult {
697            done_tasks,
698            rejected_tasks,
699            rounds: reviewer_rounds,
700        })
701    }
702}
703
704impl std::fmt::Debug for TeamRunner {
705    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
706        f.debug_struct("TeamRunner")
707            .field("team", &self.team.name())
708            .field("bound_sessions", &self.sessions.len())
709            .finish()
710    }
711}
712
713/// Parse `{"tasks": ["...", "..."]}` from a Lead response.
714fn parse_task_list(response: &str) -> crate::error::Result<Vec<String>> {
715    // Find JSON object boundaries in case there is extra whitespace or newlines.
716    let start = response
717        .find('{')
718        .ok_or_else(|| anyhow::anyhow!("lead response contains no JSON object: {}", response))?;
719    let end = response
720        .rfind('}')
721        .ok_or_else(|| anyhow::anyhow!("lead response JSON object is unclosed"))?;
722    let json_str = &response[start..=end];
723
724    let value: serde_json::Value = serde_json::from_str(json_str)
725        .map_err(|e| anyhow::anyhow!("failed to parse lead JSON response: {e}"))?;
726
727    let tasks: Vec<String> = value["tasks"]
728        .as_array()
729        .ok_or_else(|| anyhow::anyhow!("lead JSON response missing 'tasks' array"))?
730        .iter()
731        .filter_map(|v: &serde_json::Value| v.as_str().map(|s| s.to_string()))
732        .collect();
733
734    Ok(tasks)
735}
736
737/// Worker loop: claim and execute tasks until the board is quiescent.
738async fn run_worker(
739    member_id: String,
740    executor: Arc<dyn AgentExecutor>,
741    board: Arc<TeamTaskBoard>,
742    max_rounds: usize,
743    poll: Duration,
744) {
745    let mut idle = 0usize;
746    loop {
747        if let Some(task) = board.claim(&member_id) {
748            idle = 0;
749            let result = executor
750                .execute(&task.description)
751                .await
752                .unwrap_or_else(|e| format!("execution error: {e}"));
753            board.complete(&task.id, &result);
754        } else {
755            let (open, in_progress, in_review, _, rejected) = board.stats();
756            // No claimable work and no tasks that could re-enter the queue → stop.
757            // Include in_review so we wait for the reviewer's verdict (which may
758            // produce a Rejected task for us to retry).
759            if open == 0 && in_progress == 0 && in_review == 0 && rejected == 0 {
760                break;
761            }
762            idle += 1;
763            if idle >= max_rounds {
764                break;
765            }
766            tokio::time::sleep(poll).await;
767        }
768    }
769}
770
771/// Reviewer loop: review InReview tasks and approve or reject them.
772/// Returns the number of rounds completed.
773async fn run_reviewer(
774    _member_id: String,
775    executor: Arc<dyn AgentExecutor>,
776    board: Arc<TeamTaskBoard>,
777    max_rounds: usize,
778    poll: Duration,
779) -> usize {
780    let mut rounds = 0usize;
781    loop {
782        let in_review = board.by_status(TaskStatus::InReview);
783        for task in in_review {
784            let result_text = task.result.as_deref().unwrap_or("");
785            let prompt = REVIEWER_PROMPT
786                .replace("{task}", &task.description)
787                .replace("{result}", result_text);
788            let verdict = executor
789                .execute(&prompt)
790                .await
791                .unwrap_or_else(|_| "REJECTED: execution error".to_string());
792            if verdict.contains("APPROVED") {
793                board.approve(&task.id);
794            } else {
795                board.reject(&task.id);
796            }
797            // Yield after each decision so workers can pick up rejected tasks.
798            tokio::task::yield_now().await;
799        }
800
801        let (open, in_progress, in_review_count, _, rejected) = board.stats();
802        if open == 0 && in_progress == 0 && in_review_count == 0 && rejected == 0 {
803            break;
804        }
805        rounds += 1;
806        if rounds >= max_rounds {
807            break;
808        }
809        tokio::time::sleep(poll).await;
810    }
811    rounds
812}
813
814#[cfg(test)]
815mod tests {
816    use super::*;
817
818    #[test]
819    fn test_team_creation() {
820        let team = AgentTeam::new("test-team", TeamConfig::default());
821        assert_eq!(team.name(), "test-team");
822        assert_eq!(team.member_count(), 0);
823    }
824
825    #[test]
826    fn test_add_remove_members() {
827        let mut team = AgentTeam::new("test", TeamConfig::default());
828        team.add_member("lead", TeamRole::Lead);
829        team.add_member("w1", TeamRole::Worker);
830        team.add_member("w2", TeamRole::Worker);
831        team.add_member("rev", TeamRole::Reviewer);
832        assert_eq!(team.member_count(), 4);
833        assert_eq!(team.members_by_role(TeamRole::Worker).len(), 2);
834
835        assert!(team.remove_member("w2"));
836        assert_eq!(team.member_count(), 3);
837        assert!(!team.remove_member("nonexistent"));
838    }
839
840    #[test]
841    fn test_task_board_post_and_claim() {
842        let board = TeamTaskBoard::new(10);
843        let id = board.post("Fix auth bug", "lead", None).unwrap();
844        assert_eq!(board.len(), 1);
845
846        let task = board.claim("worker-1").unwrap();
847        assert_eq!(task.id, id);
848        assert_eq!(task.assigned_to.as_deref(), Some("worker-1"));
849        assert_eq!(task.status, TaskStatus::InProgress);
850
851        // No more open tasks
852        assert!(board.claim("worker-2").is_none());
853    }
854
855    #[test]
856    fn test_task_board_workflow() {
857        let board = TeamTaskBoard::new(10);
858        let id = board.post("Write tests", "lead", None).unwrap();
859
860        // Claim
861        board.claim("worker-1");
862
863        // Complete
864        assert!(board.complete(&id, "Added 5 tests"));
865        let task = board.get(&id).unwrap();
866        assert_eq!(task.status, TaskStatus::InReview);
867
868        // Approve
869        assert!(board.approve(&id));
870        let task = board.get(&id).unwrap();
871        assert_eq!(task.status, TaskStatus::Done);
872    }
873
874    #[test]
875    fn test_task_board_reject() {
876        let board = TeamTaskBoard::new(10);
877        let id = board.post("Refactor module", "lead", None).unwrap();
878        board.claim("worker-1");
879        board.complete(&id, "Done");
880
881        assert!(board.reject(&id));
882        let task = board.get(&id).unwrap();
883        assert_eq!(task.status, TaskStatus::Rejected);
884        assert!(task.assigned_to.is_none());
885    }
886
887    #[test]
888    fn test_task_board_max_capacity() {
889        let board = TeamTaskBoard::new(2);
890        assert!(board.post("Task 1", "lead", None).is_some());
891        assert!(board.post("Task 2", "lead", None).is_some());
892        assert!(board.post("Task 3", "lead", None).is_none()); // Full
893    }
894
895    #[test]
896    fn test_task_board_stats() {
897        let board = TeamTaskBoard::new(10);
898        board.post("T1", "lead", None);
899        board.post("T2", "lead", None);
900        let id3 = board.post("T3", "lead", Some("w1")).unwrap();
901        board.complete(&id3, "done");
902
903        let (open, progress, review, done, rejected) = board.stats();
904        assert_eq!(open, 2);
905        assert_eq!(progress, 0);
906        assert_eq!(review, 1);
907        assert_eq!(done, 0);
908        assert_eq!(rejected, 0);
909    }
910
911    #[test]
912    fn test_task_board_by_assignee() {
913        let board = TeamTaskBoard::new(10);
914        board.post("T1", "lead", Some("w1"));
915        board.post("T2", "lead", Some("w2"));
916        board.post("T3", "lead", Some("w1"));
917
918        let w1_tasks = board.by_assignee("w1");
919        assert_eq!(w1_tasks.len(), 2);
920    }
921
922    #[tokio::test]
923    async fn test_send_message() {
924        let mut team = AgentTeam::new("msg-test", TeamConfig::default());
925        team.add_member("lead", TeamRole::Lead);
926        team.add_member("worker", TeamRole::Worker);
927
928        let mut rx = team.take_receiver("worker").unwrap();
929
930        assert!(
931            team.send_message("lead", "worker", "Please fix the bug", Some("task-1"))
932                .await
933        );
934
935        let msg = rx.recv().await.unwrap();
936        assert_eq!(msg.from, "lead");
937        assert_eq!(msg.to, "worker");
938        assert_eq!(msg.content, "Please fix the bug");
939        assert_eq!(msg.task_id.as_deref(), Some("task-1"));
940    }
941
942    #[tokio::test]
943    async fn test_broadcast() {
944        let mut team = AgentTeam::new("broadcast-test", TeamConfig::default());
945        team.add_member("lead", TeamRole::Lead);
946        team.add_member("w1", TeamRole::Worker);
947        team.add_member("w2", TeamRole::Worker);
948
949        let mut rx1 = team.take_receiver("w1").unwrap();
950        let mut rx2 = team.take_receiver("w2").unwrap();
951
952        team.broadcast("lead", "New task available", None).await;
953
954        let m1 = rx1.recv().await.unwrap();
955        let m2 = rx2.recv().await.unwrap();
956        assert_eq!(m1.content, "New task available");
957        assert_eq!(m2.content, "New task available");
958    }
959
960    #[test]
961    fn test_role_display() {
962        assert_eq!(TeamRole::Lead.to_string(), "lead");
963        assert_eq!(TeamRole::Worker.to_string(), "worker");
964        assert_eq!(TeamRole::Reviewer.to_string(), "reviewer");
965    }
966
967    #[test]
968    fn test_task_status_display() {
969        assert_eq!(TaskStatus::Open.to_string(), "open");
970        assert_eq!(TaskStatus::InProgress.to_string(), "in_progress");
971        assert_eq!(TaskStatus::InReview.to_string(), "in_review");
972        assert_eq!(TaskStatus::Done.to_string(), "done");
973        assert_eq!(TaskStatus::Rejected.to_string(), "rejected");
974    }
975
976    // -----------------------------------------------------------------------
977    // TeamRunner tests (mock executor)
978    // -----------------------------------------------------------------------
979
980    /// Minimal mock executor for unit tests.
981    struct MockExecutor {
982        response: String,
983    }
984
985    impl MockExecutor {
986        fn new(response: impl Into<String>) -> Arc<Self> {
987            Arc::new(Self {
988                response: response.into(),
989            })
990        }
991    }
992
993    #[async_trait::async_trait]
994    impl AgentExecutor for MockExecutor {
995        async fn execute(&self, _prompt: &str) -> crate::error::Result<String> {
996            Ok(self.response.clone())
997        }
998    }
999
1000    #[test]
1001    fn test_team_runner_session_binding() {
1002        let mut team = AgentTeam::new("bind-test", TeamConfig::default());
1003        team.add_member("lead", TeamRole::Lead);
1004        team.add_member("w1", TeamRole::Worker);
1005
1006        let mut runner = TeamRunner::new(team);
1007
1008        // Binding to a known member succeeds.
1009        assert!(runner.bind_session("lead", MockExecutor::new("ok")).is_ok());
1010        assert!(runner.bind_session("w1", MockExecutor::new("ok")).is_ok());
1011
1012        // Binding to an unknown member fails.
1013        assert!(runner
1014            .bind_session("nobody", MockExecutor::new("ok"))
1015            .is_err());
1016    }
1017
1018    #[test]
1019    fn test_parse_task_list() {
1020        let json = r#"{"tasks": ["Write tests", "Fix lints", "Update docs"]}"#;
1021        let tasks = parse_task_list(json).unwrap();
1022        assert_eq!(tasks.len(), 3);
1023        assert_eq!(tasks[0], "Write tests");
1024        assert_eq!(tasks[2], "Update docs");
1025    }
1026
1027    #[test]
1028    fn test_parse_task_list_no_json() {
1029        assert!(parse_task_list("no json here").is_err());
1030    }
1031
1032    #[test]
1033    fn test_claim_rejected_tasks() {
1034        let board = TeamTaskBoard::new(10);
1035        let id = board.post("Refactor module", "lead", None).unwrap();
1036
1037        // Simulate workflow: claim → complete → reject
1038        board.claim("worker-1");
1039        board.complete(&id, "initial attempt");
1040        board.reject(&id);
1041
1042        assert_eq!(board.get(&id).unwrap().status, TaskStatus::Rejected);
1043
1044        // A new worker can re-claim the rejected task.
1045        let task = board.claim("worker-2");
1046        assert!(task.is_some());
1047        let task = task.unwrap();
1048        assert_eq!(task.id, id);
1049        assert_eq!(task.assigned_to.as_deref(), Some("worker-2"));
1050        assert_eq!(task.status, TaskStatus::InProgress);
1051    }
1052
1053    #[tokio::test]
1054    async fn test_team_runner_goal_decomposition() {
1055        let config = TeamConfig {
1056            poll_interval_ms: 1,
1057            max_rounds: 3,
1058            ..TeamConfig::default()
1059        };
1060        let mut team = AgentTeam::new("decomp-test", config);
1061        team.add_member("lead", TeamRole::Lead);
1062        team.add_member("w1", TeamRole::Worker);
1063        team.add_member("rev", TeamRole::Reviewer);
1064
1065        let mut runner = TeamRunner::new(team);
1066        // Lead returns two tasks as JSON.
1067        runner
1068            .bind_session(
1069                "lead",
1070                MockExecutor::new(r#"{"tasks": ["Task A", "Task B"]}"#),
1071            )
1072            .unwrap();
1073        // Worker always succeeds.
1074        runner
1075            .bind_session("w1", MockExecutor::new("done"))
1076            .unwrap();
1077        // Reviewer always approves.
1078        runner
1079            .bind_session("rev", MockExecutor::new("APPROVED: looks good"))
1080            .unwrap();
1081
1082        let result = runner.run_until_done("Build the feature").await.unwrap();
1083
1084        assert_eq!(result.done_tasks.len(), 2);
1085        assert!(result.rejected_tasks.is_empty());
1086    }
1087
1088    #[tokio::test]
1089    async fn test_team_runner_worker_execution() {
1090        let config = TeamConfig {
1091            poll_interval_ms: 1,
1092            max_rounds: 3,
1093            ..TeamConfig::default()
1094        };
1095        let mut team = AgentTeam::new("worker-exec-test", config);
1096        team.add_member("lead", TeamRole::Lead);
1097        team.add_member("w1", TeamRole::Worker);
1098
1099        let mut runner = TeamRunner::new(team);
1100        runner
1101            .bind_session(
1102                "lead",
1103                MockExecutor::new(r#"{"tasks": ["Write unit tests"]}"#),
1104            )
1105            .unwrap();
1106        runner
1107            .bind_session("w1", MockExecutor::new("Added 3 tests"))
1108            .unwrap();
1109
1110        // No reviewer bound → tasks end up InReview (not Done), which is fine for this test.
1111        let board = runner.task_board();
1112        let _ = runner.run_until_done("Test the module").await;
1113
1114        // The task must have been claimed and completed (InReview or Done).
1115        let tasks = board.by_status(TaskStatus::InReview);
1116        assert_eq!(tasks.len(), 1);
1117        assert_eq!(tasks[0].result.as_deref(), Some("Added 3 tests"));
1118    }
1119
1120    #[tokio::test]
1121    async fn test_team_runner_reviewer_approval() {
1122        let config = TeamConfig {
1123            poll_interval_ms: 1,
1124            max_rounds: 5,
1125            ..TeamConfig::default()
1126        };
1127        let mut team = AgentTeam::new("reviewer-test", config);
1128        team.add_member("lead", TeamRole::Lead);
1129        team.add_member("w1", TeamRole::Worker);
1130        team.add_member("rev", TeamRole::Reviewer);
1131
1132        let mut runner = TeamRunner::new(team);
1133        runner
1134            .bind_session(
1135                "lead",
1136                MockExecutor::new(r#"{"tasks": ["Implement feature X"]}"#),
1137            )
1138            .unwrap();
1139        runner
1140            .bind_session("w1", MockExecutor::new("Feature X implemented"))
1141            .unwrap();
1142        runner
1143            .bind_session("rev", MockExecutor::new("APPROVED: complete"))
1144            .unwrap();
1145
1146        let result = runner.run_until_done("Ship feature X").await.unwrap();
1147
1148        assert_eq!(result.done_tasks.len(), 1);
1149        assert_eq!(
1150            result.done_tasks[0].result.as_deref(),
1151            Some("Feature X implemented")
1152        );
1153    }
1154
1155    #[tokio::test]
1156    async fn test_team_runner_rejection_and_retry() {
1157        use std::sync::atomic::{AtomicUsize, Ordering};
1158
1159        // Reviewer approves on the second attempt.
1160        struct ConditionalReviewer {
1161            calls: AtomicUsize,
1162        }
1163
1164        #[async_trait::async_trait]
1165        impl AgentExecutor for ConditionalReviewer {
1166            async fn execute(&self, _prompt: &str) -> crate::error::Result<String> {
1167                let n = self.calls.fetch_add(1, Ordering::SeqCst);
1168                if n == 0 {
1169                    Ok("REJECTED: needs improvement".to_string())
1170                } else {
1171                    Ok("APPROVED: now correct".to_string())
1172                }
1173            }
1174        }
1175
1176        let config = TeamConfig {
1177            poll_interval_ms: 1,
1178            max_rounds: 10,
1179            ..TeamConfig::default()
1180        };
1181        let mut team = AgentTeam::new("retry-test", config);
1182        team.add_member("lead", TeamRole::Lead);
1183        team.add_member("w1", TeamRole::Worker);
1184        team.add_member("rev", TeamRole::Reviewer);
1185
1186        let mut runner = TeamRunner::new(team);
1187        runner
1188            .bind_session("lead", MockExecutor::new(r#"{"tasks": ["Do the thing"]}"#))
1189            .unwrap();
1190        runner
1191            .bind_session("w1", MockExecutor::new("attempt result"))
1192            .unwrap();
1193        runner
1194            .bind_session(
1195                "rev",
1196                Arc::new(ConditionalReviewer {
1197                    calls: AtomicUsize::new(0),
1198                }),
1199            )
1200            .unwrap();
1201
1202        let result = runner.run_until_done("Complete the thing").await.unwrap();
1203
1204        // After retry, the task is eventually approved.
1205        assert_eq!(result.done_tasks.len(), 1);
1206        assert!(result.rejected_tasks.is_empty());
1207    }
1208}