agent-teams 0.1.0

Generic Rust agent teams framework replicating Claude Code Agent Teams architecture with pluggable backends for Claude Code, Codex, and Gemini CLI
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
//! Gemini CLI backend -- spawns agents via the `gemini` CLI in one-shot mode.
//!
//! The Gemini CLI (`/opt/homebrew/bin/gemini` or any PATH-discovered binary) operates
//! in a **stateless, one-shot** mode: each turn spawns a fresh process with the prompt
//! piped to stdin and the response read from stdout. There is no JSON-RPC protocol,
//! no persistent session, and no multi-turn state.
//!
//! This makes Gemini ideal for single-turn tasks like code review, analysis, and Q&A
//! where multi-turn conversation state is not needed.
//!
//! ## Process model
//!
//! ```text
//! spawn()        → first process  (initial prompt → stdout → output_tx)
//! send_input()   → kill old proc  → new process   (input → stdout → output_tx)
//! send_input()   → kill old proc  → new process   (input → stdout → output_tx)
//! shutdown()     → kill current proc, set alive=false
//! ```
//!
//! The `output_tx` channel is created once at `spawn()` time and reused across all
//! processes, so the orchestrator's `output_rx` remains valid for the session lifetime.

use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use async_trait::async_trait;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};

use super::{send_agent_output, AgentBackend, AgentOutput, AgentSession, BackendType, SpawnConfig};
use crate::{Error, Result};

/// Channel buffer size for agent output events.
const OUTPUT_CHANNEL_SIZE: usize = 256;

// ---------------------------------------------------------------------------
// GeminiCliBackend  (factory)
// ---------------------------------------------------------------------------

/// Factory that creates Gemini CLI agent sessions by spawning the `gemini` binary.
#[derive(Debug, Clone)]
pub struct GeminiCliBackend {
    /// Path to the `gemini` CLI binary.
    gemini_path: PathBuf,
}

impl GeminiCliBackend {
    /// Locate the `gemini` binary on `$PATH` via `which`.
    pub fn new() -> Result<Self> {
        let path = which::which("gemini").map_err(|_| Error::CliNotFound {
            name: "gemini".into(),
        })?;
        Ok(Self { gemini_path: path })
    }

    /// Use an explicit path to the `gemini` binary.
    pub fn with_path(path: impl Into<PathBuf>) -> Self {
        Self {
            gemini_path: path.into(),
        }
    }

    /// Build the CLI arguments from a [`SpawnConfig`] and the system prompt.
    ///
    /// User input is NOT passed as an argument -- it is piped via stdin by the caller.
    fn build_args(config: &SpawnConfig, system_prompt: &str) -> Vec<String> {
        let mut args = Vec::new();

        // System prompt via `-p`
        if !system_prompt.is_empty() {
            args.push("-p".into());
            args.push(system_prompt.to_string());
        }

        // Model via `-m` (default: gemini-2.5-pro)
        let model = config.model.as_deref().unwrap_or("gemini-2.5-pro");
        args.push("-m".into());
        args.push(model.to_string());

        // Always use `-y` (auto-approve): Gemini CLI is non-interactive in pipe mode,
        // so tool-call prompts would hang without this flag.
        args.push("-y".into());

        args
    }
}

#[async_trait]
impl AgentBackend for GeminiCliBackend {
    fn backend_type(&self) -> BackendType {
        BackendType::GeminiCli
    }

    async fn spawn(&self, config: SpawnConfig) -> Result<Box<dyn AgentSession>> {
        let agent_name = config.name.clone();
        // The prompt from SpawnConfig is used as the system prompt (injected via `-p`
        // on every turn). The initial user input is a brief init message -- NOT the
        // system prompt again -- to avoid sending it twice (once as `-p`, once as stdin).
        let system_prompt = config.prompt.clone();
        let initial_input = "Hello. Awaiting instructions.";

        info!(agent = %agent_name, "Spawning Gemini CLI agent");

        // Create the output channel (lives for the entire session)
        let (output_tx, output_rx) = mpsc::channel(OUTPUT_CHANNEL_SIZE);
        let alive = Arc::new(AtomicBool::new(true));

        // Spawn the first process with a brief init message (system prompt is via -p)
        let (child, reader_handle) = spawn_gemini_process(
            &self.gemini_path,
            &config,
            initial_input,
            &system_prompt,
            output_tx.clone(),
            alive.clone(),
            &agent_name,
        )
        .await?;

        let session = GeminiCliSession {
            name: agent_name,
            gemini_path: self.gemini_path.clone(),
            config,
            system_prompt,
            child: Some(child),
            reader_handle: Some(reader_handle),
            output_tx,
            output_rx: Some(output_rx),
            alive,
        };

        Ok(Box::new(session))
    }
}

// ---------------------------------------------------------------------------
// GeminiCliSession
// ---------------------------------------------------------------------------

/// A running Gemini CLI agent session.
///
/// Each turn spawns a fresh `gemini` process. The `output_tx` channel is shared
/// across all process lifetimes so the orchestrator's receiver stays valid.
struct GeminiCliSession {
    /// Agent name.
    name: String,
    /// Path to the gemini binary.
    gemini_path: PathBuf,
    /// Original spawn config (for cwd, env, model, etc.).
    config: SpawnConfig,
    /// System prompt injected into every turn via `-p`.
    system_prompt: String,
    /// Current child process (if any).
    child: Option<Child>,
    /// Background reader task for current process.
    reader_handle: Option<JoinHandle<()>>,
    /// Shared output sender (reused across process lifetimes).
    output_tx: mpsc::Sender<AgentOutput>,
    /// Output receiver (taken once by the orchestrator).
    output_rx: Option<mpsc::Receiver<AgentOutput>>,
    /// Liveness flag.
    alive: Arc<AtomicBool>,
}

#[async_trait]
impl AgentSession for GeminiCliSession {
    fn name(&self) -> &str {
        &self.name
    }

    async fn send_input(&mut self, input: &str) -> Result<()> {
        if !self.alive.load(Ordering::Relaxed) {
            return Err(Error::AgentNotAlive {
                name: self.name.clone(),
            });
        }

        // Kill the old process and reader (if still running)
        self.kill_current().await;

        // Spawn a new process for this turn
        let (child, reader_handle) = spawn_gemini_process(
            &self.gemini_path,
            &self.config,
            input,
            &self.system_prompt,
            self.output_tx.clone(),
            self.alive.clone(),
            &self.name,
        )
        .await?;

        self.child = Some(child);
        self.reader_handle = Some(reader_handle);

        Ok(())
    }

    fn output_receiver(&mut self) -> Option<mpsc::Receiver<AgentOutput>> {
        self.output_rx.take()
    }

    async fn is_alive(&self) -> bool {
        self.alive.load(Ordering::Relaxed)
    }

    async fn shutdown(&mut self) -> Result<()> {
        info!(agent = %self.name, "Shutting down Gemini CLI session");
        self.alive.store(false, Ordering::Relaxed);
        self.kill_current().await;
        Ok(())
    }

    async fn force_kill(&mut self) -> Result<()> {
        info!(agent = %self.name, "Force-killing Gemini CLI session");
        self.alive.store(false, Ordering::Relaxed);
        self.kill_current().await;
        Ok(())
    }
}

impl GeminiCliSession {
    /// Kill the current child process and abort the reader task.
    async fn kill_current(&mut self) {
        // Abort reader first so it stops reading stdout
        if let Some(handle) = self.reader_handle.take() {
            handle.abort();
            let _ = handle.await;
        }

        // Kill the child process
        if let Some(mut child) = self.child.take() {
            let _ = child.kill().await;
            let _ = child.wait().await;
        }
    }
}

impl Drop for GeminiCliSession {
    fn drop(&mut self) {
        if let Some(handle) = self.reader_handle.take() {
            handle.abort();
        }
        // Child process is killed by kill_on_drop(true) set during spawn.
    }
}

// ---------------------------------------------------------------------------
// Process spawning helper
// ---------------------------------------------------------------------------

/// Spawn a single Gemini CLI process, pipe `input` to its stdin, and start a
/// background reader task that forwards stdout lines to `output_tx`.
///
/// Returns the child process handle and the reader task handle.
async fn spawn_gemini_process(
    gemini_path: &std::path::Path,
    config: &SpawnConfig,
    input: &str,
    system_prompt: &str,
    output_tx: mpsc::Sender<AgentOutput>,
    alive: Arc<AtomicBool>,
    agent_name: &str,
) -> Result<(Child, JoinHandle<()>)> {
    let args = GeminiCliBackend::build_args(config, system_prompt);

    let mut cmd = Command::new(gemini_path);
    cmd.args(&args);

    cmd.stdin(std::process::Stdio::piped());
    cmd.stdout(std::process::Stdio::piped());
    // Capture stderr for logging instead of discarding it -- helps debug
    // auth failures, invalid models, and other CLI errors.
    cmd.stderr(std::process::Stdio::piped());

    if let Some(ref cwd) = config.cwd {
        cmd.current_dir(cwd);
    }

    for (k, v) in &config.env {
        cmd.env(k, v);
    }

    cmd.kill_on_drop(true);

    let mut child = cmd.spawn().map_err(|e| Error::SpawnFailed {
        name: agent_name.to_string(),
        reason: format!("Failed to start gemini process: {e}"),
    })?;

    // Write input to stdin, then close it
    {
        use tokio::io::AsyncWriteExt;
        let mut stdin = child.stdin.take().ok_or_else(|| Error::SpawnFailed {
            name: agent_name.to_string(),
            reason: "Failed to capture gemini stdin".into(),
        })?;
        stdin
            .write_all(input.as_bytes())
            .await
            .map_err(|e| Error::GeminiCli {
                reason: format!("Failed to write to gemini stdin: {e}"),
            })?;
        // Drop stdin to close it -- signals EOF to the gemini process
    }

    // Take stdout for reading
    let stdout = child.stdout.take().ok_or_else(|| Error::SpawnFailed {
        name: agent_name.to_string(),
        reason: "Failed to capture gemini stdout".into(),
    })?;

    // Spawn a lightweight stderr drain task that logs any error output.
    // This runs fire-and-forget -- it will stop when the child process exits (EOF).
    if let Some(stderr) = child.stderr.take() {
        let stderr_name = agent_name.to_string();
        tokio::spawn(async move {
            let mut reader = BufReader::new(stderr);
            let mut line_buf = String::new();
            loop {
                line_buf.clear();
                match reader.read_line(&mut line_buf).await {
                    Ok(0) | Err(_) => break,
                    Ok(_) => {
                        let trimmed = line_buf.trim();
                        if !trimmed.is_empty() {
                            warn!(agent = %stderr_name, stderr = %trimmed, "Gemini CLI stderr");
                        }
                    }
                }
            }
        });
    }

    // Spawn background reader task
    let reader_alive = alive.clone();
    let reader_name = agent_name.to_string();
    let reader_tx = output_tx;

    let reader_handle = tokio::spawn(async move {
        debug!(agent = %reader_name, "Gemini reader task started");
        let mut reader = BufReader::new(stdout);
        let mut line_buf = String::new();

        loop {
            if !reader_alive.load(Ordering::Relaxed) {
                break;
            }

            line_buf.clear();
            match reader.read_line(&mut line_buf).await {
                Ok(0) => {
                    // EOF -- process exited
                    debug!(agent = %reader_name, "Gemini stdout EOF");
                    let _ =
                        send_agent_output(&reader_tx, AgentOutput::TurnComplete, &reader_alive, &reader_name)
                            .await;
                    break;
                }
                Ok(_) => {
                    let text = line_buf.trim_end_matches('\n').to_string();
                    if !text.is_empty()
                        && send_agent_output(
                            &reader_tx,
                            AgentOutput::Delta(text),
                            &reader_alive,
                            &reader_name,
                        )
                        .await
                        .is_err()
                    {
                        break;
                    }
                }
                Err(e) => {
                    warn!(agent = %reader_name, error = %e, "Error reading gemini stdout");
                    let _ = send_agent_output(
                        &reader_tx,
                        AgentOutput::Error(format!("Read error: {e}")),
                        &reader_alive,
                        &reader_name,
                    )
                    .await;
                    break;
                }
            }
        }
        debug!(agent = %reader_name, "Gemini reader task stopped");
    });

    Ok((child, reader_handle))
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_gemini_backend_type() {
        let backend = GeminiCliBackend {
            gemini_path: PathBuf::from("/usr/bin/gemini"),
        };
        assert_eq!(backend.backend_type(), BackendType::GeminiCli);
    }

    #[test]
    fn test_spawn_config_to_args_default() {
        let config = SpawnConfig::new("test-agent", "You are a code reviewer");
        let args = GeminiCliBackend::build_args(&config, "You are a code reviewer");

        assert!(args.contains(&"-p".to_string()));
        assert!(args.contains(&"You are a code reviewer".to_string()));
        assert!(args.contains(&"-m".to_string()));
        assert!(args.contains(&"gemini-2.5-pro".to_string()));
        assert!(args.contains(&"-y".to_string()));
    }

    #[test]
    fn test_spawn_config_to_args_custom_model() {
        let mut config = SpawnConfig::new("test-agent", "system prompt");
        config.model = Some("gemini-2.5-flash".to_string());
        let args = GeminiCliBackend::build_args(&config, "system prompt");

        assert!(args.contains(&"-m".to_string()));
        assert!(args.contains(&"gemini-2.5-flash".to_string()));
        assert!(!args.contains(&"gemini-2.5-pro".to_string()));
    }

    #[test]
    fn test_spawn_config_to_args_empty_system_prompt() {
        let config = SpawnConfig::new("test-agent", "");
        let args = GeminiCliBackend::build_args(&config, "");

        // Empty system prompt should not include -p
        assert!(!args.contains(&"-p".to_string()));
    }

    #[test]
    fn test_backend_type_display() {
        assert_eq!(BackendType::GeminiCli.to_string(), "gemini-cli");
    }
}