use std::path::PathBuf;
use anyhow::Result;
use async_trait::async_trait;
use serde_json::Value;
use tokio::process::Command;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::state::TokenUsage;
use super::{
subprocess::{self, SubprocessOutcome},
Agent, AgentEvent, AgentOutcome, AgentRequest, StopReason,
};
const DEFAULT_BINARY: &str = "claude";
const ERROR_TAIL_LINES: usize = 8;
#[derive(Debug, Clone)]
pub struct ClaudeCodeAgent {
binary: PathBuf,
permission_mode: Option<String>,
extra_args: Vec<String>,
model_override: Option<String>,
}
impl ClaudeCodeAgent {
pub fn new() -> Self {
Self {
binary: PathBuf::from(DEFAULT_BINARY),
permission_mode: None,
extra_args: Vec::new(),
model_override: None,
}
}
pub fn with_binary(binary: impl Into<PathBuf>) -> Self {
Self {
binary: binary.into(),
permission_mode: None,
extra_args: Vec::new(),
model_override: None,
}
}
pub fn with_permission_mode(mut self, mode: impl Into<String>) -> Self {
self.permission_mode = Some(mode.into());
self
}
pub fn with_extra_args(mut self, args: Vec<String>) -> Self {
self.extra_args = args;
self
}
pub fn with_model_override(mut self, model: impl Into<String>) -> Self {
self.model_override = Some(model.into());
self
}
pub fn binary(&self) -> &PathBuf {
&self.binary
}
}
impl Default for ClaudeCodeAgent {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Agent for ClaudeCodeAgent {
fn name(&self) -> &str {
"claude-code"
}
async fn run(
&self,
req: AgentRequest,
events: mpsc::Sender<AgentEvent>,
cancel: CancellationToken,
) -> Result<AgentOutcome> {
let log_path = req.log_path.clone();
let cmd = self.build_command(&req);
let (raw_tx, mut raw_rx) = mpsc::channel::<AgentEvent>(64);
let outbound = events.clone();
let forwarder = tokio::spawn(async move {
let mut tokens = TokenUsage::default();
let mut error_message: Option<String> = None;
let mut stderr_tail: Vec<String> = Vec::new();
while let Some(ev) = raw_rx.recv().await {
match ev {
AgentEvent::Stdout(line) => {
handle_stdout_line(&line, &outbound, &mut tokens, &mut error_message).await;
}
AgentEvent::Stderr(line) => {
push_tail(&mut stderr_tail, line.clone(), ERROR_TAIL_LINES);
let _ = outbound.send(AgentEvent::Stderr(line)).await;
}
other => {
let _ = outbound.send(other).await;
}
}
}
ForwarderResult {
tokens,
error_message,
stderr_tail,
}
});
let sub_outcome: SubprocessOutcome =
subprocess::run_logged(cmd, &log_path, raw_tx, cancel, req.timeout).await?;
let ForwarderResult {
mut tokens,
error_message,
stderr_tail,
} = forwarder.await.unwrap_or(ForwarderResult {
tokens: TokenUsage::default(),
error_message: None,
stderr_tail: Vec::new(),
});
if tokens.input > 0 || tokens.output > 0 {
tokens
.by_role
.entry(req.role.as_str().to_string())
.or_default();
let entry = tokens
.by_role
.get_mut(req.role.as_str())
.expect("just inserted");
entry.input = tokens.input;
entry.output = tokens.output;
}
let stop_reason = match sub_outcome.stop_reason {
StopReason::Completed => {
if sub_outcome.exit_code == 0 && error_message.is_none() {
StopReason::Completed
} else {
StopReason::Error(format_error_message(
sub_outcome.exit_code,
error_message.as_deref(),
&stderr_tail,
))
}
}
other => other,
};
Ok(AgentOutcome {
exit_code: sub_outcome.exit_code,
stop_reason,
tokens,
log_path,
})
}
}
pub fn resolve_permission_mode(model: &str) -> &'static str {
if model.to_ascii_lowercase().contains("opus") {
"auto"
} else {
"acceptEdits"
}
}
impl ClaudeCodeAgent {
fn build_command(&self, req: &AgentRequest) -> Command {
let mut cmd = Command::new(&self.binary);
cmd.current_dir(&req.workdir);
if !req.env.is_empty() {
cmd.envs(req.env.iter());
}
cmd.args(["--print", "--output-format", "stream-json", "--verbose"]);
let model = self.model_override.as_deref().unwrap_or(&req.model);
cmd.args(["--model", model]);
let permission_mode = self
.permission_mode
.as_deref()
.unwrap_or_else(|| resolve_permission_mode(model));
cmd.args(["--permission-mode", permission_mode]);
if !req.system_prompt.is_empty() {
cmd.arg("--append-system-prompt").arg(&req.system_prompt);
}
for arg in &self.extra_args {
cmd.arg(arg);
}
cmd.arg("--").arg(&req.user_prompt);
cmd
}
}
struct ForwarderResult {
tokens: TokenUsage,
error_message: Option<String>,
stderr_tail: Vec<String>,
}
async fn handle_stdout_line(
line: &str,
outbound: &mpsc::Sender<AgentEvent>,
tokens: &mut TokenUsage,
error_message: &mut Option<String>,
) {
let trimmed = line.trim();
if trimmed.is_empty() {
return;
}
let parsed: Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(_) => {
let _ = outbound.send(AgentEvent::Stdout(line.to_string())).await;
return;
}
};
let kind = parsed.get("type").and_then(Value::as_str).unwrap_or("");
match kind {
"assistant" => {
if let Some(content) = parsed
.get("message")
.and_then(|m| m.get("content"))
.and_then(Value::as_array)
{
for block in content {
let bk = block.get("type").and_then(Value::as_str).unwrap_or("");
match bk {
"text" => {
if let Some(text) = block.get("text").and_then(Value::as_str) {
if !text.is_empty() {
let _ =
outbound.send(AgentEvent::Stdout(text.to_string())).await;
}
}
}
"tool_use" => {
if let Some(name) = block.get("name").and_then(Value::as_str) {
let _ = outbound.send(AgentEvent::ToolUse(name.to_string())).await;
}
}
_ => {}
}
}
}
}
"result" => {
if let Some(usage) = parsed.get("usage") {
let new_input = sum_input_tokens(usage);
let new_output = read_u64(usage, "output_tokens");
if new_input != tokens.input || new_output != tokens.output {
tokens.input = new_input;
tokens.output = new_output;
let _ = outbound.send(AgentEvent::TokenDelta(tokens.clone())).await;
}
}
let is_error = parsed
.get("is_error")
.and_then(Value::as_bool)
.unwrap_or(false);
if is_error {
let msg = parsed
.get("result")
.and_then(Value::as_str)
.map(str::to_string)
.unwrap_or_else(|| {
parsed
.get("subtype")
.and_then(Value::as_str)
.unwrap_or("claude reported an error")
.to_string()
});
*error_message = Some(msg);
}
}
_ => {}
}
}
fn sum_input_tokens(usage: &Value) -> u64 {
read_u64(usage, "input_tokens")
+ read_u64(usage, "cache_creation_input_tokens")
+ read_u64(usage, "cache_read_input_tokens")
}
fn read_u64(v: &Value, key: &str) -> u64 {
v.get(key).and_then(Value::as_u64).unwrap_or(0)
}
fn push_tail(buf: &mut Vec<String>, line: String, max: usize) {
if buf.len() == max {
buf.remove(0);
}
buf.push(line);
}
fn format_error_message(exit_code: i32, parsed: Option<&str>, stderr_tail: &[String]) -> String {
let mut out = match parsed {
Some(m) if !m.is_empty() => format!("claude: {} (exit {})", m, exit_code),
_ => format!("claude exited with code {}", exit_code),
};
if !stderr_tail.is_empty() {
out.push_str("\nstderr tail:\n");
for line in stderr_tail {
out.push_str(line);
out.push('\n');
}
}
out
}
#[cfg(all(test, unix))]
mod tests {
use super::*;
use crate::agent::Role;
use std::path::PathBuf;
use std::time::Duration;
fn fixture_path(name: &str) -> PathBuf {
let manifest = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
manifest.join("tests").join("fixtures").join(name)
}
fn req_with_log(log_path: PathBuf, timeout: Duration) -> AgentRequest {
AgentRequest {
role: Role::Implementer,
model: "claude-haiku-test".into(),
system_prompt: "be brief".into(),
user_prompt: "say hi".into(),
workdir: std::env::temp_dir(),
log_path,
timeout,
env: std::collections::HashMap::new(),
}
}
async fn drain<T>(mut rx: mpsc::Receiver<T>) -> Vec<T> {
let mut out = Vec::new();
while let Some(v) = rx.recv().await {
out.push(v);
}
out
}
#[tokio::test]
async fn parses_assistant_text_and_tool_use_blocks() {
let dir = tempfile::tempdir().unwrap();
let log = dir.path().join("run.log");
let agent = ClaudeCodeAgent::with_binary(fixture_path("fake-claude-success.sh"));
let (tx, rx) = mpsc::channel(64);
let cancel = CancellationToken::new();
let outcome = agent
.run(
req_with_log(log.clone(), Duration::from_secs(5)),
tx,
cancel,
)
.await
.unwrap();
assert_eq!(outcome.stop_reason, StopReason::Completed);
assert_eq!(outcome.exit_code, 0);
let evs = drain(rx).await;
let stdouts: Vec<&str> = evs
.iter()
.filter_map(|e| match e {
AgentEvent::Stdout(s) => Some(s.as_str()),
_ => None,
})
.collect();
let tool_uses: Vec<&str> = evs
.iter()
.filter_map(|e| match e {
AgentEvent::ToolUse(s) => Some(s.as_str()),
_ => None,
})
.collect();
let token_deltas: Vec<&TokenUsage> = evs
.iter()
.filter_map(|e| match e {
AgentEvent::TokenDelta(t) => Some(t),
_ => None,
})
.collect();
assert!(
stdouts.iter().any(|s| s.contains("Hello from Claude")),
"missing assistant text: {stdouts:?}"
);
assert_eq!(tool_uses, vec!["Bash", "Read"]);
assert_eq!(token_deltas.len(), 1);
let total = token_deltas[0];
assert_eq!(total.input, 35);
assert_eq!(total.output, 51);
assert_eq!(outcome.tokens.input, 35);
assert_eq!(outcome.tokens.output, 51);
let role_usage = outcome
.tokens
.by_role
.get("implementer")
.expect("implementer role usage");
assert_eq!(role_usage.input, 35);
assert_eq!(role_usage.output, 51);
let log_text = std::fs::read_to_string(&log).unwrap();
assert!(log_text.contains("\"type\":\"assistant\""), "{log_text}");
assert!(log_text.contains("\"type\":\"result\""), "{log_text}");
}
#[tokio::test]
async fn maps_is_error_result_to_error_stop_reason() {
let dir = tempfile::tempdir().unwrap();
let log = dir.path().join("run.log");
let agent = ClaudeCodeAgent::with_binary(fixture_path("fake-claude-error.sh"));
let (tx, _rx) = mpsc::channel(64);
let cancel = CancellationToken::new();
let outcome = agent
.run(req_with_log(log, Duration::from_secs(5)), tx, cancel)
.await
.unwrap();
match outcome.stop_reason {
StopReason::Error(msg) => {
assert!(
msg.contains("rate limit"),
"expected rate limit message, got: {msg}"
);
}
other => panic!("expected Error, got {other:?}"),
}
assert_eq!(outcome.exit_code, 2);
}
#[tokio::test]
async fn nonjson_stdout_is_forwarded_verbatim() {
let dir = tempfile::tempdir().unwrap();
let log = dir.path().join("run.log");
let agent = ClaudeCodeAgent::with_binary(fixture_path("fake-claude-nonjson.sh"));
let (tx, rx) = mpsc::channel(64);
let cancel = CancellationToken::new();
let outcome = agent
.run(req_with_log(log, Duration::from_secs(5)), tx, cancel)
.await
.unwrap();
assert_eq!(outcome.stop_reason, StopReason::Completed);
let evs = drain(rx).await;
let stdouts: Vec<&str> = evs
.iter()
.filter_map(|e| match e {
AgentEvent::Stdout(s) => Some(s.as_str()),
_ => None,
})
.collect();
assert!(
stdouts.contains(&"not-json output line"),
"expected raw line, got {stdouts:?}"
);
}
#[tokio::test]
async fn nonzero_exit_without_result_event_maps_to_error() {
let dir = tempfile::tempdir().unwrap();
let log = dir.path().join("run.log");
let agent = ClaudeCodeAgent::with_binary(fixture_path("fake-claude-crash.sh"));
let (tx, _rx) = mpsc::channel(64);
let cancel = CancellationToken::new();
let outcome = agent
.run(req_with_log(log, Duration::from_secs(5)), tx, cancel)
.await
.unwrap();
match outcome.stop_reason {
StopReason::Error(msg) => {
assert!(msg.contains("exit"), "{msg}");
assert!(msg.contains("authentication required"), "{msg}");
}
other => panic!("expected Error, got {other:?}"),
}
assert_eq!(outcome.exit_code, 1);
}
#[tokio::test]
async fn cancellation_propagates_to_child_process() {
let dir = tempfile::tempdir().unwrap();
let log = dir.path().join("run.log");
let agent = ClaudeCodeAgent::with_binary(fixture_path("fake-claude-hang.sh"));
let (tx, _rx) = mpsc::channel(64);
let cancel = CancellationToken::new();
let canceler = cancel.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(80)).await;
canceler.cancel();
});
let outcome = agent
.run(req_with_log(log, Duration::from_secs(30)), tx, cancel)
.await
.unwrap();
assert_eq!(outcome.stop_reason, StopReason::Cancelled);
assert_eq!(outcome.exit_code, -1);
}
#[tokio::test]
async fn build_command_includes_required_flags_and_workdir() {
let agent = ClaudeCodeAgent::with_binary("/usr/local/bin/claude")
.with_permission_mode("acceptEdits");
let dir = tempfile::tempdir().unwrap();
let log = dir.path().join("run.log");
let req = AgentRequest {
role: Role::Auditor,
model: "claude-opus-4-7".into(),
system_prompt: "system body".into(),
user_prompt: "user body".into(),
workdir: dir.path().to_path_buf(),
log_path: log,
timeout: Duration::from_secs(1),
env: std::collections::HashMap::new(),
};
let cmd = agent.build_command(&req);
let std_cmd = cmd.as_std();
let args: Vec<String> = std_cmd
.get_args()
.map(|a| a.to_string_lossy().into_owned())
.collect();
assert!(args.iter().any(|a| a == "--print"));
assert!(args
.windows(2)
.any(|w| w[0] == "--output-format" && w[1] == "stream-json"));
assert!(args.iter().any(|a| a == "--verbose"));
assert!(args
.windows(2)
.any(|w| w[0] == "--model" && w[1] == "claude-opus-4-7"));
assert!(args
.windows(2)
.any(|w| w[0] == "--permission-mode" && w[1] == "acceptEdits"));
assert!(args
.windows(2)
.any(|w| w[0] == "--append-system-prompt" && w[1] == "system body"));
assert!(args.windows(2).any(|w| w[0] == "--" && w[1] == "user body"));
assert_eq!(std_cmd.get_program(), "/usr/local/bin/claude");
assert_eq!(std_cmd.get_current_dir(), Some(dir.path()));
}
#[tokio::test]
async fn build_command_applies_model_override_and_extra_args() {
let agent = ClaudeCodeAgent::with_binary("claude")
.with_extra_args(vec!["--max-turns".into(), "50".into()])
.with_model_override("claude-opus-4-7");
let dir = tempfile::tempdir().unwrap();
let log = dir.path().join("run.log");
let req = AgentRequest {
role: Role::Implementer,
model: "role-default-model".into(),
system_prompt: "sys".into(),
user_prompt: "u".into(),
workdir: dir.path().to_path_buf(),
log_path: log,
timeout: Duration::from_secs(1),
env: std::collections::HashMap::new(),
};
let cmd = agent.build_command(&req);
let args: Vec<String> = cmd
.as_std()
.get_args()
.map(|a| a.to_string_lossy().into_owned())
.collect();
assert!(
args.windows(2)
.any(|w| w[0] == "--model" && w[1] == "claude-opus-4-7"),
"model override should win over req.model: {args:?}"
);
assert!(
!args.iter().any(|a| a == "role-default-model"),
"req.model must not leak when override is set: {args:?}"
);
assert!(
args.windows(2)
.any(|w| w[0] == "--max-turns" && w[1] == "50"),
"extra_args missing: {args:?}"
);
let max_turns_idx = args.iter().position(|a| a == "--max-turns").unwrap();
let dashdash_idx = args.iter().position(|a| a == "--").unwrap();
assert!(
max_turns_idx < dashdash_idx,
"extra_args must appear before the positional `--` sigil: {args:?}"
);
}
#[test]
fn resolve_permission_mode_picks_auto_for_opus_and_accept_edits_otherwise() {
assert_eq!(resolve_permission_mode("claude-opus-4-7"), "auto");
assert_eq!(resolve_permission_mode("claude-opus-4-6"), "auto");
assert_eq!(resolve_permission_mode("opus"), "auto");
assert_eq!(resolve_permission_mode("CLAUDE-OPUS-4-7"), "auto");
assert_eq!(resolve_permission_mode("claude-sonnet-4-6"), "acceptEdits");
assert_eq!(resolve_permission_mode("claude-haiku-4-5"), "acceptEdits");
assert_eq!(resolve_permission_mode("gpt-5"), "acceptEdits");
}
#[tokio::test]
async fn build_command_picks_per_model_permission_mode_without_explicit_override() {
let dir = tempfile::tempdir().unwrap();
let agent = ClaudeCodeAgent::with_binary("claude");
let opus_req = AgentRequest {
role: Role::Implementer,
model: "claude-opus-4-7".into(),
system_prompt: String::new(),
user_prompt: "u".into(),
workdir: dir.path().to_path_buf(),
log_path: dir.path().join("opus.log"),
timeout: Duration::from_secs(1),
env: std::collections::HashMap::new(),
};
let opus_args: Vec<String> = agent
.build_command(&opus_req)
.as_std()
.get_args()
.map(|a| a.to_string_lossy().into_owned())
.collect();
assert!(
opus_args
.windows(2)
.any(|w| w[0] == "--permission-mode" && w[1] == "auto"),
"opus dispatch must default to auto: {opus_args:?}"
);
let sonnet_req = AgentRequest {
role: Role::Auditor,
model: "claude-sonnet-4-6".into(),
system_prompt: String::new(),
user_prompt: "u".into(),
workdir: dir.path().to_path_buf(),
log_path: dir.path().join("sonnet.log"),
timeout: Duration::from_secs(1),
env: std::collections::HashMap::new(),
};
let sonnet_args: Vec<String> = agent
.build_command(&sonnet_req)
.as_std()
.get_args()
.map(|a| a.to_string_lossy().into_owned())
.collect();
assert!(
sonnet_args
.windows(2)
.any(|w| w[0] == "--permission-mode" && w[1] == "acceptEdits"),
"sonnet dispatch must default to acceptEdits: {sonnet_args:?}"
);
}
#[tokio::test]
async fn explicit_permission_mode_override_beats_per_model_default() {
let dir = tempfile::tempdir().unwrap();
let agent =
ClaudeCodeAgent::with_binary("claude").with_permission_mode("bypassPermissions");
let req = AgentRequest {
role: Role::Auditor,
model: "claude-sonnet-4-6".into(),
system_prompt: String::new(),
user_prompt: "u".into(),
workdir: dir.path().to_path_buf(),
log_path: dir.path().join("run.log"),
timeout: Duration::from_secs(1),
env: std::collections::HashMap::new(),
};
let args: Vec<String> = agent
.build_command(&req)
.as_std()
.get_args()
.map(|a| a.to_string_lossy().into_owned())
.collect();
assert!(
args.windows(2)
.any(|w| w[0] == "--permission-mode" && w[1] == "bypassPermissions"),
"explicit override must win: {args:?}"
);
assert!(
!args
.windows(2)
.any(|w| w[0] == "--permission-mode" && (w[1] == "auto" || w[1] == "acceptEdits")),
"per-model default must not also appear: {args:?}"
);
}
#[tokio::test]
async fn build_command_omits_append_system_prompt_when_empty() {
let agent = ClaudeCodeAgent::with_binary("claude");
let dir = tempfile::tempdir().unwrap();
let log = dir.path().join("run.log");
let req = AgentRequest {
role: Role::Implementer,
model: "claude-sonnet".into(),
system_prompt: String::new(),
user_prompt: "u".into(),
workdir: dir.path().to_path_buf(),
log_path: log,
timeout: Duration::from_secs(1),
env: std::collections::HashMap::new(),
};
let cmd = agent.build_command(&req);
let args: Vec<String> = cmd
.as_std()
.get_args()
.map(|a| a.to_string_lossy().into_owned())
.collect();
assert!(!args.iter().any(|a| a == "--append-system-prompt"));
}
#[tokio::test]
async fn real_claude_smoke_test() {
if std::env::var("PITBOSS_REAL_AGENT_TESTS").ok().as_deref() != Some("1") {
eprintln!("skipping real_claude_smoke_test (set PITBOSS_REAL_AGENT_TESTS=1 to run)");
return;
}
let dir = tempfile::tempdir().unwrap();
let log = dir.path().join("run.log");
let agent = ClaudeCodeAgent::new();
let (tx, _rx) = mpsc::channel(64);
let cancel = CancellationToken::new();
let req = AgentRequest {
role: Role::Implementer,
model: "haiku".into(),
system_prompt: String::new(),
user_prompt: "respond with the single word OK".into(),
workdir: dir.path().to_path_buf(),
log_path: log,
timeout: Duration::from_secs(120),
env: std::collections::HashMap::new(),
};
let outcome = agent.run(req, tx, cancel).await.unwrap();
assert!(
matches!(outcome.stop_reason, StopReason::Completed),
"real claude run did not complete: {:?}",
outcome.stop_reason
);
assert_eq!(outcome.exit_code, 0);
assert!(outcome.tokens.output > 0, "no output tokens reported");
}
}