use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
use super::{send_agent_output, AgentBackend, AgentOutput, AgentSession, BackendType, SpawnConfig};
use crate::{Error, Result};
const OUTPUT_CHANNEL_SIZE: usize = 256;
const CMD_CHANNEL_SIZE: usize = 16;
enum SessionCommand {
SendMessage(String),
Shutdown,
}
#[derive(Debug)]
pub struct ClaudeCodeBackend {
default_options: Option<cc_sdk::ClaudeCodeOptions>,
}
impl ClaudeCodeBackend {
pub fn new() -> Self {
Self {
default_options: None,
}
}
pub fn with_options(options: cc_sdk::ClaudeCodeOptions) -> Self {
Self {
default_options: Some(options),
}
}
#[allow(deprecated)] fn build_options(&self, config: &SpawnConfig) -> cc_sdk::ClaudeCodeOptions {
let mut opts = self.default_options.clone().unwrap_or_default();
if let Some(ref model) = config.model {
opts.model = Some(model.clone());
}
if let Some(ref cwd) = config.cwd {
opts.cwd = Some(cwd.clone());
}
if let Some(turns) = config.max_turns {
opts.max_turns = Some(turns);
}
if !config.allowed_tools.is_empty() {
opts.allowed_tools = config.allowed_tools.clone();
}
if let Some(ref mode) = config.permission_mode {
opts.permission_mode = match mode.as_str() {
"plan" => cc_sdk::PermissionMode::Plan,
"acceptEdits" => cc_sdk::PermissionMode::AcceptEdits,
"bypassPermissions" => cc_sdk::PermissionMode::BypassPermissions,
_ => cc_sdk::PermissionMode::Default,
};
}
if !config.env.is_empty() {
opts.env.extend(config.env.clone());
}
opts
}
}
impl Default for ClaudeCodeBackend {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl AgentBackend for ClaudeCodeBackend {
fn backend_type(&self) -> BackendType {
BackendType::ClaudeCode
}
async fn spawn(&self, config: SpawnConfig) -> Result<Box<dyn AgentSession>> {
let agent_name = config.name.clone();
let initial_prompt = config.prompt.clone();
let options = self.build_options(&config);
info!(agent = %agent_name, "Spawning Claude Code agent");
let mut client =
cc_sdk::InteractiveClient::new(options).map_err(|e| Error::SpawnFailed {
name: agent_name.clone(),
reason: format!("Failed to create InteractiveClient: {e}"),
})?;
client.connect().await.map_err(|e| Error::SpawnFailed {
name: agent_name.clone(),
reason: format!("Failed to connect: {e}"),
})?;
client
.send_message(initial_prompt)
.await
.map_err(|e| Error::SpawnFailed {
name: agent_name.clone(),
reason: format!("Failed to send initial prompt: {e}"),
})?;
let (cmd_tx, cmd_rx) = mpsc::channel(CMD_CHANNEL_SIZE);
let (output_tx, output_rx) = mpsc::channel(OUTPUT_CHANNEL_SIZE);
let alive = Arc::new(AtomicBool::new(true));
let task_alive = alive.clone();
let task_name = agent_name.clone();
let task_handle = tokio::spawn(session_task(
client, cmd_rx, output_tx, task_alive, task_name,
));
let session = ClaudeCodeSession {
name: agent_name,
cmd_tx,
output_rx: Some(output_rx),
alive,
task_handle: Some(task_handle),
};
Ok(Box::new(session))
}
}
async fn session_task(
mut client: cc_sdk::InteractiveClient,
mut cmd_rx: mpsc::Receiver<SessionCommand>,
output_tx: mpsc::Sender<AgentOutput>,
alive: Arc<AtomicBool>,
agent_name: String,
) {
debug!(agent = %agent_name, "Session task started");
match client.receive_response().await {
Ok(msgs) => {
for msg in msgs {
if let Some(out) = message_to_output(&msg)
&& send_agent_output(&output_tx, out, &alive, &agent_name).await.is_err()
{
return;
}
}
}
Err(e) => {
let _ = send_agent_output(&output_tx, AgentOutput::Error(format!("Receive error: {e}")), &alive, &agent_name).await;
alive.store(false, Ordering::Relaxed);
return;
}
}
#[allow(clippy::while_let_loop)]
loop {
match cmd_rx.recv().await {
Some(SessionCommand::SendMessage(msg)) => {
if let Err(e) = client.send_message(msg).await {
let _ = send_agent_output(&output_tx, AgentOutput::Error(format!("Send error: {e}")), &alive, &agent_name).await;
break;
}
match client.receive_response().await {
Ok(msgs) => {
for msg in msgs {
if let Some(out) = message_to_output(&msg)
&& send_agent_output(&output_tx, out, &alive, &agent_name)
.await
.is_err()
{
let _ = client.disconnect().await;
return;
}
}
}
Err(e) => {
let _ = send_agent_output(&output_tx, AgentOutput::Error(format!("Receive error: {e}")), &alive, &agent_name).await;
break;
}
}
}
Some(SessionCommand::Shutdown) | None => break,
}
}
let _ = client.disconnect().await;
alive.store(false, Ordering::Relaxed);
debug!(agent = %agent_name, "Session task stopped");
}
fn message_to_output(msg: &cc_sdk::Message) -> Option<AgentOutput> {
match msg {
cc_sdk::Message::Assistant { message } => {
let text: String = message
.content
.iter()
.filter_map(|block| match block {
cc_sdk::ContentBlock::Text(t) => Some(t.text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("");
if text.is_empty() {
None
} else {
Some(AgentOutput::Message(text))
}
}
cc_sdk::Message::Result { is_error, .. } => {
if *is_error {
Some(AgentOutput::Error(
"Agent turn completed with error".into(),
))
} else {
Some(AgentOutput::TurnComplete)
}
}
_ => None,
}
}
struct ClaudeCodeSession {
name: String,
cmd_tx: mpsc::Sender<SessionCommand>,
output_rx: Option<mpsc::Receiver<AgentOutput>>,
alive: Arc<AtomicBool>,
task_handle: Option<JoinHandle<()>>,
}
#[async_trait]
impl AgentSession for ClaudeCodeSession {
fn name(&self) -> &str {
&self.name
}
async fn send_input(&mut self, input: &str) -> Result<()> {
if !self.alive.load(Ordering::Relaxed) {
return Err(Error::AgentNotAlive {
name: self.name.clone(),
});
}
self.cmd_tx
.send(SessionCommand::SendMessage(input.to_string()))
.await
.map_err(|_| Error::AgentNotAlive {
name: self.name.clone(),
})?;
Ok(())
}
fn output_receiver(&mut self) -> Option<mpsc::Receiver<AgentOutput>> {
self.output_rx.take()
}
async fn is_alive(&self) -> bool {
self.alive.load(Ordering::Relaxed)
}
async fn shutdown(&mut self) -> Result<()> {
info!(agent = %self.name, "Shutting down Claude Code session");
self.alive.store(false, Ordering::Relaxed);
let _ = self.cmd_tx.send(SessionCommand::Shutdown).await;
if let Some(handle) = self.task_handle.take() {
let abort_handle = handle.abort_handle();
if tokio::time::timeout(Duration::from_secs(10), handle)
.await
.is_err()
{
warn!(agent = %self.name, "Session task timed out during shutdown, aborting");
abort_handle.abort();
}
}
Ok(())
}
async fn force_kill(&mut self) -> Result<()> {
info!(agent = %self.name, "Force-killing Claude Code session");
self.alive.store(false, Ordering::Relaxed);
if let Some(handle) = self.task_handle.take() {
handle.abort();
let _ = handle.await;
}
Ok(())
}
}