Skip to main content

forge_sandbox/
pool.rs

1//! Worker pool for reusing sandbox child processes across executions.
2//!
3//! Instead of spawning a new `forgemax-worker` for every `execute()` call (~50ms),
4//! the pool keeps warm workers alive and reuses them. Each reuse sends a
5//! [`Reset`](crate::ipc::ParentMessage::Reset) message that causes the worker to
6//! drop its V8 runtime and create a fresh one (~5-10ms).
7//!
8//! **Security invariant**: Every execution gets a completely fresh V8 Isolate +
9//! Context. There is no state leakage between executions.
10
11use std::collections::VecDeque;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use tokio::io::BufReader;
17use tokio::process::{Child, ChildStdin, ChildStdout};
18use tokio::sync::Mutex;
19
20use crate::error::SandboxError;
21use crate::host::{find_worker_binary, ipc_event_loop};
22use crate::ipc::{read_message, write_message, ChildMessage, ParentMessage, WorkerConfig};
23use crate::{ResourceDispatcher, StashDispatcher, ToolDispatcher};
24
25/// Configuration for the worker pool.
26#[derive(Debug, Clone)]
27pub struct PoolConfig {
28    /// Minimum warm workers to keep ready.
29    pub min_workers: usize,
30    /// Maximum workers in the pool.
31    pub max_workers: usize,
32    /// Kill idle workers after this duration.
33    pub max_idle_time: Duration,
34    /// Recycle a worker after this many executions.
35    pub max_uses: u32,
36    /// Timeout for a health-check Reset round-trip.
37    pub health_check_timeout: Duration,
38}
39
40impl Default for PoolConfig {
41    fn default() -> Self {
42        Self {
43            min_workers: 2,
44            max_workers: 8,
45            max_idle_time: Duration::from_secs(60),
46            max_uses: 50,
47            health_check_timeout: Duration::from_millis(500),
48        }
49    }
50}
51
52/// Atomic counters for pool observability.
53#[derive(Debug, Default)]
54pub struct PoolMetrics {
55    /// Total workers spawned.
56    pub spawned: AtomicU64,
57    /// Total workers reused (acquired from idle pool).
58    pub reused: AtomicU64,
59    /// Workers killed because they hit max_uses.
60    pub killed_max_uses: AtomicU64,
61    /// Workers killed because they were idle too long.
62    pub killed_idle: AtomicU64,
63    /// Workers killed due to errors (crash, health-check failure).
64    pub killed_error: AtomicU64,
65}
66
67/// A warm worker process that can be reused.
68struct PoolWorker {
69    child: Child,
70    stdin: ChildStdin,
71    stdout: BufReader<ChildStdout>,
72    uses: u32,
73    idle_since: Instant,
74}
75
76/// Outcome of using a worker, reported back to the pool on release.
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78#[non_exhaustive]
79pub enum ReleaseOutcome {
80    /// Execution completed normally — worker can be reused.
81    Ok,
82    /// Execution failed fatally — worker must be killed (timeout, heap OOM, crash).
83    Fatal,
84}
85
86/// A checked-out worker handle. The pool retains no reference to this worker
87/// while it is in use; the caller owns it.
88pub struct AcquiredWorker {
89    worker: Option<PoolWorker>,
90}
91
92impl AcquiredWorker {
93    /// Execute code on this worker, routing IPC through the given dispatchers.
94    ///
95    /// Returns the execution result. On completion, call
96    /// [`WorkerPool::release`] with the appropriate outcome.
97    pub async fn execute(
98        &mut self,
99        code: &str,
100        config: &crate::SandboxConfig,
101        dispatcher: Arc<dyn ToolDispatcher>,
102        resource_dispatcher: Option<Arc<dyn ResourceDispatcher>>,
103        stash_dispatcher: Option<Arc<dyn StashDispatcher>>,
104    ) -> Result<serde_json::Value, SandboxError> {
105        let w = self.worker.as_mut().expect("worker already consumed");
106
107        // Send Execute message
108        let worker_config = WorkerConfig::from(config);
109        let execute_msg = ParentMessage::Execute {
110            code: code.to_string(),
111            manifest: None,
112            config: worker_config,
113        };
114        write_message(&mut w.stdin, &execute_msg)
115            .await
116            .map_err(|e| {
117                SandboxError::Execution(anyhow::anyhow!(
118                    "failed to send Execute to pooled worker: {}",
119                    e
120                ))
121            })?;
122
123        w.uses += 1;
124
125        // Run IPC event loop with timeout
126        let timeout = config.timeout + Duration::from_secs(2);
127        let result = tokio::time::timeout(
128            timeout,
129            ipc_event_loop(
130                &mut w.stdin,
131                &mut w.stdout,
132                dispatcher,
133                resource_dispatcher,
134                stash_dispatcher,
135            ),
136        )
137        .await;
138
139        match result {
140            Ok(inner) => inner,
141            Err(_elapsed) => {
142                // Timeout — the caller should release with Fatal
143                Err(SandboxError::Timeout {
144                    timeout_ms: config.timeout.as_millis() as u64,
145                })
146            }
147        }
148    }
149}
150
151/// A pool of warm worker processes.
152pub struct WorkerPool {
153    config: PoolConfig,
154    idle_workers: Mutex<VecDeque<PoolWorker>>,
155    /// Total workers currently alive (idle + checked out).
156    alive_count: Mutex<usize>,
157    metrics: Arc<PoolMetrics>,
158    /// Flag to prevent new acquisitions during shutdown.
159    shutting_down: Mutex<bool>,
160}
161
162impl WorkerPool {
163    /// Create a new worker pool with the given configuration.
164    pub fn new(config: PoolConfig) -> Self {
165        Self {
166            config,
167            idle_workers: Mutex::new(VecDeque::new()),
168            alive_count: Mutex::new(0),
169            metrics: Arc::new(PoolMetrics::default()),
170            shutting_down: Mutex::new(false),
171        }
172    }
173
174    /// Get a reference to the pool metrics.
175    pub fn metrics(&self) -> &Arc<PoolMetrics> {
176        &self.metrics
177    }
178
179    /// Acquire a worker from the pool, spawning a new one if necessary.
180    ///
181    /// Returns `None` if the pool is shutting down or at capacity.
182    #[tracing::instrument(skip(self, sandbox_config))]
183    pub async fn acquire(
184        &self,
185        sandbox_config: &crate::SandboxConfig,
186    ) -> Result<AcquiredWorker, SandboxError> {
187        if *self.shutting_down.lock().await {
188            return Err(SandboxError::Execution(anyhow::anyhow!(
189                "worker pool is shutting down"
190            )));
191        }
192
193        let worker_config = WorkerConfig::from(sandbox_config);
194
195        // Try to get an idle worker
196        loop {
197            let mut idle = self.idle_workers.lock().await;
198            if let Some(mut w) = idle.pop_front() {
199                drop(idle); // Release lock before I/O
200
201                // Health check: send Reset and wait for ResetComplete
202                let healthy = self.health_check(&mut w, &worker_config).await;
203                if healthy {
204                    self.metrics.reused.fetch_add(1, Ordering::Relaxed);
205                    return Ok(AcquiredWorker { worker: Some(w) });
206                } else {
207                    // Kill unhealthy worker, try next
208                    self.kill_worker(w).await;
209                    self.metrics.killed_error.fetch_add(1, Ordering::Relaxed);
210                    continue;
211                }
212            } else {
213                drop(idle);
214                break;
215            }
216        }
217
218        // No idle workers — spawn a new one if under capacity
219        let mut alive = self.alive_count.lock().await;
220        if *alive >= self.config.max_workers {
221            return Err(SandboxError::Execution(anyhow::anyhow!(
222                "worker pool at capacity ({} workers)",
223                self.config.max_workers
224            )));
225        }
226
227        let worker = self.spawn_worker().await?;
228        *alive += 1;
229        drop(alive);
230
231        // Send Reset to initialize for this execution's config
232        let mut w = worker;
233        let healthy = self.health_check(&mut w, &worker_config).await;
234        if !healthy {
235            self.kill_worker(w).await;
236            return Err(SandboxError::Execution(anyhow::anyhow!(
237                "newly spawned worker failed health check"
238            )));
239        }
240
241        Ok(AcquiredWorker { worker: Some(w) })
242    }
243
244    /// Return a worker to the pool after use.
245    ///
246    /// If the outcome is [`ReleaseOutcome::Fatal`] or the worker has exceeded
247    /// `max_uses`, the worker is killed instead of returned to the idle pool.
248    #[tracing::instrument(skip(self, handle), fields(outcome = ?outcome))]
249    pub async fn release(&self, mut handle: AcquiredWorker, outcome: ReleaseOutcome) {
250        let worker = match handle.worker.take() {
251            Some(w) => w,
252            None => return,
253        };
254
255        if outcome == ReleaseOutcome::Fatal {
256            self.kill_worker(worker).await;
257            self.metrics.killed_error.fetch_add(1, Ordering::Relaxed);
258            return;
259        }
260
261        if worker.uses >= self.config.max_uses {
262            self.kill_worker(worker).await;
263            self.metrics.killed_max_uses.fetch_add(1, Ordering::Relaxed);
264            return;
265        }
266
267        if *self.shutting_down.lock().await {
268            self.kill_worker(worker).await;
269            return;
270        }
271
272        // Return to idle pool
273        let mut w = worker;
274        w.idle_since = Instant::now();
275        self.idle_workers.lock().await.push_back(w);
276    }
277
278    /// Shut down the pool, killing all idle workers.
279    pub async fn shutdown(&self) {
280        *self.shutting_down.lock().await = true;
281
282        let mut idle = self.idle_workers.lock().await;
283        let workers: Vec<PoolWorker> = idle.drain(..).collect();
284        drop(idle);
285
286        for w in workers {
287            self.kill_worker(w).await;
288        }
289    }
290
291    /// Reap idle workers that have exceeded `max_idle_time`.
292    ///
293    /// Call this periodically (e.g., every 10 seconds) from a background task.
294    /// Preserves `min_workers` to avoid repeated cold starts.
295    pub async fn reap_idle(&self) {
296        let mut idle = self.idle_workers.lock().await;
297        let now = Instant::now();
298        let mut to_kill = Vec::new();
299        let mut kept = VecDeque::new();
300        let alive = *self.alive_count.lock().await;
301
302        while let Some(w) = idle.pop_front() {
303            if now.duration_since(w.idle_since) > self.config.max_idle_time {
304                // Preserve min_workers: only reap if we'd still have enough alive
305                let would_remain = alive - to_kill.len() - 1;
306                if would_remain >= self.config.min_workers {
307                    to_kill.push(w);
308                } else {
309                    kept.push_back(w);
310                }
311            } else {
312                kept.push_back(w);
313            }
314        }
315        *idle = kept;
316        drop(idle);
317
318        for w in to_kill {
319            self.kill_worker(w).await;
320            self.metrics.killed_idle.fetch_add(1, Ordering::Relaxed);
321        }
322    }
323
324    /// Pre-warm the pool by spawning workers up to `min_workers`.
325    ///
326    /// Each worker is spawned and given a Reset health check. Returns the
327    /// number of workers successfully pre-warmed.
328    #[cfg(feature = "worker-pool")]
329    pub async fn pre_warm(&self, config: &crate::SandboxConfig) -> Result<usize, SandboxError> {
330        let worker_config = WorkerConfig::from(config);
331        let mut count = 0;
332
333        let alive = *self.alive_count.lock().await;
334        let to_spawn = self.config.min_workers.saturating_sub(alive);
335
336        for _ in 0..to_spawn {
337            if *self.alive_count.lock().await >= self.config.max_workers {
338                break;
339            }
340
341            match self.spawn_worker().await {
342                Ok(mut w) => {
343                    if self.health_check(&mut w, &worker_config).await {
344                        w.idle_since = Instant::now();
345                        self.idle_workers.lock().await.push_back(w);
346                        *self.alive_count.lock().await += 1;
347                        count += 1;
348                    } else {
349                        self.kill_worker(w).await;
350                    }
351                }
352                Err(e) => {
353                    tracing::warn!(error = %e, "failed to pre-warm worker");
354                }
355            }
356        }
357
358        Ok(count)
359    }
360
361    /// Start a background task that periodically reaps idle workers.
362    ///
363    /// The task runs until the returned `JoinHandle` is aborted or the pool shuts down.
364    #[cfg(feature = "worker-pool")]
365    pub fn start_reap_task(self: &Arc<Self>, interval: Duration) -> tokio::task::JoinHandle<()> {
366        let pool = Arc::clone(self);
367        tokio::spawn(async move {
368            loop {
369                tokio::time::sleep(interval).await;
370                if *pool.shutting_down.lock().await {
371                    break;
372                }
373                pool.reap_idle().await;
374            }
375        })
376    }
377
378    /// Spawn a fresh worker process.
379    async fn spawn_worker(&self) -> Result<PoolWorker, SandboxError> {
380        let worker_bin = find_worker_binary()?;
381
382        // stderr is always piped (debug) or null (non-debug) — never inherit.
383        let debug_mode = std::env::var("FORGE_DEBUG").is_ok();
384        let mut child = tokio::process::Command::new(&worker_bin)
385            .stdin(std::process::Stdio::piped())
386            .stdout(std::process::Stdio::piped())
387            .stderr(if debug_mode {
388                std::process::Stdio::piped()
389            } else {
390                std::process::Stdio::null()
391            })
392            .env_clear()
393            .kill_on_drop(true)
394            .spawn()
395            .map_err(|e| {
396                SandboxError::Execution(anyhow::anyhow!(
397                    "failed to spawn pooled worker at {}: {}",
398                    worker_bin.display(),
399                    e
400                ))
401            })?;
402
403        // Bounded stderr capture in debug mode (max 4KB, logged via tracing)
404        if debug_mode {
405            if let Some(stderr) = child.stderr.take() {
406                tokio::spawn(crate::host::capture_bounded_stderr(stderr));
407            }
408        }
409
410        let stdin = child
411            .stdin
412            .take()
413            .ok_or_else(|| SandboxError::Execution(anyhow::anyhow!("no stdin on pooled worker")))?;
414        let stdout = child.stdout.take().ok_or_else(|| {
415            SandboxError::Execution(anyhow::anyhow!("no stdout on pooled worker"))
416        })?;
417
418        self.metrics.spawned.fetch_add(1, Ordering::Relaxed);
419
420        Ok(PoolWorker {
421            child,
422            stdin,
423            stdout: BufReader::new(stdout),
424            uses: 0,
425            idle_since: Instant::now(),
426        })
427    }
428
429    /// Send a Reset message and wait for ResetComplete within the health check timeout.
430    async fn health_check(&self, w: &mut PoolWorker, config: &WorkerConfig) -> bool {
431        let reset_msg = ParentMessage::Reset {
432            config: config.clone(),
433        };
434
435        // Send Reset
436        if write_message(&mut w.stdin, &reset_msg).await.is_err() {
437            return false;
438        }
439
440        // Wait for ResetComplete
441        matches!(
442            tokio::time::timeout(
443                self.config.health_check_timeout,
444                read_message::<ChildMessage, _>(&mut w.stdout),
445            )
446            .await,
447            Ok(Ok(Some(ChildMessage::ResetComplete)))
448        )
449    }
450
451    /// Kill a worker process and decrement the alive counter.
452    async fn kill_worker(&self, mut w: PoolWorker) {
453        let _ = w.child.kill().await;
454        let mut alive = self.alive_count.lock().await;
455        *alive = alive.saturating_sub(1);
456    }
457}
458
459// Drop the worker handle — if it wasn't released properly, kill the worker.
460impl Drop for AcquiredWorker {
461    fn drop(&mut self) {
462        if let Some(mut w) = self.worker.take() {
463            // Best-effort kill — can't async in Drop, but kill_on_drop handles it
464            let _ = w.child.start_kill();
465        }
466    }
467}
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472
473    #[test]
474    fn pool_config_defaults() {
475        let config = PoolConfig::default();
476        assert_eq!(config.min_workers, 2);
477        assert_eq!(config.max_workers, 8);
478        assert_eq!(config.max_idle_time, Duration::from_secs(60));
479        assert_eq!(config.max_uses, 50);
480        assert_eq!(config.health_check_timeout, Duration::from_millis(500));
481    }
482
483    #[test]
484    fn pool_metrics_default_zero() {
485        let m = PoolMetrics::default();
486        assert_eq!(m.spawned.load(Ordering::Relaxed), 0);
487        assert_eq!(m.reused.load(Ordering::Relaxed), 0);
488        assert_eq!(m.killed_max_uses.load(Ordering::Relaxed), 0);
489        assert_eq!(m.killed_idle.load(Ordering::Relaxed), 0);
490        assert_eq!(m.killed_error.load(Ordering::Relaxed), 0);
491    }
492
493    #[test]
494    fn release_outcome_eq() {
495        assert_eq!(ReleaseOutcome::Ok, ReleaseOutcome::Ok);
496        assert_eq!(ReleaseOutcome::Fatal, ReleaseOutcome::Fatal);
497        assert_ne!(ReleaseOutcome::Ok, ReleaseOutcome::Fatal);
498    }
499
500    #[tokio::test]
501    async fn pool_new_starts_empty() {
502        let pool = WorkerPool::new(PoolConfig::default());
503        let idle = pool.idle_workers.lock().await;
504        assert_eq!(idle.len(), 0);
505        assert_eq!(*pool.alive_count.lock().await, 0);
506    }
507
508    #[tokio::test]
509    async fn pool_shutdown_sets_flag() {
510        let pool = WorkerPool::new(PoolConfig::default());
511        assert!(!*pool.shutting_down.lock().await);
512        pool.shutdown().await;
513        assert!(*pool.shutting_down.lock().await);
514    }
515
516    #[tokio::test]
517    async fn pool_reap_empty_is_noop() {
518        let pool = WorkerPool::new(PoolConfig::default());
519        pool.reap_idle().await;
520        assert_eq!(pool.idle_workers.lock().await.len(), 0);
521    }
522
523    // --- Phase 5: Pool maturation unit tests ---
524
525    #[test]
526    fn pool_cc15_pool_config_validation() {
527        let config = PoolConfig {
528            min_workers: 0,
529            max_workers: 1,
530            max_idle_time: Duration::from_secs(1),
531            max_uses: 1,
532            health_check_timeout: Duration::from_millis(100),
533        };
534        // Config should accept edge values
535        assert_eq!(config.min_workers, 0);
536        assert_eq!(config.max_workers, 1);
537        assert_eq!(config.max_uses, 1);
538    }
539
540    #[tokio::test]
541    async fn pool_shutdown_rejects_new_acquires() {
542        let pool = WorkerPool::new(PoolConfig::default());
543        pool.shutdown().await;
544
545        let config = crate::SandboxConfig::default();
546        let result = pool.acquire(&config).await;
547        match result {
548            Err(e) => {
549                let msg = e.to_string();
550                assert!(
551                    msg.contains("shutting down"),
552                    "should mention shutting down: {msg}"
553                );
554            }
555            Ok(_) => panic!("should reject after shutdown"),
556        }
557    }
558
559    #[tokio::test]
560    async fn pool_shutdown_kills_all_idle() {
561        let pool = WorkerPool::new(PoolConfig::default());
562        // After shutdown, idle pool should be empty
563        pool.shutdown().await;
564        assert_eq!(pool.idle_workers.lock().await.len(), 0);
565    }
566
567    #[tokio::test]
568    async fn pool_reap_preserves_min_workers_count() {
569        // The reap logic should not drop below min_workers
570        // This is a unit test of the logic — we verify via the kept count
571        let config = PoolConfig {
572            min_workers: 2,
573            max_workers: 4,
574            max_idle_time: Duration::from_secs(0), // everything is "expired"
575            max_uses: 50,
576            health_check_timeout: Duration::from_millis(500),
577        };
578        let pool = WorkerPool::new(config);
579        // We can't add real workers without spawning, but we verify the
580        // reap_idle logic handles empty pool + min_workers correctly
581        pool.reap_idle().await;
582        assert_eq!(pool.idle_workers.lock().await.len(), 0);
583    }
584
585    #[test]
586    fn pool_metrics_spawned_increments() {
587        let m = PoolMetrics::default();
588        m.spawned.fetch_add(1, Ordering::Relaxed);
589        assert_eq!(m.spawned.load(Ordering::Relaxed), 1);
590        m.spawned.fetch_add(1, Ordering::Relaxed);
591        assert_eq!(m.spawned.load(Ordering::Relaxed), 2);
592    }
593
594    #[test]
595    fn pool_metrics_reused_increments() {
596        let m = PoolMetrics::default();
597        m.reused.fetch_add(1, Ordering::Relaxed);
598        assert_eq!(m.reused.load(Ordering::Relaxed), 1);
599    }
600
601    #[test]
602    fn pool_metrics_killed_idle_increments() {
603        let m = PoolMetrics::default();
604        m.killed_idle.fetch_add(3, Ordering::Relaxed);
605        assert_eq!(m.killed_idle.load(Ordering::Relaxed), 3);
606    }
607
608    #[test]
609    fn pool_release_outcome_debug() {
610        // Verify Debug impl works
611        let ok = format!("{:?}", ReleaseOutcome::Ok);
612        let fatal = format!("{:?}", ReleaseOutcome::Fatal);
613        assert!(ok.contains("Ok"));
614        assert!(fatal.contains("Fatal"));
615    }
616
617    #[tokio::test]
618    async fn pool_multiple_shutdowns_safe() {
619        let pool = WorkerPool::new(PoolConfig::default());
620        pool.shutdown().await;
621        pool.shutdown().await; // Should not panic
622        assert!(*pool.shutting_down.lock().await);
623    }
624
625    #[cfg(feature = "worker-pool")]
626    #[tokio::test]
627    async fn pool_pw_feature_compiles() {
628        // Verify worker-pool feature gates compile correctly
629        let pool = Arc::new(WorkerPool::new(PoolConfig::default()));
630        let handle = pool.start_reap_task(Duration::from_secs(3600));
631        handle.abort();
632        // Just verify it compiles and runs
633    }
634
635    #[test]
636    fn pool_config_clone() {
637        let config = PoolConfig::default();
638        let cloned = config.clone();
639        assert_eq!(config.min_workers, cloned.min_workers);
640        assert_eq!(config.max_workers, cloned.max_workers);
641    }
642
643    #[test]
644    fn pool_cc22_worker_pool_feature_gate() {
645        // This test verifies the crate compiles both with and without the worker-pool feature.
646        // The feature only gates pre_warm and start_reap_task — core pool functionality is always available.
647        let _config = PoolConfig::default();
648        let _pool = WorkerPool::new(PoolConfig::default());
649    }
650}