Skip to main content

a3s_code_core/
agent_teams.rs

1//! Agent Teams — Peer-to-peer multi-agent coordination
2//!
3//! Enables multiple `AgentSession` instances to collaborate on complex tasks
4//! through a shared task board and message passing. Each agent has a role
5//! (Lead, Worker, Reviewer) and can post/claim tasks on the board.
6//!
7//! ## Architecture
8//!
9//! ```text
10//! AgentTeam
11//!   +-- TeamTaskBoard (shared task queue)
12//!   +-- TeamMember[] (role + session reference)
13//!   +-- mpsc channels (peer-to-peer messaging)
14//! ```
15//!
16//! ## Usage
17//!
18//! ```rust,no_run
19//! use a3s_code_core::agent_teams::{AgentTeam, TeamConfig, TeamRole};
20//!
21//! # async fn run() -> anyhow::Result<()> {
22//! let config = TeamConfig::default();
23//! let mut team = AgentTeam::new("refactor-auth", config);
24//!
25//! // Add members (each wraps an AgentSession)
26//! team.add_member("lead", TeamRole::Lead);
27//! team.add_member("worker-1", TeamRole::Worker);
28//! team.add_member("reviewer", TeamRole::Reviewer);
29//!
30//! // Post a task to the board
31//! team.task_board().post("Refactor auth module", "lead", None);
32//!
33//! // Worker claims and works on it
34//! let task = team.task_board().claim("worker-1");
35//! # Ok(())
36//! # }
37//! ```
38
39use std::collections::HashMap;
40use std::sync::{Arc, RwLock};
41use std::time::Duration;
42use tokio::sync::mpsc;
43
44/// Team configuration.
45#[derive(Debug, Clone)]
46pub struct TeamConfig {
47    /// Maximum concurrent tasks on the board.
48    /// Default: 50
49    pub max_tasks: usize,
50    /// Message channel buffer size.
51    /// Default: 128
52    pub channel_buffer: usize,
53    /// Maximum coordination rounds before `run_until_done` exits.
54    /// Default: 10
55    pub max_rounds: usize,
56    /// Worker/Reviewer polling interval in milliseconds.
57    /// Default: 200
58    pub poll_interval_ms: u64,
59}
60
61impl Default for TeamConfig {
62    fn default() -> Self {
63        Self {
64            max_tasks: 50,
65            channel_buffer: 128,
66            max_rounds: 10,
67            poll_interval_ms: 200,
68        }
69    }
70}
71
72/// Role of a team member.
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
74pub enum TeamRole {
75    /// Decomposes goals into tasks, assigns work.
76    Lead,
77    /// Executes assigned tasks.
78    Worker,
79    /// Reviews completed work, provides feedback.
80    Reviewer,
81}
82
83impl std::fmt::Display for TeamRole {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        match self {
86            TeamRole::Lead => write!(f, "lead"),
87            TeamRole::Worker => write!(f, "worker"),
88            TeamRole::Reviewer => write!(f, "reviewer"),
89        }
90    }
91}
92
93/// A message passed between team members.
94#[derive(Debug, Clone)]
95pub struct TeamMessage {
96    /// Sender member ID.
97    pub from: String,
98    /// Recipient member ID.
99    pub to: String,
100    /// Message content.
101    pub content: String,
102    /// Optional task ID this message relates to.
103    pub task_id: Option<String>,
104    /// Timestamp (Unix epoch seconds).
105    pub timestamp: i64,
106}
107
108/// Task status on the board.
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum TaskStatus {
111    /// Waiting to be claimed.
112    Open,
113    /// Claimed by a worker.
114    InProgress,
115    /// Work done, awaiting review.
116    InReview,
117    /// Approved by reviewer.
118    Done,
119    /// Rejected, needs rework.
120    Rejected,
121}
122
123impl std::fmt::Display for TaskStatus {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        match self {
126            TaskStatus::Open => write!(f, "open"),
127            TaskStatus::InProgress => write!(f, "in_progress"),
128            TaskStatus::InReview => write!(f, "in_review"),
129            TaskStatus::Done => write!(f, "done"),
130            TaskStatus::Rejected => write!(f, "rejected"),
131        }
132    }
133}
134
135/// A task on the team board.
136#[derive(Debug, Clone)]
137pub struct TeamTask {
138    /// Unique task ID.
139    pub id: String,
140    /// Task description.
141    pub description: String,
142    /// Who posted it.
143    pub posted_by: String,
144    /// Who is working on it (if claimed).
145    pub assigned_to: Option<String>,
146    /// Current status.
147    pub status: TaskStatus,
148    /// Optional result/output when completed.
149    pub result: Option<String>,
150    /// Created timestamp.
151    pub created_at: i64,
152    /// Last updated timestamp.
153    pub updated_at: i64,
154}
155
156/// Shared task board for team coordination.
157#[derive(Debug)]
158pub struct TeamTaskBoard {
159    tasks: RwLock<Vec<TeamTask>>,
160    max_tasks: usize,
161    next_id: RwLock<u64>,
162}
163
164impl TeamTaskBoard {
165    /// Create a new task board.
166    pub fn new(max_tasks: usize) -> Self {
167        Self {
168            tasks: RwLock::new(Vec::new()),
169            max_tasks,
170            next_id: RwLock::new(1),
171        }
172    }
173
174    /// Post a new task to the board. Returns the task ID.
175    pub fn post(
176        &self,
177        description: &str,
178        posted_by: &str,
179        assign_to: Option<&str>,
180    ) -> Option<String> {
181        let mut tasks = self.tasks.write().unwrap();
182        if tasks.len() >= self.max_tasks {
183            return None;
184        }
185
186        let mut id_counter = self.next_id.write().unwrap();
187        let id = format!("task-{}", *id_counter);
188        *id_counter += 1;
189
190        let now = chrono::Utc::now().timestamp();
191        let status = if assign_to.is_some() {
192            TaskStatus::InProgress
193        } else {
194            TaskStatus::Open
195        };
196
197        tasks.push(TeamTask {
198            id: id.clone(),
199            description: description.to_string(),
200            posted_by: posted_by.to_string(),
201            assigned_to: assign_to.map(|s| s.to_string()),
202            status,
203            result: None,
204            created_at: now,
205            updated_at: now,
206        });
207
208        Some(id)
209    }
210
211    /// Claim the next open or rejected task for a member. Returns the task if available.
212    ///
213    /// Rejected tasks are treated as retriable: a worker can claim them again
214    /// for another execution attempt.
215    pub fn claim(&self, member_id: &str) -> Option<TeamTask> {
216        let mut tasks = self.tasks.write().unwrap();
217        let task = tasks
218            .iter_mut()
219            .find(|t| t.status == TaskStatus::Open || t.status == TaskStatus::Rejected)?;
220        task.assigned_to = Some(member_id.to_string());
221        task.status = TaskStatus::InProgress;
222        task.updated_at = chrono::Utc::now().timestamp();
223        Some(task.clone())
224    }
225
226    /// Mark a task as complete with a result.
227    pub fn complete(&self, task_id: &str, result: &str) -> bool {
228        let mut tasks = self.tasks.write().unwrap();
229        if let Some(task) = tasks.iter_mut().find(|t| t.id == task_id) {
230            task.status = TaskStatus::InReview;
231            task.result = Some(result.to_string());
232            task.updated_at = chrono::Utc::now().timestamp();
233            true
234        } else {
235            false
236        }
237    }
238
239    /// Approve a task (reviewer action).
240    pub fn approve(&self, task_id: &str) -> bool {
241        let mut tasks = self.tasks.write().unwrap();
242        if let Some(task) = tasks
243            .iter_mut()
244            .find(|t| t.id == task_id && t.status == TaskStatus::InReview)
245        {
246            task.status = TaskStatus::Done;
247            task.updated_at = chrono::Utc::now().timestamp();
248            true
249        } else {
250            false
251        }
252    }
253
254    /// Reject a task back to open (reviewer action).
255    pub fn reject(&self, task_id: &str) -> bool {
256        let mut tasks = self.tasks.write().unwrap();
257        if let Some(task) = tasks
258            .iter_mut()
259            .find(|t| t.id == task_id && t.status == TaskStatus::InReview)
260        {
261            task.status = TaskStatus::Rejected;
262            task.assigned_to = None;
263            task.updated_at = chrono::Utc::now().timestamp();
264            true
265        } else {
266            false
267        }
268    }
269
270    /// Get all tasks with a given status.
271    pub fn by_status(&self, status: TaskStatus) -> Vec<TeamTask> {
272        self.tasks
273            .read()
274            .unwrap()
275            .iter()
276            .filter(|t| t.status == status)
277            .cloned()
278            .collect()
279    }
280
281    /// Get all tasks assigned to a member.
282    pub fn by_assignee(&self, member_id: &str) -> Vec<TeamTask> {
283        self.tasks
284            .read()
285            .unwrap()
286            .iter()
287            .filter(|t| t.assigned_to.as_deref() == Some(member_id))
288            .cloned()
289            .collect()
290    }
291
292    /// Get a task by ID.
293    pub fn get(&self, task_id: &str) -> Option<TeamTask> {
294        self.tasks
295            .read()
296            .unwrap()
297            .iter()
298            .find(|t| t.id == task_id)
299            .cloned()
300    }
301
302    /// Number of tasks on the board.
303    pub fn len(&self) -> usize {
304        self.tasks.read().unwrap().len()
305    }
306
307    /// Whether the board is empty.
308    pub fn is_empty(&self) -> bool {
309        self.tasks.read().unwrap().is_empty()
310    }
311
312    /// Summary stats: (open, in_progress, in_review, done, rejected).
313    pub fn stats(&self) -> (usize, usize, usize, usize, usize) {
314        let tasks = self.tasks.read().unwrap();
315        let open = tasks
316            .iter()
317            .filter(|t| t.status == TaskStatus::Open)
318            .count();
319        let progress = tasks
320            .iter()
321            .filter(|t| t.status == TaskStatus::InProgress)
322            .count();
323        let review = tasks
324            .iter()
325            .filter(|t| t.status == TaskStatus::InReview)
326            .count();
327        let done = tasks
328            .iter()
329            .filter(|t| t.status == TaskStatus::Done)
330            .count();
331        let rejected = tasks
332            .iter()
333            .filter(|t| t.status == TaskStatus::Rejected)
334            .count();
335        (open, progress, review, done, rejected)
336    }
337}
338
339/// A team member.
340#[derive(Debug, Clone)]
341pub struct TeamMember {
342    /// Unique member ID.
343    pub id: String,
344    /// Member role.
345    pub role: TeamRole,
346}
347
348/// Multi-agent team coordinator.
349pub struct AgentTeam {
350    /// Team name.
351    name: String,
352    /// Configuration.
353    config: TeamConfig,
354    /// Registered members.
355    members: HashMap<String, TeamMember>,
356    /// Shared task board.
357    task_board: Arc<TeamTaskBoard>,
358    /// Message senders per member.
359    senders: HashMap<String, mpsc::Sender<TeamMessage>>,
360    /// Message receivers per member (taken on first access).
361    receivers: HashMap<String, mpsc::Receiver<TeamMessage>>,
362}
363
364impl AgentTeam {
365    /// Create a new team.
366    pub fn new(name: &str, config: TeamConfig) -> Self {
367        Self {
368            name: name.to_string(),
369            config,
370            members: HashMap::new(),
371            task_board: Arc::new(TeamTaskBoard::new(50)),
372            senders: HashMap::new(),
373            receivers: HashMap::new(),
374        }
375    }
376
377    /// Team name.
378    pub fn name(&self) -> &str {
379        &self.name
380    }
381
382    /// Add a member to the team.
383    pub fn add_member(&mut self, id: &str, role: TeamRole) {
384        let (tx, rx) = mpsc::channel(self.config.channel_buffer);
385        self.members.insert(
386            id.to_string(),
387            TeamMember {
388                id: id.to_string(),
389                role,
390            },
391        );
392        self.senders.insert(id.to_string(), tx);
393        self.receivers.insert(id.to_string(), rx);
394    }
395
396    /// Remove a member from the team.
397    pub fn remove_member(&mut self, id: &str) -> bool {
398        self.senders.remove(id);
399        self.receivers.remove(id);
400        self.members.remove(id).is_some()
401    }
402
403    /// Get a reference to the shared task board.
404    pub fn task_board(&self) -> &TeamTaskBoard {
405        &self.task_board
406    }
407
408    /// Get a cloneable Arc to the task board.
409    pub fn task_board_arc(&self) -> Arc<TeamTaskBoard> {
410        Arc::clone(&self.task_board)
411    }
412
413    /// Send a message to a team member.
414    pub async fn send_message(
415        &self,
416        from: &str,
417        to: &str,
418        content: &str,
419        task_id: Option<&str>,
420    ) -> bool {
421        let sender = match self.senders.get(to) {
422            Some(s) => s,
423            None => return false,
424        };
425
426        let msg = TeamMessage {
427            from: from.to_string(),
428            to: to.to_string(),
429            content: content.to_string(),
430            task_id: task_id.map(|s| s.to_string()),
431            timestamp: chrono::Utc::now().timestamp(),
432        };
433
434        sender.send(msg).await.is_ok()
435    }
436
437    /// Take the message receiver for a member (can only be called once per member).
438    pub fn take_receiver(&mut self, member_id: &str) -> Option<mpsc::Receiver<TeamMessage>> {
439        self.receivers.remove(member_id)
440    }
441
442    /// Broadcast a message to all members except the sender.
443    pub async fn broadcast(&self, from: &str, content: &str, task_id: Option<&str>) {
444        for (id, sender) in &self.senders {
445            if id == from {
446                continue;
447            }
448            let msg = TeamMessage {
449                from: from.to_string(),
450                to: id.clone(),
451                content: content.to_string(),
452                task_id: task_id.map(|s| s.to_string()),
453                timestamp: chrono::Utc::now().timestamp(),
454            };
455            let _ = sender.send(msg).await;
456        }
457    }
458
459    /// Get all members.
460    pub fn members(&self) -> Vec<&TeamMember> {
461        self.members.values().collect()
462    }
463
464    /// Get members by role.
465    pub fn members_by_role(&self, role: TeamRole) -> Vec<&TeamMember> {
466        self.members.values().filter(|m| m.role == role).collect()
467    }
468
469    /// Number of members.
470    pub fn member_count(&self) -> usize {
471        self.members.len()
472    }
473}
474
475impl std::fmt::Debug for AgentTeam {
476    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
477        f.debug_struct("AgentTeam")
478            .field("name", &self.name)
479            .field("members", &self.members.len())
480            .field("tasks", &self.task_board.len())
481            .finish()
482    }
483}
484
485// ---------------------------------------------------------------------------
486// AgentExecutor — abstraction over AgentSession for testability
487// ---------------------------------------------------------------------------
488
489/// Minimal execution interface for a team member.
490///
491/// `AgentSession` implements this trait. In tests, a mock can be used instead.
492#[async_trait::async_trait]
493pub trait AgentExecutor: Send + Sync {
494    /// Execute a prompt and return the text response.
495    async fn execute(&self, prompt: &str) -> crate::error::Result<String>;
496}
497
498#[async_trait::async_trait]
499impl AgentExecutor for crate::agent_api::AgentSession {
500    async fn execute(&self, prompt: &str) -> crate::error::Result<String> {
501        let result = self.send(prompt, None).await?;
502        Ok(result.text)
503    }
504}
505
506// ---------------------------------------------------------------------------
507// TeamRunResult
508// ---------------------------------------------------------------------------
509
510/// Result returned by `TeamRunner::run_until_done`.
511#[derive(Debug)]
512pub struct TeamRunResult {
513    /// Tasks that reached the `Done` state.
514    pub done_tasks: Vec<TeamTask>,
515    /// Tasks that are still `Rejected` after all rounds (could not be approved).
516    pub rejected_tasks: Vec<TeamTask>,
517    /// Number of reviewer polling rounds completed.
518    pub rounds: usize,
519}
520
521// ---------------------------------------------------------------------------
522// TeamRunner — binds AgentSession execution to AgentTeam coordination
523// ---------------------------------------------------------------------------
524
525const LEAD_PROMPT: &str = "You are the lead agent in a team. Your goal is: {goal}
526
527Decompose this goal into concrete, self-contained tasks for your team workers.
528Each task should be independently executable by an AI coding agent.
529
530Respond with ONLY valid JSON in this exact format:
531{{\"tasks\": [\"task description 1\", \"task description 2\", ...]}}
532
533No markdown, no explanation, just the JSON.";
534
535const REVIEWER_PROMPT: &str = "Review the following completed task:
536
537Task: {task}
538Result: {result}
539
540If the result satisfactorily completes the task, respond with \"APPROVED: <brief reason>\".
541If the result is incomplete or incorrect, respond with \"REJECTED: <specific feedback for improvement>\".";
542
543/// Binds an `AgentTeam` to concrete `AgentExecutor` sessions, enabling
544/// Lead → Worker → Reviewer automated workflows.
545pub struct TeamRunner {
546    team: AgentTeam,
547    sessions: HashMap<String, Arc<dyn AgentExecutor>>,
548}
549
550impl TeamRunner {
551    /// Create a new runner wrapping the given team.
552    pub fn new(team: AgentTeam) -> Self {
553        Self {
554            team,
555            sessions: HashMap::new(),
556        }
557    }
558
559    /// Bind an executor to a team member.
560    ///
561    /// Returns an error if `member_id` is not registered in the team.
562    pub fn bind_session(
563        &mut self,
564        member_id: &str,
565        executor: Arc<dyn AgentExecutor>,
566    ) -> crate::error::Result<()> {
567        if !self.team.members.contains_key(member_id) {
568            return Err(anyhow::anyhow!(
569                "member '{}' not found in team '{}'",
570                member_id,
571                self.team.name()
572            )
573            .into());
574        }
575        self.sessions.insert(member_id.to_string(), executor);
576        Ok(())
577    }
578
579    /// Access the shared task board.
580    pub fn task_board(&self) -> Arc<TeamTaskBoard> {
581        self.team.task_board_arc()
582    }
583
584    /// Run the full Lead → Worker → Reviewer workflow until all tasks are done
585    /// or `max_rounds` is exceeded.
586    ///
587    /// Steps:
588    /// 1. Lead decomposes `goal` into tasks via JSON response.
589    /// 2. Workers concurrently claim and execute tasks.
590    /// 3. Reviewer approves or rejects completed tasks.
591    /// 4. Rejected tasks re-enter the work queue for retry.
592    pub async fn run_until_done(&self, goal: &str) -> crate::error::Result<TeamRunResult> {
593        // --- Step 1: Lead decomposes the goal ---
594        let lead = self
595            .team
596            .members_by_role(TeamRole::Lead)
597            .into_iter()
598            .next()
599            .ok_or_else(|| anyhow::anyhow!("team has no Lead member"))?;
600
601        let lead_executor = self
602            .sessions
603            .get(&lead.id)
604            .ok_or_else(|| anyhow::anyhow!("no executor bound for lead member '{}'", lead.id))?;
605
606        let lead_prompt = LEAD_PROMPT.replace("{goal}", goal);
607        let raw = lead_executor.execute(&lead_prompt).await?;
608        let task_descriptions = parse_task_list(&raw)?;
609
610        let board = self.team.task_board_arc();
611        for desc in &task_descriptions {
612            board.post(desc, &lead.id, None);
613        }
614
615        // --- Step 2 & 3: Spawn workers and reviewer concurrently ---
616        let poll = Duration::from_millis(self.team.config.poll_interval_ms);
617        let max_rounds = self.team.config.max_rounds;
618
619        let workers: Vec<(String, Arc<dyn AgentExecutor>)> = self
620            .team
621            .members_by_role(TeamRole::Worker)
622            .into_iter()
623            .filter_map(|m| {
624                self.sessions
625                    .get(&m.id)
626                    .map(|e| (m.id.clone(), Arc::clone(e)))
627            })
628            .collect();
629
630        let reviewer: Option<(String, Arc<dyn AgentExecutor>)> = self
631            .team
632            .members_by_role(TeamRole::Reviewer)
633            .into_iter()
634            .next()
635            .and_then(|m| {
636                self.sessions
637                    .get(&m.id)
638                    .map(|e| (m.id.clone(), Arc::clone(e)))
639            });
640
641        let mut worker_handles = Vec::new();
642        for (id, executor) in workers {
643            let b = Arc::clone(&board);
644            let handle = tokio::spawn(async move {
645                run_worker(id, executor, b, max_rounds, poll).await;
646            });
647            worker_handles.push(handle);
648        }
649
650        let reviewer_rounds = if let Some((id, executor)) = reviewer {
651            let b = Arc::clone(&board);
652            let handle =
653                tokio::spawn(async move { run_reviewer(id, executor, b, max_rounds, poll).await });
654            for h in worker_handles {
655                let _ = h.await;
656            }
657            handle.await.unwrap_or(0)
658        } else {
659            for h in worker_handles {
660                let _ = h.await;
661            }
662            0
663        };
664
665        let done_tasks = board.by_status(TaskStatus::Done);
666        let rejected_tasks = board.by_status(TaskStatus::Rejected);
667
668        Ok(TeamRunResult {
669            done_tasks,
670            rejected_tasks,
671            rounds: reviewer_rounds,
672        })
673    }
674}
675
676impl std::fmt::Debug for TeamRunner {
677    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
678        f.debug_struct("TeamRunner")
679            .field("team", &self.team.name())
680            .field("bound_sessions", &self.sessions.len())
681            .finish()
682    }
683}
684
685/// Parse `{"tasks": ["...", "..."]}` from a Lead response.
686fn parse_task_list(response: &str) -> crate::error::Result<Vec<String>> {
687    // Find JSON object boundaries in case there is extra whitespace or newlines.
688    let start = response
689        .find('{')
690        .ok_or_else(|| anyhow::anyhow!("lead response contains no JSON object: {}", response))?;
691    let end = response
692        .rfind('}')
693        .ok_or_else(|| anyhow::anyhow!("lead response JSON object is unclosed"))?;
694    let json_str = &response[start..=end];
695
696    let value: serde_json::Value = serde_json::from_str(json_str)
697        .map_err(|e| anyhow::anyhow!("failed to parse lead JSON response: {e}"))?;
698
699    let tasks: Vec<String> = value["tasks"]
700        .as_array()
701        .ok_or_else(|| anyhow::anyhow!("lead JSON response missing 'tasks' array"))?
702        .iter()
703        .filter_map(|v: &serde_json::Value| v.as_str().map(|s| s.to_string()))
704        .collect();
705
706    Ok(tasks)
707}
708
709/// Worker loop: claim and execute tasks until the board is quiescent.
710async fn run_worker(
711    member_id: String,
712    executor: Arc<dyn AgentExecutor>,
713    board: Arc<TeamTaskBoard>,
714    max_rounds: usize,
715    poll: Duration,
716) {
717    let mut idle = 0usize;
718    loop {
719        if let Some(task) = board.claim(&member_id) {
720            idle = 0;
721            let result = executor
722                .execute(&task.description)
723                .await
724                .unwrap_or_else(|e| format!("execution error: {e}"));
725            board.complete(&task.id, &result);
726        } else {
727            let (open, in_progress, in_review, _, rejected) = board.stats();
728            // No claimable work and no tasks that could re-enter the queue → stop.
729            // Include in_review so we wait for the reviewer's verdict (which may
730            // produce a Rejected task for us to retry).
731            if open == 0 && in_progress == 0 && in_review == 0 && rejected == 0 {
732                break;
733            }
734            idle += 1;
735            if idle >= max_rounds {
736                break;
737            }
738            tokio::time::sleep(poll).await;
739        }
740    }
741}
742
743/// Reviewer loop: review InReview tasks and approve or reject them.
744/// Returns the number of rounds completed.
745async fn run_reviewer(
746    _member_id: String,
747    executor: Arc<dyn AgentExecutor>,
748    board: Arc<TeamTaskBoard>,
749    max_rounds: usize,
750    poll: Duration,
751) -> usize {
752    let mut rounds = 0usize;
753    loop {
754        let in_review = board.by_status(TaskStatus::InReview);
755        for task in in_review {
756            let result_text = task.result.as_deref().unwrap_or("");
757            let prompt = REVIEWER_PROMPT
758                .replace("{task}", &task.description)
759                .replace("{result}", result_text);
760            let verdict = executor
761                .execute(&prompt)
762                .await
763                .unwrap_or_else(|_| "REJECTED: execution error".to_string());
764            if verdict.contains("APPROVED") {
765                board.approve(&task.id);
766            } else {
767                board.reject(&task.id);
768            }
769            // Yield after each decision so workers can pick up rejected tasks.
770            tokio::task::yield_now().await;
771        }
772
773        let (open, in_progress, in_review_count, _, rejected) = board.stats();
774        if open == 0 && in_progress == 0 && in_review_count == 0 && rejected == 0 {
775            break;
776        }
777        rounds += 1;
778        if rounds >= max_rounds {
779            break;
780        }
781        tokio::time::sleep(poll).await;
782    }
783    rounds
784}
785
786#[cfg(test)]
787mod tests {
788    use super::*;
789
790    #[test]
791    fn test_team_creation() {
792        let team = AgentTeam::new("test-team", TeamConfig::default());
793        assert_eq!(team.name(), "test-team");
794        assert_eq!(team.member_count(), 0);
795    }
796
797    #[test]
798    fn test_add_remove_members() {
799        let mut team = AgentTeam::new("test", TeamConfig::default());
800        team.add_member("lead", TeamRole::Lead);
801        team.add_member("w1", TeamRole::Worker);
802        team.add_member("w2", TeamRole::Worker);
803        team.add_member("rev", TeamRole::Reviewer);
804        assert_eq!(team.member_count(), 4);
805        assert_eq!(team.members_by_role(TeamRole::Worker).len(), 2);
806
807        assert!(team.remove_member("w2"));
808        assert_eq!(team.member_count(), 3);
809        assert!(!team.remove_member("nonexistent"));
810    }
811
812    #[test]
813    fn test_task_board_post_and_claim() {
814        let board = TeamTaskBoard::new(10);
815        let id = board.post("Fix auth bug", "lead", None).unwrap();
816        assert_eq!(board.len(), 1);
817
818        let task = board.claim("worker-1").unwrap();
819        assert_eq!(task.id, id);
820        assert_eq!(task.assigned_to.as_deref(), Some("worker-1"));
821        assert_eq!(task.status, TaskStatus::InProgress);
822
823        // No more open tasks
824        assert!(board.claim("worker-2").is_none());
825    }
826
827    #[test]
828    fn test_task_board_workflow() {
829        let board = TeamTaskBoard::new(10);
830        let id = board.post("Write tests", "lead", None).unwrap();
831
832        // Claim
833        board.claim("worker-1");
834
835        // Complete
836        assert!(board.complete(&id, "Added 5 tests"));
837        let task = board.get(&id).unwrap();
838        assert_eq!(task.status, TaskStatus::InReview);
839
840        // Approve
841        assert!(board.approve(&id));
842        let task = board.get(&id).unwrap();
843        assert_eq!(task.status, TaskStatus::Done);
844    }
845
846    #[test]
847    fn test_task_board_reject() {
848        let board = TeamTaskBoard::new(10);
849        let id = board.post("Refactor module", "lead", None).unwrap();
850        board.claim("worker-1");
851        board.complete(&id, "Done");
852
853        assert!(board.reject(&id));
854        let task = board.get(&id).unwrap();
855        assert_eq!(task.status, TaskStatus::Rejected);
856        assert!(task.assigned_to.is_none());
857    }
858
859    #[test]
860    fn test_task_board_max_capacity() {
861        let board = TeamTaskBoard::new(2);
862        assert!(board.post("Task 1", "lead", None).is_some());
863        assert!(board.post("Task 2", "lead", None).is_some());
864        assert!(board.post("Task 3", "lead", None).is_none()); // Full
865    }
866
867    #[test]
868    fn test_task_board_stats() {
869        let board = TeamTaskBoard::new(10);
870        board.post("T1", "lead", None);
871        board.post("T2", "lead", None);
872        let id3 = board.post("T3", "lead", Some("w1")).unwrap();
873        board.complete(&id3, "done");
874
875        let (open, progress, review, done, rejected) = board.stats();
876        assert_eq!(open, 2);
877        assert_eq!(progress, 0);
878        assert_eq!(review, 1);
879        assert_eq!(done, 0);
880        assert_eq!(rejected, 0);
881    }
882
883    #[test]
884    fn test_task_board_by_assignee() {
885        let board = TeamTaskBoard::new(10);
886        board.post("T1", "lead", Some("w1"));
887        board.post("T2", "lead", Some("w2"));
888        board.post("T3", "lead", Some("w1"));
889
890        let w1_tasks = board.by_assignee("w1");
891        assert_eq!(w1_tasks.len(), 2);
892    }
893
894    #[tokio::test]
895    async fn test_send_message() {
896        let mut team = AgentTeam::new("msg-test", TeamConfig::default());
897        team.add_member("lead", TeamRole::Lead);
898        team.add_member("worker", TeamRole::Worker);
899
900        let mut rx = team.take_receiver("worker").unwrap();
901
902        assert!(
903            team.send_message("lead", "worker", "Please fix the bug", Some("task-1"))
904                .await
905        );
906
907        let msg = rx.recv().await.unwrap();
908        assert_eq!(msg.from, "lead");
909        assert_eq!(msg.to, "worker");
910        assert_eq!(msg.content, "Please fix the bug");
911        assert_eq!(msg.task_id.as_deref(), Some("task-1"));
912    }
913
914    #[tokio::test]
915    async fn test_broadcast() {
916        let mut team = AgentTeam::new("broadcast-test", TeamConfig::default());
917        team.add_member("lead", TeamRole::Lead);
918        team.add_member("w1", TeamRole::Worker);
919        team.add_member("w2", TeamRole::Worker);
920
921        let mut rx1 = team.take_receiver("w1").unwrap();
922        let mut rx2 = team.take_receiver("w2").unwrap();
923
924        team.broadcast("lead", "New task available", None).await;
925
926        let m1 = rx1.recv().await.unwrap();
927        let m2 = rx2.recv().await.unwrap();
928        assert_eq!(m1.content, "New task available");
929        assert_eq!(m2.content, "New task available");
930    }
931
932    #[test]
933    fn test_role_display() {
934        assert_eq!(TeamRole::Lead.to_string(), "lead");
935        assert_eq!(TeamRole::Worker.to_string(), "worker");
936        assert_eq!(TeamRole::Reviewer.to_string(), "reviewer");
937    }
938
939    #[test]
940    fn test_task_status_display() {
941        assert_eq!(TaskStatus::Open.to_string(), "open");
942        assert_eq!(TaskStatus::InProgress.to_string(), "in_progress");
943        assert_eq!(TaskStatus::InReview.to_string(), "in_review");
944        assert_eq!(TaskStatus::Done.to_string(), "done");
945        assert_eq!(TaskStatus::Rejected.to_string(), "rejected");
946    }
947
948    // -----------------------------------------------------------------------
949    // TeamRunner tests (mock executor)
950    // -----------------------------------------------------------------------
951
952    /// Minimal mock executor for unit tests.
953    struct MockExecutor {
954        response: String,
955    }
956
957    impl MockExecutor {
958        fn new(response: impl Into<String>) -> Arc<Self> {
959            Arc::new(Self {
960                response: response.into(),
961            })
962        }
963    }
964
965    #[async_trait::async_trait]
966    impl AgentExecutor for MockExecutor {
967        async fn execute(&self, _prompt: &str) -> crate::error::Result<String> {
968            Ok(self.response.clone())
969        }
970    }
971
972    #[test]
973    fn test_team_runner_session_binding() {
974        let mut team = AgentTeam::new("bind-test", TeamConfig::default());
975        team.add_member("lead", TeamRole::Lead);
976        team.add_member("w1", TeamRole::Worker);
977
978        let mut runner = TeamRunner::new(team);
979
980        // Binding to a known member succeeds.
981        assert!(runner.bind_session("lead", MockExecutor::new("ok")).is_ok());
982        assert!(runner.bind_session("w1", MockExecutor::new("ok")).is_ok());
983
984        // Binding to an unknown member fails.
985        assert!(runner
986            .bind_session("nobody", MockExecutor::new("ok"))
987            .is_err());
988    }
989
990    #[test]
991    fn test_parse_task_list() {
992        let json = r#"{"tasks": ["Write tests", "Fix lints", "Update docs"]}"#;
993        let tasks = parse_task_list(json).unwrap();
994        assert_eq!(tasks.len(), 3);
995        assert_eq!(tasks[0], "Write tests");
996        assert_eq!(tasks[2], "Update docs");
997    }
998
999    #[test]
1000    fn test_parse_task_list_no_json() {
1001        assert!(parse_task_list("no json here").is_err());
1002    }
1003
1004    #[test]
1005    fn test_claim_rejected_tasks() {
1006        let board = TeamTaskBoard::new(10);
1007        let id = board.post("Refactor module", "lead", None).unwrap();
1008
1009        // Simulate workflow: claim → complete → reject
1010        board.claim("worker-1");
1011        board.complete(&id, "initial attempt");
1012        board.reject(&id);
1013
1014        assert_eq!(board.get(&id).unwrap().status, TaskStatus::Rejected);
1015
1016        // A new worker can re-claim the rejected task.
1017        let task = board.claim("worker-2");
1018        assert!(task.is_some());
1019        let task = task.unwrap();
1020        assert_eq!(task.id, id);
1021        assert_eq!(task.assigned_to.as_deref(), Some("worker-2"));
1022        assert_eq!(task.status, TaskStatus::InProgress);
1023    }
1024
1025    #[tokio::test]
1026    async fn test_team_runner_goal_decomposition() {
1027        let config = TeamConfig {
1028            poll_interval_ms: 1,
1029            max_rounds: 3,
1030            ..TeamConfig::default()
1031        };
1032        let mut team = AgentTeam::new("decomp-test", config);
1033        team.add_member("lead", TeamRole::Lead);
1034        team.add_member("w1", TeamRole::Worker);
1035        team.add_member("rev", TeamRole::Reviewer);
1036
1037        let mut runner = TeamRunner::new(team);
1038        // Lead returns two tasks as JSON.
1039        runner
1040            .bind_session(
1041                "lead",
1042                MockExecutor::new(r#"{"tasks": ["Task A", "Task B"]}"#),
1043            )
1044            .unwrap();
1045        // Worker always succeeds.
1046        runner
1047            .bind_session("w1", MockExecutor::new("done"))
1048            .unwrap();
1049        // Reviewer always approves.
1050        runner
1051            .bind_session("rev", MockExecutor::new("APPROVED: looks good"))
1052            .unwrap();
1053
1054        let result = runner.run_until_done("Build the feature").await.unwrap();
1055
1056        assert_eq!(result.done_tasks.len(), 2);
1057        assert!(result.rejected_tasks.is_empty());
1058    }
1059
1060    #[tokio::test]
1061    async fn test_team_runner_worker_execution() {
1062        let config = TeamConfig {
1063            poll_interval_ms: 1,
1064            max_rounds: 3,
1065            ..TeamConfig::default()
1066        };
1067        let mut team = AgentTeam::new("worker-exec-test", config);
1068        team.add_member("lead", TeamRole::Lead);
1069        team.add_member("w1", TeamRole::Worker);
1070
1071        let mut runner = TeamRunner::new(team);
1072        runner
1073            .bind_session(
1074                "lead",
1075                MockExecutor::new(r#"{"tasks": ["Write unit tests"]}"#),
1076            )
1077            .unwrap();
1078        runner
1079            .bind_session("w1", MockExecutor::new("Added 3 tests"))
1080            .unwrap();
1081
1082        // No reviewer bound → tasks end up InReview (not Done), which is fine for this test.
1083        let board = runner.task_board();
1084        let _ = runner.run_until_done("Test the module").await;
1085
1086        // The task must have been claimed and completed (InReview or Done).
1087        let tasks = board.by_status(TaskStatus::InReview);
1088        assert_eq!(tasks.len(), 1);
1089        assert_eq!(tasks[0].result.as_deref(), Some("Added 3 tests"));
1090    }
1091
1092    #[tokio::test]
1093    async fn test_team_runner_reviewer_approval() {
1094        let config = TeamConfig {
1095            poll_interval_ms: 1,
1096            max_rounds: 5,
1097            ..TeamConfig::default()
1098        };
1099        let mut team = AgentTeam::new("reviewer-test", config);
1100        team.add_member("lead", TeamRole::Lead);
1101        team.add_member("w1", TeamRole::Worker);
1102        team.add_member("rev", TeamRole::Reviewer);
1103
1104        let mut runner = TeamRunner::new(team);
1105        runner
1106            .bind_session(
1107                "lead",
1108                MockExecutor::new(r#"{"tasks": ["Implement feature X"]}"#),
1109            )
1110            .unwrap();
1111        runner
1112            .bind_session("w1", MockExecutor::new("Feature X implemented"))
1113            .unwrap();
1114        runner
1115            .bind_session("rev", MockExecutor::new("APPROVED: complete"))
1116            .unwrap();
1117
1118        let result = runner.run_until_done("Ship feature X").await.unwrap();
1119
1120        assert_eq!(result.done_tasks.len(), 1);
1121        assert_eq!(
1122            result.done_tasks[0].result.as_deref(),
1123            Some("Feature X implemented")
1124        );
1125    }
1126
1127    #[tokio::test]
1128    async fn test_team_runner_rejection_and_retry() {
1129        use std::sync::atomic::{AtomicUsize, Ordering};
1130
1131        // Reviewer approves on the second attempt.
1132        struct ConditionalReviewer {
1133            calls: AtomicUsize,
1134        }
1135
1136        #[async_trait::async_trait]
1137        impl AgentExecutor for ConditionalReviewer {
1138            async fn execute(&self, _prompt: &str) -> crate::error::Result<String> {
1139                let n = self.calls.fetch_add(1, Ordering::SeqCst);
1140                if n == 0 {
1141                    Ok("REJECTED: needs improvement".to_string())
1142                } else {
1143                    Ok("APPROVED: now correct".to_string())
1144                }
1145            }
1146        }
1147
1148        let config = TeamConfig {
1149            poll_interval_ms: 1,
1150            max_rounds: 10,
1151            ..TeamConfig::default()
1152        };
1153        let mut team = AgentTeam::new("retry-test", config);
1154        team.add_member("lead", TeamRole::Lead);
1155        team.add_member("w1", TeamRole::Worker);
1156        team.add_member("rev", TeamRole::Reviewer);
1157
1158        let mut runner = TeamRunner::new(team);
1159        runner
1160            .bind_session("lead", MockExecutor::new(r#"{"tasks": ["Do the thing"]}"#))
1161            .unwrap();
1162        runner
1163            .bind_session("w1", MockExecutor::new("attempt result"))
1164            .unwrap();
1165        runner
1166            .bind_session(
1167                "rev",
1168                Arc::new(ConditionalReviewer {
1169                    calls: AtomicUsize::new(0),
1170                }),
1171            )
1172            .unwrap();
1173
1174        let result = runner.run_until_done("Complete the thing").await.unwrap();
1175
1176        // After retry, the task is eventually approved.
1177        assert_eq!(result.done_tasks.len(), 1);
1178        assert!(result.rejected_tasks.is_empty());
1179    }
1180}