#![allow(dead_code)]
use anyhow::Result;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::{Mutex, Semaphore};
use tokio::time::{Duration, interval};
use crate::backend::{BackendType, ExecResult, Sandbox, SandboxConfig, create_sandbox};
const DEFAULT_POOL_SIZE: usize = 5;
const DEFAULT_MAX_POOL_SIZE: usize = 20;
const GC_INTERVAL_MS: u64 = 1000;
const GC_BATCH_SIZE: usize = 5;
pub struct PooledSandbox {
pub id: String,
sandbox: Box<dyn Sandbox>,
backend_type: BackendType,
}
impl std::fmt::Debug for PooledSandbox {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PooledSandbox")
.field("id", &self.id)
.field("backend_type", &self.backend_type)
.field("is_running", &self.sandbox.is_running())
.finish()
}
}
impl PooledSandbox {
pub async fn exec(&mut self, cmd: &[&str]) -> Result<ExecResult> {
self.sandbox.exec(cmd).await
}
pub fn is_running(&self) -> bool {
self.sandbox.is_running()
}
pub fn backend_type(&self) -> BackendType {
self.backend_type
}
}
pub struct SandboxPool {
warm_pool: Arc<Mutex<VecDeque<PooledSandbox>>>,
cleanup_queue: Arc<Mutex<VecDeque<Box<dyn Sandbox>>>>,
start_semaphore: Arc<Semaphore>,
name_counter: AtomicUsize,
backend_type: BackendType,
config: SandboxConfig,
target_size: usize,
max_size: usize,
running: Arc<std::sync::atomic::AtomicBool>,
}
impl SandboxPool {
pub fn new(backend_type: BackendType) -> Result<Self> {
Self::with_config(
backend_type,
SandboxConfig::default(),
DEFAULT_POOL_SIZE,
DEFAULT_MAX_POOL_SIZE,
)
}
pub fn with_config(
backend_type: BackendType,
config: SandboxConfig,
target_size: usize,
max_size: usize,
) -> Result<Self> {
Ok(Self {
warm_pool: Arc::new(Mutex::new(VecDeque::new())),
cleanup_queue: Arc::new(Mutex::new(VecDeque::new())),
start_semaphore: Arc::new(Semaphore::new(5)), name_counter: AtomicUsize::new(0),
backend_type,
config,
target_size,
max_size,
running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
})
}
pub async fn start(&self) -> Result<()> {
self.running.store(true, Ordering::SeqCst);
self.warm_pool_to_target().await?;
self.spawn_gc_task();
Ok(())
}
pub async fn stop(&self) -> Result<()> {
self.running.store(false, Ordering::SeqCst);
{
let mut warm = self.warm_pool.lock().await;
let mut cleanup = self.cleanup_queue.lock().await;
while let Some(pooled) = warm.pop_front() {
cleanup.push_back(pooled.sandbox);
}
}
self.gc_all().await;
Ok(())
}
pub async fn acquire(&self) -> Result<PooledSandbox> {
{
let mut pool = self.warm_pool.lock().await;
while let Some(sandbox) = pool.pop_front() {
if sandbox.is_running() {
self.spawn_refill_task();
return Ok(sandbox);
}
}
}
self.create_sandbox().await
}
pub async fn release(&self, sandbox: PooledSandbox) {
if !sandbox.is_running() {
let mut cleanup = self.cleanup_queue.lock().await;
cleanup.push_back(sandbox.sandbox);
return;
}
let pool_size = {
let pool = self.warm_pool.lock().await;
pool.len()
};
if pool_size < self.max_size {
let mut pool = self.warm_pool.lock().await;
pool.push_back(sandbox);
} else {
let mut cleanup = self.cleanup_queue.lock().await;
cleanup.push_back(sandbox.sandbox);
}
}
pub async fn stats(&self) -> SandboxPoolStats {
let warm = self.warm_pool.lock().await;
let cleanup = self.cleanup_queue.lock().await;
SandboxPoolStats {
warm_count: warm.len(),
cleanup_pending: cleanup.len(),
target_size: self.target_size,
max_size: self.max_size,
backend_type: self.backend_type,
}
}
async fn create_sandbox(&self) -> Result<PooledSandbox> {
let _permit = self.start_semaphore.acquire().await?;
let id = self.name_counter.fetch_add(1, Ordering::SeqCst);
let name = format!("pool-{}-{}", self.backend_type, id);
let mut sandbox = create_sandbox(self.backend_type, &name)?;
sandbox.start(&self.config).await?;
Ok(PooledSandbox {
id: name,
sandbox,
backend_type: self.backend_type,
})
}
async fn warm_pool_to_target(&self) -> Result<()> {
let current_size = {
let pool = self.warm_pool.lock().await;
pool.len()
};
let needed = self.target_size.saturating_sub(current_size);
if needed == 0 {
return Ok(());
}
eprintln!(
"Warming {} pool: creating {} sandboxes...",
self.backend_type, needed
);
let mut created = 0;
for _ in 0..needed {
match self.create_sandbox().await {
Ok(sandbox) => {
let mut pool = self.warm_pool.lock().await;
pool.push_back(sandbox);
created += 1;
}
Err(e) => {
eprintln!("Warning: Failed to create sandbox: {}", e);
}
}
}
eprintln!(
"{} pool warmed: {} sandboxes ready",
self.backend_type, created
);
Ok(())
}
fn spawn_refill_task(&self) {
let warm_pool = Arc::clone(&self.warm_pool);
let start_semaphore = Arc::clone(&self.start_semaphore);
let running = Arc::clone(&self.running);
let backend_type = self.backend_type;
let config = self.config.clone();
let target_size = self.target_size;
let name_counter = self.name_counter.fetch_add(1, Ordering::SeqCst);
tokio::spawn(async move {
if !running.load(Ordering::SeqCst) {
return;
}
let current_size = {
let pool = warm_pool.lock().await;
pool.len()
};
if current_size >= target_size {
return;
}
if let Ok(_permit) = start_semaphore.try_acquire() {
let name = format!("pool-{}-{}", backend_type, name_counter);
if let Ok(mut sandbox) = create_sandbox(backend_type, &name)
&& sandbox.start(&config).await.is_ok()
{
let pooled = PooledSandbox {
id: name,
sandbox,
backend_type,
};
let mut pool = warm_pool.lock().await;
pool.push_back(pooled);
}
}
});
}
fn spawn_gc_task(&self) {
let cleanup_queue = Arc::clone(&self.cleanup_queue);
let running = Arc::clone(&self.running);
tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(GC_INTERVAL_MS));
while running.load(Ordering::SeqCst) {
interval.tick().await;
let to_cleanup: Vec<Box<dyn Sandbox>> = {
let mut queue = cleanup_queue.lock().await;
let mut batch = Vec::new();
for _ in 0..GC_BATCH_SIZE {
if let Some(sandbox) = queue.pop_front() {
batch.push(sandbox);
} else {
break;
}
}
batch
};
for mut sandbox in to_cleanup {
let _ = sandbox.stop().await;
}
}
});
}
async fn gc_all(&self) {
loop {
let to_cleanup: Vec<Box<dyn Sandbox>> = {
let mut queue = self.cleanup_queue.lock().await;
let mut batch = Vec::new();
for _ in 0..GC_BATCH_SIZE {
if let Some(sandbox) = queue.pop_front() {
batch.push(sandbox);
} else {
break;
}
}
batch
};
if to_cleanup.is_empty() {
break;
}
for mut sandbox in to_cleanup {
let _ = sandbox.stop().await;
}
}
}
}
#[derive(Debug, Clone)]
pub struct SandboxPoolStats {
pub warm_count: usize,
pub cleanup_pending: usize,
pub target_size: usize,
pub max_size: usize,
pub backend_type: BackendType,
}
impl std::fmt::Display for SandboxPoolStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} pool: {}/{} warm, {} pending cleanup",
self.backend_type, self.warm_count, self.target_size, self.cleanup_pending
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sandbox_pool_stats_display() {
let stats = SandboxPoolStats {
warm_count: 3,
cleanup_pending: 1,
target_size: 5,
max_size: 20,
backend_type: BackendType::Docker,
};
let display = format!("{}", stats);
assert!(display.contains("docker"));
assert!(display.contains("3/5 warm"));
assert!(display.contains("1 pending cleanup"));
}
#[test]
fn test_sandbox_pool_stats_display_apple() {
let stats = SandboxPoolStats {
warm_count: 2,
cleanup_pending: 0,
target_size: 3,
max_size: 10,
backend_type: BackendType::Apple,
};
let display = format!("{}", stats);
assert!(display.contains("apple"));
assert!(display.contains("2/3 warm"));
}
#[test]
fn test_sandbox_pool_stats_debug() {
let stats = SandboxPoolStats {
warm_count: 5,
cleanup_pending: 2,
target_size: 10,
max_size: 50,
backend_type: BackendType::Firecracker,
};
let debug = format!("{:?}", stats);
assert!(debug.contains("warm_count: 5"));
assert!(debug.contains("cleanup_pending: 2"));
assert!(debug.contains("target_size: 10"));
assert!(debug.contains("max_size: 50"));
assert!(debug.contains("Firecracker"));
}
#[test]
fn test_sandbox_pool_stats_clone() {
let stats = SandboxPoolStats {
warm_count: 3,
cleanup_pending: 1,
target_size: 5,
max_size: 20,
backend_type: BackendType::Docker,
};
let cloned = stats.clone();
assert_eq!(cloned.warm_count, 3);
assert_eq!(cloned.cleanup_pending, 1);
assert_eq!(cloned.target_size, 5);
assert_eq!(cloned.max_size, 20);
assert_eq!(cloned.backend_type, BackendType::Docker);
}
#[test]
fn test_default_pool_constants() {
assert_eq!(DEFAULT_POOL_SIZE, 5);
assert_eq!(DEFAULT_MAX_POOL_SIZE, 20);
assert_eq!(GC_INTERVAL_MS, 1000);
assert_eq!(GC_BATCH_SIZE, 5);
}
#[test]
fn test_sandbox_pool_with_config_values() {
let config = SandboxConfig::with_image("python:3.12-alpine")
.with_resources(2, 1024)
.with_network(false);
let pool = SandboxPool::with_config(BackendType::Docker, config.clone(), 3, 15).unwrap();
assert_eq!(pool.target_size, 3);
assert_eq!(pool.max_size, 15);
assert_eq!(pool.backend_type, BackendType::Docker);
assert_eq!(pool.config.image, "python:3.12-alpine");
assert_eq!(pool.config.vcpus, 2);
assert_eq!(pool.config.memory_mb, 1024);
assert!(!pool.config.network);
}
#[test]
fn test_sandbox_pool_new_default_values() {
let pool = SandboxPool::new(BackendType::Podman).unwrap();
assert_eq!(pool.target_size, DEFAULT_POOL_SIZE);
assert_eq!(pool.max_size, DEFAULT_MAX_POOL_SIZE);
assert_eq!(pool.backend_type, BackendType::Podman);
}
#[test]
fn test_sandbox_pool_different_backends() {
let backends = [
BackendType::Docker,
BackendType::Podman,
BackendType::Apple,
BackendType::Firecracker,
BackendType::Hyperlight,
];
for backend in backends {
let pool = SandboxPool::new(backend).unwrap();
assert_eq!(pool.backend_type, backend);
}
}
#[test]
fn test_sandbox_pool_initial_state() {
let pool = SandboxPool::new(BackendType::Docker).unwrap();
assert!(!pool.running.load(Ordering::SeqCst));
assert_eq!(pool.name_counter.load(Ordering::SeqCst), 0);
}
#[tokio::test]
#[ignore] async fn test_sandbox_pool_basic() {
let backend = crate::backend::detect_best_backend().expect("No backend available");
let config = SandboxConfig::with_image("alpine:3.20");
let pool = SandboxPool::with_config(backend, config, 2, 5).unwrap();
pool.start().await.unwrap();
let mut sandbox = pool.acquire().await.unwrap();
assert!(sandbox.is_running());
let result = sandbox.exec(&["echo", "hello"]).await.unwrap();
assert!(result.is_success());
assert!(result.stdout.contains("hello"));
pool.release(sandbox).await;
let stats = pool.stats().await;
assert!(stats.warm_count >= 1);
pool.stop().await.unwrap();
}
#[tokio::test]
#[ignore] async fn test_sandbox_pool_stats_lifecycle() {
let backend = crate::backend::detect_best_backend().expect("No backend available");
let config = SandboxConfig::with_image("alpine:3.20");
let pool = SandboxPool::with_config(backend, config, 2, 5).unwrap();
let before = pool.stats().await;
assert_eq!(before.warm_count, 0);
pool.start().await.unwrap();
let after_start = pool.stats().await;
assert!(after_start.warm_count > 0);
pool.stop().await.unwrap();
}
}