Skip to main content

forge_sandbox/
pool.rs

1//! Worker pool for reusing sandbox child processes across executions.
2//!
3//! Instead of spawning a new `forgemax-worker` for every `execute()` call (~50ms),
4//! the pool keeps warm workers alive and reuses them. Each reuse sends a
5//! [`Reset`](crate::ipc::ParentMessage::Reset) message that causes the worker to
6//! drop its V8 runtime and create a fresh one (~5-10ms).
7//!
8//! **Security invariant**: Every execution gets a completely fresh V8 Isolate +
9//! Context. There is no state leakage between executions.
10
11use std::collections::{HashSet, VecDeque};
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use tokio::io::BufReader;
17use tokio::process::{Child, ChildStdin, ChildStdout};
18use tokio::sync::Mutex;
19
20use crate::error::SandboxError;
21use crate::host::{find_worker_binary, ipc_event_loop};
22use crate::ipc::{read_message, write_message, ChildMessage, ParentMessage, WorkerConfig};
23use crate::{ResourceDispatcher, StashDispatcher, ToolDispatcher};
24
25/// Configuration for the worker pool.
26#[derive(Debug, Clone)]
27pub struct PoolConfig {
28    /// Minimum warm workers to keep ready.
29    pub min_workers: usize,
30    /// Maximum workers in the pool.
31    pub max_workers: usize,
32    /// Kill idle workers after this duration.
33    pub max_idle_time: Duration,
34    /// Recycle a worker after this many executions.
35    pub max_uses: u32,
36    /// Timeout for a health-check Reset round-trip.
37    pub health_check_timeout: Duration,
38}
39
40impl Default for PoolConfig {
41    fn default() -> Self {
42        Self {
43            min_workers: 2,
44            max_workers: 8,
45            max_idle_time: Duration::from_secs(60),
46            max_uses: 50,
47            health_check_timeout: Duration::from_millis(500),
48        }
49    }
50}
51
52/// Atomic counters for pool observability.
53#[derive(Debug, Default)]
54pub struct PoolMetrics {
55    /// Total workers spawned.
56    pub spawned: AtomicU64,
57    /// Total workers reused (acquired from idle pool).
58    pub reused: AtomicU64,
59    /// Workers killed because they hit max_uses.
60    pub killed_max_uses: AtomicU64,
61    /// Workers killed because they were idle too long.
62    pub killed_idle: AtomicU64,
63    /// Workers killed due to errors (crash, health-check failure).
64    pub killed_error: AtomicU64,
65}
66
67/// A warm worker process that can be reused.
68struct PoolWorker {
69    child: Child,
70    stdin: ChildStdin,
71    stdout: BufReader<ChildStdout>,
72    uses: u32,
73    idle_since: Instant,
74}
75
76/// Outcome of using a worker, reported back to the pool on release.
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78#[non_exhaustive]
79pub enum ReleaseOutcome {
80    /// Execution completed normally — worker can be reused.
81    Ok,
82    /// Execution failed fatally — worker must be killed (timeout, heap OOM, crash).
83    Fatal,
84}
85
86/// A checked-out worker handle. The pool retains no reference to this worker
87/// while it is in use; the caller owns it.
88pub struct AcquiredWorker {
89    worker: Option<PoolWorker>,
90}
91
92/// Context needed to route a single pooled worker execution.
93pub struct PooledExecutionContext {
94    /// Tool calls made by the worker.
95    pub dispatcher: Arc<dyn ToolDispatcher>,
96    /// Optional resource reads made by the worker.
97    pub resource_dispatcher: Option<Arc<dyn ResourceDispatcher>>,
98    /// Optional stash calls made by the worker.
99    pub stash_dispatcher: Option<Arc<dyn StashDispatcher>>,
100    /// Known server names for validation and fuzzy matching.
101    pub known_servers: Option<HashSet<String>>,
102    /// Known tool names for validation and fuzzy matching.
103    pub known_tools: Option<Vec<(String, String)>>,
104}
105
106impl AcquiredWorker {
107    /// Execute code on this worker, routing IPC through the given dispatchers.
108    ///
109    /// Returns the execution result. On completion, call
110    /// [`WorkerPool::release`] with the appropriate outcome.
111    pub async fn execute(
112        &mut self,
113        code: &str,
114        config: &crate::SandboxConfig,
115        context: PooledExecutionContext,
116    ) -> Result<serde_json::Value, SandboxError> {
117        let w = self.worker.as_mut().expect("worker already consumed");
118
119        // Send Execute message
120        let mut worker_config = WorkerConfig::from(config);
121        worker_config.known_servers = context.known_servers;
122        worker_config.known_tools = context.known_tools;
123        let execute_msg = ParentMessage::Execute {
124            code: code.to_string(),
125            manifest: None,
126            config: worker_config,
127        };
128        write_message(&mut w.stdin, &execute_msg)
129            .await
130            .map_err(|e| {
131                SandboxError::Execution(anyhow::anyhow!(
132                    "failed to send Execute to pooled worker: {}",
133                    e
134                ))
135            })?;
136
137        w.uses += 1;
138
139        // Run IPC event loop with timeout
140        let timeout = config.timeout + Duration::from_secs(2);
141        let result = tokio::time::timeout(
142            timeout,
143            ipc_event_loop(
144                &mut w.stdin,
145                &mut w.stdout,
146                context.dispatcher,
147                context.resource_dispatcher,
148                context.stash_dispatcher,
149            ),
150        )
151        .await;
152
153        match result {
154            Ok(inner) => inner,
155            Err(_elapsed) => {
156                // Timeout — the caller should release with Fatal
157                Err(SandboxError::Timeout {
158                    timeout_ms: config.timeout.as_millis() as u64,
159                })
160            }
161        }
162    }
163}
164
165/// A pool of warm worker processes.
166pub struct WorkerPool {
167    config: PoolConfig,
168    idle_workers: Mutex<VecDeque<PoolWorker>>,
169    /// Total workers currently alive (idle + checked out).
170    alive_count: Mutex<usize>,
171    metrics: Arc<PoolMetrics>,
172    /// Flag to prevent new acquisitions during shutdown.
173    shutting_down: Mutex<bool>,
174}
175
176impl WorkerPool {
177    /// Create a new worker pool with the given configuration.
178    pub fn new(config: PoolConfig) -> Self {
179        Self {
180            config,
181            idle_workers: Mutex::new(VecDeque::new()),
182            alive_count: Mutex::new(0),
183            metrics: Arc::new(PoolMetrics::default()),
184            shutting_down: Mutex::new(false),
185        }
186    }
187
188    /// Get a reference to the pool metrics.
189    pub fn metrics(&self) -> &Arc<PoolMetrics> {
190        &self.metrics
191    }
192
193    /// Acquire a worker from the pool, spawning a new one if necessary.
194    ///
195    /// Returns `None` if the pool is shutting down or at capacity.
196    #[tracing::instrument(skip(self, sandbox_config))]
197    pub async fn acquire(
198        &self,
199        sandbox_config: &crate::SandboxConfig,
200    ) -> Result<AcquiredWorker, SandboxError> {
201        if *self.shutting_down.lock().await {
202            return Err(SandboxError::Execution(anyhow::anyhow!(
203                "worker pool is shutting down"
204            )));
205        }
206
207        let worker_config = WorkerConfig::from(sandbox_config);
208
209        // Try to get an idle worker
210        loop {
211            let mut idle = self.idle_workers.lock().await;
212            if let Some(mut w) = idle.pop_front() {
213                drop(idle); // Release lock before I/O
214
215                // Health check: send Reset and wait for ResetComplete
216                let healthy = self.health_check(&mut w, &worker_config).await;
217                if healthy {
218                    self.metrics.reused.fetch_add(1, Ordering::Relaxed);
219                    return Ok(AcquiredWorker { worker: Some(w) });
220                } else {
221                    // Kill unhealthy worker, try next
222                    self.kill_worker(w).await;
223                    self.metrics.killed_error.fetch_add(1, Ordering::Relaxed);
224                    continue;
225                }
226            } else {
227                drop(idle);
228                break;
229            }
230        }
231
232        // No idle workers — spawn a new one if under capacity
233        let mut alive = self.alive_count.lock().await;
234        if *alive >= self.config.max_workers {
235            return Err(SandboxError::Execution(anyhow::anyhow!(
236                "worker pool at capacity ({} workers)",
237                self.config.max_workers
238            )));
239        }
240
241        let worker = self.spawn_worker().await?;
242        *alive += 1;
243        drop(alive);
244
245        // Send Reset to initialize for this execution's config
246        let mut w = worker;
247        let healthy = self.health_check(&mut w, &worker_config).await;
248        if !healthy {
249            self.kill_worker(w).await;
250            return Err(SandboxError::Execution(anyhow::anyhow!(
251                "newly spawned worker failed health check"
252            )));
253        }
254
255        Ok(AcquiredWorker { worker: Some(w) })
256    }
257
258    /// Return a worker to the pool after use.
259    ///
260    /// If the outcome is [`ReleaseOutcome::Fatal`] or the worker has exceeded
261    /// `max_uses`, the worker is killed instead of returned to the idle pool.
262    #[tracing::instrument(skip(self, handle), fields(outcome = ?outcome))]
263    pub async fn release(&self, mut handle: AcquiredWorker, outcome: ReleaseOutcome) {
264        let worker = match handle.worker.take() {
265            Some(w) => w,
266            None => return,
267        };
268
269        if outcome == ReleaseOutcome::Fatal {
270            self.kill_worker(worker).await;
271            self.metrics.killed_error.fetch_add(1, Ordering::Relaxed);
272            return;
273        }
274
275        if worker.uses >= self.config.max_uses {
276            self.kill_worker(worker).await;
277            self.metrics.killed_max_uses.fetch_add(1, Ordering::Relaxed);
278            return;
279        }
280
281        if *self.shutting_down.lock().await {
282            self.kill_worker(worker).await;
283            return;
284        }
285
286        // Return to idle pool
287        let mut w = worker;
288        w.idle_since = Instant::now();
289        self.idle_workers.lock().await.push_back(w);
290    }
291
292    /// Shut down the pool, killing all idle workers.
293    pub async fn shutdown(&self) {
294        *self.shutting_down.lock().await = true;
295
296        let mut idle = self.idle_workers.lock().await;
297        let workers: Vec<PoolWorker> = idle.drain(..).collect();
298        drop(idle);
299
300        for w in workers {
301            self.kill_worker(w).await;
302        }
303    }
304
305    /// Reap idle workers that have exceeded `max_idle_time`.
306    ///
307    /// Call this periodically (e.g., every 10 seconds) from a background task.
308    /// Preserves `min_workers` to avoid repeated cold starts.
309    pub async fn reap_idle(&self) {
310        let mut idle = self.idle_workers.lock().await;
311        let now = Instant::now();
312        let mut to_kill = Vec::new();
313        let mut kept = VecDeque::new();
314        let alive = *self.alive_count.lock().await;
315
316        while let Some(w) = idle.pop_front() {
317            if now.duration_since(w.idle_since) > self.config.max_idle_time {
318                // Preserve min_workers: only reap if we'd still have enough alive
319                let would_remain = alive - to_kill.len() - 1;
320                if would_remain >= self.config.min_workers {
321                    to_kill.push(w);
322                } else {
323                    kept.push_back(w);
324                }
325            } else {
326                kept.push_back(w);
327            }
328        }
329        *idle = kept;
330        drop(idle);
331
332        for w in to_kill {
333            self.kill_worker(w).await;
334            self.metrics.killed_idle.fetch_add(1, Ordering::Relaxed);
335        }
336    }
337
338    /// Pre-warm the pool by spawning workers up to `min_workers`.
339    ///
340    /// Each worker is spawned and given a Reset health check. Returns the
341    /// number of workers successfully pre-warmed.
342    #[cfg(feature = "worker-pool")]
343    pub async fn pre_warm(&self, config: &crate::SandboxConfig) -> Result<usize, SandboxError> {
344        let worker_config = WorkerConfig::from(config);
345        let mut count = 0;
346
347        let alive = *self.alive_count.lock().await;
348        let to_spawn = self.config.min_workers.saturating_sub(alive);
349
350        for _ in 0..to_spawn {
351            if *self.alive_count.lock().await >= self.config.max_workers {
352                break;
353            }
354
355            match self.spawn_worker().await {
356                Ok(mut w) => {
357                    if self.health_check(&mut w, &worker_config).await {
358                        w.idle_since = Instant::now();
359                        self.idle_workers.lock().await.push_back(w);
360                        *self.alive_count.lock().await += 1;
361                        count += 1;
362                    } else {
363                        self.kill_worker(w).await;
364                    }
365                }
366                Err(e) => {
367                    tracing::warn!(error = %e, "failed to pre-warm worker");
368                }
369            }
370        }
371
372        Ok(count)
373    }
374
375    /// Start a background task that periodically reaps idle workers.
376    ///
377    /// The task runs until the returned `JoinHandle` is aborted or the pool shuts down.
378    #[cfg(feature = "worker-pool")]
379    pub fn start_reap_task(self: &Arc<Self>, interval: Duration) -> tokio::task::JoinHandle<()> {
380        let pool = Arc::clone(self);
381        tokio::spawn(async move {
382            loop {
383                tokio::time::sleep(interval).await;
384                if *pool.shutting_down.lock().await {
385                    break;
386                }
387                pool.reap_idle().await;
388            }
389        })
390    }
391
392    /// Spawn a fresh worker process.
393    async fn spawn_worker(&self) -> Result<PoolWorker, SandboxError> {
394        let worker_bin = find_worker_binary()?;
395
396        // stderr is always piped (debug) or null (non-debug) — never inherit.
397        let debug_mode = std::env::var("FORGE_DEBUG").is_ok();
398        let mut child = tokio::process::Command::new(&worker_bin)
399            .stdin(std::process::Stdio::piped())
400            .stdout(std::process::Stdio::piped())
401            .stderr(if debug_mode {
402                std::process::Stdio::piped()
403            } else {
404                std::process::Stdio::null()
405            })
406            .env_clear()
407            .kill_on_drop(true)
408            .spawn()
409            .map_err(|e| {
410                SandboxError::Execution(anyhow::anyhow!(
411                    "failed to spawn pooled worker at {}: {}",
412                    worker_bin.display(),
413                    e
414                ))
415            })?;
416
417        // Bounded stderr capture in debug mode (max 4KB, logged via tracing)
418        if debug_mode {
419            if let Some(stderr) = child.stderr.take() {
420                tokio::spawn(crate::host::capture_bounded_stderr(stderr));
421            }
422        }
423
424        let stdin = child
425            .stdin
426            .take()
427            .ok_or_else(|| SandboxError::Execution(anyhow::anyhow!("no stdin on pooled worker")))?;
428        let stdout = child.stdout.take().ok_or_else(|| {
429            SandboxError::Execution(anyhow::anyhow!("no stdout on pooled worker"))
430        })?;
431
432        self.metrics.spawned.fetch_add(1, Ordering::Relaxed);
433
434        Ok(PoolWorker {
435            child,
436            stdin,
437            stdout: BufReader::new(stdout),
438            uses: 0,
439            idle_since: Instant::now(),
440        })
441    }
442
443    /// Send a Reset message and wait for ResetComplete within the health check timeout.
444    async fn health_check(&self, w: &mut PoolWorker, config: &WorkerConfig) -> bool {
445        let reset_msg = ParentMessage::Reset {
446            config: config.clone(),
447        };
448
449        // Send Reset
450        if write_message(&mut w.stdin, &reset_msg).await.is_err() {
451            return false;
452        }
453
454        // Wait for ResetComplete
455        matches!(
456            tokio::time::timeout(
457                self.config.health_check_timeout,
458                read_message::<ChildMessage, _>(&mut w.stdout),
459            )
460            .await,
461            Ok(Ok(Some(ChildMessage::ResetComplete)))
462        )
463    }
464
465    /// Kill a worker process and decrement the alive counter.
466    async fn kill_worker(&self, mut w: PoolWorker) {
467        let _ = w.child.kill().await;
468        let mut alive = self.alive_count.lock().await;
469        *alive = alive.saturating_sub(1);
470    }
471}
472
473// Drop the worker handle — if it wasn't released properly, kill the worker.
474impl Drop for AcquiredWorker {
475    fn drop(&mut self) {
476        if let Some(mut w) = self.worker.take() {
477            // Best-effort kill — can't async in Drop, but kill_on_drop handles it
478            let _ = w.child.start_kill();
479        }
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486
487    #[test]
488    fn pool_config_defaults() {
489        let config = PoolConfig::default();
490        assert_eq!(config.min_workers, 2);
491        assert_eq!(config.max_workers, 8);
492        assert_eq!(config.max_idle_time, Duration::from_secs(60));
493        assert_eq!(config.max_uses, 50);
494        assert_eq!(config.health_check_timeout, Duration::from_millis(500));
495    }
496
497    #[test]
498    fn pool_metrics_default_zero() {
499        let m = PoolMetrics::default();
500        assert_eq!(m.spawned.load(Ordering::Relaxed), 0);
501        assert_eq!(m.reused.load(Ordering::Relaxed), 0);
502        assert_eq!(m.killed_max_uses.load(Ordering::Relaxed), 0);
503        assert_eq!(m.killed_idle.load(Ordering::Relaxed), 0);
504        assert_eq!(m.killed_error.load(Ordering::Relaxed), 0);
505    }
506
507    #[test]
508    fn release_outcome_eq() {
509        assert_eq!(ReleaseOutcome::Ok, ReleaseOutcome::Ok);
510        assert_eq!(ReleaseOutcome::Fatal, ReleaseOutcome::Fatal);
511        assert_ne!(ReleaseOutcome::Ok, ReleaseOutcome::Fatal);
512    }
513
514    #[tokio::test]
515    async fn pool_new_starts_empty() {
516        let pool = WorkerPool::new(PoolConfig::default());
517        let idle = pool.idle_workers.lock().await;
518        assert_eq!(idle.len(), 0);
519        assert_eq!(*pool.alive_count.lock().await, 0);
520    }
521
522    #[tokio::test]
523    async fn pool_shutdown_sets_flag() {
524        let pool = WorkerPool::new(PoolConfig::default());
525        assert!(!*pool.shutting_down.lock().await);
526        pool.shutdown().await;
527        assert!(*pool.shutting_down.lock().await);
528    }
529
530    #[tokio::test]
531    async fn pool_reap_empty_is_noop() {
532        let pool = WorkerPool::new(PoolConfig::default());
533        pool.reap_idle().await;
534        assert_eq!(pool.idle_workers.lock().await.len(), 0);
535    }
536
537    // --- Phase 5: Pool maturation unit tests ---
538
539    #[test]
540    fn pool_cc15_pool_config_validation() {
541        let config = PoolConfig {
542            min_workers: 0,
543            max_workers: 1,
544            max_idle_time: Duration::from_secs(1),
545            max_uses: 1,
546            health_check_timeout: Duration::from_millis(100),
547        };
548        // Config should accept edge values
549        assert_eq!(config.min_workers, 0);
550        assert_eq!(config.max_workers, 1);
551        assert_eq!(config.max_uses, 1);
552    }
553
554    #[tokio::test]
555    async fn pool_shutdown_rejects_new_acquires() {
556        let pool = WorkerPool::new(PoolConfig::default());
557        pool.shutdown().await;
558
559        let config = crate::SandboxConfig::default();
560        let result = pool.acquire(&config).await;
561        match result {
562            Err(e) => {
563                let msg = e.to_string();
564                assert!(
565                    msg.contains("shutting down"),
566                    "should mention shutting down: {msg}"
567                );
568            }
569            Ok(_) => panic!("should reject after shutdown"),
570        }
571    }
572
573    #[tokio::test]
574    async fn pool_shutdown_kills_all_idle() {
575        let pool = WorkerPool::new(PoolConfig::default());
576        // After shutdown, idle pool should be empty
577        pool.shutdown().await;
578        assert_eq!(pool.idle_workers.lock().await.len(), 0);
579    }
580
581    #[tokio::test]
582    async fn pool_reap_preserves_min_workers_count() {
583        // The reap logic should not drop below min_workers
584        // This is a unit test of the logic — we verify via the kept count
585        let config = PoolConfig {
586            min_workers: 2,
587            max_workers: 4,
588            max_idle_time: Duration::from_secs(0), // everything is "expired"
589            max_uses: 50,
590            health_check_timeout: Duration::from_millis(500),
591        };
592        let pool = WorkerPool::new(config);
593        // We can't add real workers without spawning, but we verify the
594        // reap_idle logic handles empty pool + min_workers correctly
595        pool.reap_idle().await;
596        assert_eq!(pool.idle_workers.lock().await.len(), 0);
597    }
598
599    #[test]
600    fn pool_metrics_spawned_increments() {
601        let m = PoolMetrics::default();
602        m.spawned.fetch_add(1, Ordering::Relaxed);
603        assert_eq!(m.spawned.load(Ordering::Relaxed), 1);
604        m.spawned.fetch_add(1, Ordering::Relaxed);
605        assert_eq!(m.spawned.load(Ordering::Relaxed), 2);
606    }
607
608    #[test]
609    fn pool_metrics_reused_increments() {
610        let m = PoolMetrics::default();
611        m.reused.fetch_add(1, Ordering::Relaxed);
612        assert_eq!(m.reused.load(Ordering::Relaxed), 1);
613    }
614
615    #[test]
616    fn pool_metrics_killed_idle_increments() {
617        let m = PoolMetrics::default();
618        m.killed_idle.fetch_add(3, Ordering::Relaxed);
619        assert_eq!(m.killed_idle.load(Ordering::Relaxed), 3);
620    }
621
622    #[test]
623    fn pool_release_outcome_debug() {
624        // Verify Debug impl works
625        let ok = format!("{:?}", ReleaseOutcome::Ok);
626        let fatal = format!("{:?}", ReleaseOutcome::Fatal);
627        assert!(ok.contains("Ok"));
628        assert!(fatal.contains("Fatal"));
629    }
630
631    #[tokio::test]
632    async fn pool_multiple_shutdowns_safe() {
633        let pool = WorkerPool::new(PoolConfig::default());
634        pool.shutdown().await;
635        pool.shutdown().await; // Should not panic
636        assert!(*pool.shutting_down.lock().await);
637    }
638
639    #[cfg(feature = "worker-pool")]
640    #[tokio::test]
641    async fn pool_pw_feature_compiles() {
642        // Verify worker-pool feature gates compile correctly
643        let pool = Arc::new(WorkerPool::new(PoolConfig::default()));
644        let handle = pool.start_reap_task(Duration::from_secs(3600));
645        handle.abort();
646        // Just verify it compiles and runs
647    }
648
649    #[test]
650    fn pool_config_clone() {
651        let config = PoolConfig::default();
652        let cloned = config.clone();
653        assert_eq!(config.min_workers, cloned.min_workers);
654        assert_eq!(config.max_workers, cloned.max_workers);
655    }
656
657    #[test]
658    fn pool_cc22_worker_pool_feature_gate() {
659        // This test verifies the crate compiles both with and without the worker-pool feature.
660        // The feature only gates pre_warm and start_reap_task — core pool functionality is always available.
661        let _config = PoolConfig::default();
662        let _pool = WorkerPool::new(PoolConfig::default());
663    }
664}