unified-agent-api-claude-code 0.2.2

Async wrapper around the Claude Code CLI for non-interactive prompting
Documentation
use std::{
    collections::BTreeMap,
    io::{self, Write},
    path::Path,
    process::ExitStatus,
    time::Duration,
};

use tokio::{
    io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
    process::Command,
    task, time,
};

use crate::ClaudeCodeError;

const CLEANUP_GRACE: Duration = Duration::from_secs(2);

async fn join_or_abort<T>(mut handle: tokio::task::JoinHandle<T>, grace: Duration) -> Option<T> {
    if grace.is_zero() {
        handle.abort();
        let _ = handle.await;
        return None;
    }

    tokio::select! {
        output = &mut handle => output.ok(),
        _ = time::sleep(grace) => {
            handle.abort();
            let _ = handle.await;
            None
        }
    }
}

#[derive(Clone, Copy)]
pub(crate) enum ConsoleTarget {
    Stdout,
    Stderr,
}

#[derive(Debug, Clone)]
pub struct CommandOutput {
    pub status: ExitStatus,
    pub stdout: Vec<u8>,
    pub stderr: Vec<u8>,
}

pub(crate) async fn tee_stream<R>(
    mut reader: R,
    target: ConsoleTarget,
    mirror_console: bool,
) -> Result<Vec<u8>, io::Error>
where
    R: AsyncRead + Unpin,
{
    let mut buffer = Vec::new();
    let mut chunk = [0u8; 4096];
    loop {
        let n = reader.read(&mut chunk).await?;
        if n == 0 {
            break;
        }
        if mirror_console {
            task::block_in_place(|| match target {
                ConsoleTarget::Stdout => {
                    let mut out = io::stdout();
                    out.write_all(&chunk[..n])?;
                    out.flush()
                }
                ConsoleTarget::Stderr => {
                    let mut out = io::stderr();
                    out.write_all(&chunk[..n])?;
                    out.flush()
                }
            })?;
        }
        buffer.extend_from_slice(&chunk[..n]);
    }
    Ok(buffer)
}

pub(crate) fn spawn_with_retry(
    command: &mut Command,
    binary: &Path,
) -> Result<tokio::process::Child, ClaudeCodeError> {
    let mut backoff = Duration::from_millis(2);
    for attempt in 0..5 {
        match command.spawn() {
            Ok(child) => return Ok(child),
            Err(source) => {
                let is_busy = matches!(source.kind(), std::io::ErrorKind::ExecutableFileBusy)
                    || source.raw_os_error() == Some(26);
                if is_busy && attempt < 4 {
                    std::thread::sleep(backoff);
                    backoff = std::cmp::min(backoff * 2, Duration::from_millis(50));
                    continue;
                }
                return Err(ClaudeCodeError::Spawn {
                    binary: binary.to_path_buf(),
                    source,
                });
            }
        }
    }

    unreachable!("spawn_with_retry should return before exhausting retries")
}

pub(crate) async fn run_command(
    mut command: Command,
    binary: &Path,
    stdin_bytes: Option<&[u8]>,
    timeout: Option<Duration>,
    mirror_stdout: bool,
    mirror_stderr: bool,
) -> Result<CommandOutput, ClaudeCodeError> {
    command.stdin(if stdin_bytes.is_some() {
        std::process::Stdio::piped()
    } else {
        std::process::Stdio::null()
    });
    command.stdout(std::process::Stdio::piped());
    command.stderr(std::process::Stdio::piped());
    command.kill_on_drop(true);

    let mut child = spawn_with_retry(&mut command, binary)?;

    if let Some(bytes) = stdin_bytes {
        if let Some(mut stdin) = child.stdin.take() {
            stdin
                .write_all(bytes)
                .await
                .map_err(ClaudeCodeError::StdinWrite)?;
        }
    }

    let stdout = child.stdout.take().ok_or(ClaudeCodeError::MissingStdout)?;
    let stderr = child.stderr.take().ok_or(ClaudeCodeError::MissingStderr)?;

    let stdout_task = tokio::spawn(tee_stream(stdout, ConsoleTarget::Stdout, mirror_stdout));
    let stderr_task = tokio::spawn(tee_stream(stderr, ConsoleTarget::Stderr, mirror_stderr));

    let status = if let Some(dur) = timeout {
        match time::timeout(dur, child.wait()).await {
            Ok(Ok(status)) => status,
            Ok(Err(source)) => {
                let _ = child.start_kill();
                let deadline = time::Instant::now() + CLEANUP_GRACE;
                let remaining = deadline.saturating_duration_since(time::Instant::now());
                let _ = time::timeout(remaining, child.wait()).await;
                let remaining = deadline.saturating_duration_since(time::Instant::now());
                let _ = join_or_abort(stdout_task, remaining).await;
                let remaining = deadline.saturating_duration_since(time::Instant::now());
                let _ = join_or_abort(stderr_task, remaining).await;
                return Err(ClaudeCodeError::Wait(source));
            }
            Err(_) => {
                let _ = child.start_kill();
                let deadline = time::Instant::now() + CLEANUP_GRACE;
                let remaining = deadline.saturating_duration_since(time::Instant::now());
                let _ = time::timeout(remaining, child.wait()).await;
                let remaining = deadline.saturating_duration_since(time::Instant::now());
                let _ = join_or_abort(stdout_task, remaining).await;
                let remaining = deadline.saturating_duration_since(time::Instant::now());
                let _ = join_or_abort(stderr_task, remaining).await;
                return Err(ClaudeCodeError::Timeout { timeout: dur });
            }
        }
    } else {
        match child.wait().await {
            Ok(status) => status,
            Err(source) => {
                let _ = child.start_kill();
                let deadline = time::Instant::now() + CLEANUP_GRACE;
                let remaining = deadline.saturating_duration_since(time::Instant::now());
                let _ = time::timeout(remaining, child.wait()).await;
                let remaining = deadline.saturating_duration_since(time::Instant::now());
                let _ = join_or_abort(stdout_task, remaining).await;
                let remaining = deadline.saturating_duration_since(time::Instant::now());
                let _ = join_or_abort(stderr_task, remaining).await;
                return Err(ClaudeCodeError::Wait(source));
            }
        }
    };

    let stdout = stdout_task
        .await
        .map_err(|e| ClaudeCodeError::Join(e.to_string()))?
        .map_err(ClaudeCodeError::StdoutRead)?;
    let stderr = stderr_task
        .await
        .map_err(|e| ClaudeCodeError::Join(e.to_string()))?
        .map_err(ClaudeCodeError::StderrRead)?;

    Ok(CommandOutput {
        status,
        stdout,
        stderr,
    })
}

pub(crate) fn apply_env(command: &mut Command, env: &BTreeMap<String, String>) {
    for (k, v) in env {
        command.env(k, v);
    }
}