collet 0.1.0

Relentless agentic coding orchestrator with zero-drop agent loops
Documentation
//! Plan-Review-Execute strategy: planner proposes, reviewers critique, planner revises.

use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::agent::r#loop::AgentEvent;
use crate::agent::subagent::SubagentTask;
use crate::agent::swarm::knowledge::BlackboardKind;

use super::*;

impl SwarmCoordinator {
    /// Plan-Review protocol: one planner proposes, reviewers critique, planner revises.
    ///
    /// Fork mode only: all workers self-select their role from the available agent roster.
    /// Returns the final approved (or best-effort) plan string, or `None` if the planner
    /// failed or the task was cancelled.
    pub async fn run_plan_review(
        &self,
        user_msg: &str,
        system_prompt: &str,
        event_tx: &mpsc::UnboundedSender<AgentEvent>,
        cancel: &CancellationToken,
    ) -> Option<String> {
        // ── Phase 1: Planner ────────────────────────────────────────────────────
        // run_plan_review is only called from Fork mode.
        // Phase prompts ([PLANNING PHASE] / [REVIEW PHASE] / [REVISION N/M]) guide behavior.
        // Agents self-select from the full roster — no tag-based forcing.
        let planner_task = SubagentTask {
            id: "planner".to_string(),
            prompt: format!(
                "[PLANNING PHASE] Analyze the codebase thoroughly and produce a detailed \
                 implementation plan for: {user_msg}"
            ),
            agent_name: None,
            available_agents: self.available_agents.clone(),
            model_override: self.hive_config.coordinator_model.clone(),
            working_dir_override: None,
            outer_event_tx: None,
            cancel_token: None,
            iteration_budget: None,
            instruction_rx: None,
        };

        let _ = event_tx.send(AgentEvent::SwarmAgentStarted {
            agent_id: "planner".to_string(),
            agent_name: "planner".to_string(),
            task_preview: truncate(user_msg, 80),
        });

        let plan_result = crate::agent::subagent::spawn(
            planner_task,
            self.client.clone(),
            self.config.clone(),
            system_prompt.to_string(),
            self.working_dir.clone(),
            self.lsp_manager.clone(),
            None,
        )
        .await;

        if cancel.is_cancelled() || !plan_result.success {
            return None;
        }

        let mut current_plan = plan_result.response;

        // Consensus tier+: post initial plan as a proposal on the blackboard
        if self.hive_config.mode.has_consensus() {
            self.knowledge
                .post_to_blackboard(
                    "plan:main",
                    &current_plan,
                    "planner",
                    BlackboardKind::Proposal,
                )
                .await;
        }

        // ── Phase 2: Reviewer agents (parallel) ────────────────────────────────
        // Each reviewer self-selects from the full roster based on the phase prompt.
        let review_aspects = [
            "correctness and completeness",
            "risks and conflicts",
            "performance",
        ];
        let max_reviewers = (self.hive_config.max_agents - 1).min(review_aspects.len());

        let make_reviewer_tasks = |plan: &str, suffix: &str| -> Vec<SubagentTask> {
            review_aspects[..max_reviewers]
                .iter()
                .enumerate()
                .map(|(i, aspect)| SubagentTask {
                    id: format!("reviewer-{i}{suffix}"),
                    prompt: format!(
                        "[REVIEW PHASE] Review this implementation plan for {aspect}.\n\n\
                         Mark issues as CRITICAL (must fix) or SUGGESTION (optional).\n\n\
                         Plan:\n{plan}"
                    ),
                    agent_name: None,
                    available_agents: self.available_agents.clone(),
                    model_override: self.hive_config.worker_model.clone(),
                    working_dir_override: None,
                    outer_event_tx: None,
                    cancel_token: None,
                    iteration_budget: None,
                    instruction_rx: None,
                })
                .collect()
        };

        let reviewer_tasks = make_reviewer_tasks(&current_plan, "");
        for task in &reviewer_tasks {
            let _ = event_tx.send(AgentEvent::SwarmAgentStarted {
                agent_id: task.id.clone(),
                agent_name: task.id.clone(),
                task_preview: "Reviewing plan".to_string(),
            });
        }

        let mut reviews = crate::agent::subagent::spawn_parallel(
            reviewer_tasks,
            self.client.clone(),
            self.config.clone(),
            system_prompt.to_string(),
            self.working_dir.clone(),
            self.lsp_manager.clone(),
            None,
        )
        .await;

        // Consensus tier+: vote on the initial plan
        if self.hive_config.mode.has_consensus() {
            for review in &reviews {
                let any_critical = review.response.to_uppercase().contains("CRITICAL");
                self.knowledge
                    .vote_on_proposal("plan:main", &review.id, !any_critical)
                    .await;
            }

            let total = reviews.len() + 1;
            if let Some(result) = self.knowledge.check_consensus("plan:main", total).await {
                tracing::debug!(proposal = %result.key, total_agents = result.total_agents, approved = result.approved, "Flock consensus result");
                if result.approved {
                    self.knowledge.finalize_proposal("plan:main").await;
                    let _ = event_tx.send(AgentEvent::PhaseChange {
                        label: format!(
                            "{} — initial plan approved ({}/{} votes for)",
                            self.mode_label(),
                            result.votes_for,
                            total
                        ),
                    });
                } else {
                    let _ = event_tx.send(AgentEvent::PhaseChange {
                        label: format!(
                            "{} — initial plan rejected ({}/{} against) — revising…",
                            self.mode_label(),
                            result.votes_against,
                            total
                        ),
                    });
                }
            }
        }

        let mut has_critical = reviews
            .iter()
            .any(|r| r.response.to_uppercase().contains("CRITICAL"));

        // ── Phase 3: Revision loop ──────────────────────────────────────────────
        // When require_consensus is true and reviewers flag CRITICAL issues, the
        // planner revises the plan and the reviewers re-evaluate (up to MAX_REVISIONS).
        const MAX_REVISIONS: u32 = 3;
        let mut revision = 0u32;

        while has_critical && revision < MAX_REVISIONS && self.hive_config.require_consensus {
            if cancel.is_cancelled() {
                return None;
            }
            revision += 1;

            // Collect only CRITICAL feedback
            let critical_feedback: String = reviews
                .iter()
                .filter(|r| r.response.to_uppercase().contains("CRITICAL"))
                .map(|r| format!("Reviewer ({}): {}", r.id, truncate(&r.response, 800)))
                .collect::<Vec<_>>()
                .join("\n\n---\n\n");

            let _ = event_tx.send(AgentEvent::PhaseChange {
                label: format!(
                    "{} — plan revision {revision}/{MAX_REVISIONS} (addressing critical issues)…",
                    self.mode_label()
                ),
            });

            let revision_task = SubagentTask {
                id: format!("planner-r{revision}"),
                prompt: format!(
                    "[REVISION {revision}/{MAX_REVISIONS}] Your implementation plan has CRITICAL issues \
                     that must be fixed before execution.\n\n\
                      CRITICAL FEEDBACK:\n{critical_feedback}\n\n\
                      Produce a revised plan that addresses ALL critical issues.\n\
                      Preserve correct parts of the original plan.\n\n\
                      Original task: {user_msg}\n\n\
                      Previous plan:\n{current_plan}"
                ),
                agent_name: None,
                available_agents: self.available_agents.clone(),
                model_override: self.hive_config.coordinator_model.clone(),
                working_dir_override: None,
                outer_event_tx: None,
                cancel_token: None,
                iteration_budget: None,
                instruction_rx: None,
            };

            let _ = event_tx.send(AgentEvent::SwarmAgentStarted {
                agent_id: format!("planner-r{revision}"),
                agent_name: "planner".to_string(),
                task_preview: format!("Revising plan (attempt {revision}/{MAX_REVISIONS})"),
            });

            let revised = crate::agent::subagent::spawn(
                revision_task,
                self.client.clone(),
                self.config.clone(),
                system_prompt.to_string(),
                self.working_dir.clone(),
                self.lsp_manager.clone(),
                None,
            )
            .await;

            if !revised.success {
                break;
            }

            current_plan = revised.response;

            // Post revised plan to blackboard for transparency
            if self.hive_config.mode.has_consensus() {
                let rev_key = format!("plan:revision-{revision}");
                self.knowledge
                    .post_to_blackboard(
                        &rev_key,
                        &current_plan,
                        &format!("planner-r{revision}"),
                        BlackboardKind::Proposal,
                    )
                    .await;
            }

            // Re-run reviewers on the revised plan
            let re_review_tasks = make_reviewer_tasks(&current_plan, &format!("-r{revision}"));
            for task in &re_review_tasks {
                let _ = event_tx.send(AgentEvent::SwarmAgentStarted {
                    agent_id: task.id.clone(),
                    agent_name: task.id.clone(),
                    task_preview: format!("Re-reviewing revised plan (r{revision})"),
                });
            }

            reviews = crate::agent::subagent::spawn_parallel(
                re_review_tasks,
                self.client.clone(),
                self.config.clone(),
                system_prompt.to_string(),
                self.working_dir.clone(),
                self.lsp_manager.clone(),
                None,
            )
            .await;

            // Consensus votes for revised plan
            if self.hive_config.mode.has_consensus() {
                let rev_key = format!("plan:revision-{revision}");
                for review in &reviews {
                    let any_critical = review.response.to_uppercase().contains("CRITICAL");
                    self.knowledge
                        .vote_on_proposal(&rev_key, &review.id, !any_critical)
                        .await;
                }
                let total = reviews.len() + 1;
                if let Some(result) = self.knowledge.check_consensus(&rev_key, total).await {
                    if result.approved {
                        self.knowledge.finalize_proposal(&rev_key).await;
                        let _ = event_tx.send(AgentEvent::PhaseChange {
                            label: format!(
                                "{} — revised plan approved (r{revision}, {}/{} for)",
                                self.mode_label(),
                                result.votes_for,
                                total
                            ),
                        });
                    } else {
                        let _ = event_tx.send(AgentEvent::PhaseChange {
                            label: format!(
                                "{} — revised plan still has issues (r{revision}, {}/{} against)",
                                self.mode_label(),
                                result.votes_against,
                                total
                            ),
                        });
                    }
                }
            }

            has_critical = reviews
                .iter()
                .any(|r| r.response.to_uppercase().contains("CRITICAL"));
        }

        if has_critical {
            let _ = event_tx.send(AgentEvent::PhaseChange {
                label: format!(
                    "{} — plan has unresolved critical issues after {revision} revision(s); proceeding with best effort",
                    self.mode_label()
                ),
            });
        } else {
            let _ = event_tx.send(AgentEvent::PhaseChange {
                label: format!(
                    "{} — plan finalized, ready for execution",
                    self.mode_label()
                ),
            });
        }

        Some(current_plan)
    }
}