Skip to main content

agent_teams/backend/
claude_code.rs

1//! Claude Code backend -- spawns agents via the `cc-sdk` interactive client.
2//!
3//! Each session owns a subprocess running `claude-code` (found via PATH
4//! or auto-downloaded), communicating through the SDK's transport layer.
5//! A dedicated session task owns the client exclusively and processes commands from
6//! the session handle via an mpsc channel, forwarding output events through a
7//! separate mpsc channel that the orchestrator can `select!` on.
8
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use tokio::sync::mpsc;
15use tokio::task::JoinHandle;
16use tracing::{debug, info, warn};
17
18use super::{send_agent_output, AgentBackend, AgentOutput, AgentSession, BackendType, SpawnConfig};
19use crate::{Error, Result};
20
21/// Channel buffer size for agent output events.
22const OUTPUT_CHANNEL_SIZE: usize = 256;
23
24/// Channel buffer size for session commands.
25const CMD_CHANNEL_SIZE: usize = 16;
26
27/// Commands sent from the session handle to the background task.
28enum SessionCommand {
29    SendMessage(String),
30    Shutdown,
31}
32
33// ---------------------------------------------------------------------------
34// ClaudeCodeBackend  (factory)
35// ---------------------------------------------------------------------------
36
37/// Factory that creates Claude Code agent sessions using `cc-sdk`.
38#[derive(Debug)]
39pub struct ClaudeCodeBackend {
40    /// Base options applied to every spawned session.
41    /// Per-session overrides from [`SpawnConfig`] are merged on top.
42    default_options: Option<cc_sdk::ClaudeCodeOptions>,
43}
44
45impl ClaudeCodeBackend {
46    /// Create a backend with default SDK options.
47    pub fn new() -> Self {
48        Self {
49            default_options: None,
50        }
51    }
52
53    /// Create a backend with explicit base options.
54    pub fn with_options(options: cc_sdk::ClaudeCodeOptions) -> Self {
55        Self {
56            default_options: Some(options),
57        }
58    }
59
60    /// Build [`ClaudeCodeOptions`](cc_sdk::ClaudeCodeOptions) from a [`SpawnConfig`],
61    /// layering config values on top of the factory defaults.
62    #[allow(deprecated)] // system_prompt field is deprecated but we need it
63    fn build_options(&self, config: &SpawnConfig) -> cc_sdk::ClaudeCodeOptions {
64        let mut opts = self.default_options.clone().unwrap_or_default();
65
66        // The prompt is sent as the first user message (via send_message),
67        // NOT as system_prompt. Setting both wastes tokens and may confuse
68        // the model with duplicate instructions.
69
70        // Model
71        if let Some(ref model) = config.model {
72            opts.model = Some(model.clone());
73        }
74
75        // Working directory
76        if let Some(ref cwd) = config.cwd {
77            opts.cwd = Some(cwd.clone());
78        }
79
80        // Max turns
81        if let Some(turns) = config.max_turns {
82            opts.max_turns = Some(turns);
83        }
84
85        // Allowed tools
86        if !config.allowed_tools.is_empty() {
87            opts.allowed_tools = config.allowed_tools.clone();
88        }
89
90        // Permission mode
91        if let Some(ref mode) = config.permission_mode {
92            opts.permission_mode = match mode.as_str() {
93                "plan" => cc_sdk::PermissionMode::Plan,
94                "acceptEdits" => cc_sdk::PermissionMode::AcceptEdits,
95                "bypassPermissions" => cc_sdk::PermissionMode::BypassPermissions,
96                _ => cc_sdk::PermissionMode::Default,
97            };
98        }
99
100        // Extra environment variables
101        if !config.env.is_empty() {
102            opts.env.extend(config.env.clone());
103        }
104
105        opts
106    }
107}
108
109impl Default for ClaudeCodeBackend {
110    fn default() -> Self {
111        Self::new()
112    }
113}
114
115#[async_trait]
116impl AgentBackend for ClaudeCodeBackend {
117    fn backend_type(&self) -> BackendType {
118        BackendType::ClaudeCode
119    }
120
121    async fn spawn(&self, config: SpawnConfig) -> Result<Box<dyn AgentSession>> {
122        let agent_name = config.name.clone();
123        let initial_prompt = config.prompt.clone();
124        let options = self.build_options(&config);
125
126        info!(agent = %agent_name, "Spawning Claude Code agent");
127
128        // Create and connect the interactive client
129        let mut client =
130            cc_sdk::InteractiveClient::new(options).map_err(|e| Error::SpawnFailed {
131                name: agent_name.clone(),
132                reason: format!("Failed to create InteractiveClient: {e}"),
133            })?;
134
135        client.connect().await.map_err(|e| Error::SpawnFailed {
136            name: agent_name.clone(),
137            reason: format!("Failed to connect: {e}"),
138        })?;
139
140        // Send the initial prompt before handing the client to the session task
141        client
142            .send_message(initial_prompt)
143            .await
144            .map_err(|e| Error::SpawnFailed {
145                name: agent_name.clone(),
146                reason: format!("Failed to send initial prompt: {e}"),
147            })?;
148
149        // Create channels
150        let (cmd_tx, cmd_rx) = mpsc::channel(CMD_CHANNEL_SIZE);
151        let (output_tx, output_rx) = mpsc::channel(OUTPUT_CHANNEL_SIZE);
152        let alive = Arc::new(AtomicBool::new(true));
153
154        // Spawn the session task (owns the client exclusively)
155        let task_alive = alive.clone();
156        let task_name = agent_name.clone();
157        let task_handle = tokio::spawn(session_task(
158            client, cmd_rx, output_tx, task_alive, task_name,
159        ));
160
161        let session = ClaudeCodeSession {
162            name: agent_name,
163            cmd_tx,
164            output_rx: Some(output_rx),
165            alive,
166            task_handle: Some(task_handle),
167        };
168
169        Ok(Box::new(session))
170    }
171}
172
173// ---------------------------------------------------------------------------
174// Session task  (owns the client exclusively)
175// ---------------------------------------------------------------------------
176
177/// Background task that owns the `InteractiveClient` and serialises all
178/// send/receive operations. No Mutex needed -- the client lives here only.
179async fn session_task(
180    mut client: cc_sdk::InteractiveClient,
181    mut cmd_rx: mpsc::Receiver<SessionCommand>,
182    output_tx: mpsc::Sender<AgentOutput>,
183    alive: Arc<AtomicBool>,
184    agent_name: String,
185) {
186    debug!(agent = %agent_name, "Session task started");
187
188    // Phase 1: Receive response for the initial prompt that was already sent.
189    match client.receive_response().await {
190        Ok(msgs) => {
191            for msg in msgs {
192                if let Some(out) = message_to_output(&msg)
193                    && send_agent_output(&output_tx, out, &alive, &agent_name).await.is_err()
194                {
195                    return;
196                }
197            }
198        }
199        Err(e) => {
200            let _ = send_agent_output(&output_tx, AgentOutput::Error(format!("Receive error: {e}")), &alive, &agent_name).await;
201            alive.store(false, Ordering::Relaxed);
202            return;
203        }
204    }
205
206    // Phase 2: Process commands -- wait for next command, then send+receive.
207    // Uses explicit match instead of `while let` so new SessionCommand variants
208    // trigger a compile error rather than silently exiting the loop.
209    #[allow(clippy::while_let_loop)]
210    loop {
211        match cmd_rx.recv().await {
212            Some(SessionCommand::SendMessage(msg)) => {
213                if let Err(e) = client.send_message(msg).await {
214                    let _ = send_agent_output(&output_tx, AgentOutput::Error(format!("Send error: {e}")), &alive, &agent_name).await;
215                    break;
216                }
217                match client.receive_response().await {
218                    Ok(msgs) => {
219                        for msg in msgs {
220                            if let Some(out) = message_to_output(&msg)
221                                && send_agent_output(&output_tx, out, &alive, &agent_name)
222                                    .await
223                                    .is_err()
224                            {
225                                let _ = client.disconnect().await;
226                                return;
227                            }
228                        }
229                    }
230                    Err(e) => {
231                        let _ = send_agent_output(&output_tx, AgentOutput::Error(format!("Receive error: {e}")), &alive, &agent_name).await;
232                        break;
233                    }
234                }
235            }
236            Some(SessionCommand::Shutdown) | None => break,
237        }
238    }
239
240    let _ = client.disconnect().await;
241    alive.store(false, Ordering::Relaxed);
242    debug!(agent = %agent_name, "Session task stopped");
243}
244
245// ---------------------------------------------------------------------------
246// Helpers
247// ---------------------------------------------------------------------------
248
249/// Convert a `cc_sdk::Message` to an `AgentOutput`, or `None` if irrelevant.
250fn message_to_output(msg: &cc_sdk::Message) -> Option<AgentOutput> {
251    match msg {
252        cc_sdk::Message::Assistant { message } => {
253            let text: String = message
254                .content
255                .iter()
256                .filter_map(|block| match block {
257                    cc_sdk::ContentBlock::Text(t) => Some(t.text.as_str()),
258                    _ => None,
259                })
260                .collect::<Vec<_>>()
261                .join("");
262
263            if text.is_empty() {
264                None
265            } else {
266                Some(AgentOutput::Message(text))
267            }
268        }
269        cc_sdk::Message::Result { is_error, .. } => {
270            if *is_error {
271                Some(AgentOutput::Error(
272                    "Agent turn completed with error".into(),
273                ))
274            } else {
275                Some(AgentOutput::TurnComplete)
276            }
277        }
278        _ => None,
279    }
280}
281
282// ---------------------------------------------------------------------------
283// ClaudeCodeSession
284// ---------------------------------------------------------------------------
285
286/// A running Claude Code agent session.
287struct ClaudeCodeSession {
288    /// Agent name.
289    name: String,
290    /// Channel to send commands to the background session task.
291    cmd_tx: mpsc::Sender<SessionCommand>,
292    /// Output receiver (taken once by the orchestrator).
293    output_rx: Option<mpsc::Receiver<AgentOutput>>,
294    /// Liveness flag shared with the session task.
295    alive: Arc<AtomicBool>,
296    /// Handle to the background session task.
297    task_handle: Option<JoinHandle<()>>,
298}
299
300#[async_trait]
301impl AgentSession for ClaudeCodeSession {
302    fn name(&self) -> &str {
303        &self.name
304    }
305
306    async fn send_input(&mut self, input: &str) -> Result<()> {
307        if !self.alive.load(Ordering::Relaxed) {
308            return Err(Error::AgentNotAlive {
309                name: self.name.clone(),
310            });
311        }
312
313        self.cmd_tx
314            .send(SessionCommand::SendMessage(input.to_string()))
315            .await
316            .map_err(|_| Error::AgentNotAlive {
317                name: self.name.clone(),
318            })?;
319        Ok(())
320    }
321
322    fn output_receiver(&mut self) -> Option<mpsc::Receiver<AgentOutput>> {
323        self.output_rx.take()
324    }
325
326    async fn is_alive(&self) -> bool {
327        self.alive.load(Ordering::Relaxed)
328    }
329
330    async fn shutdown(&mut self) -> Result<()> {
331        info!(agent = %self.name, "Shutting down Claude Code session");
332        self.alive.store(false, Ordering::Relaxed);
333        let _ = self.cmd_tx.send(SessionCommand::Shutdown).await;
334        if let Some(handle) = self.task_handle.take() {
335            let abort_handle = handle.abort_handle();
336            if tokio::time::timeout(Duration::from_secs(10), handle)
337                .await
338                .is_err()
339            {
340                warn!(agent = %self.name, "Session task timed out during shutdown, aborting");
341                abort_handle.abort();
342            }
343        }
344        Ok(())
345    }
346
347    async fn force_kill(&mut self) -> Result<()> {
348        info!(agent = %self.name, "Force-killing Claude Code session");
349        self.alive.store(false, Ordering::Relaxed);
350        if let Some(handle) = self.task_handle.take() {
351            handle.abort();
352            let _ = handle.await;
353        }
354        Ok(())
355    }
356}