use crate::error::{Result, SdkError};
use serde::Serialize;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use tracing::{debug, warn};
pub struct Transport {
child: Child,
stdin: Option<ChildStdin>,
stdout_reader: BufReader<ChildStdout>,
line_buf: String,
stderr_task: Option<tokio::task::JoinHandle<String>>,
}
impl Transport {
pub fn spawn(
cli_path: &str,
extra_args: &[String],
working_dir: Option<&std::path::Path>,
env_vars: &[(String, String)],
) -> Result<Self> {
let mut cmd = Command::new(cli_path);
cmd.args([
"--print",
"--output-format",
"stream-json",
"--input-format",
"stream-json",
"--verbose",
]);
cmd.args(extra_args);
cmd.env_remove("CLAUDECODE");
cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
if let Some(dir) = working_dir {
cmd.current_dir(dir);
}
for (key, value) in env_vars {
cmd.env(key, value);
}
cmd.stdin(std::process::Stdio::piped());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
#[cfg(unix)]
{
unsafe {
cmd.pre_exec(|| {
libc::signal(libc::SIGPIPE, libc::SIG_DFL);
Ok(())
});
}
}
let mut child = cmd.spawn().map_err(SdkError::ProcessSpawn)?;
let stdin = child.stdin.take();
let stdout = child
.stdout
.take()
.expect("stdout was configured as piped but is None");
let stderr = child.stderr.take();
let stdout_reader = BufReader::new(stdout);
let stderr_task = stderr.map(|se| tokio::spawn(drain_stderr(se)));
Ok(Self {
child,
stdin,
stdout_reader,
line_buf: String::with_capacity(4096),
stderr_task,
})
}
pub async fn send(&mut self, msg: &impl Serialize) -> Result<()> {
let stdin = self.stdin.as_mut().ok_or(SdkError::NotConnected)?;
let json = serde_json::to_string(msg)
.map_err(|e| SdkError::ProtocolError(format!("failed to serialize message: {e}")))?;
debug!(json = %json, "-> stdin");
stdin.write_all(json.as_bytes()).await?;
stdin.write_all(b"\n").await?;
stdin.flush().await?;
Ok(())
}
pub async fn recv(&mut self) -> Result<Option<serde_json::Value>> {
loop {
self.line_buf.clear();
let n = self.stdout_reader.read_line(&mut self.line_buf).await?;
if n == 0 {
return Ok(None); }
let line = self.line_buf.trim();
if line.is_empty() {
continue;
}
debug!(line = %line, "stdout <-");
return serde_json::from_str(line)
.map(Some)
.map_err(|e| SdkError::InvalidJson {
message: e.to_string(),
line: line.to_owned(),
source: e,
});
}
}
pub async fn recv_message(&mut self) -> Result<Option<crate::types::Message>> {
let Some(value) = self.recv().await? else {
return Ok(None);
};
serde_json::from_value(value.clone())
.map(Some)
.map_err(|e| SdkError::InvalidJson {
message: format!("failed to parse as Message: {e}"),
line: value.to_string(),
source: e,
})
}
pub fn close_stdin(&mut self) {
self.stdin.take();
}
pub async fn kill(&mut self) -> Result<()> {
self.close_stdin();
self.child.kill().await.map_err(SdkError::Io)
}
pub async fn wait_with_stderr(&mut self) -> Result<(Option<i32>, Option<String>)> {
let status = self.child.wait().await?;
let stderr = if let Some(task) = self.stderr_task.take() {
task.await.ok()
} else {
None
};
Ok((status.code(), stderr))
}
pub fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>> {
self.child.try_wait().map_err(SdkError::Io)
}
#[cfg(unix)]
pub fn interrupt(&self) -> Result<()> {
if let Some(pid) = self.child.id() {
let ret = unsafe { libc::kill(pid as libc::pid_t, libc::SIGINT) };
if ret != 0 {
return Err(SdkError::Io(std::io::Error::last_os_error()));
}
}
Ok(())
}
#[cfg(not(unix))]
pub fn interrupt(&self) -> Result<()> {
Err(SdkError::ProtocolError(
"interrupt is not supported on this platform".to_owned(),
))
}
}
async fn drain_stderr(stderr: ChildStderr) -> String {
let mut reader = BufReader::new(stderr);
let mut buf = String::new();
let mut accumulated = String::new();
loop {
buf.clear();
match reader.read_line(&mut buf).await {
Ok(0) => break, Ok(_) => {
let line = buf.trim_end();
if !line.is_empty() {
eprintln!("[claude stderr] {}", line);
warn!(target: "claude_stderr", "{}", line);
accumulated.push_str(line);
accumulated.push('\n');
}
}
Err(e) => {
warn!(target: "claude_stderr", "error reading stderr: {}", e);
break;
}
}
}
accumulated
}