agent_teams/backend/mod.rs
1//! Backend abstraction layer for spawning and managing agent processes.
2//!
3//! Provides [`AgentBackend`] (factory trait) and [`AgentSession`] (per-agent handle trait)
4//! with concrete implementations for Claude Code (`cc-sdk`) and Codex (JSON-RPC subprocess).
5
6pub mod claude_code;
7pub mod codex;
8pub mod codex_protocol;
9pub mod delegation;
10pub mod gemini;
11pub mod router;
12
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use tracing::{debug, warn};
20
21use crate::Result;
22
23// ---------------------------------------------------------------------------
24// BackendType
25// ---------------------------------------------------------------------------
26
27/// Identifies which backend an agent is running on.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub enum BackendType {
30 /// Claude Code via `cc-sdk` interactive client.
31 ClaudeCode,
32 /// OpenAI Codex via JSON-RPC 2.0 subprocess.
33 Codex,
34 /// Google Gemini via one-shot CLI subprocess.
35 GeminiCli,
36}
37
38impl std::fmt::Display for BackendType {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 BackendType::ClaudeCode => write!(f, "claude-code"),
42 BackendType::Codex => write!(f, "codex"),
43 BackendType::GeminiCli => write!(f, "gemini-cli"),
44 }
45 }
46}
47
48impl std::str::FromStr for BackendType {
49 type Err = String;
50
51 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
52 match s {
53 "claude-code" => Ok(BackendType::ClaudeCode),
54 "codex" => Ok(BackendType::Codex),
55 "gemini-cli" => Ok(BackendType::GeminiCli),
56 other => Err(format!("unknown backend type: {other}")),
57 }
58 }
59}
60
61// ---------------------------------------------------------------------------
62// SpawnConfig
63// ---------------------------------------------------------------------------
64
65/// Configuration passed to [`AgentBackend::spawn`] when creating a new agent session.
66#[derive(Debug, Clone)]
67pub struct SpawnConfig {
68 /// Human-readable agent name (used for logging and routing).
69 pub name: String,
70 /// Initial prompt / system instruction to send to the agent.
71 pub prompt: String,
72 /// Model override (backend-specific; `None` = use default).
73 pub model: Option<String>,
74 /// Working directory for the agent process.
75 pub cwd: Option<PathBuf>,
76 /// Maximum conversation turns before the agent auto-stops.
77 pub max_turns: Option<i32>,
78 /// Tools the agent is allowed to use (auto-approval list).
79 pub allowed_tools: Vec<String>,
80 /// Permission mode string (e.g. `"default"`, `"plan"`, `"bypassPermissions"`).
81 pub permission_mode: Option<String>,
82 /// Reasoning effort level for the model (Codex: `"low"`, `"medium"`, `"high"`, `"xhigh"`).
83 /// When `None`, the backend's default / global config is used.
84 pub reasoning_effort: Option<String>,
85 /// Extra environment variables passed to the child process.
86 pub env: HashMap<String, String>,
87 /// Memory configuration for cross-turn context injection.
88 pub memory_config: Option<crate::memory::MemoryConfig>,
89 /// CLI tools this agent should delegate to via Bash.
90 pub delegations: Vec<delegation::CliDelegation>,
91}
92
93impl SpawnConfig {
94 /// Create a minimal spawn config with just a name and prompt.
95 pub fn new(name: impl Into<String>, prompt: impl Into<String>) -> Self {
96 Self {
97 name: name.into(),
98 prompt: prompt.into(),
99 model: None,
100 cwd: None,
101 max_turns: None,
102 allowed_tools: Vec::new(),
103 permission_mode: None,
104 reasoning_effort: None,
105 env: HashMap::new(),
106 memory_config: None,
107 delegations: Vec::new(),
108 }
109 }
110
111 /// Start building a spawn config with required fields.
112 ///
113 /// ```rust
114 /// use agent_teams::SpawnConfig;
115 ///
116 /// let config = SpawnConfig::builder("reviewer", "You are a code reviewer")
117 /// .model("gemini-2.5-flash")
118 /// .max_turns(5)
119 /// .build();
120 /// ```
121 pub fn builder(name: impl Into<String>, prompt: impl Into<String>) -> SpawnConfigBuilder {
122 SpawnConfigBuilder {
123 name: name.into(),
124 prompt: prompt.into(),
125 model: None,
126 cwd: None,
127 max_turns: None,
128 allowed_tools: Vec::new(),
129 permission_mode: None,
130 reasoning_effort: None,
131 env: HashMap::new(),
132 memory_config: None,
133 delegations: Vec::new(),
134 }
135 }
136}
137
138/// Builder for [`SpawnConfig`] with fluent setter methods for optional fields.
139#[derive(Debug)]
140pub struct SpawnConfigBuilder {
141 name: String,
142 prompt: String,
143 model: Option<String>,
144 cwd: Option<PathBuf>,
145 max_turns: Option<i32>,
146 allowed_tools: Vec<String>,
147 permission_mode: Option<String>,
148 reasoning_effort: Option<String>,
149 env: HashMap<String, String>,
150 memory_config: Option<crate::memory::MemoryConfig>,
151 delegations: Vec<delegation::CliDelegation>,
152}
153
154impl SpawnConfigBuilder {
155 /// Set the model override.
156 pub fn model(mut self, model: impl Into<String>) -> Self {
157 self.model = Some(model.into());
158 self
159 }
160
161 /// Set the working directory for the agent process.
162 pub fn cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
163 self.cwd = Some(cwd.into());
164 self
165 }
166
167 /// Set the maximum conversation turns.
168 pub fn max_turns(mut self, turns: i32) -> Self {
169 self.max_turns = Some(turns);
170 self
171 }
172
173 /// Set the tools the agent is allowed to use.
174 pub fn allowed_tools(mut self, tools: Vec<String>) -> Self {
175 self.allowed_tools = tools;
176 self
177 }
178
179 /// Set the permission mode (e.g., `"default"`, `"plan"`, `"bypassPermissions"`).
180 pub fn permission_mode(mut self, mode: impl Into<String>) -> Self {
181 self.permission_mode = Some(mode.into());
182 self
183 }
184
185 /// Set the reasoning effort level.
186 pub fn reasoning_effort(mut self, effort: impl Into<String>) -> Self {
187 self.reasoning_effort = Some(effort.into());
188 self
189 }
190
191 /// Add a single environment variable.
192 pub fn env_var(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
193 self.env.insert(key.into(), value.into());
194 self
195 }
196
197 /// Set all environment variables.
198 pub fn env(mut self, env: HashMap<String, String>) -> Self {
199 self.env = env;
200 self
201 }
202
203 /// Set memory configuration for cross-turn context injection.
204 pub fn memory(mut self, config: crate::memory::MemoryConfig) -> Self {
205 self.memory_config = Some(config);
206 self
207 }
208
209 /// Add a CLI delegation for this agent.
210 ///
211 /// The agent's prompt will be augmented with instructions on how and when
212 /// to invoke the delegated CLI tool via Bash.
213 ///
214 /// ```rust
215 /// use agent_teams::backend::delegation::CliDelegation;
216 /// use agent_teams::SpawnConfig;
217 ///
218 /// let config = SpawnConfig::builder("coder", "You write Rust code.")
219 /// .delegate(CliDelegation::codex())
220 /// .delegate(CliDelegation::gemini("gemini-2.5-pro"))
221 /// .build();
222 ///
223 /// assert_eq!(config.delegations.len(), 2);
224 /// ```
225 pub fn delegate(mut self, delegation: delegation::CliDelegation) -> Self {
226 self.delegations.push(delegation);
227 self
228 }
229
230 /// Build the [`SpawnConfig`].
231 pub fn build(self) -> SpawnConfig {
232 SpawnConfig {
233 name: self.name,
234 prompt: self.prompt,
235 model: self.model,
236 cwd: self.cwd,
237 max_turns: self.max_turns,
238 allowed_tools: self.allowed_tools,
239 permission_mode: self.permission_mode,
240 reasoning_effort: self.reasoning_effort,
241 env: self.env,
242 memory_config: self.memory_config,
243 delegations: self.delegations,
244 }
245 }
246}
247
248// ---------------------------------------------------------------------------
249// AgentOutput
250// ---------------------------------------------------------------------------
251
252/// Events emitted by a running agent session, delivered via an mpsc channel.
253#[derive(Debug, Clone)]
254pub enum AgentOutput {
255 /// A complete text message from the agent.
256 Message(String),
257 /// A streaming text delta (partial token).
258 Delta(String),
259 /// The agent finished a turn (ready for next input).
260 TurnComplete,
261 /// The agent is idle / waiting for work.
262 Idle,
263 /// An error occurred inside the agent.
264 Error(String),
265}
266
267// ---------------------------------------------------------------------------
268// AgentBackend (factory)
269// ---------------------------------------------------------------------------
270
271/// Factory trait: creates [`AgentSession`] instances for a specific backend.
272///
273/// Implementations are expected to be cheaply cloneable (or wrapped in `Arc`).
274#[async_trait]
275pub trait AgentBackend: Send + Sync {
276 /// Which backend this factory produces.
277 fn backend_type(&self) -> BackendType;
278
279 /// Spawn a new agent and return a session handle.
280 async fn spawn(&self, config: SpawnConfig) -> Result<Box<dyn AgentSession>>;
281}
282
283// ---------------------------------------------------------------------------
284// AgentSession (per-agent handle)
285// ---------------------------------------------------------------------------
286
287/// A running agent session that can receive input and emit output.
288#[async_trait]
289pub trait AgentSession: Send + Sync {
290 /// The agent's name (matches `SpawnConfig::name`).
291 fn name(&self) -> &str;
292
293 /// Send a follow-up user message to the agent (starts a new turn).
294 async fn send_input(&mut self, input: &str) -> Result<()>;
295
296 /// Take the output receiver.
297 ///
298 /// Returns `Some` on the first call, `None` thereafter (the receiver is moved
299 /// out so that the caller owns it).
300 fn output_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<AgentOutput>>;
301
302 /// Check whether the underlying process / connection is still alive.
303 ///
304 /// **Performance contract:** Implementations MUST return promptly with no I/O.
305 /// This method may be called while the orchestrator holds internal locks (e.g.,
306 /// inside [`TeamOrchestrator::are_alive`](crate::orchestrator::TeamOrchestrator::are_alive)).
307 /// A typical implementation is a single [`AtomicBool::load`](std::sync::atomic::AtomicBool::load).
308 async fn is_alive(&self) -> bool;
309
310 /// Gracefully shut down the agent.
311 async fn shutdown(&mut self) -> Result<()>;
312
313 /// Forcefully kill the agent process.
314 async fn force_kill(&mut self) -> Result<()>;
315}
316
317// ---------------------------------------------------------------------------
318// Shared helpers
319// ---------------------------------------------------------------------------
320
321// ---------------------------------------------------------------------------
322// AgentOutputStream (streaming adapter)
323// ---------------------------------------------------------------------------
324
325/// A stream of [`AgentOutput`] events from an agent session.
326///
327/// This wraps `tokio::sync::mpsc::Receiver<AgentOutput>` in a `Stream` for
328/// ergonomic use with `StreamExt` combinators:
329///
330/// ```ignore
331/// use tokio_stream::StreamExt;
332///
333/// let mut stream = orch.take_output_stream("team", "agent").await?
334/// .expect("receiver not yet taken");
335///
336/// while let Some(event) = stream.next().await {
337/// match event {
338/// AgentOutput::Delta(text) => print!("{text}"),
339/// AgentOutput::TurnComplete => break,
340/// _ => {}
341/// }
342/// }
343/// ```
344pub type AgentOutputStream = tokio_stream::wrappers::ReceiverStream<AgentOutput>;
345
346// ---------------------------------------------------------------------------
347// Shared helpers
348// ---------------------------------------------------------------------------
349
350/// Timeout for control event delivery.
351///
352/// If a control event (`TurnComplete`, `Error`, `Idle`) cannot be delivered within
353/// this duration, the session is marked dead. This prevents indefinite blocking when
354/// the consumer stops draining the channel.
355const CONTROL_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
356
357/// Send an output event from a backend reader/session task.
358///
359/// Control events (`TurnComplete`, `Error`, `Idle`) use `send().await` with a timeout
360/// to guarantee delivery -- dropping these would violate liveness (callers may hang
361/// forever). If delivery times out, the session is marked dead.
362/// Data events (`Delta`, `Message`) use `try_send` -- dropping text is acceptable
363/// under backpressure.
364///
365/// Returns `Err(())` if the channel is closed or control delivery timed out.
366pub(crate) async fn send_agent_output(
367 tx: &tokio::sync::mpsc::Sender<AgentOutput>,
368 output: AgentOutput,
369 alive: &Arc<AtomicBool>,
370 agent_name: &str,
371) -> std::result::Result<(), ()> {
372 let is_control = matches!(
373 output,
374 AgentOutput::TurnComplete | AgentOutput::Error(_) | AgentOutput::Idle
375 );
376
377 if is_control {
378 match tokio::time::timeout(CONTROL_SEND_TIMEOUT, tx.send(output)).await {
379 Ok(Ok(())) => Ok(()),
380 Ok(Err(_)) => {
381 debug!(agent = %agent_name, "Output channel closed");
382 alive.store(false, Ordering::Relaxed);
383 Err(())
384 }
385 Err(_) => {
386 warn!(agent = %agent_name, "Control event delivery timed out, marking session dead");
387 alive.store(false, Ordering::Relaxed);
388 Err(())
389 }
390 }
391 } else {
392 match tx.try_send(output) {
393 Ok(()) => Ok(()),
394 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
395 warn!(agent = %agent_name, "Output channel full, dropping data message");
396 Ok(())
397 }
398 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
399 debug!(agent = %agent_name, "Output channel closed");
400 alive.store(false, Ordering::Relaxed);
401 Err(())
402 }
403 }
404 }
405}
406
407// ---------------------------------------------------------------------------
408// Tests
409// ---------------------------------------------------------------------------
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414
415 /// Ensures `Display` and `FromStr` stay in sync for all `BackendType` variants.
416 /// If a new variant is added and either impl is not updated, this test fails.
417 #[test]
418 fn backend_type_display_fromstr_round_trip() {
419 let variants = [BackendType::ClaudeCode, BackendType::Codex, BackendType::GeminiCli];
420 for variant in &variants {
421 let s = variant.to_string();
422 let parsed: BackendType = s.parse().unwrap_or_else(|e| {
423 panic!("FromStr failed for Display output \"{s}\": {e}");
424 });
425 assert_eq!(*variant, parsed, "Round-trip failed for {s}");
426 }
427 }
428
429 #[test]
430 fn backend_type_fromstr_rejects_unknown() {
431 assert!("unknown".parse::<BackendType>().is_err());
432 assert!("Claude-Code".parse::<BackendType>().is_err()); // case-sensitive
433 }
434
435 #[test]
436 fn spawn_config_builder_all_fields() {
437 let config = SpawnConfig::builder("test-agent", "system prompt")
438 .model("gpt-4.1")
439 .cwd("/tmp")
440 .max_turns(5)
441 .allowed_tools(vec!["Read".into(), "Write".into()])
442 .permission_mode("plan")
443 .reasoning_effort("high")
444 .env_var("API_KEY", "secret")
445 .env_var("DEBUG", "1")
446 .build();
447
448 assert_eq!(config.name, "test-agent");
449 assert_eq!(config.prompt, "system prompt");
450 assert_eq!(config.model.as_deref(), Some("gpt-4.1"));
451 assert_eq!(config.cwd.as_ref().unwrap().to_str().unwrap(), "/tmp");
452 assert_eq!(config.max_turns, Some(5));
453 assert_eq!(config.allowed_tools, vec!["Read", "Write"]);
454 assert_eq!(config.permission_mode.as_deref(), Some("plan"));
455 assert_eq!(config.reasoning_effort.as_deref(), Some("high"));
456 assert_eq!(config.env.len(), 2);
457 assert_eq!(config.env["API_KEY"], "secret");
458 }
459
460 #[test]
461 fn spawn_config_builder_minimal() {
462 let config = SpawnConfig::builder("agent", "prompt").build();
463 assert_eq!(config.name, "agent");
464 assert_eq!(config.prompt, "prompt");
465 assert!(config.model.is_none());
466 assert!(config.cwd.is_none());
467 assert!(config.max_turns.is_none());
468 assert!(config.allowed_tools.is_empty());
469 assert!(config.env.is_empty());
470 }
471
472 #[test]
473 fn spawn_config_builder_is_debug() {
474 // Verify Debug derive compiles and produces output
475 let builder = SpawnConfig::builder("a", "b").model("gpt-4.1");
476 let debug_str = format!("{builder:?}");
477 assert!(debug_str.contains("SpawnConfigBuilder"));
478 }
479}