agent-teams 0.1.0

Generic Rust agent teams framework replicating Claude Code Agent Teams architecture with pluggable backends for Claude Code, Codex, and Gemini CLI
Documentation
//! Claude Code backend -- spawns agents via the `cc-sdk` interactive client.
//!
//! Each session owns a subprocess running `claude-code` (found via PATH
//! or auto-downloaded), communicating through the SDK's transport layer.
//! A dedicated session task owns the client exclusively and processes commands from
//! the session handle via an mpsc channel, forwarding output events through a
//! separate mpsc channel that the orchestrator can `select!` on.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};

use super::{send_agent_output, AgentBackend, AgentOutput, AgentSession, BackendType, SpawnConfig};
use crate::{Error, Result};

/// Channel buffer size for agent output events.
const OUTPUT_CHANNEL_SIZE: usize = 256;

/// Channel buffer size for session commands.
const CMD_CHANNEL_SIZE: usize = 16;

/// Commands sent from the session handle to the background task.
enum SessionCommand {
    SendMessage(String),
    Shutdown,
}

// ---------------------------------------------------------------------------
// ClaudeCodeBackend  (factory)
// ---------------------------------------------------------------------------

/// Factory that creates Claude Code agent sessions using `cc-sdk`.
#[derive(Debug)]
pub struct ClaudeCodeBackend {
    /// Base options applied to every spawned session.
    /// Per-session overrides from [`SpawnConfig`] are merged on top.
    default_options: Option<cc_sdk::ClaudeCodeOptions>,
}

impl ClaudeCodeBackend {
    /// Create a backend with default SDK options.
    pub fn new() -> Self {
        Self {
            default_options: None,
        }
    }

    /// Create a backend with explicit base options.
    pub fn with_options(options: cc_sdk::ClaudeCodeOptions) -> Self {
        Self {
            default_options: Some(options),
        }
    }

    /// Build [`ClaudeCodeOptions`](cc_sdk::ClaudeCodeOptions) from a [`SpawnConfig`],
    /// layering config values on top of the factory defaults.
    #[allow(deprecated)] // system_prompt field is deprecated but we need it
    fn build_options(&self, config: &SpawnConfig) -> cc_sdk::ClaudeCodeOptions {
        let mut opts = self.default_options.clone().unwrap_or_default();

        // The prompt is sent as the first user message (via send_message),
        // NOT as system_prompt. Setting both wastes tokens and may confuse
        // the model with duplicate instructions.

        // Model
        if let Some(ref model) = config.model {
            opts.model = Some(model.clone());
        }

        // Working directory
        if let Some(ref cwd) = config.cwd {
            opts.cwd = Some(cwd.clone());
        }

        // Max turns
        if let Some(turns) = config.max_turns {
            opts.max_turns = Some(turns);
        }

        // Allowed tools
        if !config.allowed_tools.is_empty() {
            opts.allowed_tools = config.allowed_tools.clone();
        }

        // Permission mode
        if let Some(ref mode) = config.permission_mode {
            opts.permission_mode = match mode.as_str() {
                "plan" => cc_sdk::PermissionMode::Plan,
                "acceptEdits" => cc_sdk::PermissionMode::AcceptEdits,
                "bypassPermissions" => cc_sdk::PermissionMode::BypassPermissions,
                _ => cc_sdk::PermissionMode::Default,
            };
        }

        // Extra environment variables
        if !config.env.is_empty() {
            opts.env.extend(config.env.clone());
        }

        opts
    }
}

impl Default for ClaudeCodeBackend {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait]
impl AgentBackend for ClaudeCodeBackend {
    fn backend_type(&self) -> BackendType {
        BackendType::ClaudeCode
    }

    async fn spawn(&self, config: SpawnConfig) -> Result<Box<dyn AgentSession>> {
        let agent_name = config.name.clone();
        let initial_prompt = config.prompt.clone();
        let options = self.build_options(&config);

        info!(agent = %agent_name, "Spawning Claude Code agent");

        // Create and connect the interactive client
        let mut client =
            cc_sdk::InteractiveClient::new(options).map_err(|e| Error::SpawnFailed {
                name: agent_name.clone(),
                reason: format!("Failed to create InteractiveClient: {e}"),
            })?;

        client.connect().await.map_err(|e| Error::SpawnFailed {
            name: agent_name.clone(),
            reason: format!("Failed to connect: {e}"),
        })?;

        // Send the initial prompt before handing the client to the session task
        client
            .send_message(initial_prompt)
            .await
            .map_err(|e| Error::SpawnFailed {
                name: agent_name.clone(),
                reason: format!("Failed to send initial prompt: {e}"),
            })?;

        // Create channels
        let (cmd_tx, cmd_rx) = mpsc::channel(CMD_CHANNEL_SIZE);
        let (output_tx, output_rx) = mpsc::channel(OUTPUT_CHANNEL_SIZE);
        let alive = Arc::new(AtomicBool::new(true));

        // Spawn the session task (owns the client exclusively)
        let task_alive = alive.clone();
        let task_name = agent_name.clone();
        let task_handle = tokio::spawn(session_task(
            client, cmd_rx, output_tx, task_alive, task_name,
        ));

        let session = ClaudeCodeSession {
            name: agent_name,
            cmd_tx,
            output_rx: Some(output_rx),
            alive,
            task_handle: Some(task_handle),
        };

        Ok(Box::new(session))
    }
}

// ---------------------------------------------------------------------------
// Session task  (owns the client exclusively)
// ---------------------------------------------------------------------------

/// Background task that owns the `InteractiveClient` and serialises all
/// send/receive operations. No Mutex needed -- the client lives here only.
async fn session_task(
    mut client: cc_sdk::InteractiveClient,
    mut cmd_rx: mpsc::Receiver<SessionCommand>,
    output_tx: mpsc::Sender<AgentOutput>,
    alive: Arc<AtomicBool>,
    agent_name: String,
) {
    debug!(agent = %agent_name, "Session task started");

    // Phase 1: Receive response for the initial prompt that was already sent.
    match client.receive_response().await {
        Ok(msgs) => {
            for msg in msgs {
                if let Some(out) = message_to_output(&msg)
                    && send_agent_output(&output_tx, out, &alive, &agent_name).await.is_err()
                {
                    return;
                }
            }
        }
        Err(e) => {
            let _ = send_agent_output(&output_tx, AgentOutput::Error(format!("Receive error: {e}")), &alive, &agent_name).await;
            alive.store(false, Ordering::Relaxed);
            return;
        }
    }

    // Phase 2: Process commands -- wait for next command, then send+receive.
    // Uses explicit match instead of `while let` so new SessionCommand variants
    // trigger a compile error rather than silently exiting the loop.
    #[allow(clippy::while_let_loop)]
    loop {
        match cmd_rx.recv().await {
            Some(SessionCommand::SendMessage(msg)) => {
                if let Err(e) = client.send_message(msg).await {
                    let _ = send_agent_output(&output_tx, AgentOutput::Error(format!("Send error: {e}")), &alive, &agent_name).await;
                    break;
                }
                match client.receive_response().await {
                    Ok(msgs) => {
                        for msg in msgs {
                            if let Some(out) = message_to_output(&msg)
                                && send_agent_output(&output_tx, out, &alive, &agent_name)
                                    .await
                                    .is_err()
                            {
                                let _ = client.disconnect().await;
                                return;
                            }
                        }
                    }
                    Err(e) => {
                        let _ = send_agent_output(&output_tx, AgentOutput::Error(format!("Receive error: {e}")), &alive, &agent_name).await;
                        break;
                    }
                }
            }
            Some(SessionCommand::Shutdown) | None => break,
        }
    }

    let _ = client.disconnect().await;
    alive.store(false, Ordering::Relaxed);
    debug!(agent = %agent_name, "Session task stopped");
}

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

/// Convert a `cc_sdk::Message` to an `AgentOutput`, or `None` if irrelevant.
fn message_to_output(msg: &cc_sdk::Message) -> Option<AgentOutput> {
    match msg {
        cc_sdk::Message::Assistant { message } => {
            let text: String = message
                .content
                .iter()
                .filter_map(|block| match block {
                    cc_sdk::ContentBlock::Text(t) => Some(t.text.as_str()),
                    _ => None,
                })
                .collect::<Vec<_>>()
                .join("");

            if text.is_empty() {
                None
            } else {
                Some(AgentOutput::Message(text))
            }
        }
        cc_sdk::Message::Result { is_error, .. } => {
            if *is_error {
                Some(AgentOutput::Error(
                    "Agent turn completed with error".into(),
                ))
            } else {
                Some(AgentOutput::TurnComplete)
            }
        }
        _ => None,
    }
}

// ---------------------------------------------------------------------------
// ClaudeCodeSession
// ---------------------------------------------------------------------------

/// A running Claude Code agent session.
struct ClaudeCodeSession {
    /// Agent name.
    name: String,
    /// Channel to send commands to the background session task.
    cmd_tx: mpsc::Sender<SessionCommand>,
    /// Output receiver (taken once by the orchestrator).
    output_rx: Option<mpsc::Receiver<AgentOutput>>,
    /// Liveness flag shared with the session task.
    alive: Arc<AtomicBool>,
    /// Handle to the background session task.
    task_handle: Option<JoinHandle<()>>,
}

#[async_trait]
impl AgentSession for ClaudeCodeSession {
    fn name(&self) -> &str {
        &self.name
    }

    async fn send_input(&mut self, input: &str) -> Result<()> {
        if !self.alive.load(Ordering::Relaxed) {
            return Err(Error::AgentNotAlive {
                name: self.name.clone(),
            });
        }

        self.cmd_tx
            .send(SessionCommand::SendMessage(input.to_string()))
            .await
            .map_err(|_| Error::AgentNotAlive {
                name: self.name.clone(),
            })?;
        Ok(())
    }

    fn output_receiver(&mut self) -> Option<mpsc::Receiver<AgentOutput>> {
        self.output_rx.take()
    }

    async fn is_alive(&self) -> bool {
        self.alive.load(Ordering::Relaxed)
    }

    async fn shutdown(&mut self) -> Result<()> {
        info!(agent = %self.name, "Shutting down Claude Code session");
        self.alive.store(false, Ordering::Relaxed);
        let _ = self.cmd_tx.send(SessionCommand::Shutdown).await;
        if let Some(handle) = self.task_handle.take() {
            let abort_handle = handle.abort_handle();
            if tokio::time::timeout(Duration::from_secs(10), handle)
                .await
                .is_err()
            {
                warn!(agent = %self.name, "Session task timed out during shutdown, aborting");
                abort_handle.abort();
            }
        }
        Ok(())
    }

    async fn force_kill(&mut self) -> Result<()> {
        info!(agent = %self.name, "Force-killing Claude Code session");
        self.alive.store(false, Ordering::Relaxed);
        if let Some(handle) = self.task_handle.take() {
            handle.abort();
            let _ = handle.await;
        }
        Ok(())
    }
}