Skip to main content

agent_teams/backend/
gemini.rs

1//! Gemini CLI backend -- spawns agents via the `gemini` CLI in one-shot mode.
2//!
3//! The Gemini CLI (`/opt/homebrew/bin/gemini` or any PATH-discovered binary) operates
4//! in a **stateless, one-shot** mode: each turn spawns a fresh process with the prompt
5//! piped to stdin and the response read from stdout. There is no JSON-RPC protocol,
6//! no persistent session, and no multi-turn state.
7//!
8//! This makes Gemini ideal for single-turn tasks like code review, analysis, and Q&A
9//! where multi-turn conversation state is not needed.
10//!
11//! ## Process model
12//!
13//! ```text
14//! spawn()        → first process  (initial prompt → stdout → output_tx)
15//! send_input()   → kill old proc  → new process   (input → stdout → output_tx)
16//! send_input()   → kill old proc  → new process   (input → stdout → output_tx)
17//! shutdown()     → kill current proc, set alive=false
18//! ```
19//!
20//! The `output_tx` channel is created once at `spawn()` time and reused across all
21//! processes, so the orchestrator's `output_rx` remains valid for the session lifetime.
22
23use std::path::PathBuf;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26
27use async_trait::async_trait;
28use tokio::io::{AsyncBufReadExt, BufReader};
29use tokio::process::{Child, Command};
30use tokio::sync::mpsc;
31use tokio::task::JoinHandle;
32use tracing::{debug, info, warn};
33
34use super::{send_agent_output, AgentBackend, AgentOutput, AgentSession, BackendType, SpawnConfig};
35use crate::{Error, Result};
36
37/// Channel buffer size for agent output events.
38const OUTPUT_CHANNEL_SIZE: usize = 256;
39
40// ---------------------------------------------------------------------------
41// GeminiCliBackend  (factory)
42// ---------------------------------------------------------------------------
43
44/// Factory that creates Gemini CLI agent sessions by spawning the `gemini` binary.
45#[derive(Debug, Clone)]
46pub struct GeminiCliBackend {
47    /// Path to the `gemini` CLI binary.
48    gemini_path: PathBuf,
49}
50
51impl GeminiCliBackend {
52    /// Locate the `gemini` binary on `$PATH` via `which`.
53    pub fn new() -> Result<Self> {
54        let path = which::which("gemini").map_err(|_| Error::CliNotFound {
55            name: "gemini".into(),
56        })?;
57        Ok(Self { gemini_path: path })
58    }
59
60    /// Use an explicit path to the `gemini` binary.
61    pub fn with_path(path: impl Into<PathBuf>) -> Self {
62        Self {
63            gemini_path: path.into(),
64        }
65    }
66
67    /// Build the CLI arguments from a [`SpawnConfig`] and the system prompt.
68    ///
69    /// User input is NOT passed as an argument -- it is piped via stdin by the caller.
70    fn build_args(config: &SpawnConfig, system_prompt: &str) -> Vec<String> {
71        let mut args = Vec::new();
72
73        // System prompt via `-p`
74        if !system_prompt.is_empty() {
75            args.push("-p".into());
76            args.push(system_prompt.to_string());
77        }
78
79        // Model via `-m` (default: gemini-2.5-pro)
80        let model = config.model.as_deref().unwrap_or("gemini-2.5-pro");
81        args.push("-m".into());
82        args.push(model.to_string());
83
84        // Always use `-y` (auto-approve): Gemini CLI is non-interactive in pipe mode,
85        // so tool-call prompts would hang without this flag.
86        args.push("-y".into());
87
88        args
89    }
90}
91
92#[async_trait]
93impl AgentBackend for GeminiCliBackend {
94    fn backend_type(&self) -> BackendType {
95        BackendType::GeminiCli
96    }
97
98    async fn spawn(&self, config: SpawnConfig) -> Result<Box<dyn AgentSession>> {
99        let agent_name = config.name.clone();
100        // The prompt from SpawnConfig is used as the system prompt (injected via `-p`
101        // on every turn). The initial user input is a brief init message -- NOT the
102        // system prompt again -- to avoid sending it twice (once as `-p`, once as stdin).
103        let system_prompt = config.prompt.clone();
104        let initial_input = "Hello. Awaiting instructions.";
105
106        info!(agent = %agent_name, "Spawning Gemini CLI agent");
107
108        // Create the output channel (lives for the entire session)
109        let (output_tx, output_rx) = mpsc::channel(OUTPUT_CHANNEL_SIZE);
110        let alive = Arc::new(AtomicBool::new(true));
111
112        // Spawn the first process with a brief init message (system prompt is via -p)
113        let (child, reader_handle) = spawn_gemini_process(
114            &self.gemini_path,
115            &config,
116            initial_input,
117            &system_prompt,
118            output_tx.clone(),
119            alive.clone(),
120            &agent_name,
121        )
122        .await?;
123
124        let session = GeminiCliSession {
125            name: agent_name,
126            gemini_path: self.gemini_path.clone(),
127            config,
128            system_prompt,
129            child: Some(child),
130            reader_handle: Some(reader_handle),
131            output_tx,
132            output_rx: Some(output_rx),
133            alive,
134        };
135
136        Ok(Box::new(session))
137    }
138}
139
140// ---------------------------------------------------------------------------
141// GeminiCliSession
142// ---------------------------------------------------------------------------
143
144/// A running Gemini CLI agent session.
145///
146/// Each turn spawns a fresh `gemini` process. The `output_tx` channel is shared
147/// across all process lifetimes so the orchestrator's receiver stays valid.
148struct GeminiCliSession {
149    /// Agent name.
150    name: String,
151    /// Path to the gemini binary.
152    gemini_path: PathBuf,
153    /// Original spawn config (for cwd, env, model, etc.).
154    config: SpawnConfig,
155    /// System prompt injected into every turn via `-p`.
156    system_prompt: String,
157    /// Current child process (if any).
158    child: Option<Child>,
159    /// Background reader task for current process.
160    reader_handle: Option<JoinHandle<()>>,
161    /// Shared output sender (reused across process lifetimes).
162    output_tx: mpsc::Sender<AgentOutput>,
163    /// Output receiver (taken once by the orchestrator).
164    output_rx: Option<mpsc::Receiver<AgentOutput>>,
165    /// Liveness flag.
166    alive: Arc<AtomicBool>,
167}
168
169#[async_trait]
170impl AgentSession for GeminiCliSession {
171    fn name(&self) -> &str {
172        &self.name
173    }
174
175    async fn send_input(&mut self, input: &str) -> Result<()> {
176        if !self.alive.load(Ordering::Relaxed) {
177            return Err(Error::AgentNotAlive {
178                name: self.name.clone(),
179            });
180        }
181
182        // Kill the old process and reader (if still running)
183        self.kill_current().await;
184
185        // Spawn a new process for this turn
186        let (child, reader_handle) = spawn_gemini_process(
187            &self.gemini_path,
188            &self.config,
189            input,
190            &self.system_prompt,
191            self.output_tx.clone(),
192            self.alive.clone(),
193            &self.name,
194        )
195        .await?;
196
197        self.child = Some(child);
198        self.reader_handle = Some(reader_handle);
199
200        Ok(())
201    }
202
203    fn output_receiver(&mut self) -> Option<mpsc::Receiver<AgentOutput>> {
204        self.output_rx.take()
205    }
206
207    async fn is_alive(&self) -> bool {
208        self.alive.load(Ordering::Relaxed)
209    }
210
211    async fn shutdown(&mut self) -> Result<()> {
212        info!(agent = %self.name, "Shutting down Gemini CLI session");
213        self.alive.store(false, Ordering::Relaxed);
214        self.kill_current().await;
215        Ok(())
216    }
217
218    async fn force_kill(&mut self) -> Result<()> {
219        info!(agent = %self.name, "Force-killing Gemini CLI session");
220        self.alive.store(false, Ordering::Relaxed);
221        self.kill_current().await;
222        Ok(())
223    }
224}
225
226impl GeminiCliSession {
227    /// Kill the current child process and abort the reader task.
228    async fn kill_current(&mut self) {
229        // Abort reader first so it stops reading stdout
230        if let Some(handle) = self.reader_handle.take() {
231            handle.abort();
232            let _ = handle.await;
233        }
234
235        // Kill the child process
236        if let Some(mut child) = self.child.take() {
237            let _ = child.kill().await;
238            let _ = child.wait().await;
239        }
240    }
241}
242
243impl Drop for GeminiCliSession {
244    fn drop(&mut self) {
245        if let Some(handle) = self.reader_handle.take() {
246            handle.abort();
247        }
248        // Child process is killed by kill_on_drop(true) set during spawn.
249    }
250}
251
252// ---------------------------------------------------------------------------
253// Process spawning helper
254// ---------------------------------------------------------------------------
255
256/// Spawn a single Gemini CLI process, pipe `input` to its stdin, and start a
257/// background reader task that forwards stdout lines to `output_tx`.
258///
259/// Returns the child process handle and the reader task handle.
260async fn spawn_gemini_process(
261    gemini_path: &std::path::Path,
262    config: &SpawnConfig,
263    input: &str,
264    system_prompt: &str,
265    output_tx: mpsc::Sender<AgentOutput>,
266    alive: Arc<AtomicBool>,
267    agent_name: &str,
268) -> Result<(Child, JoinHandle<()>)> {
269    let args = GeminiCliBackend::build_args(config, system_prompt);
270
271    let mut cmd = Command::new(gemini_path);
272    cmd.args(&args);
273
274    cmd.stdin(std::process::Stdio::piped());
275    cmd.stdout(std::process::Stdio::piped());
276    // Capture stderr for logging instead of discarding it -- helps debug
277    // auth failures, invalid models, and other CLI errors.
278    cmd.stderr(std::process::Stdio::piped());
279
280    if let Some(ref cwd) = config.cwd {
281        cmd.current_dir(cwd);
282    }
283
284    for (k, v) in &config.env {
285        cmd.env(k, v);
286    }
287
288    cmd.kill_on_drop(true);
289
290    let mut child = cmd.spawn().map_err(|e| Error::SpawnFailed {
291        name: agent_name.to_string(),
292        reason: format!("Failed to start gemini process: {e}"),
293    })?;
294
295    // Write input to stdin, then close it
296    {
297        use tokio::io::AsyncWriteExt;
298        let mut stdin = child.stdin.take().ok_or_else(|| Error::SpawnFailed {
299            name: agent_name.to_string(),
300            reason: "Failed to capture gemini stdin".into(),
301        })?;
302        stdin
303            .write_all(input.as_bytes())
304            .await
305            .map_err(|e| Error::GeminiCli {
306                reason: format!("Failed to write to gemini stdin: {e}"),
307            })?;
308        // Drop stdin to close it -- signals EOF to the gemini process
309    }
310
311    // Take stdout for reading
312    let stdout = child.stdout.take().ok_or_else(|| Error::SpawnFailed {
313        name: agent_name.to_string(),
314        reason: "Failed to capture gemini stdout".into(),
315    })?;
316
317    // Spawn a lightweight stderr drain task that logs any error output.
318    // This runs fire-and-forget -- it will stop when the child process exits (EOF).
319    if let Some(stderr) = child.stderr.take() {
320        let stderr_name = agent_name.to_string();
321        tokio::spawn(async move {
322            let mut reader = BufReader::new(stderr);
323            let mut line_buf = String::new();
324            loop {
325                line_buf.clear();
326                match reader.read_line(&mut line_buf).await {
327                    Ok(0) | Err(_) => break,
328                    Ok(_) => {
329                        let trimmed = line_buf.trim();
330                        if !trimmed.is_empty() {
331                            warn!(agent = %stderr_name, stderr = %trimmed, "Gemini CLI stderr");
332                        }
333                    }
334                }
335            }
336        });
337    }
338
339    // Spawn background reader task
340    let reader_alive = alive.clone();
341    let reader_name = agent_name.to_string();
342    let reader_tx = output_tx;
343
344    let reader_handle = tokio::spawn(async move {
345        debug!(agent = %reader_name, "Gemini reader task started");
346        let mut reader = BufReader::new(stdout);
347        let mut line_buf = String::new();
348
349        loop {
350            if !reader_alive.load(Ordering::Relaxed) {
351                break;
352            }
353
354            line_buf.clear();
355            match reader.read_line(&mut line_buf).await {
356                Ok(0) => {
357                    // EOF -- process exited
358                    debug!(agent = %reader_name, "Gemini stdout EOF");
359                    let _ =
360                        send_agent_output(&reader_tx, AgentOutput::TurnComplete, &reader_alive, &reader_name)
361                            .await;
362                    break;
363                }
364                Ok(_) => {
365                    let text = line_buf.trim_end_matches('\n').to_string();
366                    if !text.is_empty()
367                        && send_agent_output(
368                            &reader_tx,
369                            AgentOutput::Delta(text),
370                            &reader_alive,
371                            &reader_name,
372                        )
373                        .await
374                        .is_err()
375                    {
376                        break;
377                    }
378                }
379                Err(e) => {
380                    warn!(agent = %reader_name, error = %e, "Error reading gemini stdout");
381                    let _ = send_agent_output(
382                        &reader_tx,
383                        AgentOutput::Error(format!("Read error: {e}")),
384                        &reader_alive,
385                        &reader_name,
386                    )
387                    .await;
388                    break;
389                }
390            }
391        }
392        debug!(agent = %reader_name, "Gemini reader task stopped");
393    });
394
395    Ok((child, reader_handle))
396}
397
398// ---------------------------------------------------------------------------
399// Tests
400// ---------------------------------------------------------------------------
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405
406    #[test]
407    fn test_gemini_backend_type() {
408        let backend = GeminiCliBackend {
409            gemini_path: PathBuf::from("/usr/bin/gemini"),
410        };
411        assert_eq!(backend.backend_type(), BackendType::GeminiCli);
412    }
413
414    #[test]
415    fn test_spawn_config_to_args_default() {
416        let config = SpawnConfig::new("test-agent", "You are a code reviewer");
417        let args = GeminiCliBackend::build_args(&config, "You are a code reviewer");
418
419        assert!(args.contains(&"-p".to_string()));
420        assert!(args.contains(&"You are a code reviewer".to_string()));
421        assert!(args.contains(&"-m".to_string()));
422        assert!(args.contains(&"gemini-2.5-pro".to_string()));
423        assert!(args.contains(&"-y".to_string()));
424    }
425
426    #[test]
427    fn test_spawn_config_to_args_custom_model() {
428        let mut config = SpawnConfig::new("test-agent", "system prompt");
429        config.model = Some("gemini-2.5-flash".to_string());
430        let args = GeminiCliBackend::build_args(&config, "system prompt");
431
432        assert!(args.contains(&"-m".to_string()));
433        assert!(args.contains(&"gemini-2.5-flash".to_string()));
434        assert!(!args.contains(&"gemini-2.5-pro".to_string()));
435    }
436
437    #[test]
438    fn test_spawn_config_to_args_empty_system_prompt() {
439        let config = SpawnConfig::new("test-agent", "");
440        let args = GeminiCliBackend::build_args(&config, "");
441
442        // Empty system prompt should not include -p
443        assert!(!args.contains(&"-p".to_string()));
444    }
445
446    #[test]
447    fn test_backend_type_display() {
448        assert_eq!(BackendType::GeminiCli.to_string(), "gemini-cli");
449    }
450}