use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::BufReader;
use tokio::process::{Child, Command};
use tokio::time::timeout;
use terraphim_types::capability::{ProcessId, Provider};
pub mod audit;
pub mod config;
pub mod health;
pub mod mention;
pub mod output;
pub use audit::AuditEvent;
pub use config::{AgentConfig, AgentValidator, ResourceLimits, ValidationError};
pub use health::{
CircuitBreaker, CircuitBreakerConfig, CircuitState, HealthChecker, HealthHistory, HealthStatus,
};
pub use mention::{MentionEvent, MentionRouter};
pub use output::{OutputCapture, OutputEvent};
#[derive(thiserror::Error, Debug)]
pub enum SpawnerError {
#[error("Agent validation failed: {0}")]
ValidationError(String),
#[error("Failed to spawn agent: {0}")]
SpawnError(String),
#[error("Agent process exited unexpectedly: {0}")]
ProcessExit(String),
#[error("Health check failed: {0}")]
HealthCheckFailed(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Config validation error: {0}")]
ConfigValidation(#[from] ValidationError),
}
#[derive(Debug, Clone)]
pub struct SpawnRequest {
pub primary_provider: Provider,
pub primary_model: Option<String>,
pub fallback_provider: Option<Provider>,
pub fallback_model: Option<String>,
pub task: String,
pub use_stdin: bool,
pub resource_limits: ResourceLimits,
}
impl SpawnRequest {
pub fn new(primary_provider: Provider, task: impl Into<String>) -> Self {
Self {
primary_provider,
primary_model: None,
fallback_provider: None,
fallback_model: None,
task: task.into(),
use_stdin: false,
resource_limits: ResourceLimits::default(),
}
}
pub fn with_primary_model(mut self, model: impl Into<String>) -> Self {
self.primary_model = Some(model.into());
self
}
pub fn with_fallback_provider(mut self, provider: Provider) -> Self {
self.fallback_provider = Some(provider);
self
}
pub fn with_fallback_model(mut self, model: impl Into<String>) -> Self {
self.fallback_model = Some(model.into());
self
}
pub fn with_stdin(mut self) -> Self {
self.use_stdin = true;
self
}
pub fn with_resource_limits(mut self, limits: ResourceLimits) -> Self {
self.resource_limits = limits;
self
}
}
#[derive(Debug)]
pub struct AgentHandle {
pub process_id: ProcessId,
pub provider: Provider,
child: Child,
health_checker: HealthChecker,
output_capture: OutputCapture,
}
impl AgentHandle {
pub fn process_id(&self) -> ProcessId {
self.process_id
}
pub async fn is_healthy(&self) -> bool {
self.health_checker.is_healthy().await
}
pub fn health_status(&self) -> HealthStatus {
self.health_checker.status()
}
pub fn output_capture(&self) -> &OutputCapture {
&self.output_capture
}
pub fn subscribe_output(&self) -> tokio::sync::broadcast::Receiver<OutputEvent> {
self.output_capture.subscribe()
}
pub async fn shutdown(&mut self, grace_period: Duration) -> Result<bool, SpawnerError> {
let pid = match self.child.id() {
Some(id) => id,
None => {
self.health_checker.mark_terminated();
return Ok(true);
}
};
#[cfg(unix)]
{
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
let nix_pid = Pid::from_raw(pid as i32);
if let Err(e) = kill(nix_pid, Signal::SIGTERM) {
tracing::warn!(pid = pid, error = %e, "Failed to send SIGTERM");
} else {
tracing::info!(pid = pid, process_id = %self.process_id, "Sent SIGTERM");
}
}
match timeout(grace_period, self.child.wait()).await {
Ok(Ok(status)) => {
tracing::info!(
process_id = %self.process_id,
status = %status,
"Process exited gracefully"
);
tracing::info!(
target: "terraphim_spawner::audit",
event = %AuditEvent::AgentTerminated {
process_id: self.process_id,
graceful: true,
},
"Agent terminated gracefully"
);
self.health_checker.mark_terminated();
Ok(true)
}
Ok(Err(e)) => {
self.health_checker.mark_terminated();
Err(SpawnerError::ProcessExit(format!(
"Wait failed for {}: {}",
self.process_id, e
)))
}
Err(_) => {
tracing::warn!(
process_id = %self.process_id,
grace_period_ms = grace_period.as_millis() as u64,
"Process did not exit within grace period, sending SIGKILL"
);
self.child.kill().await?;
tracing::info!(
target: "terraphim_spawner::audit",
event = %AuditEvent::AgentTerminated {
process_id: self.process_id,
graceful: false,
},
"Agent force-killed"
);
self.health_checker.mark_terminated();
Ok(false)
}
}
}
pub async fn kill(mut self) -> Result<(), SpawnerError> {
self.health_checker.mark_terminated();
self.child.kill().await?;
Ok(())
}
pub fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>, SpawnerError> {
match self.child.try_wait() {
Ok(status) => {
if status.is_some() {
self.health_checker.mark_terminated();
}
Ok(status)
}
Err(e) => Err(SpawnerError::Io(e)),
}
}
pub async fn wait(&mut self) -> Result<std::process::ExitStatus, SpawnerError> {
let status = self.child.wait().await.map_err(SpawnerError::Io)?;
self.health_checker.mark_terminated();
Ok(status)
}
}
pub struct AgentPool {
idle: HashMap<String, Vec<AgentHandle>>,
max_idle_per_provider: usize,
shutdown_grace: Duration,
}
impl AgentPool {
pub fn new(max_idle_per_provider: usize) -> Self {
Self {
idle: HashMap::new(),
max_idle_per_provider,
shutdown_grace: Duration::from_secs(5),
}
}
pub fn with_shutdown_grace(mut self, grace: Duration) -> Self {
self.shutdown_grace = grace;
self
}
pub fn checkout(&mut self, provider_id: &str) -> Option<AgentHandle> {
let agents = self.idle.get_mut(provider_id)?;
agents.pop()
}
pub fn release(&mut self, handle: AgentHandle) {
let provider_id = handle.provider.id.clone();
let agents = self.idle.entry(provider_id).or_default();
if agents.len() >= self.max_idle_per_provider {
let mut evicted = agents.remove(0);
let grace = self.shutdown_grace;
tokio::spawn(async move {
let _ = evicted.shutdown(grace).await;
});
}
agents.push(handle);
}
pub fn idle_count(&self, provider_id: &str) -> usize {
self.idle.get(provider_id).map_or(0, |v| v.len())
}
pub fn total_idle(&self) -> usize {
self.idle.values().map(|v| v.len()).sum()
}
pub async fn drain(&mut self) {
for (_provider_id, agents) in self.idle.drain() {
for mut handle in agents {
let _ = handle.shutdown(self.shutdown_grace).await;
}
}
}
}
impl std::fmt::Debug for AgentPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AgentPool")
.field("max_idle_per_provider", &self.max_idle_per_provider)
.field("shutdown_grace", &self.shutdown_grace)
.field("total_idle", &self.total_idle())
.finish()
}
}
#[derive(Debug, Clone)]
pub struct AgentSpawner {
default_working_dir: PathBuf,
env_vars: HashMap<String, String>,
auto_restart: bool,
max_restarts: u32,
}
impl AgentSpawner {
pub fn new() -> Self {
Self {
default_working_dir: PathBuf::from("/tmp"),
env_vars: HashMap::new(),
auto_restart: true,
max_restarts: 3,
}
}
pub fn with_working_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.default_working_dir = dir.into();
self
}
pub fn with_env_vars(mut self, vars: HashMap<String, String>) -> Self {
self.env_vars = vars;
self
}
pub fn with_auto_restart(mut self, enabled: bool) -> Self {
self.auto_restart = enabled;
self
}
pub fn with_max_restarts(mut self, max: u32) -> Self {
self.max_restarts = max;
self
}
pub fn auto_restart(&self) -> bool {
self.auto_restart
}
pub fn max_restarts(&self) -> u32 {
self.max_restarts
}
pub async fn spawn_with_model(
&self,
provider: &Provider,
task: &str,
model: Option<&str>,
) -> Result<AgentHandle, SpawnerError> {
let config = AgentConfig::from_provider(provider)?;
let config = match model {
Some(m) => config.with_model(m),
None => config,
};
self.spawn_config(provider, &config, task, false).await
}
pub async fn spawn_with_model_stdin(
&self,
provider: &Provider,
task: &str,
model: Option<&str>,
) -> Result<AgentHandle, SpawnerError> {
let config = AgentConfig::from_provider(provider)?;
let config = match model {
Some(m) => config.with_model(m),
None => config,
};
self.spawn_config(provider, &config, task, true).await
}
async fn spawn_with_options(
&self,
provider: &Provider,
task: &str,
model: Option<&str>,
use_stdin: bool,
resource_limits: ResourceLimits,
) -> Result<AgentHandle, SpawnerError> {
let config = AgentConfig::from_provider(provider)?;
let config = config.with_resource_limits(resource_limits);
let config = match model {
Some(m) => config.with_model(m),
None => config,
};
self.spawn_config(provider, &config, task, use_stdin).await
}
pub async fn spawn(
&self,
provider: &Provider,
task: &str,
) -> Result<AgentHandle, SpawnerError> {
let config = AgentConfig::from_provider(provider)?;
self.spawn_config(provider, &config, task, false).await
}
pub async fn spawn_with_fallback(
&self,
request: &SpawnRequest,
) -> Result<AgentHandle, SpawnerError> {
let primary_result = self
.spawn_with_options(
&request.primary_provider,
&request.task,
request.primary_model.as_deref(),
request.use_stdin,
request.resource_limits.clone(),
)
.await;
match primary_result {
Ok(handle) => Ok(handle),
Err(primary_err) => {
tracing::warn!(
primary_provider = %request.primary_provider.id,
error = %primary_err,
"Primary spawn failed, attempting fallback"
);
if let Some(ref fallback) = request.fallback_provider {
tracing::info!(
fallback_provider = %fallback.id,
"Attempting fallback spawn"
);
let fallback_result = self
.spawn_with_options(
fallback,
&request.task,
request.fallback_model.as_deref(),
request.use_stdin,
request.resource_limits.clone(),
)
.await;
match fallback_result {
Ok(handle) => {
tracing::info!(
fallback_provider = %fallback.id,
"Fallback spawn succeeded"
);
Ok(handle)
}
Err(fallback_err) => {
tracing::error!(
fallback_provider = %fallback.id,
error = %fallback_err,
"Fallback spawn also failed"
);
Err(primary_err)
}
}
} else {
Err(primary_err)
}
}
}
}
async fn spawn_config(
&self,
provider: &Provider,
config: &AgentConfig,
task: &str,
use_stdin: bool,
) -> Result<AgentHandle, SpawnerError> {
let _span = tracing::info_span!(
"spawner.spawn",
provider_id = provider.id.as_str(),
task_len = task.len(),
)
.entered();
let validator = AgentValidator::new(config);
validator.validate().await?;
let process_id = ProcessId::new();
let mut child = self.spawn_process(config, task, use_stdin).await?;
let health_checker = HealthChecker::new(process_id, Duration::from_secs(30));
let stdout = child
.stdout
.take()
.ok_or_else(|| SpawnerError::SpawnError("Failed to capture stdout".to_string()))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| SpawnerError::SpawnError("Failed to capture stderr".to_string()))?;
let output_capture =
OutputCapture::new(process_id, BufReader::new(stdout), BufReader::new(stderr));
tracing::info!(
target: "terraphim_spawner::audit",
event = %AuditEvent::AgentSpawned {
process_id,
provider_id: provider.id.clone(),
},
"Agent spawned"
);
Ok(AgentHandle {
process_id,
provider: provider.clone(),
child,
health_checker,
output_capture,
})
}
async fn spawn_process(
&self,
config: &AgentConfig,
task: &str,
use_stdin: bool,
) -> Result<Child, SpawnerError> {
let working_dir = config
.working_dir
.as_ref()
.unwrap_or(&self.default_working_dir);
let mut cmd = Command::new(&config.cli_command);
cmd.current_dir(working_dir).args(&config.args);
if use_stdin {
cmd.stdin(Stdio::piped());
} else {
cmd.arg(task);
cmd.stdin(Stdio::null());
}
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
for (key, value) in &self.env_vars {
cmd.env(key, value);
}
for (key, value) in &config.env_vars {
cmd.env(key, value);
}
let cli_name = std::path::Path::new(&config.cli_command)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("");
if cli_name == "claude" || cli_name == "claude-code" {
cmd.env_remove("ANTHROPIC_API_KEY");
}
#[cfg(unix)]
{
let limits = config.resource_limits.clone();
unsafe {
cmd.pre_exec(move || {
Self::apply_resource_limits(&limits)?;
Ok(())
});
}
}
let mut child = cmd.spawn()?;
if use_stdin {
if let Some(mut stdin) = child.stdin.take() {
use tokio::io::AsyncWriteExt;
stdin.write_all(task.as_bytes()).await.map_err(|e| {
SpawnerError::SpawnError(format!("failed to write prompt to stdin: {}", e))
})?;
}
}
Ok(child)
}
#[cfg(unix)]
fn apply_resource_limits(limits: &config::ResourceLimits) -> Result<(), std::io::Error> {
use nix::sys::resource::{setrlimit, Resource};
if let Some(max_mem) = limits.max_memory_bytes {
setrlimit(Resource::RLIMIT_AS, max_mem, max_mem)
.map_err(|e| std::io::Error::other(format!("RLIMIT_AS: {}", e)))?;
}
if let Some(max_cpu) = limits.max_cpu_seconds {
setrlimit(Resource::RLIMIT_CPU, max_cpu, max_cpu)
.map_err(|e| std::io::Error::other(format!("RLIMIT_CPU: {}", e)))?;
}
if let Some(max_fsize) = limits.max_file_size_bytes {
setrlimit(Resource::RLIMIT_FSIZE, max_fsize, max_fsize)
.map_err(|e| std::io::Error::other(format!("RLIMIT_FSIZE: {}", e)))?;
}
if let Some(max_files) = limits.max_open_files {
setrlimit(Resource::RLIMIT_NOFILE, max_files, max_files)
.map_err(|e| std::io::Error::other(format!("RLIMIT_NOFILE: {}", e)))?;
}
Ok(())
}
}
impl Default for AgentSpawner {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use terraphim_types::capability::{Capability, ProviderType};
fn create_test_agent_provider() -> Provider {
Provider::new(
"@test-agent",
"Test Agent",
ProviderType::Agent {
agent_id: "@test".to_string(),
cli_command: "echo".to_string(),
working_dir: PathBuf::from("/tmp"),
},
vec![Capability::CodeGeneration],
)
}
fn create_sleep_agent_provider() -> Provider {
Provider::new(
"@sleep-agent",
"Sleep Agent",
ProviderType::Agent {
agent_id: "@sleep".to_string(),
cli_command: "sleep".to_string(),
working_dir: PathBuf::from("/tmp"),
},
vec![Capability::CodeGeneration],
)
}
#[test]
fn test_spawner_creation() {
let spawner = AgentSpawner::new()
.with_auto_restart(false)
.with_working_dir("/workspace")
.with_max_restarts(5);
assert!(!spawner.auto_restart());
assert_eq!(spawner.max_restarts(), 5);
assert_eq!(spawner.default_working_dir, PathBuf::from("/workspace"));
}
#[tokio::test]
async fn test_spawn_echo_agent() {
let spawner = AgentSpawner::new();
let provider = create_test_agent_provider();
let handle = spawner.spawn(&provider, "Hello World").await;
assert!(handle.is_ok());
let handle = handle.unwrap();
assert_eq!(handle.provider.id, "@test-agent");
}
#[tokio::test]
async fn test_try_wait_completed() {
let spawner = AgentSpawner::new();
let provider = create_test_agent_provider();
let mut handle = spawner.spawn(&provider, "done").await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let status = handle.try_wait().unwrap();
assert!(status.is_some()); assert_eq!(handle.health_status(), HealthStatus::Terminated);
}
#[tokio::test]
async fn test_graceful_shutdown() {
let spawner = AgentSpawner::new();
let provider = create_sleep_agent_provider();
let mut handle = spawner.spawn(&provider, "60").await.unwrap();
let result = handle.shutdown(Duration::from_secs(2)).await;
assert!(result.is_ok());
let _graceful = result.unwrap();
assert_eq!(handle.health_status(), HealthStatus::Terminated);
}
#[test]
fn test_agent_pool_checkout_empty() {
let mut pool = AgentPool::new(5);
assert!(pool.checkout("nonexistent").is_none());
assert_eq!(pool.total_idle(), 0);
}
#[tokio::test]
async fn test_agent_pool_release_and_checkout() {
let spawner = AgentSpawner::new();
let provider = create_test_agent_provider();
let handle = spawner.spawn(&provider, "hello").await.unwrap();
let mut pool = AgentPool::new(5);
pool.release(handle);
assert_eq!(pool.idle_count("@test-agent"), 1);
assert_eq!(pool.total_idle(), 1);
let checked_out = pool.checkout("@test-agent");
assert!(checked_out.is_some());
assert_eq!(pool.idle_count("@test-agent"), 0);
}
#[tokio::test]
async fn test_subscribe_output_receives_events() {
let spawner = AgentSpawner::new();
let provider = create_test_agent_provider();
let handle = spawner.spawn(&provider, "broadcast test").await.unwrap();
let mut receiver = handle.subscribe_output();
tokio::time::sleep(Duration::from_millis(200)).await;
match receiver.try_recv() {
Ok(OutputEvent::Stdout { line, .. }) => {
assert!(line.contains("broadcast"));
}
Ok(OutputEvent::Mention { .. }) => {
}
Ok(_) => {}
Err(tokio::sync::broadcast::error::TryRecvError::Empty) => {
}
Err(e) => panic!("Unexpected broadcast error: {:?}", e),
}
}
#[tokio::test]
async fn test_spawn_with_resource_limits() {
let spawner = AgentSpawner::new();
let provider = create_test_agent_provider();
let handle = spawner.spawn(&provider, "resource-limited").await;
assert!(handle.is_ok());
}
#[tokio::test]
async fn test_agent_pool_drain() {
let spawner = AgentSpawner::new();
let provider = create_test_agent_provider();
let handle = spawner.spawn(&provider, "hello").await.unwrap();
let mut pool = AgentPool::new(5);
pool.release(handle);
assert_eq!(pool.total_idle(), 1);
pool.drain().await;
assert_eq!(pool.total_idle(), 0);
}
fn create_cat_agent_provider() -> Provider {
Provider::new(
"@cat-agent",
"Cat Agent",
ProviderType::Agent {
agent_id: "@cat".to_string(),
cli_command: "cat".to_string(),
working_dir: PathBuf::from("/tmp"),
},
vec![Capability::CodeGeneration],
)
}
#[tokio::test]
async fn test_spawn_process_stdin_echo() {
let spawner = AgentSpawner::new();
let provider = create_cat_agent_provider();
let handle = spawner
.spawn_with_model_stdin(&provider, "hello from stdin", None)
.await;
assert!(handle.is_ok());
let handle = handle.unwrap();
assert_eq!(handle.provider.id, "@cat-agent");
tokio::time::sleep(Duration::from_millis(100)).await;
let mut receiver = handle.subscribe_output();
tokio::time::sleep(Duration::from_millis(200)).await;
match receiver.try_recv() {
Ok(OutputEvent::Stdout { line, .. }) => {
assert!(line.contains("hello from stdin"));
}
Ok(_) => {}
Err(tokio::sync::broadcast::error::TryRecvError::Empty) => {
}
Err(e) => panic!("Unexpected broadcast error: {:?}", e),
}
}
#[tokio::test]
async fn test_spawn_process_arg_fallback() {
let spawner = AgentSpawner::new();
let provider = create_test_agent_provider();
let handle = spawner.spawn(&provider, "arg test").await;
assert!(handle.is_ok());
let handle = handle.unwrap();
assert_eq!(handle.provider.id, "@test-agent");
}
#[test]
fn test_stdin_threshold_applied() {
const STDIN_THRESHOLD: usize = 32_768;
let small_prompt = "small task".to_string();
let use_stdin = small_prompt.len() > STDIN_THRESHOLD;
assert!(!use_stdin, "small prompt should not trigger stdin");
let large_prompt = "x".repeat(STDIN_THRESHOLD + 1);
let use_stdin = large_prompt.len() > STDIN_THRESHOLD;
assert!(use_stdin, "large prompt should trigger stdin");
}
#[tokio::test]
async fn test_stdin_write_completes() {
let spawner = AgentSpawner::new();
let provider = create_cat_agent_provider();
let large_prompt = "x".repeat(100 * 1024);
let handle = spawner
.spawn_with_model_stdin(&provider, &large_prompt, None)
.await;
assert!(
handle.is_ok(),
"large prompt should be written to stdin without error"
);
tokio::time::sleep(Duration::from_millis(300)).await;
}
#[tokio::test]
async fn test_spawn_with_model_stdin() {
let spawner = AgentSpawner::new();
let provider = Provider::new(
"@model-cat-agent",
"Model Cat Agent",
ProviderType::Agent {
agent_id: "@model-cat".to_string(),
cli_command: "cat".to_string(),
working_dir: PathBuf::from("/tmp"),
},
vec![Capability::CodeGeneration],
);
let handle = spawner
.spawn_with_model_stdin(&provider, "model test via stdin", Some("test-model"))
.await;
assert!(handle.is_ok());
let handle = handle.unwrap();
assert_eq!(handle.provider.id, "@model-cat-agent");
}
#[test]
fn test_spawn_request_with_resource_limits() {
let provider = create_test_agent_provider();
let limits = ResourceLimits {
max_cpu_seconds: Some(3600),
max_memory_bytes: Some(2_147_483_648),
..Default::default()
};
let request = SpawnRequest::new(provider, "test").with_resource_limits(limits.clone());
assert_eq!(request.resource_limits.max_cpu_seconds, Some(3600));
assert_eq!(
request.resource_limits.max_memory_bytes,
Some(2_147_483_648)
);
}
}