agent_teams/backend/
claude_code.rs1use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use tokio::sync::mpsc;
15use tokio::task::JoinHandle;
16use tracing::{debug, info, warn};
17
18use super::{send_agent_output, AgentBackend, AgentOutput, AgentSession, BackendType, SpawnConfig};
19use crate::{Error, Result};
20
21const OUTPUT_CHANNEL_SIZE: usize = 256;
23
24const CMD_CHANNEL_SIZE: usize = 16;
26
27enum SessionCommand {
29 SendMessage(String),
30 Shutdown,
31}
32
33#[derive(Debug)]
39pub struct ClaudeCodeBackend {
40 default_options: Option<cc_sdk::ClaudeCodeOptions>,
43}
44
45impl ClaudeCodeBackend {
46 pub fn new() -> Self {
48 Self {
49 default_options: None,
50 }
51 }
52
53 pub fn with_options(options: cc_sdk::ClaudeCodeOptions) -> Self {
55 Self {
56 default_options: Some(options),
57 }
58 }
59
60 #[allow(deprecated)] fn build_options(&self, config: &SpawnConfig) -> cc_sdk::ClaudeCodeOptions {
64 let mut opts = self.default_options.clone().unwrap_or_default();
65
66 if let Some(ref model) = config.model {
72 opts.model = Some(model.clone());
73 }
74
75 if let Some(ref cwd) = config.cwd {
77 opts.cwd = Some(cwd.clone());
78 }
79
80 if let Some(turns) = config.max_turns {
82 opts.max_turns = Some(turns);
83 }
84
85 if !config.allowed_tools.is_empty() {
87 opts.allowed_tools = config.allowed_tools.clone();
88 }
89
90 if let Some(ref mode) = config.permission_mode {
92 opts.permission_mode = match mode.as_str() {
93 "plan" => cc_sdk::PermissionMode::Plan,
94 "acceptEdits" => cc_sdk::PermissionMode::AcceptEdits,
95 "bypassPermissions" => cc_sdk::PermissionMode::BypassPermissions,
96 _ => cc_sdk::PermissionMode::Default,
97 };
98 }
99
100 if !config.env.is_empty() {
102 opts.env.extend(config.env.clone());
103 }
104
105 opts
106 }
107}
108
109impl Default for ClaudeCodeBackend {
110 fn default() -> Self {
111 Self::new()
112 }
113}
114
115#[async_trait]
116impl AgentBackend for ClaudeCodeBackend {
117 fn backend_type(&self) -> BackendType {
118 BackendType::ClaudeCode
119 }
120
121 async fn spawn(&self, config: SpawnConfig) -> Result<Box<dyn AgentSession>> {
122 let agent_name = config.name.clone();
123 let initial_prompt = config.prompt.clone();
124 let options = self.build_options(&config);
125
126 info!(agent = %agent_name, "Spawning Claude Code agent");
127
128 let mut client =
130 cc_sdk::InteractiveClient::new(options).map_err(|e| Error::SpawnFailed {
131 name: agent_name.clone(),
132 reason: format!("Failed to create InteractiveClient: {e}"),
133 })?;
134
135 client.connect().await.map_err(|e| Error::SpawnFailed {
136 name: agent_name.clone(),
137 reason: format!("Failed to connect: {e}"),
138 })?;
139
140 client
142 .send_message(initial_prompt)
143 .await
144 .map_err(|e| Error::SpawnFailed {
145 name: agent_name.clone(),
146 reason: format!("Failed to send initial prompt: {e}"),
147 })?;
148
149 let (cmd_tx, cmd_rx) = mpsc::channel(CMD_CHANNEL_SIZE);
151 let (output_tx, output_rx) = mpsc::channel(OUTPUT_CHANNEL_SIZE);
152 let alive = Arc::new(AtomicBool::new(true));
153
154 let task_alive = alive.clone();
156 let task_name = agent_name.clone();
157 let task_handle = tokio::spawn(session_task(
158 client, cmd_rx, output_tx, task_alive, task_name,
159 ));
160
161 let session = ClaudeCodeSession {
162 name: agent_name,
163 cmd_tx,
164 output_rx: Some(output_rx),
165 alive,
166 task_handle: Some(task_handle),
167 };
168
169 Ok(Box::new(session))
170 }
171}
172
173async fn session_task(
180 mut client: cc_sdk::InteractiveClient,
181 mut cmd_rx: mpsc::Receiver<SessionCommand>,
182 output_tx: mpsc::Sender<AgentOutput>,
183 alive: Arc<AtomicBool>,
184 agent_name: String,
185) {
186 debug!(agent = %agent_name, "Session task started");
187
188 match client.receive_response().await {
190 Ok(msgs) => {
191 for msg in msgs {
192 if let Some(out) = message_to_output(&msg)
193 && send_agent_output(&output_tx, out, &alive, &agent_name).await.is_err()
194 {
195 return;
196 }
197 }
198 }
199 Err(e) => {
200 let _ = send_agent_output(&output_tx, AgentOutput::Error(format!("Receive error: {e}")), &alive, &agent_name).await;
201 alive.store(false, Ordering::Relaxed);
202 return;
203 }
204 }
205
206 #[allow(clippy::while_let_loop)]
210 loop {
211 match cmd_rx.recv().await {
212 Some(SessionCommand::SendMessage(msg)) => {
213 if let Err(e) = client.send_message(msg).await {
214 let _ = send_agent_output(&output_tx, AgentOutput::Error(format!("Send error: {e}")), &alive, &agent_name).await;
215 break;
216 }
217 match client.receive_response().await {
218 Ok(msgs) => {
219 for msg in msgs {
220 if let Some(out) = message_to_output(&msg)
221 && send_agent_output(&output_tx, out, &alive, &agent_name)
222 .await
223 .is_err()
224 {
225 let _ = client.disconnect().await;
226 return;
227 }
228 }
229 }
230 Err(e) => {
231 let _ = send_agent_output(&output_tx, AgentOutput::Error(format!("Receive error: {e}")), &alive, &agent_name).await;
232 break;
233 }
234 }
235 }
236 Some(SessionCommand::Shutdown) | None => break,
237 }
238 }
239
240 let _ = client.disconnect().await;
241 alive.store(false, Ordering::Relaxed);
242 debug!(agent = %agent_name, "Session task stopped");
243}
244
245fn message_to_output(msg: &cc_sdk::Message) -> Option<AgentOutput> {
251 match msg {
252 cc_sdk::Message::Assistant { message } => {
253 let text: String = message
254 .content
255 .iter()
256 .filter_map(|block| match block {
257 cc_sdk::ContentBlock::Text(t) => Some(t.text.as_str()),
258 _ => None,
259 })
260 .collect::<Vec<_>>()
261 .join("");
262
263 if text.is_empty() {
264 None
265 } else {
266 Some(AgentOutput::Message(text))
267 }
268 }
269 cc_sdk::Message::Result { is_error, .. } => {
270 if *is_error {
271 Some(AgentOutput::Error(
272 "Agent turn completed with error".into(),
273 ))
274 } else {
275 Some(AgentOutput::TurnComplete)
276 }
277 }
278 _ => None,
279 }
280}
281
282struct ClaudeCodeSession {
288 name: String,
290 cmd_tx: mpsc::Sender<SessionCommand>,
292 output_rx: Option<mpsc::Receiver<AgentOutput>>,
294 alive: Arc<AtomicBool>,
296 task_handle: Option<JoinHandle<()>>,
298}
299
300#[async_trait]
301impl AgentSession for ClaudeCodeSession {
302 fn name(&self) -> &str {
303 &self.name
304 }
305
306 async fn send_input(&mut self, input: &str) -> Result<()> {
307 if !self.alive.load(Ordering::Relaxed) {
308 return Err(Error::AgentNotAlive {
309 name: self.name.clone(),
310 });
311 }
312
313 self.cmd_tx
314 .send(SessionCommand::SendMessage(input.to_string()))
315 .await
316 .map_err(|_| Error::AgentNotAlive {
317 name: self.name.clone(),
318 })?;
319 Ok(())
320 }
321
322 fn output_receiver(&mut self) -> Option<mpsc::Receiver<AgentOutput>> {
323 self.output_rx.take()
324 }
325
326 async fn is_alive(&self) -> bool {
327 self.alive.load(Ordering::Relaxed)
328 }
329
330 async fn shutdown(&mut self) -> Result<()> {
331 info!(agent = %self.name, "Shutting down Claude Code session");
332 self.alive.store(false, Ordering::Relaxed);
333 let _ = self.cmd_tx.send(SessionCommand::Shutdown).await;
334 if let Some(handle) = self.task_handle.take() {
335 let abort_handle = handle.abort_handle();
336 if tokio::time::timeout(Duration::from_secs(10), handle)
337 .await
338 .is_err()
339 {
340 warn!(agent = %self.name, "Session task timed out during shutdown, aborting");
341 abort_handle.abort();
342 }
343 }
344 Ok(())
345 }
346
347 async fn force_kill(&mut self) -> Result<()> {
348 info!(agent = %self.name, "Force-killing Claude Code session");
349 self.alive.store(false, Ordering::Relaxed);
350 if let Some(handle) = self.task_handle.take() {
351 handle.abort();
352 let _ = handle.await;
353 }
354 Ok(())
355 }
356}