Skip to main content

cli_agents/adapters/
mod.rs

1mod claude;
2mod codex;
3mod gemini;
4
5pub use claude::ClaudeAdapter;
6pub use codex::CodexAdapter;
7pub use gemini::GeminiAdapter;
8
9use crate::error::{Error, Result};
10use crate::events::StreamEvent;
11use crate::types::{CliName, RunOptions, RunResult};
12use std::collections::HashMap;
13use tokio::io::{AsyncBufReadExt, BufReader};
14use tokio::process::Command;
15use tracing::{debug, warn};
16
17/// Trait implemented by each CLI adapter.
18pub trait CliAdapter: Send + Sync {
19    fn name(&self) -> CliName;
20
21    fn run(
22        &self,
23        opts: &RunOptions,
24        emit: &(dyn Fn(StreamEvent) + Send + Sync),
25        cancel: tokio_util::sync::CancellationToken,
26    ) -> impl std::future::Future<Output = crate::error::Result<RunResult>> + Send;
27}
28
29/// Get the adapter for a given CLI.
30pub(crate) fn get_adapter(cli: CliName) -> Box<dyn CliAdapterBoxed> {
31    match cli {
32        CliName::Claude => Box::new(ClaudeAdapter),
33        CliName::Codex => Box::new(CodexAdapter),
34        CliName::Gemini => Box::new(GeminiAdapter),
35    }
36}
37
38/// Object-safe version of [`CliAdapter`] for dynamic dispatch.
39///
40/// Needed because `CliAdapter::run` uses RPITIT (`impl Future`), which makes
41/// the trait non-object-safe. This wrapper boxes the future for `dyn` dispatch.
42/// The blanket impl below bridges the two automatically.
43#[allow(dead_code)]
44pub(crate) trait CliAdapterBoxed: Send + Sync {
45    fn name(&self) -> CliName;
46
47    fn run_boxed<'a>(
48        &'a self,
49        opts: &'a RunOptions,
50        emit: &'a (dyn Fn(StreamEvent) + Send + Sync),
51        cancel: tokio_util::sync::CancellationToken,
52    ) -> std::pin::Pin<
53        Box<dyn std::future::Future<Output = crate::error::Result<RunResult>> + Send + 'a>,
54    >;
55}
56
57impl<T: CliAdapter> CliAdapterBoxed for T {
58    fn name(&self) -> CliName {
59        CliAdapter::name(self)
60    }
61
62    fn run_boxed<'a>(
63        &'a self,
64        opts: &'a RunOptions,
65        emit: &'a (dyn Fn(StreamEvent) + Send + Sync),
66        cancel: tokio_util::sync::CancellationToken,
67    ) -> std::pin::Pin<
68        Box<dyn std::future::Future<Output = crate::error::Result<RunResult>> + Send + 'a>,
69    > {
70        Box::pin(self.run(opts, emit, cancel))
71    }
72}
73
74// ── Shared subprocess infrastructure ──
75
76/// Outcome of a spawned CLI process.
77pub(crate) enum SpawnOutcome {
78    /// Process exited normally.
79    Done {
80        exit_code: i32,
81        stderr: Option<String>,
82    },
83    /// Process was cancelled via the cancellation token.
84    Cancelled,
85}
86
87/// Parameters for [`spawn_and_stream`].
88pub(crate) struct SpawnParams<'a> {
89    pub cli_label: &'a str,
90    pub binary: &'a str,
91    pub args: &'a [String],
92    pub extra_env: &'a HashMap<String, String>,
93    pub cwd: &'a str,
94    pub max_bytes: usize,
95    pub cancel: &'a tokio_util::sync::CancellationToken,
96}
97
98/// Spawn a CLI subprocess and stream its stdout line-by-line.
99///
100/// Handles the boilerplate shared across all adapters: process spawning,
101/// stdout buffering with size limits, stderr collection, and cancellation.
102/// Does **not** clone the parent process environment — `Command` inherits it
103/// automatically; only `extra_env` entries are added.
104pub(crate) async fn spawn_and_stream(
105    params: SpawnParams<'_>,
106    mut on_line: impl FnMut(&str) + Send,
107) -> Result<SpawnOutcome> {
108    let SpawnParams {
109        cli_label,
110        binary,
111        args,
112        extra_env,
113        cwd,
114        max_bytes,
115        cancel,
116    } = params;
117    debug!(cli = cli_label, binary = %binary, args = ?args, "spawning CLI");
118
119    let mut child = Command::new(binary)
120        .args(args)
121        .envs(extra_env)
122        .current_dir(cwd)
123        .stdin(std::process::Stdio::null())
124        .stdout(std::process::Stdio::piped())
125        .stderr(std::process::Stdio::piped())
126        .kill_on_drop(true)
127        .spawn()
128        .map_err(|e| Error::Process(format!("failed to spawn {cli_label}: {e}")))?;
129
130    let stdout = child.stdout.take().expect("stdout piped");
131    let stderr = child.stderr.take().expect("stderr piped");
132
133    let stderr_handle = tokio::spawn(async move {
134        let mut reader = BufReader::new(stderr);
135        let mut buf = String::new();
136        while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {}
137        buf
138    });
139
140    let mut reader = BufReader::new(stdout);
141    let mut line = String::new();
142    let mut total_bytes: usize = 0;
143
144    loop {
145        line.clear();
146        tokio::select! {
147            result = reader.read_line(&mut line) => {
148                match result {
149                    Ok(0) => break,
150                    Ok(n) => {
151                        total_bytes += n;
152                        if total_bytes > max_bytes {
153                            warn!(cli = cli_label, total_bytes, max_bytes, "output exceeded max buffer size");
154                            let _ = child.kill().await;
155                            return Err(Error::Process(format!(
156                                "output exceeded max buffer size ({max_bytes} bytes)"
157                            )));
158                        }
159                        on_line(line.trim());
160                    }
161                    Err(e) => {
162                        warn!(cli = cli_label, error = %e, "error reading stdout");
163                        break;
164                    }
165                }
166            }
167            _ = cancel.cancelled() => {
168                let _ = child.kill().await;
169                return Ok(SpawnOutcome::Cancelled);
170            }
171        }
172    }
173
174    let status = child.wait().await.map_err(Error::Io)?;
175    let exit_code = status.code().unwrap_or(1);
176    let stderr_text = stderr_handle.await.unwrap_or_default();
177
178    Ok(SpawnOutcome::Done {
179        exit_code,
180        stderr: if stderr_text.is_empty() {
181            None
182        } else {
183            Some(stderr_text)
184        },
185    })
186}