Skip to main content

agent_teams/backend/
mod.rs

1//! Backend abstraction layer for spawning and managing agent processes.
2//!
3//! Provides [`AgentBackend`] (factory trait) and [`AgentSession`] (per-agent handle trait)
4//! with concrete implementations for Claude Code (`cc-sdk`) and Codex (JSON-RPC subprocess).
5
6pub mod claude_code;
7pub mod codex;
8pub mod codex_protocol;
9pub mod delegation;
10pub mod gemini;
11pub mod router;
12
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use tracing::{debug, warn};
20
21use crate::Result;
22
23// ---------------------------------------------------------------------------
24// BackendType
25// ---------------------------------------------------------------------------
26
27/// Identifies which backend an agent is running on.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub enum BackendType {
30    /// Claude Code via `cc-sdk` interactive client.
31    ClaudeCode,
32    /// OpenAI Codex via JSON-RPC 2.0 subprocess.
33    Codex,
34    /// Google Gemini via one-shot CLI subprocess.
35    GeminiCli,
36}
37
38impl std::fmt::Display for BackendType {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        match self {
41            BackendType::ClaudeCode => write!(f, "claude-code"),
42            BackendType::Codex => write!(f, "codex"),
43            BackendType::GeminiCli => write!(f, "gemini-cli"),
44        }
45    }
46}
47
48impl std::str::FromStr for BackendType {
49    type Err = String;
50
51    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
52        match s {
53            "claude-code" => Ok(BackendType::ClaudeCode),
54            "codex" => Ok(BackendType::Codex),
55            "gemini-cli" => Ok(BackendType::GeminiCli),
56            other => Err(format!("unknown backend type: {other}")),
57        }
58    }
59}
60
61// ---------------------------------------------------------------------------
62// SpawnConfig
63// ---------------------------------------------------------------------------
64
65/// Configuration passed to [`AgentBackend::spawn`] when creating a new agent session.
66#[derive(Debug, Clone)]
67pub struct SpawnConfig {
68    /// Human-readable agent name (used for logging and routing).
69    pub name: String,
70    /// Initial prompt / system instruction to send to the agent.
71    pub prompt: String,
72    /// Model override (backend-specific; `None` = use default).
73    pub model: Option<String>,
74    /// Working directory for the agent process.
75    pub cwd: Option<PathBuf>,
76    /// Maximum conversation turns before the agent auto-stops.
77    pub max_turns: Option<i32>,
78    /// Tools the agent is allowed to use (auto-approval list).
79    pub allowed_tools: Vec<String>,
80    /// Permission mode string (e.g. `"default"`, `"plan"`, `"bypassPermissions"`).
81    pub permission_mode: Option<String>,
82    /// Reasoning effort level for the model (Codex: `"low"`, `"medium"`, `"high"`, `"xhigh"`).
83    /// When `None`, the backend's default / global config is used.
84    pub reasoning_effort: Option<String>,
85    /// Extra environment variables passed to the child process.
86    pub env: HashMap<String, String>,
87    /// Memory configuration for cross-turn context injection.
88    pub memory_config: Option<crate::memory::MemoryConfig>,
89    /// CLI tools this agent should delegate to via Bash.
90    pub delegations: Vec<delegation::CliDelegation>,
91}
92
93impl SpawnConfig {
94    /// Create a minimal spawn config with just a name and prompt.
95    pub fn new(name: impl Into<String>, prompt: impl Into<String>) -> Self {
96        Self {
97            name: name.into(),
98            prompt: prompt.into(),
99            model: None,
100            cwd: None,
101            max_turns: None,
102            allowed_tools: Vec::new(),
103            permission_mode: None,
104            reasoning_effort: None,
105            env: HashMap::new(),
106            memory_config: None,
107            delegations: Vec::new(),
108        }
109    }
110
111    /// Start building a spawn config with required fields.
112    ///
113    /// ```rust
114    /// use agent_teams::SpawnConfig;
115    ///
116    /// let config = SpawnConfig::builder("reviewer", "You are a code reviewer")
117    ///     .model("gemini-2.5-flash")
118    ///     .max_turns(5)
119    ///     .build();
120    /// ```
121    pub fn builder(name: impl Into<String>, prompt: impl Into<String>) -> SpawnConfigBuilder {
122        SpawnConfigBuilder {
123            name: name.into(),
124            prompt: prompt.into(),
125            model: None,
126            cwd: None,
127            max_turns: None,
128            allowed_tools: Vec::new(),
129            permission_mode: None,
130            reasoning_effort: None,
131            env: HashMap::new(),
132            memory_config: None,
133            delegations: Vec::new(),
134        }
135    }
136}
137
138/// Builder for [`SpawnConfig`] with fluent setter methods for optional fields.
139#[derive(Debug)]
140pub struct SpawnConfigBuilder {
141    name: String,
142    prompt: String,
143    model: Option<String>,
144    cwd: Option<PathBuf>,
145    max_turns: Option<i32>,
146    allowed_tools: Vec<String>,
147    permission_mode: Option<String>,
148    reasoning_effort: Option<String>,
149    env: HashMap<String, String>,
150    memory_config: Option<crate::memory::MemoryConfig>,
151    delegations: Vec<delegation::CliDelegation>,
152}
153
154impl SpawnConfigBuilder {
155    /// Set the model override.
156    pub fn model(mut self, model: impl Into<String>) -> Self {
157        self.model = Some(model.into());
158        self
159    }
160
161    /// Set the working directory for the agent process.
162    pub fn cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
163        self.cwd = Some(cwd.into());
164        self
165    }
166
167    /// Set the maximum conversation turns.
168    pub fn max_turns(mut self, turns: i32) -> Self {
169        self.max_turns = Some(turns);
170        self
171    }
172
173    /// Set the tools the agent is allowed to use.
174    pub fn allowed_tools(mut self, tools: Vec<String>) -> Self {
175        self.allowed_tools = tools;
176        self
177    }
178
179    /// Set the permission mode (e.g., `"default"`, `"plan"`, `"bypassPermissions"`).
180    pub fn permission_mode(mut self, mode: impl Into<String>) -> Self {
181        self.permission_mode = Some(mode.into());
182        self
183    }
184
185    /// Set the reasoning effort level.
186    pub fn reasoning_effort(mut self, effort: impl Into<String>) -> Self {
187        self.reasoning_effort = Some(effort.into());
188        self
189    }
190
191    /// Add a single environment variable.
192    pub fn env_var(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
193        self.env.insert(key.into(), value.into());
194        self
195    }
196
197    /// Set all environment variables.
198    pub fn env(mut self, env: HashMap<String, String>) -> Self {
199        self.env = env;
200        self
201    }
202
203    /// Set memory configuration for cross-turn context injection.
204    pub fn memory(mut self, config: crate::memory::MemoryConfig) -> Self {
205        self.memory_config = Some(config);
206        self
207    }
208
209    /// Add a CLI delegation for this agent.
210    ///
211    /// The agent's prompt will be augmented with instructions on how and when
212    /// to invoke the delegated CLI tool via Bash.
213    ///
214    /// ```rust
215    /// use agent_teams::backend::delegation::CliDelegation;
216    /// use agent_teams::SpawnConfig;
217    ///
218    /// let config = SpawnConfig::builder("coder", "You write Rust code.")
219    ///     .delegate(CliDelegation::codex())
220    ///     .delegate(CliDelegation::gemini("gemini-2.5-pro"))
221    ///     .build();
222    ///
223    /// assert_eq!(config.delegations.len(), 2);
224    /// ```
225    pub fn delegate(mut self, delegation: delegation::CliDelegation) -> Self {
226        self.delegations.push(delegation);
227        self
228    }
229
230    /// Build the [`SpawnConfig`].
231    pub fn build(self) -> SpawnConfig {
232        SpawnConfig {
233            name: self.name,
234            prompt: self.prompt,
235            model: self.model,
236            cwd: self.cwd,
237            max_turns: self.max_turns,
238            allowed_tools: self.allowed_tools,
239            permission_mode: self.permission_mode,
240            reasoning_effort: self.reasoning_effort,
241            env: self.env,
242            memory_config: self.memory_config,
243            delegations: self.delegations,
244        }
245    }
246}
247
248// ---------------------------------------------------------------------------
249// AgentOutput
250// ---------------------------------------------------------------------------
251
252/// Events emitted by a running agent session, delivered via an mpsc channel.
253#[derive(Debug, Clone)]
254pub enum AgentOutput {
255    /// A complete text message from the agent.
256    Message(String),
257    /// A streaming text delta (partial token).
258    Delta(String),
259    /// The agent finished a turn (ready for next input).
260    TurnComplete,
261    /// The agent is idle / waiting for work.
262    Idle,
263    /// An error occurred inside the agent.
264    Error(String),
265}
266
267// ---------------------------------------------------------------------------
268// AgentBackend  (factory)
269// ---------------------------------------------------------------------------
270
271/// Factory trait: creates [`AgentSession`] instances for a specific backend.
272///
273/// Implementations are expected to be cheaply cloneable (or wrapped in `Arc`).
274#[async_trait]
275pub trait AgentBackend: Send + Sync {
276    /// Which backend this factory produces.
277    fn backend_type(&self) -> BackendType;
278
279    /// Spawn a new agent and return a session handle.
280    async fn spawn(&self, config: SpawnConfig) -> Result<Box<dyn AgentSession>>;
281}
282
283// ---------------------------------------------------------------------------
284// AgentSession  (per-agent handle)
285// ---------------------------------------------------------------------------
286
287/// A running agent session that can receive input and emit output.
288#[async_trait]
289pub trait AgentSession: Send + Sync {
290    /// The agent's name (matches `SpawnConfig::name`).
291    fn name(&self) -> &str;
292
293    /// Send a follow-up user message to the agent (starts a new turn).
294    async fn send_input(&mut self, input: &str) -> Result<()>;
295
296    /// Take the output receiver.
297    ///
298    /// Returns `Some` on the first call, `None` thereafter (the receiver is moved
299    /// out so that the caller owns it).
300    fn output_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<AgentOutput>>;
301
302    /// Check whether the underlying process / connection is still alive.
303    ///
304    /// **Performance contract:** Implementations MUST return promptly with no I/O.
305    /// This method may be called while the orchestrator holds internal locks (e.g.,
306    /// inside [`TeamOrchestrator::are_alive`](crate::orchestrator::TeamOrchestrator::are_alive)).
307    /// A typical implementation is a single [`AtomicBool::load`](std::sync::atomic::AtomicBool::load).
308    async fn is_alive(&self) -> bool;
309
310    /// Gracefully shut down the agent.
311    async fn shutdown(&mut self) -> Result<()>;
312
313    /// Forcefully kill the agent process.
314    async fn force_kill(&mut self) -> Result<()>;
315}
316
317// ---------------------------------------------------------------------------
318// Shared helpers
319// ---------------------------------------------------------------------------
320
321// ---------------------------------------------------------------------------
322// AgentOutputStream  (streaming adapter)
323// ---------------------------------------------------------------------------
324
325/// A stream of [`AgentOutput`] events from an agent session.
326///
327/// This wraps `tokio::sync::mpsc::Receiver<AgentOutput>` in a `Stream` for
328/// ergonomic use with `StreamExt` combinators:
329///
330/// ```ignore
331/// use tokio_stream::StreamExt;
332///
333/// let mut stream = orch.take_output_stream("team", "agent").await?
334///     .expect("receiver not yet taken");
335///
336/// while let Some(event) = stream.next().await {
337///     match event {
338///         AgentOutput::Delta(text) => print!("{text}"),
339///         AgentOutput::TurnComplete => break,
340///         _ => {}
341///     }
342/// }
343/// ```
344pub type AgentOutputStream = tokio_stream::wrappers::ReceiverStream<AgentOutput>;
345
346// ---------------------------------------------------------------------------
347// Shared helpers
348// ---------------------------------------------------------------------------
349
350/// Timeout for control event delivery.
351///
352/// If a control event (`TurnComplete`, `Error`, `Idle`) cannot be delivered within
353/// this duration, the session is marked dead. This prevents indefinite blocking when
354/// the consumer stops draining the channel.
355const CONTROL_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
356
357/// Send an output event from a backend reader/session task.
358///
359/// Control events (`TurnComplete`, `Error`, `Idle`) use `send().await` with a timeout
360/// to guarantee delivery -- dropping these would violate liveness (callers may hang
361/// forever). If delivery times out, the session is marked dead.
362/// Data events (`Delta`, `Message`) use `try_send` -- dropping text is acceptable
363/// under backpressure.
364///
365/// Returns `Err(())` if the channel is closed or control delivery timed out.
366pub(crate) async fn send_agent_output(
367    tx: &tokio::sync::mpsc::Sender<AgentOutput>,
368    output: AgentOutput,
369    alive: &Arc<AtomicBool>,
370    agent_name: &str,
371) -> std::result::Result<(), ()> {
372    let is_control = matches!(
373        output,
374        AgentOutput::TurnComplete | AgentOutput::Error(_) | AgentOutput::Idle
375    );
376
377    if is_control {
378        match tokio::time::timeout(CONTROL_SEND_TIMEOUT, tx.send(output)).await {
379            Ok(Ok(())) => Ok(()),
380            Ok(Err(_)) => {
381                debug!(agent = %agent_name, "Output channel closed");
382                alive.store(false, Ordering::Relaxed);
383                Err(())
384            }
385            Err(_) => {
386                warn!(agent = %agent_name, "Control event delivery timed out, marking session dead");
387                alive.store(false, Ordering::Relaxed);
388                Err(())
389            }
390        }
391    } else {
392        match tx.try_send(output) {
393            Ok(()) => Ok(()),
394            Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
395                warn!(agent = %agent_name, "Output channel full, dropping data message");
396                Ok(())
397            }
398            Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
399                debug!(agent = %agent_name, "Output channel closed");
400                alive.store(false, Ordering::Relaxed);
401                Err(())
402            }
403        }
404    }
405}
406
407// ---------------------------------------------------------------------------
408// Tests
409// ---------------------------------------------------------------------------
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414
415    /// Ensures `Display` and `FromStr` stay in sync for all `BackendType` variants.
416    /// If a new variant is added and either impl is not updated, this test fails.
417    #[test]
418    fn backend_type_display_fromstr_round_trip() {
419        let variants = [BackendType::ClaudeCode, BackendType::Codex, BackendType::GeminiCli];
420        for variant in &variants {
421            let s = variant.to_string();
422            let parsed: BackendType = s.parse().unwrap_or_else(|e| {
423                panic!("FromStr failed for Display output \"{s}\": {e}");
424            });
425            assert_eq!(*variant, parsed, "Round-trip failed for {s}");
426        }
427    }
428
429    #[test]
430    fn backend_type_fromstr_rejects_unknown() {
431        assert!("unknown".parse::<BackendType>().is_err());
432        assert!("Claude-Code".parse::<BackendType>().is_err()); // case-sensitive
433    }
434
435    #[test]
436    fn spawn_config_builder_all_fields() {
437        let config = SpawnConfig::builder("test-agent", "system prompt")
438            .model("gpt-4.1")
439            .cwd("/tmp")
440            .max_turns(5)
441            .allowed_tools(vec!["Read".into(), "Write".into()])
442            .permission_mode("plan")
443            .reasoning_effort("high")
444            .env_var("API_KEY", "secret")
445            .env_var("DEBUG", "1")
446            .build();
447
448        assert_eq!(config.name, "test-agent");
449        assert_eq!(config.prompt, "system prompt");
450        assert_eq!(config.model.as_deref(), Some("gpt-4.1"));
451        assert_eq!(config.cwd.as_ref().unwrap().to_str().unwrap(), "/tmp");
452        assert_eq!(config.max_turns, Some(5));
453        assert_eq!(config.allowed_tools, vec!["Read", "Write"]);
454        assert_eq!(config.permission_mode.as_deref(), Some("plan"));
455        assert_eq!(config.reasoning_effort.as_deref(), Some("high"));
456        assert_eq!(config.env.len(), 2);
457        assert_eq!(config.env["API_KEY"], "secret");
458    }
459
460    #[test]
461    fn spawn_config_builder_minimal() {
462        let config = SpawnConfig::builder("agent", "prompt").build();
463        assert_eq!(config.name, "agent");
464        assert_eq!(config.prompt, "prompt");
465        assert!(config.model.is_none());
466        assert!(config.cwd.is_none());
467        assert!(config.max_turns.is_none());
468        assert!(config.allowed_tools.is_empty());
469        assert!(config.env.is_empty());
470    }
471
472    #[test]
473    fn spawn_config_builder_is_debug() {
474        // Verify Debug derive compiles and produces output
475        let builder = SpawnConfig::builder("a", "b").model("gpt-4.1");
476        let debug_str = format!("{builder:?}");
477        assert!(debug_str.contains("SpawnConfigBuilder"));
478    }
479}