cli_agents/adapters/
mod.rs1mod claude;
2mod codex;
3mod gemini;
4
5pub use claude::ClaudeAdapter;
6pub use codex::CodexAdapter;
7pub use gemini::GeminiAdapter;
8
9use crate::error::{Error, Result};
10use crate::events::StreamEvent;
11use crate::types::{CliName, RunOptions, RunResult};
12use std::collections::HashMap;
13use tokio::io::{AsyncBufReadExt, BufReader};
14use tokio::process::Command;
15use tracing::{debug, warn};
16
17pub trait CliAdapter: Send + Sync {
19 fn name(&self) -> CliName;
20
21 fn run(
22 &self,
23 opts: &RunOptions,
24 emit: &(dyn Fn(StreamEvent) + Send + Sync),
25 cancel: tokio_util::sync::CancellationToken,
26 ) -> impl std::future::Future<Output = crate::error::Result<RunResult>> + Send;
27}
28
29pub(crate) fn get_adapter(cli: CliName) -> Box<dyn CliAdapterBoxed> {
31 match cli {
32 CliName::Claude => Box::new(ClaudeAdapter),
33 CliName::Codex => Box::new(CodexAdapter),
34 CliName::Gemini => Box::new(GeminiAdapter),
35 }
36}
37
38#[allow(dead_code)]
44pub(crate) trait CliAdapterBoxed: Send + Sync {
45 fn name(&self) -> CliName;
46
47 fn run_boxed<'a>(
48 &'a self,
49 opts: &'a RunOptions,
50 emit: &'a (dyn Fn(StreamEvent) + Send + Sync),
51 cancel: tokio_util::sync::CancellationToken,
52 ) -> std::pin::Pin<
53 Box<dyn std::future::Future<Output = crate::error::Result<RunResult>> + Send + 'a>,
54 >;
55}
56
57impl<T: CliAdapter> CliAdapterBoxed for T {
58 fn name(&self) -> CliName {
59 CliAdapter::name(self)
60 }
61
62 fn run_boxed<'a>(
63 &'a self,
64 opts: &'a RunOptions,
65 emit: &'a (dyn Fn(StreamEvent) + Send + Sync),
66 cancel: tokio_util::sync::CancellationToken,
67 ) -> std::pin::Pin<
68 Box<dyn std::future::Future<Output = crate::error::Result<RunResult>> + Send + 'a>,
69 > {
70 Box::pin(self.run(opts, emit, cancel))
71 }
72}
73
74pub(crate) enum SpawnOutcome {
78 Done {
80 exit_code: i32,
81 stderr: Option<String>,
82 },
83 Cancelled,
85}
86
87pub(crate) struct SpawnParams<'a> {
89 pub cli_label: &'a str,
90 pub binary: &'a str,
91 pub args: &'a [String],
92 pub extra_env: &'a HashMap<String, String>,
93 pub cwd: &'a str,
94 pub max_bytes: usize,
95 pub cancel: &'a tokio_util::sync::CancellationToken,
96}
97
98pub(crate) async fn spawn_and_stream(
105 params: SpawnParams<'_>,
106 mut on_line: impl FnMut(&str) + Send,
107) -> Result<SpawnOutcome> {
108 let SpawnParams {
109 cli_label,
110 binary,
111 args,
112 extra_env,
113 cwd,
114 max_bytes,
115 cancel,
116 } = params;
117 debug!(cli = cli_label, binary = %binary, args = ?args, "spawning CLI");
118
119 let mut cmd = Command::new(binary);
120 cmd.args(args)
121 .envs(extra_env)
122 .current_dir(cwd)
123 .stdin(std::process::Stdio::null())
124 .stdout(std::process::Stdio::piped())
125 .stderr(std::process::Stdio::piped())
126 .kill_on_drop(true);
127
128 #[cfg(unix)]
129 {
130 unsafe {
131 cmd.pre_exec(|| {
132 if libc::setpgid(0, 0) != 0 {
133 return Err(std::io::Error::last_os_error());
134 }
135 Ok(())
136 });
137 }
138 }
139
140 let mut child = cmd
141 .spawn()
142 .map_err(|e| Error::Process(format!("failed to spawn {cli_label}: {e}")))?;
143
144 let child_pid = child.id();
145
146 let stdout = child.stdout.take().expect("stdout piped");
147 let stderr = child.stderr.take().expect("stderr piped");
148
149 let stderr_handle = tokio::spawn(async move {
150 let mut reader = BufReader::new(stderr);
151 let mut buf = String::new();
152 while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {}
153 buf
154 });
155
156 let mut reader = BufReader::new(stdout);
157 let mut line = String::new();
158 let mut total_bytes: usize = 0;
159
160 loop {
161 line.clear();
162 tokio::select! {
163 result = reader.read_line(&mut line) => {
164 match result {
165 Ok(0) => break,
166 Ok(n) => {
167 total_bytes += n;
168 if total_bytes > max_bytes {
169 warn!(cli = cli_label, total_bytes, max_bytes, "output exceeded max buffer size");
170 kill_process_group(&mut child, child_pid).await;
171 return Err(Error::Process(format!(
172 "output exceeded max buffer size ({max_bytes} bytes)"
173 )));
174 }
175 on_line(line.trim());
176 }
177 Err(e) => {
178 warn!(cli = cli_label, error = %e, "error reading stdout");
179 break;
180 }
181 }
182 }
183 _ = cancel.cancelled() => {
184 kill_process_group(&mut child, child_pid).await;
185 return Ok(SpawnOutcome::Cancelled);
186 }
187 }
188 }
189
190 let status = child.wait().await.map_err(Error::Io)?;
191 let exit_code = status.code().unwrap_or(1);
192 let stderr_text = stderr_handle.await.unwrap_or_default();
193
194 Ok(SpawnOutcome::Done {
195 exit_code,
196 stderr: if stderr_text.is_empty() {
197 None
198 } else {
199 Some(stderr_text)
200 },
201 })
202}
203
204async fn kill_process_group(child: &mut tokio::process::Child, pid: Option<u32>) {
205 #[cfg(unix)]
206 {
207 if let Some(pid) = pid {
208 unsafe {
209 libc::killpg(pid as libc::pid_t, libc::SIGKILL);
210 }
211 }
212 }
213 let _ = child.kill().await;
214}