use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
use super::{send_agent_output, AgentBackend, AgentOutput, AgentSession, BackendType, SpawnConfig};
use crate::{Error, Result};
const OUTPUT_CHANNEL_SIZE: usize = 256;
#[derive(Debug, Clone)]
pub struct GeminiCliBackend {
gemini_path: PathBuf,
}
impl GeminiCliBackend {
pub fn new() -> Result<Self> {
let path = which::which("gemini").map_err(|_| Error::CliNotFound {
name: "gemini".into(),
})?;
Ok(Self { gemini_path: path })
}
pub fn with_path(path: impl Into<PathBuf>) -> Self {
Self {
gemini_path: path.into(),
}
}
fn build_args(config: &SpawnConfig, system_prompt: &str) -> Vec<String> {
let mut args = Vec::new();
if !system_prompt.is_empty() {
args.push("-p".into());
args.push(system_prompt.to_string());
}
let model = config.model.as_deref().unwrap_or("gemini-2.5-pro");
args.push("-m".into());
args.push(model.to_string());
args.push("-y".into());
args
}
}
#[async_trait]
impl AgentBackend for GeminiCliBackend {
fn backend_type(&self) -> BackendType {
BackendType::GeminiCli
}
async fn spawn(&self, config: SpawnConfig) -> Result<Box<dyn AgentSession>> {
let agent_name = config.name.clone();
let system_prompt = config.prompt.clone();
let initial_input = "Hello. Awaiting instructions.";
info!(agent = %agent_name, "Spawning Gemini CLI agent");
let (output_tx, output_rx) = mpsc::channel(OUTPUT_CHANNEL_SIZE);
let alive = Arc::new(AtomicBool::new(true));
let (child, reader_handle) = spawn_gemini_process(
&self.gemini_path,
&config,
initial_input,
&system_prompt,
output_tx.clone(),
alive.clone(),
&agent_name,
)
.await?;
let session = GeminiCliSession {
name: agent_name,
gemini_path: self.gemini_path.clone(),
config,
system_prompt,
child: Some(child),
reader_handle: Some(reader_handle),
output_tx,
output_rx: Some(output_rx),
alive,
};
Ok(Box::new(session))
}
}
struct GeminiCliSession {
name: String,
gemini_path: PathBuf,
config: SpawnConfig,
system_prompt: String,
child: Option<Child>,
reader_handle: Option<JoinHandle<()>>,
output_tx: mpsc::Sender<AgentOutput>,
output_rx: Option<mpsc::Receiver<AgentOutput>>,
alive: Arc<AtomicBool>,
}
#[async_trait]
impl AgentSession for GeminiCliSession {
fn name(&self) -> &str {
&self.name
}
async fn send_input(&mut self, input: &str) -> Result<()> {
if !self.alive.load(Ordering::Relaxed) {
return Err(Error::AgentNotAlive {
name: self.name.clone(),
});
}
self.kill_current().await;
let (child, reader_handle) = spawn_gemini_process(
&self.gemini_path,
&self.config,
input,
&self.system_prompt,
self.output_tx.clone(),
self.alive.clone(),
&self.name,
)
.await?;
self.child = Some(child);
self.reader_handle = Some(reader_handle);
Ok(())
}
fn output_receiver(&mut self) -> Option<mpsc::Receiver<AgentOutput>> {
self.output_rx.take()
}
async fn is_alive(&self) -> bool {
self.alive.load(Ordering::Relaxed)
}
async fn shutdown(&mut self) -> Result<()> {
info!(agent = %self.name, "Shutting down Gemini CLI session");
self.alive.store(false, Ordering::Relaxed);
self.kill_current().await;
Ok(())
}
async fn force_kill(&mut self) -> Result<()> {
info!(agent = %self.name, "Force-killing Gemini CLI session");
self.alive.store(false, Ordering::Relaxed);
self.kill_current().await;
Ok(())
}
}
impl GeminiCliSession {
async fn kill_current(&mut self) {
if let Some(handle) = self.reader_handle.take() {
handle.abort();
let _ = handle.await;
}
if let Some(mut child) = self.child.take() {
let _ = child.kill().await;
let _ = child.wait().await;
}
}
}
impl Drop for GeminiCliSession {
fn drop(&mut self) {
if let Some(handle) = self.reader_handle.take() {
handle.abort();
}
}
}
async fn spawn_gemini_process(
gemini_path: &std::path::Path,
config: &SpawnConfig,
input: &str,
system_prompt: &str,
output_tx: mpsc::Sender<AgentOutput>,
alive: Arc<AtomicBool>,
agent_name: &str,
) -> Result<(Child, JoinHandle<()>)> {
let args = GeminiCliBackend::build_args(config, system_prompt);
let mut cmd = Command::new(gemini_path);
cmd.args(&args);
cmd.stdin(std::process::Stdio::piped());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
if let Some(ref cwd) = config.cwd {
cmd.current_dir(cwd);
}
for (k, v) in &config.env {
cmd.env(k, v);
}
cmd.kill_on_drop(true);
let mut child = cmd.spawn().map_err(|e| Error::SpawnFailed {
name: agent_name.to_string(),
reason: format!("Failed to start gemini process: {e}"),
})?;
{
use tokio::io::AsyncWriteExt;
let mut stdin = child.stdin.take().ok_or_else(|| Error::SpawnFailed {
name: agent_name.to_string(),
reason: "Failed to capture gemini stdin".into(),
})?;
stdin
.write_all(input.as_bytes())
.await
.map_err(|e| Error::GeminiCli {
reason: format!("Failed to write to gemini stdin: {e}"),
})?;
}
let stdout = child.stdout.take().ok_or_else(|| Error::SpawnFailed {
name: agent_name.to_string(),
reason: "Failed to capture gemini stdout".into(),
})?;
if let Some(stderr) = child.stderr.take() {
let stderr_name = agent_name.to_string();
tokio::spawn(async move {
let mut reader = BufReader::new(stderr);
let mut line_buf = String::new();
loop {
line_buf.clear();
match reader.read_line(&mut line_buf).await {
Ok(0) | Err(_) => break,
Ok(_) => {
let trimmed = line_buf.trim();
if !trimmed.is_empty() {
warn!(agent = %stderr_name, stderr = %trimmed, "Gemini CLI stderr");
}
}
}
}
});
}
let reader_alive = alive.clone();
let reader_name = agent_name.to_string();
let reader_tx = output_tx;
let reader_handle = tokio::spawn(async move {
debug!(agent = %reader_name, "Gemini reader task started");
let mut reader = BufReader::new(stdout);
let mut line_buf = String::new();
loop {
if !reader_alive.load(Ordering::Relaxed) {
break;
}
line_buf.clear();
match reader.read_line(&mut line_buf).await {
Ok(0) => {
debug!(agent = %reader_name, "Gemini stdout EOF");
let _ =
send_agent_output(&reader_tx, AgentOutput::TurnComplete, &reader_alive, &reader_name)
.await;
break;
}
Ok(_) => {
let text = line_buf.trim_end_matches('\n').to_string();
if !text.is_empty()
&& send_agent_output(
&reader_tx,
AgentOutput::Delta(text),
&reader_alive,
&reader_name,
)
.await
.is_err()
{
break;
}
}
Err(e) => {
warn!(agent = %reader_name, error = %e, "Error reading gemini stdout");
let _ = send_agent_output(
&reader_tx,
AgentOutput::Error(format!("Read error: {e}")),
&reader_alive,
&reader_name,
)
.await;
break;
}
}
}
debug!(agent = %reader_name, "Gemini reader task stopped");
});
Ok((child, reader_handle))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_gemini_backend_type() {
let backend = GeminiCliBackend {
gemini_path: PathBuf::from("/usr/bin/gemini"),
};
assert_eq!(backend.backend_type(), BackendType::GeminiCli);
}
#[test]
fn test_spawn_config_to_args_default() {
let config = SpawnConfig::new("test-agent", "You are a code reviewer");
let args = GeminiCliBackend::build_args(&config, "You are a code reviewer");
assert!(args.contains(&"-p".to_string()));
assert!(args.contains(&"You are a code reviewer".to_string()));
assert!(args.contains(&"-m".to_string()));
assert!(args.contains(&"gemini-2.5-pro".to_string()));
assert!(args.contains(&"-y".to_string()));
}
#[test]
fn test_spawn_config_to_args_custom_model() {
let mut config = SpawnConfig::new("test-agent", "system prompt");
config.model = Some("gemini-2.5-flash".to_string());
let args = GeminiCliBackend::build_args(&config, "system prompt");
assert!(args.contains(&"-m".to_string()));
assert!(args.contains(&"gemini-2.5-flash".to_string()));
assert!(!args.contains(&"gemini-2.5-pro".to_string()));
}
#[test]
fn test_spawn_config_to_args_empty_system_prompt() {
let config = SpawnConfig::new("test-agent", "");
let args = GeminiCliBackend::build_args(&config, "");
assert!(!args.contains(&"-p".to_string()));
}
#[test]
fn test_backend_type_display() {
assert_eq!(BackendType::GeminiCli.to_string(), "gemini-cli");
}
}