collet 0.1.0

Relentless agentic coding orchestrator with zero-drop agent loops
Documentation
//! Worker control handles for swarm coordination.

use crate::agent::swarm::knowledge::SharedKnowledge;
use crate::agent::swarm::knowledge::types::WorkerInstruction;

impl SharedKnowledge {
    /// Register cancel/extend/instruction handles for a running worker.
    ///
    /// Returns the instruction receiver half — the caller must thread it
    /// through to the worker's agent loop so it can drain instructions.
    pub async fn register_worker_handles(
        &self,
        agent_id: &str,
        cancel: tokio_util::sync::CancellationToken,
        budget: crate::agent::guard::IterationBudget,
    ) -> tokio::sync::mpsc::UnboundedReceiver<WorkerInstruction> {
        let (instruction_tx, instruction_rx) = tokio::sync::mpsc::unbounded_channel();
        self.worker_handles.write().await.insert(
            agent_id.to_string(),
            crate::agent::swarm::knowledge::types::WorkerHandles {
                cancel,
                budget,
                instruction_tx,
                paused: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
            },
        );
        instruction_rx
    }

    /// Cancel a running worker by agent ID. Returns true if the agent was found.
    pub async fn cancel_worker(&self, agent_id: &str) -> bool {
        if let Some(handles) = self.worker_handles.read().await.get(agent_id) {
            handles.cancel.cancel();
            true
        } else {
            false
        }
    }

    /// Extend a running worker's iteration budget. Returns true if the agent was found.
    pub async fn extend_worker(&self, agent_id: &str, extra_iterations: u32) -> bool {
        if let Some(handles) = self.worker_handles.read().await.get(agent_id) {
            handles.budget.extend(extra_iterations);
            true
        } else {
            false
        }
    }

    /// Send an instruction to a running worker. Returns true if the agent was found.
    pub async fn send_instruction(&self, agent_id: &str, instruction: WorkerInstruction) -> bool {
        if let Some(handles) = self.worker_handles.read().await.get(agent_id) {
            handles.instruction_tx.send(instruction).is_ok()
        } else {
            false
        }
    }

    /// Mark a worker as paused (called from TUI event handler on SwarmWorkerPaused).
    pub async fn set_worker_paused(&self, agent_id: &str, paused: bool) {
        if let Some(handles) = self.worker_handles.read().await.get(agent_id) {
            handles
                .paused
                .store(paused, std::sync::atomic::Ordering::Relaxed);
            if !paused {
                // Wake up wait_for_any if it's sleeping on all-paused condition.
                self.resume_notify.notify_waiters();
            }
        }
    }

    /// Check if all given worker IDs are currently paused.
    pub async fn all_workers_paused(&self, agent_ids: &[String]) -> bool {
        if agent_ids.is_empty() {
            return false;
        }
        let handles = self.worker_handles.read().await;
        agent_ids.iter().all(|id| {
            handles
                .get(id)
                .map(|h| h.paused.load(std::sync::atomic::Ordering::Relaxed))
                .unwrap_or(false)
        })
    }

    /// Wait until a resume notification arrives.
    /// Used by `wait_for_any` to sleep efficiently when all workers are paused.
    pub async fn wait_for_resume(&self) {
        self.resume_notify.notified().await;
    }

    /// Remove handles for a completed worker.
    pub async fn remove_worker_handles(&self, agent_id: &str) {
        self.worker_handles.write().await.remove(agent_id);
    }

    /// List all active worker IDs with handles.
    pub async fn active_worker_ids(&self) -> Vec<String> {
        self.worker_handles.read().await.keys().cloned().collect()
    }
}