1use std::collections::VecDeque;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use tokio::io::BufReader;
17use tokio::process::{Child, ChildStdin, ChildStdout};
18use tokio::sync::Mutex;
19
20use crate::error::SandboxError;
21use crate::host::{find_worker_binary, ipc_event_loop};
22use crate::ipc::{read_message, write_message, ChildMessage, ParentMessage, WorkerConfig};
23use crate::{ResourceDispatcher, StashDispatcher, ToolDispatcher};
24
25#[derive(Debug, Clone)]
27pub struct PoolConfig {
28 pub min_workers: usize,
30 pub max_workers: usize,
32 pub max_idle_time: Duration,
34 pub max_uses: u32,
36 pub health_check_timeout: Duration,
38}
39
40impl Default for PoolConfig {
41 fn default() -> Self {
42 Self {
43 min_workers: 2,
44 max_workers: 8,
45 max_idle_time: Duration::from_secs(60),
46 max_uses: 50,
47 health_check_timeout: Duration::from_millis(500),
48 }
49 }
50}
51
52#[derive(Debug, Default)]
54pub struct PoolMetrics {
55 pub spawned: AtomicU64,
57 pub reused: AtomicU64,
59 pub killed_max_uses: AtomicU64,
61 pub killed_idle: AtomicU64,
63 pub killed_error: AtomicU64,
65}
66
67struct PoolWorker {
69 child: Child,
70 stdin: ChildStdin,
71 stdout: BufReader<ChildStdout>,
72 uses: u32,
73 idle_since: Instant,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78#[non_exhaustive]
79pub enum ReleaseOutcome {
80 Ok,
82 Fatal,
84}
85
86pub struct AcquiredWorker {
89 worker: Option<PoolWorker>,
90}
91
92impl AcquiredWorker {
93 pub async fn execute(
98 &mut self,
99 code: &str,
100 config: &crate::SandboxConfig,
101 dispatcher: Arc<dyn ToolDispatcher>,
102 resource_dispatcher: Option<Arc<dyn ResourceDispatcher>>,
103 stash_dispatcher: Option<Arc<dyn StashDispatcher>>,
104 ) -> Result<serde_json::Value, SandboxError> {
105 let w = self.worker.as_mut().expect("worker already consumed");
106
107 let worker_config = WorkerConfig::from(config);
109 let execute_msg = ParentMessage::Execute {
110 code: code.to_string(),
111 manifest: None,
112 config: worker_config,
113 };
114 write_message(&mut w.stdin, &execute_msg)
115 .await
116 .map_err(|e| {
117 SandboxError::Execution(anyhow::anyhow!(
118 "failed to send Execute to pooled worker: {}",
119 e
120 ))
121 })?;
122
123 w.uses += 1;
124
125 let timeout = config.timeout + Duration::from_secs(2);
127 let result = tokio::time::timeout(
128 timeout,
129 ipc_event_loop(
130 &mut w.stdin,
131 &mut w.stdout,
132 dispatcher,
133 resource_dispatcher,
134 stash_dispatcher,
135 ),
136 )
137 .await;
138
139 match result {
140 Ok(inner) => inner,
141 Err(_elapsed) => {
142 Err(SandboxError::Timeout {
144 timeout_ms: config.timeout.as_millis() as u64,
145 })
146 }
147 }
148 }
149}
150
151pub struct WorkerPool {
153 config: PoolConfig,
154 idle_workers: Mutex<VecDeque<PoolWorker>>,
155 alive_count: Mutex<usize>,
157 metrics: Arc<PoolMetrics>,
158 shutting_down: Mutex<bool>,
160}
161
162impl WorkerPool {
163 pub fn new(config: PoolConfig) -> Self {
165 Self {
166 config,
167 idle_workers: Mutex::new(VecDeque::new()),
168 alive_count: Mutex::new(0),
169 metrics: Arc::new(PoolMetrics::default()),
170 shutting_down: Mutex::new(false),
171 }
172 }
173
174 pub fn metrics(&self) -> &Arc<PoolMetrics> {
176 &self.metrics
177 }
178
179 #[tracing::instrument(skip(self, sandbox_config))]
183 pub async fn acquire(
184 &self,
185 sandbox_config: &crate::SandboxConfig,
186 ) -> Result<AcquiredWorker, SandboxError> {
187 if *self.shutting_down.lock().await {
188 return Err(SandboxError::Execution(anyhow::anyhow!(
189 "worker pool is shutting down"
190 )));
191 }
192
193 let worker_config = WorkerConfig::from(sandbox_config);
194
195 loop {
197 let mut idle = self.idle_workers.lock().await;
198 if let Some(mut w) = idle.pop_front() {
199 drop(idle); let healthy = self.health_check(&mut w, &worker_config).await;
203 if healthy {
204 self.metrics.reused.fetch_add(1, Ordering::Relaxed);
205 return Ok(AcquiredWorker { worker: Some(w) });
206 } else {
207 self.kill_worker(w).await;
209 self.metrics.killed_error.fetch_add(1, Ordering::Relaxed);
210 continue;
211 }
212 } else {
213 drop(idle);
214 break;
215 }
216 }
217
218 let mut alive = self.alive_count.lock().await;
220 if *alive >= self.config.max_workers {
221 return Err(SandboxError::Execution(anyhow::anyhow!(
222 "worker pool at capacity ({} workers)",
223 self.config.max_workers
224 )));
225 }
226
227 let worker = self.spawn_worker().await?;
228 *alive += 1;
229 drop(alive);
230
231 let mut w = worker;
233 let healthy = self.health_check(&mut w, &worker_config).await;
234 if !healthy {
235 self.kill_worker(w).await;
236 return Err(SandboxError::Execution(anyhow::anyhow!(
237 "newly spawned worker failed health check"
238 )));
239 }
240
241 Ok(AcquiredWorker { worker: Some(w) })
242 }
243
244 #[tracing::instrument(skip(self, handle), fields(outcome = ?outcome))]
249 pub async fn release(&self, mut handle: AcquiredWorker, outcome: ReleaseOutcome) {
250 let worker = match handle.worker.take() {
251 Some(w) => w,
252 None => return,
253 };
254
255 if outcome == ReleaseOutcome::Fatal {
256 self.kill_worker(worker).await;
257 self.metrics.killed_error.fetch_add(1, Ordering::Relaxed);
258 return;
259 }
260
261 if worker.uses >= self.config.max_uses {
262 self.kill_worker(worker).await;
263 self.metrics.killed_max_uses.fetch_add(1, Ordering::Relaxed);
264 return;
265 }
266
267 if *self.shutting_down.lock().await {
268 self.kill_worker(worker).await;
269 return;
270 }
271
272 let mut w = worker;
274 w.idle_since = Instant::now();
275 self.idle_workers.lock().await.push_back(w);
276 }
277
278 pub async fn shutdown(&self) {
280 *self.shutting_down.lock().await = true;
281
282 let mut idle = self.idle_workers.lock().await;
283 let workers: Vec<PoolWorker> = idle.drain(..).collect();
284 drop(idle);
285
286 for w in workers {
287 self.kill_worker(w).await;
288 }
289 }
290
291 pub async fn reap_idle(&self) {
296 let mut idle = self.idle_workers.lock().await;
297 let now = Instant::now();
298 let mut to_kill = Vec::new();
299 let mut kept = VecDeque::new();
300 let alive = *self.alive_count.lock().await;
301
302 while let Some(w) = idle.pop_front() {
303 if now.duration_since(w.idle_since) > self.config.max_idle_time {
304 let would_remain = alive - to_kill.len() - 1;
306 if would_remain >= self.config.min_workers {
307 to_kill.push(w);
308 } else {
309 kept.push_back(w);
310 }
311 } else {
312 kept.push_back(w);
313 }
314 }
315 *idle = kept;
316 drop(idle);
317
318 for w in to_kill {
319 self.kill_worker(w).await;
320 self.metrics.killed_idle.fetch_add(1, Ordering::Relaxed);
321 }
322 }
323
324 #[cfg(feature = "worker-pool")]
329 pub async fn pre_warm(&self, config: &crate::SandboxConfig) -> Result<usize, SandboxError> {
330 let worker_config = WorkerConfig::from(config);
331 let mut count = 0;
332
333 let alive = *self.alive_count.lock().await;
334 let to_spawn = self.config.min_workers.saturating_sub(alive);
335
336 for _ in 0..to_spawn {
337 if *self.alive_count.lock().await >= self.config.max_workers {
338 break;
339 }
340
341 match self.spawn_worker().await {
342 Ok(mut w) => {
343 if self.health_check(&mut w, &worker_config).await {
344 w.idle_since = Instant::now();
345 self.idle_workers.lock().await.push_back(w);
346 *self.alive_count.lock().await += 1;
347 count += 1;
348 } else {
349 self.kill_worker(w).await;
350 }
351 }
352 Err(e) => {
353 tracing::warn!(error = %e, "failed to pre-warm worker");
354 }
355 }
356 }
357
358 Ok(count)
359 }
360
361 #[cfg(feature = "worker-pool")]
365 pub fn start_reap_task(self: &Arc<Self>, interval: Duration) -> tokio::task::JoinHandle<()> {
366 let pool = Arc::clone(self);
367 tokio::spawn(async move {
368 loop {
369 tokio::time::sleep(interval).await;
370 if *pool.shutting_down.lock().await {
371 break;
372 }
373 pool.reap_idle().await;
374 }
375 })
376 }
377
378 async fn spawn_worker(&self) -> Result<PoolWorker, SandboxError> {
380 let worker_bin = find_worker_binary()?;
381
382 let debug_mode = std::env::var("FORGE_DEBUG").is_ok();
384 let mut child = tokio::process::Command::new(&worker_bin)
385 .stdin(std::process::Stdio::piped())
386 .stdout(std::process::Stdio::piped())
387 .stderr(if debug_mode {
388 std::process::Stdio::piped()
389 } else {
390 std::process::Stdio::null()
391 })
392 .env_clear()
393 .kill_on_drop(true)
394 .spawn()
395 .map_err(|e| {
396 SandboxError::Execution(anyhow::anyhow!(
397 "failed to spawn pooled worker at {}: {}",
398 worker_bin.display(),
399 e
400 ))
401 })?;
402
403 if debug_mode {
405 if let Some(stderr) = child.stderr.take() {
406 tokio::spawn(crate::host::capture_bounded_stderr(stderr));
407 }
408 }
409
410 let stdin = child
411 .stdin
412 .take()
413 .ok_or_else(|| SandboxError::Execution(anyhow::anyhow!("no stdin on pooled worker")))?;
414 let stdout = child.stdout.take().ok_or_else(|| {
415 SandboxError::Execution(anyhow::anyhow!("no stdout on pooled worker"))
416 })?;
417
418 self.metrics.spawned.fetch_add(1, Ordering::Relaxed);
419
420 Ok(PoolWorker {
421 child,
422 stdin,
423 stdout: BufReader::new(stdout),
424 uses: 0,
425 idle_since: Instant::now(),
426 })
427 }
428
429 async fn health_check(&self, w: &mut PoolWorker, config: &WorkerConfig) -> bool {
431 let reset_msg = ParentMessage::Reset {
432 config: config.clone(),
433 };
434
435 if write_message(&mut w.stdin, &reset_msg).await.is_err() {
437 return false;
438 }
439
440 matches!(
442 tokio::time::timeout(
443 self.config.health_check_timeout,
444 read_message::<ChildMessage, _>(&mut w.stdout),
445 )
446 .await,
447 Ok(Ok(Some(ChildMessage::ResetComplete)))
448 )
449 }
450
451 async fn kill_worker(&self, mut w: PoolWorker) {
453 let _ = w.child.kill().await;
454 let mut alive = self.alive_count.lock().await;
455 *alive = alive.saturating_sub(1);
456 }
457}
458
459impl Drop for AcquiredWorker {
461 fn drop(&mut self) {
462 if let Some(mut w) = self.worker.take() {
463 let _ = w.child.start_kill();
465 }
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::*;
472
473 #[test]
474 fn pool_config_defaults() {
475 let config = PoolConfig::default();
476 assert_eq!(config.min_workers, 2);
477 assert_eq!(config.max_workers, 8);
478 assert_eq!(config.max_idle_time, Duration::from_secs(60));
479 assert_eq!(config.max_uses, 50);
480 assert_eq!(config.health_check_timeout, Duration::from_millis(500));
481 }
482
483 #[test]
484 fn pool_metrics_default_zero() {
485 let m = PoolMetrics::default();
486 assert_eq!(m.spawned.load(Ordering::Relaxed), 0);
487 assert_eq!(m.reused.load(Ordering::Relaxed), 0);
488 assert_eq!(m.killed_max_uses.load(Ordering::Relaxed), 0);
489 assert_eq!(m.killed_idle.load(Ordering::Relaxed), 0);
490 assert_eq!(m.killed_error.load(Ordering::Relaxed), 0);
491 }
492
493 #[test]
494 fn release_outcome_eq() {
495 assert_eq!(ReleaseOutcome::Ok, ReleaseOutcome::Ok);
496 assert_eq!(ReleaseOutcome::Fatal, ReleaseOutcome::Fatal);
497 assert_ne!(ReleaseOutcome::Ok, ReleaseOutcome::Fatal);
498 }
499
500 #[tokio::test]
501 async fn pool_new_starts_empty() {
502 let pool = WorkerPool::new(PoolConfig::default());
503 let idle = pool.idle_workers.lock().await;
504 assert_eq!(idle.len(), 0);
505 assert_eq!(*pool.alive_count.lock().await, 0);
506 }
507
508 #[tokio::test]
509 async fn pool_shutdown_sets_flag() {
510 let pool = WorkerPool::new(PoolConfig::default());
511 assert!(!*pool.shutting_down.lock().await);
512 pool.shutdown().await;
513 assert!(*pool.shutting_down.lock().await);
514 }
515
516 #[tokio::test]
517 async fn pool_reap_empty_is_noop() {
518 let pool = WorkerPool::new(PoolConfig::default());
519 pool.reap_idle().await;
520 assert_eq!(pool.idle_workers.lock().await.len(), 0);
521 }
522
523 #[test]
526 fn pool_cc15_pool_config_validation() {
527 let config = PoolConfig {
528 min_workers: 0,
529 max_workers: 1,
530 max_idle_time: Duration::from_secs(1),
531 max_uses: 1,
532 health_check_timeout: Duration::from_millis(100),
533 };
534 assert_eq!(config.min_workers, 0);
536 assert_eq!(config.max_workers, 1);
537 assert_eq!(config.max_uses, 1);
538 }
539
540 #[tokio::test]
541 async fn pool_shutdown_rejects_new_acquires() {
542 let pool = WorkerPool::new(PoolConfig::default());
543 pool.shutdown().await;
544
545 let config = crate::SandboxConfig::default();
546 let result = pool.acquire(&config).await;
547 match result {
548 Err(e) => {
549 let msg = e.to_string();
550 assert!(
551 msg.contains("shutting down"),
552 "should mention shutting down: {msg}"
553 );
554 }
555 Ok(_) => panic!("should reject after shutdown"),
556 }
557 }
558
559 #[tokio::test]
560 async fn pool_shutdown_kills_all_idle() {
561 let pool = WorkerPool::new(PoolConfig::default());
562 pool.shutdown().await;
564 assert_eq!(pool.idle_workers.lock().await.len(), 0);
565 }
566
567 #[tokio::test]
568 async fn pool_reap_preserves_min_workers_count() {
569 let config = PoolConfig {
572 min_workers: 2,
573 max_workers: 4,
574 max_idle_time: Duration::from_secs(0), max_uses: 50,
576 health_check_timeout: Duration::from_millis(500),
577 };
578 let pool = WorkerPool::new(config);
579 pool.reap_idle().await;
582 assert_eq!(pool.idle_workers.lock().await.len(), 0);
583 }
584
585 #[test]
586 fn pool_metrics_spawned_increments() {
587 let m = PoolMetrics::default();
588 m.spawned.fetch_add(1, Ordering::Relaxed);
589 assert_eq!(m.spawned.load(Ordering::Relaxed), 1);
590 m.spawned.fetch_add(1, Ordering::Relaxed);
591 assert_eq!(m.spawned.load(Ordering::Relaxed), 2);
592 }
593
594 #[test]
595 fn pool_metrics_reused_increments() {
596 let m = PoolMetrics::default();
597 m.reused.fetch_add(1, Ordering::Relaxed);
598 assert_eq!(m.reused.load(Ordering::Relaxed), 1);
599 }
600
601 #[test]
602 fn pool_metrics_killed_idle_increments() {
603 let m = PoolMetrics::default();
604 m.killed_idle.fetch_add(3, Ordering::Relaxed);
605 assert_eq!(m.killed_idle.load(Ordering::Relaxed), 3);
606 }
607
608 #[test]
609 fn pool_release_outcome_debug() {
610 let ok = format!("{:?}", ReleaseOutcome::Ok);
612 let fatal = format!("{:?}", ReleaseOutcome::Fatal);
613 assert!(ok.contains("Ok"));
614 assert!(fatal.contains("Fatal"));
615 }
616
617 #[tokio::test]
618 async fn pool_multiple_shutdowns_safe() {
619 let pool = WorkerPool::new(PoolConfig::default());
620 pool.shutdown().await;
621 pool.shutdown().await; assert!(*pool.shutting_down.lock().await);
623 }
624
625 #[cfg(feature = "worker-pool")]
626 #[tokio::test]
627 async fn pool_pw_feature_compiles() {
628 let pool = Arc::new(WorkerPool::new(PoolConfig::default()));
630 let handle = pool.start_reap_task(Duration::from_secs(3600));
631 handle.abort();
632 }
634
635 #[test]
636 fn pool_config_clone() {
637 let config = PoolConfig::default();
638 let cloned = config.clone();
639 assert_eq!(config.min_workers, cloned.min_workers);
640 assert_eq!(config.max_workers, cloned.max_workers);
641 }
642
643 #[test]
644 fn pool_cc22_worker_pool_feature_gate() {
645 let _config = PoolConfig::default();
648 let _pool = WorkerPool::new(PoolConfig::default());
649 }
650}