use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::BufReader;
use tokio::process::{Child, ChildStdin, ChildStdout};
use tokio::sync::Mutex;
use crate::error::SandboxError;
use crate::host::{find_worker_binary, ipc_event_loop};
use crate::ipc::{read_message, write_message, ChildMessage, ParentMessage, WorkerConfig};
use crate::{ResourceDispatcher, StashDispatcher, ToolDispatcher};
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub min_workers: usize,
pub max_workers: usize,
pub max_idle_time: Duration,
pub max_uses: u32,
pub health_check_timeout: Duration,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
min_workers: 2,
max_workers: 8,
max_idle_time: Duration::from_secs(60),
max_uses: 50,
health_check_timeout: Duration::from_millis(500),
}
}
}
#[derive(Debug, Default)]
pub struct PoolMetrics {
pub spawned: AtomicU64,
pub reused: AtomicU64,
pub killed_max_uses: AtomicU64,
pub killed_idle: AtomicU64,
pub killed_error: AtomicU64,
}
struct PoolWorker {
child: Child,
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
uses: u32,
idle_since: Instant,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum ReleaseOutcome {
Ok,
Fatal,
}
pub struct AcquiredWorker {
worker: Option<PoolWorker>,
}
impl AcquiredWorker {
pub async fn execute(
&mut self,
code: &str,
config: &crate::SandboxConfig,
dispatcher: Arc<dyn ToolDispatcher>,
resource_dispatcher: Option<Arc<dyn ResourceDispatcher>>,
stash_dispatcher: Option<Arc<dyn StashDispatcher>>,
) -> Result<serde_json::Value, SandboxError> {
let w = self.worker.as_mut().expect("worker already consumed");
let worker_config = WorkerConfig::from(config);
let execute_msg = ParentMessage::Execute {
code: code.to_string(),
manifest: None,
config: worker_config,
};
write_message(&mut w.stdin, &execute_msg)
.await
.map_err(|e| {
SandboxError::Execution(anyhow::anyhow!(
"failed to send Execute to pooled worker: {}",
e
))
})?;
w.uses += 1;
let timeout = config.timeout + Duration::from_secs(2);
let result = tokio::time::timeout(
timeout,
ipc_event_loop(
&mut w.stdin,
&mut w.stdout,
dispatcher,
resource_dispatcher,
stash_dispatcher,
),
)
.await;
match result {
Ok(inner) => inner,
Err(_elapsed) => {
Err(SandboxError::Timeout {
timeout_ms: config.timeout.as_millis() as u64,
})
}
}
}
}
pub struct WorkerPool {
config: PoolConfig,
idle_workers: Mutex<VecDeque<PoolWorker>>,
alive_count: Mutex<usize>,
metrics: Arc<PoolMetrics>,
shutting_down: Mutex<bool>,
}
impl WorkerPool {
pub fn new(config: PoolConfig) -> Self {
Self {
config,
idle_workers: Mutex::new(VecDeque::new()),
alive_count: Mutex::new(0),
metrics: Arc::new(PoolMetrics::default()),
shutting_down: Mutex::new(false),
}
}
pub fn metrics(&self) -> &Arc<PoolMetrics> {
&self.metrics
}
#[tracing::instrument(skip(self, sandbox_config))]
pub async fn acquire(
&self,
sandbox_config: &crate::SandboxConfig,
) -> Result<AcquiredWorker, SandboxError> {
if *self.shutting_down.lock().await {
return Err(SandboxError::Execution(anyhow::anyhow!(
"worker pool is shutting down"
)));
}
let worker_config = WorkerConfig::from(sandbox_config);
loop {
let mut idle = self.idle_workers.lock().await;
if let Some(mut w) = idle.pop_front() {
drop(idle);
let healthy = self.health_check(&mut w, &worker_config).await;
if healthy {
self.metrics.reused.fetch_add(1, Ordering::Relaxed);
return Ok(AcquiredWorker { worker: Some(w) });
} else {
self.kill_worker(w).await;
self.metrics.killed_error.fetch_add(1, Ordering::Relaxed);
continue;
}
} else {
drop(idle);
break;
}
}
let mut alive = self.alive_count.lock().await;
if *alive >= self.config.max_workers {
return Err(SandboxError::Execution(anyhow::anyhow!(
"worker pool at capacity ({} workers)",
self.config.max_workers
)));
}
let worker = self.spawn_worker().await?;
*alive += 1;
drop(alive);
let mut w = worker;
let healthy = self.health_check(&mut w, &worker_config).await;
if !healthy {
self.kill_worker(w).await;
return Err(SandboxError::Execution(anyhow::anyhow!(
"newly spawned worker failed health check"
)));
}
Ok(AcquiredWorker { worker: Some(w) })
}
#[tracing::instrument(skip(self, handle), fields(outcome = ?outcome))]
pub async fn release(&self, mut handle: AcquiredWorker, outcome: ReleaseOutcome) {
let worker = match handle.worker.take() {
Some(w) => w,
None => return,
};
if outcome == ReleaseOutcome::Fatal {
self.kill_worker(worker).await;
self.metrics.killed_error.fetch_add(1, Ordering::Relaxed);
return;
}
if worker.uses >= self.config.max_uses {
self.kill_worker(worker).await;
self.metrics.killed_max_uses.fetch_add(1, Ordering::Relaxed);
return;
}
if *self.shutting_down.lock().await {
self.kill_worker(worker).await;
return;
}
let mut w = worker;
w.idle_since = Instant::now();
self.idle_workers.lock().await.push_back(w);
}
pub async fn shutdown(&self) {
*self.shutting_down.lock().await = true;
let mut idle = self.idle_workers.lock().await;
let workers: Vec<PoolWorker> = idle.drain(..).collect();
drop(idle);
for w in workers {
self.kill_worker(w).await;
}
}
pub async fn reap_idle(&self) {
let mut idle = self.idle_workers.lock().await;
let now = Instant::now();
let mut to_kill = Vec::new();
let mut kept = VecDeque::new();
let alive = *self.alive_count.lock().await;
while let Some(w) = idle.pop_front() {
if now.duration_since(w.idle_since) > self.config.max_idle_time {
let would_remain = alive - to_kill.len() - 1;
if would_remain >= self.config.min_workers {
to_kill.push(w);
} else {
kept.push_back(w);
}
} else {
kept.push_back(w);
}
}
*idle = kept;
drop(idle);
for w in to_kill {
self.kill_worker(w).await;
self.metrics.killed_idle.fetch_add(1, Ordering::Relaxed);
}
}
#[cfg(feature = "worker-pool")]
pub async fn pre_warm(&self, config: &crate::SandboxConfig) -> Result<usize, SandboxError> {
let worker_config = WorkerConfig::from(config);
let mut count = 0;
let alive = *self.alive_count.lock().await;
let to_spawn = self.config.min_workers.saturating_sub(alive);
for _ in 0..to_spawn {
if *self.alive_count.lock().await >= self.config.max_workers {
break;
}
match self.spawn_worker().await {
Ok(mut w) => {
if self.health_check(&mut w, &worker_config).await {
w.idle_since = Instant::now();
self.idle_workers.lock().await.push_back(w);
*self.alive_count.lock().await += 1;
count += 1;
} else {
self.kill_worker(w).await;
}
}
Err(e) => {
tracing::warn!(error = %e, "failed to pre-warm worker");
}
}
}
Ok(count)
}
#[cfg(feature = "worker-pool")]
pub fn start_reap_task(self: &Arc<Self>, interval: Duration) -> tokio::task::JoinHandle<()> {
let pool = Arc::clone(self);
tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
if *pool.shutting_down.lock().await {
break;
}
pool.reap_idle().await;
}
})
}
async fn spawn_worker(&self) -> Result<PoolWorker, SandboxError> {
let worker_bin = find_worker_binary()?;
let debug_mode = std::env::var("FORGE_DEBUG").is_ok();
let mut child = tokio::process::Command::new(&worker_bin)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(if debug_mode {
std::process::Stdio::piped()
} else {
std::process::Stdio::null()
})
.env_clear()
.kill_on_drop(true)
.spawn()
.map_err(|e| {
SandboxError::Execution(anyhow::anyhow!(
"failed to spawn pooled worker at {}: {}",
worker_bin.display(),
e
))
})?;
if debug_mode {
if let Some(stderr) = child.stderr.take() {
tokio::spawn(crate::host::capture_bounded_stderr(stderr));
}
}
let stdin = child
.stdin
.take()
.ok_or_else(|| SandboxError::Execution(anyhow::anyhow!("no stdin on pooled worker")))?;
let stdout = child.stdout.take().ok_or_else(|| {
SandboxError::Execution(anyhow::anyhow!("no stdout on pooled worker"))
})?;
self.metrics.spawned.fetch_add(1, Ordering::Relaxed);
Ok(PoolWorker {
child,
stdin,
stdout: BufReader::new(stdout),
uses: 0,
idle_since: Instant::now(),
})
}
async fn health_check(&self, w: &mut PoolWorker, config: &WorkerConfig) -> bool {
let reset_msg = ParentMessage::Reset {
config: config.clone(),
};
if write_message(&mut w.stdin, &reset_msg).await.is_err() {
return false;
}
matches!(
tokio::time::timeout(
self.config.health_check_timeout,
read_message::<ChildMessage, _>(&mut w.stdout),
)
.await,
Ok(Ok(Some(ChildMessage::ResetComplete)))
)
}
async fn kill_worker(&self, mut w: PoolWorker) {
let _ = w.child.kill().await;
let mut alive = self.alive_count.lock().await;
*alive = alive.saturating_sub(1);
}
}
impl Drop for AcquiredWorker {
fn drop(&mut self) {
if let Some(mut w) = self.worker.take() {
let _ = w.child.start_kill();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pool_config_defaults() {
let config = PoolConfig::default();
assert_eq!(config.min_workers, 2);
assert_eq!(config.max_workers, 8);
assert_eq!(config.max_idle_time, Duration::from_secs(60));
assert_eq!(config.max_uses, 50);
assert_eq!(config.health_check_timeout, Duration::from_millis(500));
}
#[test]
fn pool_metrics_default_zero() {
let m = PoolMetrics::default();
assert_eq!(m.spawned.load(Ordering::Relaxed), 0);
assert_eq!(m.reused.load(Ordering::Relaxed), 0);
assert_eq!(m.killed_max_uses.load(Ordering::Relaxed), 0);
assert_eq!(m.killed_idle.load(Ordering::Relaxed), 0);
assert_eq!(m.killed_error.load(Ordering::Relaxed), 0);
}
#[test]
fn release_outcome_eq() {
assert_eq!(ReleaseOutcome::Ok, ReleaseOutcome::Ok);
assert_eq!(ReleaseOutcome::Fatal, ReleaseOutcome::Fatal);
assert_ne!(ReleaseOutcome::Ok, ReleaseOutcome::Fatal);
}
#[tokio::test]
async fn pool_new_starts_empty() {
let pool = WorkerPool::new(PoolConfig::default());
let idle = pool.idle_workers.lock().await;
assert_eq!(idle.len(), 0);
assert_eq!(*pool.alive_count.lock().await, 0);
}
#[tokio::test]
async fn pool_shutdown_sets_flag() {
let pool = WorkerPool::new(PoolConfig::default());
assert!(!*pool.shutting_down.lock().await);
pool.shutdown().await;
assert!(*pool.shutting_down.lock().await);
}
#[tokio::test]
async fn pool_reap_empty_is_noop() {
let pool = WorkerPool::new(PoolConfig::default());
pool.reap_idle().await;
assert_eq!(pool.idle_workers.lock().await.len(), 0);
}
#[test]
fn pool_cc15_pool_config_validation() {
let config = PoolConfig {
min_workers: 0,
max_workers: 1,
max_idle_time: Duration::from_secs(1),
max_uses: 1,
health_check_timeout: Duration::from_millis(100),
};
assert_eq!(config.min_workers, 0);
assert_eq!(config.max_workers, 1);
assert_eq!(config.max_uses, 1);
}
#[tokio::test]
async fn pool_shutdown_rejects_new_acquires() {
let pool = WorkerPool::new(PoolConfig::default());
pool.shutdown().await;
let config = crate::SandboxConfig::default();
let result = pool.acquire(&config).await;
match result {
Err(e) => {
let msg = e.to_string();
assert!(
msg.contains("shutting down"),
"should mention shutting down: {msg}"
);
}
Ok(_) => panic!("should reject after shutdown"),
}
}
#[tokio::test]
async fn pool_shutdown_kills_all_idle() {
let pool = WorkerPool::new(PoolConfig::default());
pool.shutdown().await;
assert_eq!(pool.idle_workers.lock().await.len(), 0);
}
#[tokio::test]
async fn pool_reap_preserves_min_workers_count() {
let config = PoolConfig {
min_workers: 2,
max_workers: 4,
max_idle_time: Duration::from_secs(0), max_uses: 50,
health_check_timeout: Duration::from_millis(500),
};
let pool = WorkerPool::new(config);
pool.reap_idle().await;
assert_eq!(pool.idle_workers.lock().await.len(), 0);
}
#[test]
fn pool_metrics_spawned_increments() {
let m = PoolMetrics::default();
m.spawned.fetch_add(1, Ordering::Relaxed);
assert_eq!(m.spawned.load(Ordering::Relaxed), 1);
m.spawned.fetch_add(1, Ordering::Relaxed);
assert_eq!(m.spawned.load(Ordering::Relaxed), 2);
}
#[test]
fn pool_metrics_reused_increments() {
let m = PoolMetrics::default();
m.reused.fetch_add(1, Ordering::Relaxed);
assert_eq!(m.reused.load(Ordering::Relaxed), 1);
}
#[test]
fn pool_metrics_killed_idle_increments() {
let m = PoolMetrics::default();
m.killed_idle.fetch_add(3, Ordering::Relaxed);
assert_eq!(m.killed_idle.load(Ordering::Relaxed), 3);
}
#[test]
fn pool_release_outcome_debug() {
let ok = format!("{:?}", ReleaseOutcome::Ok);
let fatal = format!("{:?}", ReleaseOutcome::Fatal);
assert!(ok.contains("Ok"));
assert!(fatal.contains("Fatal"));
}
#[tokio::test]
async fn pool_multiple_shutdowns_safe() {
let pool = WorkerPool::new(PoolConfig::default());
pool.shutdown().await;
pool.shutdown().await; assert!(*pool.shutting_down.lock().await);
}
#[cfg(feature = "worker-pool")]
#[tokio::test]
async fn pool_pw_feature_compiles() {
let pool = Arc::new(WorkerPool::new(PoolConfig::default()));
let handle = pool.start_reap_task(Duration::from_secs(3600));
handle.abort();
}
#[test]
fn pool_config_clone() {
let config = PoolConfig::default();
let cloned = config.clone();
assert_eq!(config.min_workers, cloned.min_workers);
assert_eq!(config.max_workers, cloned.max_workers);
}
#[test]
fn pool_cc22_worker_pool_feature_gate() {
let _config = PoolConfig::default();
let _pool = WorkerPool::new(PoolConfig::default());
}
}