1use std::collections::{HashSet, VecDeque};
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use tokio::io::BufReader;
17use tokio::process::{Child, ChildStdin, ChildStdout};
18use tokio::sync::Mutex;
19
20use crate::error::SandboxError;
21use crate::host::{find_worker_binary, ipc_event_loop};
22use crate::ipc::{read_message, write_message, ChildMessage, ParentMessage, WorkerConfig};
23use crate::{ResourceDispatcher, StashDispatcher, ToolDispatcher};
24
25#[derive(Debug, Clone)]
27pub struct PoolConfig {
28 pub min_workers: usize,
30 pub max_workers: usize,
32 pub max_idle_time: Duration,
34 pub max_uses: u32,
36 pub health_check_timeout: Duration,
38}
39
40impl Default for PoolConfig {
41 fn default() -> Self {
42 Self {
43 min_workers: 2,
44 max_workers: 8,
45 max_idle_time: Duration::from_secs(60),
46 max_uses: 50,
47 health_check_timeout: Duration::from_millis(500),
48 }
49 }
50}
51
52#[derive(Debug, Default)]
54pub struct PoolMetrics {
55 pub spawned: AtomicU64,
57 pub reused: AtomicU64,
59 pub killed_max_uses: AtomicU64,
61 pub killed_idle: AtomicU64,
63 pub killed_error: AtomicU64,
65}
66
67struct PoolWorker {
69 child: Child,
70 stdin: ChildStdin,
71 stdout: BufReader<ChildStdout>,
72 uses: u32,
73 idle_since: Instant,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78#[non_exhaustive]
79pub enum ReleaseOutcome {
80 Ok,
82 Fatal,
84}
85
86pub struct AcquiredWorker {
89 worker: Option<PoolWorker>,
90}
91
92pub struct PooledExecutionContext {
94 pub dispatcher: Arc<dyn ToolDispatcher>,
96 pub resource_dispatcher: Option<Arc<dyn ResourceDispatcher>>,
98 pub stash_dispatcher: Option<Arc<dyn StashDispatcher>>,
100 pub known_servers: Option<HashSet<String>>,
102 pub known_tools: Option<Vec<(String, String)>>,
104}
105
106impl AcquiredWorker {
107 pub async fn execute(
112 &mut self,
113 code: &str,
114 config: &crate::SandboxConfig,
115 context: PooledExecutionContext,
116 ) -> Result<serde_json::Value, SandboxError> {
117 let w = self.worker.as_mut().expect("worker already consumed");
118
119 let mut worker_config = WorkerConfig::from(config);
121 worker_config.known_servers = context.known_servers;
122 worker_config.known_tools = context.known_tools;
123 let execute_msg = ParentMessage::Execute {
124 code: code.to_string(),
125 manifest: None,
126 config: worker_config,
127 };
128 write_message(&mut w.stdin, &execute_msg)
129 .await
130 .map_err(|e| {
131 SandboxError::Execution(anyhow::anyhow!(
132 "failed to send Execute to pooled worker: {}",
133 e
134 ))
135 })?;
136
137 w.uses += 1;
138
139 let timeout = config.timeout + Duration::from_secs(2);
141 let result = tokio::time::timeout(
142 timeout,
143 ipc_event_loop(
144 &mut w.stdin,
145 &mut w.stdout,
146 context.dispatcher,
147 context.resource_dispatcher,
148 context.stash_dispatcher,
149 ),
150 )
151 .await;
152
153 match result {
154 Ok(inner) => inner,
155 Err(_elapsed) => {
156 Err(SandboxError::Timeout {
158 timeout_ms: config.timeout.as_millis() as u64,
159 })
160 }
161 }
162 }
163}
164
165pub struct WorkerPool {
167 config: PoolConfig,
168 idle_workers: Mutex<VecDeque<PoolWorker>>,
169 alive_count: Mutex<usize>,
171 metrics: Arc<PoolMetrics>,
172 shutting_down: Mutex<bool>,
174}
175
176impl WorkerPool {
177 pub fn new(config: PoolConfig) -> Self {
179 Self {
180 config,
181 idle_workers: Mutex::new(VecDeque::new()),
182 alive_count: Mutex::new(0),
183 metrics: Arc::new(PoolMetrics::default()),
184 shutting_down: Mutex::new(false),
185 }
186 }
187
188 pub fn metrics(&self) -> &Arc<PoolMetrics> {
190 &self.metrics
191 }
192
193 #[tracing::instrument(skip(self, sandbox_config))]
197 pub async fn acquire(
198 &self,
199 sandbox_config: &crate::SandboxConfig,
200 ) -> Result<AcquiredWorker, SandboxError> {
201 if *self.shutting_down.lock().await {
202 return Err(SandboxError::Execution(anyhow::anyhow!(
203 "worker pool is shutting down"
204 )));
205 }
206
207 let worker_config = WorkerConfig::from(sandbox_config);
208
209 loop {
211 let mut idle = self.idle_workers.lock().await;
212 if let Some(mut w) = idle.pop_front() {
213 drop(idle); let healthy = self.health_check(&mut w, &worker_config).await;
217 if healthy {
218 self.metrics.reused.fetch_add(1, Ordering::Relaxed);
219 return Ok(AcquiredWorker { worker: Some(w) });
220 } else {
221 self.kill_worker(w).await;
223 self.metrics.killed_error.fetch_add(1, Ordering::Relaxed);
224 continue;
225 }
226 } else {
227 drop(idle);
228 break;
229 }
230 }
231
232 let mut alive = self.alive_count.lock().await;
234 if *alive >= self.config.max_workers {
235 return Err(SandboxError::Execution(anyhow::anyhow!(
236 "worker pool at capacity ({} workers)",
237 self.config.max_workers
238 )));
239 }
240
241 let worker = self.spawn_worker().await?;
242 *alive += 1;
243 drop(alive);
244
245 let mut w = worker;
247 let healthy = self.health_check(&mut w, &worker_config).await;
248 if !healthy {
249 self.kill_worker(w).await;
250 return Err(SandboxError::Execution(anyhow::anyhow!(
251 "newly spawned worker failed health check"
252 )));
253 }
254
255 Ok(AcquiredWorker { worker: Some(w) })
256 }
257
258 #[tracing::instrument(skip(self, handle), fields(outcome = ?outcome))]
263 pub async fn release(&self, mut handle: AcquiredWorker, outcome: ReleaseOutcome) {
264 let worker = match handle.worker.take() {
265 Some(w) => w,
266 None => return,
267 };
268
269 if outcome == ReleaseOutcome::Fatal {
270 self.kill_worker(worker).await;
271 self.metrics.killed_error.fetch_add(1, Ordering::Relaxed);
272 return;
273 }
274
275 if worker.uses >= self.config.max_uses {
276 self.kill_worker(worker).await;
277 self.metrics.killed_max_uses.fetch_add(1, Ordering::Relaxed);
278 return;
279 }
280
281 if *self.shutting_down.lock().await {
282 self.kill_worker(worker).await;
283 return;
284 }
285
286 let mut w = worker;
288 w.idle_since = Instant::now();
289 self.idle_workers.lock().await.push_back(w);
290 }
291
292 pub async fn shutdown(&self) {
294 *self.shutting_down.lock().await = true;
295
296 let mut idle = self.idle_workers.lock().await;
297 let workers: Vec<PoolWorker> = idle.drain(..).collect();
298 drop(idle);
299
300 for w in workers {
301 self.kill_worker(w).await;
302 }
303 }
304
305 pub async fn reap_idle(&self) {
310 let mut idle = self.idle_workers.lock().await;
311 let now = Instant::now();
312 let mut to_kill = Vec::new();
313 let mut kept = VecDeque::new();
314 let alive = *self.alive_count.lock().await;
315
316 while let Some(w) = idle.pop_front() {
317 if now.duration_since(w.idle_since) > self.config.max_idle_time {
318 let would_remain = alive - to_kill.len() - 1;
320 if would_remain >= self.config.min_workers {
321 to_kill.push(w);
322 } else {
323 kept.push_back(w);
324 }
325 } else {
326 kept.push_back(w);
327 }
328 }
329 *idle = kept;
330 drop(idle);
331
332 for w in to_kill {
333 self.kill_worker(w).await;
334 self.metrics.killed_idle.fetch_add(1, Ordering::Relaxed);
335 }
336 }
337
338 #[cfg(feature = "worker-pool")]
343 pub async fn pre_warm(&self, config: &crate::SandboxConfig) -> Result<usize, SandboxError> {
344 let worker_config = WorkerConfig::from(config);
345 let mut count = 0;
346
347 let alive = *self.alive_count.lock().await;
348 let to_spawn = self.config.min_workers.saturating_sub(alive);
349
350 for _ in 0..to_spawn {
351 if *self.alive_count.lock().await >= self.config.max_workers {
352 break;
353 }
354
355 match self.spawn_worker().await {
356 Ok(mut w) => {
357 if self.health_check(&mut w, &worker_config).await {
358 w.idle_since = Instant::now();
359 self.idle_workers.lock().await.push_back(w);
360 *self.alive_count.lock().await += 1;
361 count += 1;
362 } else {
363 self.kill_worker(w).await;
364 }
365 }
366 Err(e) => {
367 tracing::warn!(error = %e, "failed to pre-warm worker");
368 }
369 }
370 }
371
372 Ok(count)
373 }
374
375 #[cfg(feature = "worker-pool")]
379 pub fn start_reap_task(self: &Arc<Self>, interval: Duration) -> tokio::task::JoinHandle<()> {
380 let pool = Arc::clone(self);
381 tokio::spawn(async move {
382 loop {
383 tokio::time::sleep(interval).await;
384 if *pool.shutting_down.lock().await {
385 break;
386 }
387 pool.reap_idle().await;
388 }
389 })
390 }
391
392 async fn spawn_worker(&self) -> Result<PoolWorker, SandboxError> {
394 let worker_bin = find_worker_binary()?;
395
396 let debug_mode = std::env::var("FORGE_DEBUG").is_ok();
398 let mut child = tokio::process::Command::new(&worker_bin)
399 .stdin(std::process::Stdio::piped())
400 .stdout(std::process::Stdio::piped())
401 .stderr(if debug_mode {
402 std::process::Stdio::piped()
403 } else {
404 std::process::Stdio::null()
405 })
406 .env_clear()
407 .kill_on_drop(true)
408 .spawn()
409 .map_err(|e| {
410 SandboxError::Execution(anyhow::anyhow!(
411 "failed to spawn pooled worker at {}: {}",
412 worker_bin.display(),
413 e
414 ))
415 })?;
416
417 if debug_mode {
419 if let Some(stderr) = child.stderr.take() {
420 tokio::spawn(crate::host::capture_bounded_stderr(stderr));
421 }
422 }
423
424 let stdin = child
425 .stdin
426 .take()
427 .ok_or_else(|| SandboxError::Execution(anyhow::anyhow!("no stdin on pooled worker")))?;
428 let stdout = child.stdout.take().ok_or_else(|| {
429 SandboxError::Execution(anyhow::anyhow!("no stdout on pooled worker"))
430 })?;
431
432 self.metrics.spawned.fetch_add(1, Ordering::Relaxed);
433
434 Ok(PoolWorker {
435 child,
436 stdin,
437 stdout: BufReader::new(stdout),
438 uses: 0,
439 idle_since: Instant::now(),
440 })
441 }
442
443 async fn health_check(&self, w: &mut PoolWorker, config: &WorkerConfig) -> bool {
445 let reset_msg = ParentMessage::Reset {
446 config: config.clone(),
447 };
448
449 if write_message(&mut w.stdin, &reset_msg).await.is_err() {
451 return false;
452 }
453
454 matches!(
456 tokio::time::timeout(
457 self.config.health_check_timeout,
458 read_message::<ChildMessage, _>(&mut w.stdout),
459 )
460 .await,
461 Ok(Ok(Some(ChildMessage::ResetComplete)))
462 )
463 }
464
465 async fn kill_worker(&self, mut w: PoolWorker) {
467 let _ = w.child.kill().await;
468 let mut alive = self.alive_count.lock().await;
469 *alive = alive.saturating_sub(1);
470 }
471}
472
473impl Drop for AcquiredWorker {
475 fn drop(&mut self) {
476 if let Some(mut w) = self.worker.take() {
477 let _ = w.child.start_kill();
479 }
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486
487 #[test]
488 fn pool_config_defaults() {
489 let config = PoolConfig::default();
490 assert_eq!(config.min_workers, 2);
491 assert_eq!(config.max_workers, 8);
492 assert_eq!(config.max_idle_time, Duration::from_secs(60));
493 assert_eq!(config.max_uses, 50);
494 assert_eq!(config.health_check_timeout, Duration::from_millis(500));
495 }
496
497 #[test]
498 fn pool_metrics_default_zero() {
499 let m = PoolMetrics::default();
500 assert_eq!(m.spawned.load(Ordering::Relaxed), 0);
501 assert_eq!(m.reused.load(Ordering::Relaxed), 0);
502 assert_eq!(m.killed_max_uses.load(Ordering::Relaxed), 0);
503 assert_eq!(m.killed_idle.load(Ordering::Relaxed), 0);
504 assert_eq!(m.killed_error.load(Ordering::Relaxed), 0);
505 }
506
507 #[test]
508 fn release_outcome_eq() {
509 assert_eq!(ReleaseOutcome::Ok, ReleaseOutcome::Ok);
510 assert_eq!(ReleaseOutcome::Fatal, ReleaseOutcome::Fatal);
511 assert_ne!(ReleaseOutcome::Ok, ReleaseOutcome::Fatal);
512 }
513
514 #[tokio::test]
515 async fn pool_new_starts_empty() {
516 let pool = WorkerPool::new(PoolConfig::default());
517 let idle = pool.idle_workers.lock().await;
518 assert_eq!(idle.len(), 0);
519 assert_eq!(*pool.alive_count.lock().await, 0);
520 }
521
522 #[tokio::test]
523 async fn pool_shutdown_sets_flag() {
524 let pool = WorkerPool::new(PoolConfig::default());
525 assert!(!*pool.shutting_down.lock().await);
526 pool.shutdown().await;
527 assert!(*pool.shutting_down.lock().await);
528 }
529
530 #[tokio::test]
531 async fn pool_reap_empty_is_noop() {
532 let pool = WorkerPool::new(PoolConfig::default());
533 pool.reap_idle().await;
534 assert_eq!(pool.idle_workers.lock().await.len(), 0);
535 }
536
537 #[test]
540 fn pool_cc15_pool_config_validation() {
541 let config = PoolConfig {
542 min_workers: 0,
543 max_workers: 1,
544 max_idle_time: Duration::from_secs(1),
545 max_uses: 1,
546 health_check_timeout: Duration::from_millis(100),
547 };
548 assert_eq!(config.min_workers, 0);
550 assert_eq!(config.max_workers, 1);
551 assert_eq!(config.max_uses, 1);
552 }
553
554 #[tokio::test]
555 async fn pool_shutdown_rejects_new_acquires() {
556 let pool = WorkerPool::new(PoolConfig::default());
557 pool.shutdown().await;
558
559 let config = crate::SandboxConfig::default();
560 let result = pool.acquire(&config).await;
561 match result {
562 Err(e) => {
563 let msg = e.to_string();
564 assert!(
565 msg.contains("shutting down"),
566 "should mention shutting down: {msg}"
567 );
568 }
569 Ok(_) => panic!("should reject after shutdown"),
570 }
571 }
572
573 #[tokio::test]
574 async fn pool_shutdown_kills_all_idle() {
575 let pool = WorkerPool::new(PoolConfig::default());
576 pool.shutdown().await;
578 assert_eq!(pool.idle_workers.lock().await.len(), 0);
579 }
580
581 #[tokio::test]
582 async fn pool_reap_preserves_min_workers_count() {
583 let config = PoolConfig {
586 min_workers: 2,
587 max_workers: 4,
588 max_idle_time: Duration::from_secs(0), max_uses: 50,
590 health_check_timeout: Duration::from_millis(500),
591 };
592 let pool = WorkerPool::new(config);
593 pool.reap_idle().await;
596 assert_eq!(pool.idle_workers.lock().await.len(), 0);
597 }
598
599 #[test]
600 fn pool_metrics_spawned_increments() {
601 let m = PoolMetrics::default();
602 m.spawned.fetch_add(1, Ordering::Relaxed);
603 assert_eq!(m.spawned.load(Ordering::Relaxed), 1);
604 m.spawned.fetch_add(1, Ordering::Relaxed);
605 assert_eq!(m.spawned.load(Ordering::Relaxed), 2);
606 }
607
608 #[test]
609 fn pool_metrics_reused_increments() {
610 let m = PoolMetrics::default();
611 m.reused.fetch_add(1, Ordering::Relaxed);
612 assert_eq!(m.reused.load(Ordering::Relaxed), 1);
613 }
614
615 #[test]
616 fn pool_metrics_killed_idle_increments() {
617 let m = PoolMetrics::default();
618 m.killed_idle.fetch_add(3, Ordering::Relaxed);
619 assert_eq!(m.killed_idle.load(Ordering::Relaxed), 3);
620 }
621
622 #[test]
623 fn pool_release_outcome_debug() {
624 let ok = format!("{:?}", ReleaseOutcome::Ok);
626 let fatal = format!("{:?}", ReleaseOutcome::Fatal);
627 assert!(ok.contains("Ok"));
628 assert!(fatal.contains("Fatal"));
629 }
630
631 #[tokio::test]
632 async fn pool_multiple_shutdowns_safe() {
633 let pool = WorkerPool::new(PoolConfig::default());
634 pool.shutdown().await;
635 pool.shutdown().await; assert!(*pool.shutting_down.lock().await);
637 }
638
639 #[cfg(feature = "worker-pool")]
640 #[tokio::test]
641 async fn pool_pw_feature_compiles() {
642 let pool = Arc::new(WorkerPool::new(PoolConfig::default()));
644 let handle = pool.start_reap_task(Duration::from_secs(3600));
645 handle.abort();
646 }
648
649 #[test]
650 fn pool_config_clone() {
651 let config = PoolConfig::default();
652 let cloned = config.clone();
653 assert_eq!(config.min_workers, cloned.min_workers);
654 assert_eq!(config.max_workers, cloned.max_workers);
655 }
656
657 #[test]
658 fn pool_cc22_worker_pool_feature_gate() {
659 let _config = PoolConfig::default();
662 let _pool = WorkerPool::new(PoolConfig::default());
663 }
664}