Skip to main content

cli_agents/adapters/
mod.rs

1mod claude;
2mod codex;
3mod gemini;
4
5pub use claude::ClaudeAdapter;
6pub use codex::CodexAdapter;
7pub use gemini::GeminiAdapter;
8
9use crate::error::{Error, Result};
10use crate::events::StreamEvent;
11use crate::types::{CliName, RunOptions, RunResult};
12use std::collections::HashMap;
13use tokio::io::{AsyncBufReadExt, BufReader};
14use tokio::process::Command;
15use tracing::{debug, warn};
16
17/// Trait implemented by each CLI adapter.
18pub trait CliAdapter: Send + Sync {
19    fn name(&self) -> CliName;
20
21    fn run(
22        &self,
23        opts: &RunOptions,
24        emit: &(dyn Fn(StreamEvent) + Send + Sync),
25        cancel: tokio_util::sync::CancellationToken,
26    ) -> impl std::future::Future<Output = crate::error::Result<RunResult>> + Send;
27}
28
29/// Get the adapter for a given CLI.
30pub(crate) fn get_adapter(cli: CliName) -> Box<dyn CliAdapterBoxed> {
31    match cli {
32        CliName::Claude => Box::new(ClaudeAdapter),
33        CliName::Codex => Box::new(CodexAdapter),
34        CliName::Gemini => Box::new(GeminiAdapter),
35    }
36}
37
38/// Object-safe version of [`CliAdapter`] for dynamic dispatch.
39///
40/// Needed because `CliAdapter::run` uses RPITIT (`impl Future`), which makes
41/// the trait non-object-safe. This wrapper boxes the future for `dyn` dispatch.
42/// The blanket impl below bridges the two automatically.
43#[allow(dead_code)]
44pub(crate) trait CliAdapterBoxed: Send + Sync {
45    fn name(&self) -> CliName;
46
47    fn run_boxed<'a>(
48        &'a self,
49        opts: &'a RunOptions,
50        emit: &'a (dyn Fn(StreamEvent) + Send + Sync),
51        cancel: tokio_util::sync::CancellationToken,
52    ) -> std::pin::Pin<
53        Box<dyn std::future::Future<Output = crate::error::Result<RunResult>> + Send + 'a>,
54    >;
55}
56
57impl<T: CliAdapter> CliAdapterBoxed for T {
58    fn name(&self) -> CliName {
59        CliAdapter::name(self)
60    }
61
62    fn run_boxed<'a>(
63        &'a self,
64        opts: &'a RunOptions,
65        emit: &'a (dyn Fn(StreamEvent) + Send + Sync),
66        cancel: tokio_util::sync::CancellationToken,
67    ) -> std::pin::Pin<
68        Box<dyn std::future::Future<Output = crate::error::Result<RunResult>> + Send + 'a>,
69    > {
70        Box::pin(self.run(opts, emit, cancel))
71    }
72}
73
74// ── Shared subprocess infrastructure ──
75
76/// Outcome of a spawned CLI process.
77pub(crate) enum SpawnOutcome {
78    /// Process exited normally.
79    Done {
80        exit_code: i32,
81        stderr: Option<String>,
82    },
83    /// Process was cancelled via the cancellation token.
84    Cancelled,
85}
86
87/// Parameters for [`spawn_and_stream`].
88pub(crate) struct SpawnParams<'a> {
89    pub cli_label: &'a str,
90    pub binary: &'a str,
91    pub args: &'a [String],
92    pub extra_env: &'a HashMap<String, String>,
93    pub cwd: &'a str,
94    pub max_bytes: usize,
95    pub cancel: &'a tokio_util::sync::CancellationToken,
96}
97
98/// Spawn a CLI subprocess and stream its stdout line-by-line.
99///
100/// Handles the boilerplate shared across all adapters: process spawning,
101/// stdout buffering with size limits, stderr collection, and cancellation.
102/// Does **not** clone the parent process environment — `Command` inherits it
103/// automatically; only `extra_env` entries are added.
104pub(crate) async fn spawn_and_stream(
105    params: SpawnParams<'_>,
106    mut on_line: impl FnMut(&str) + Send,
107) -> Result<SpawnOutcome> {
108    let SpawnParams {
109        cli_label,
110        binary,
111        args,
112        extra_env,
113        cwd,
114        max_bytes,
115        cancel,
116    } = params;
117    debug!(cli = cli_label, binary = %binary, args = ?args, "spawning CLI");
118
119    let mut cmd = Command::new(binary);
120    cmd.args(args)
121        .envs(extra_env)
122        .current_dir(cwd)
123        .stdin(std::process::Stdio::null())
124        .stdout(std::process::Stdio::piped())
125        .stderr(std::process::Stdio::piped())
126        .kill_on_drop(true);
127
128    #[cfg(unix)]
129    {
130        unsafe {
131            cmd.pre_exec(|| {
132                if libc::setpgid(0, 0) != 0 {
133                    return Err(std::io::Error::last_os_error());
134                }
135                Ok(())
136            });
137        }
138    }
139
140    let mut child = cmd
141        .spawn()
142        .map_err(|e| Error::Process(format!("failed to spawn {cli_label}: {e}")))?;
143
144    let child_pid = child.id();
145
146    let stdout = child.stdout.take().expect("stdout piped");
147    let stderr = child.stderr.take().expect("stderr piped");
148
149    let stderr_handle = tokio::spawn(async move {
150        let mut reader = BufReader::new(stderr);
151        let mut buf = String::new();
152        while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {}
153        buf
154    });
155
156    let mut reader = BufReader::new(stdout);
157    let mut line = String::new();
158    let mut total_bytes: usize = 0;
159
160    loop {
161        line.clear();
162        tokio::select! {
163            result = reader.read_line(&mut line) => {
164                match result {
165                    Ok(0) => break,
166                    Ok(n) => {
167                        total_bytes += n;
168                        if total_bytes > max_bytes {
169                            warn!(cli = cli_label, total_bytes, max_bytes, "output exceeded max buffer size");
170                            kill_process_group(&mut child, child_pid).await;
171                            return Err(Error::Process(format!(
172                                "output exceeded max buffer size ({max_bytes} bytes)"
173                            )));
174                        }
175                        on_line(line.trim());
176                    }
177                    Err(e) => {
178                        warn!(cli = cli_label, error = %e, "error reading stdout");
179                        break;
180                    }
181                }
182            }
183            _ = cancel.cancelled() => {
184                kill_process_group(&mut child, child_pid).await;
185                return Ok(SpawnOutcome::Cancelled);
186            }
187        }
188    }
189
190    let status = child.wait().await.map_err(Error::Io)?;
191    let exit_code = status.code().unwrap_or(1);
192    let stderr_text = stderr_handle.await.unwrap_or_default();
193
194    Ok(SpawnOutcome::Done {
195        exit_code,
196        stderr: if stderr_text.is_empty() {
197            None
198        } else {
199            Some(stderr_text)
200        },
201    })
202}
203
204async fn kill_process_group(child: &mut tokio::process::Child, pid: Option<u32>) {
205    #[cfg(unix)]
206    {
207        if let Some(pid) = pid {
208            unsafe {
209                libc::killpg(pid as libc::pid_t, libc::SIGKILL);
210            }
211        }
212    }
213    let _ = child.kill().await;
214}