#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::collections::VecDeque;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex as StdMutex};
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;
use tokio::time::timeout;
use crate::Error;
use crate::callback::Callback;
use crate::sandbox::{ResourceLimits, Sandbox, SandboxBuilder, state};
use crate::trace::{OutputHandler, TraceHandler};
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub max_size: usize,
pub min_idle: usize,
pub idle_timeout: Duration,
pub acquire_timeout: Duration,
pub reset_on_release: bool,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
max_size: 10,
min_idle: 1,
idle_timeout: Duration::from_secs(300),
acquire_timeout: Duration::from_secs(30),
reset_on_release: true,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum PoolError {
#[error("pool exhausted: all {0} sandboxes are in use")]
Exhausted(usize),
#[error("acquire timeout after {0:?}")]
Timeout(Duration),
#[error("sandbox creation failed: {0}")]
Creation(String),
#[error("sandbox reset failed: {0}")]
Reset(String),
#[error("pool is closed")]
Closed,
}
impl From<Error> for PoolError {
fn from(err: Error) -> Self {
PoolError::Creation(err.to_string())
}
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub total: usize,
pub available: usize,
pub in_use: usize,
pub total_acquisitions: u64,
pub total_creations: u64,
pub wait_count: u64,
pub total_wait_time: Duration,
}
impl PoolStats {
#[must_use]
pub fn average_wait_time(&self) -> Duration {
if self.wait_count == 0 {
Duration::ZERO
} else {
self.total_wait_time / self.wait_count as u32
}
}
}
struct PoolEntry {
sandbox: Sandbox,
last_used: Instant,
}
struct PoolStatsInner {
total_acquisitions: AtomicU64,
total_creations: AtomicU64,
wait_count: AtomicU64,
total_wait_nanos: AtomicU64,
}
impl PoolStatsInner {
fn new() -> Self {
Self {
total_acquisitions: AtomicU64::new(0),
total_creations: AtomicU64::new(0),
wait_count: AtomicU64::new(0),
total_wait_nanos: AtomicU64::new(0),
}
}
fn record_acquisition(&self) {
self.total_acquisitions.fetch_add(1, Ordering::Relaxed);
}
fn record_creation(&self) {
self.total_creations.fetch_add(1, Ordering::Relaxed);
}
fn record_wait(&self, duration: Duration) {
self.wait_count.fetch_add(1, Ordering::Relaxed);
self.total_wait_nanos
.fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
}
}
pub struct SandboxPool {
config: PoolConfig,
builder_fn: Arc<dyn Fn() -> Result<Sandbox, Error> + Send + Sync>,
pool: Arc<StdMutex<VecDeque<PoolEntry>>>,
semaphore: Arc<Semaphore>,
stats: Arc<PoolStatsInner>,
current_size: Arc<AtomicUsize>,
closed: Arc<AtomicUsize>,
}
impl std::fmt::Debug for SandboxPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SandboxPool")
.field("config", &self.config)
.field("current_size", &self.current_size.load(Ordering::Relaxed))
.field("stats", &self.stats())
.finish_non_exhaustive()
}
}
impl SandboxPool {
pub async fn new(
_builder: SandboxBuilder<state::Has, state::Has>,
config: PoolConfig,
) -> Result<Self, PoolError> {
if config.max_size == 0 {
return Err(PoolError::Creation(
"max_size must be greater than 0".to_string(),
));
}
if config.min_idle > config.max_size {
return Err(PoolError::Creation(
"min_idle cannot exceed max_size".to_string(),
));
}
let builder_fn: Arc<dyn Fn() -> Result<Sandbox, Error> + Send + Sync> =
Arc::new(move || {
#[cfg(feature = "embedded")]
{
Sandbox::embedded().build()
}
#[cfg(not(feature = "embedded"))]
{
Err(Error::Initialization(
"Pool requires embedded feature".to_string(),
))
}
});
let pool = Arc::new(StdMutex::new(VecDeque::with_capacity(config.max_size)));
let semaphore = Arc::new(Semaphore::new(config.max_size));
let stats = Arc::new(PoolStatsInner::new());
let current_size = Arc::new(AtomicUsize::new(0));
let closed = Arc::new(AtomicUsize::new(0));
let pool_instance = Self {
config: config.clone(),
builder_fn,
pool,
semaphore,
stats,
current_size,
closed,
};
pool_instance.prewarm(config.min_idle)?;
Ok(pool_instance)
}
pub async fn with_builder<F>(builder_fn: F, config: PoolConfig) -> Result<Self, PoolError>
where
F: Fn() -> Result<Sandbox, Error> + Send + Sync + 'static,
{
if config.max_size == 0 {
return Err(PoolError::Creation(
"max_size must be greater than 0".to_string(),
));
}
if config.min_idle > config.max_size {
return Err(PoolError::Creation(
"min_idle cannot exceed max_size".to_string(),
));
}
let builder_fn: Arc<dyn Fn() -> Result<Sandbox, Error> + Send + Sync> =
Arc::new(builder_fn);
let pool = Arc::new(StdMutex::new(VecDeque::with_capacity(config.max_size)));
let semaphore = Arc::new(Semaphore::new(config.max_size));
let stats = Arc::new(PoolStatsInner::new());
let current_size = Arc::new(AtomicUsize::new(0));
let closed = Arc::new(AtomicUsize::new(0));
let pool_instance = Self {
config: config.clone(),
builder_fn,
pool,
semaphore,
stats,
current_size,
closed,
};
pool_instance.prewarm(config.min_idle)?;
Ok(pool_instance)
}
fn prewarm(&self, count: usize) -> Result<(), PoolError> {
for _ in 0..count {
let sandbox = (self.builder_fn)().map_err(PoolError::from)?;
self.stats.record_creation();
self.current_size.fetch_add(1, Ordering::Relaxed);
let mut pool = self.pool.lock().unwrap();
pool.push_back(PoolEntry {
sandbox,
last_used: Instant::now(),
});
}
Ok(())
}
pub async fn acquire(&self) -> Result<PooledSandbox, PoolError> {
self.acquire_timeout(self.config.acquire_timeout).await
}
pub async fn acquire_timeout(
&self,
acquire_timeout: Duration,
) -> Result<PooledSandbox, PoolError> {
if self.closed.load(Ordering::Relaxed) != 0 {
return Err(PoolError::Closed);
}
let start = Instant::now();
let permit = match timeout(acquire_timeout, self.semaphore.clone().acquire_owned()).await {
Ok(Ok(permit)) => permit,
Ok(Err(_)) => return Err(PoolError::Closed), Err(_) => {
return Err(PoolError::Timeout(acquire_timeout));
}
};
let wait_time = start.elapsed();
if wait_time > Duration::from_millis(1) {
self.stats.record_wait(wait_time);
}
let sandbox = {
let mut pool = self.pool.lock().unwrap();
pool.pop_front().map(|entry| entry.sandbox)
};
let sandbox = match sandbox {
Some(s) => s,
None => {
let s = (self.builder_fn)().map_err(PoolError::from)?;
self.stats.record_creation();
self.current_size.fetch_add(1, Ordering::Relaxed);
s
}
};
self.stats.record_acquisition();
Ok(PooledSandbox {
sandbox: Some(sandbox),
pool: Arc::new(PoolHandle {
pool: Arc::clone(&self.pool),
current_size: Arc::clone(&self.current_size),
closed: Arc::clone(&self.closed),
}),
_permit: permit,
})
}
pub fn try_acquire(&self) -> Result<Option<PooledSandbox>, PoolError> {
if self.closed.load(Ordering::Relaxed) != 0 {
return Err(PoolError::Closed);
}
let permit = match self.semaphore.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => return Ok(None), };
let sandbox = {
let mut pool = self.pool.lock().unwrap();
pool.pop_front().map(|entry| entry.sandbox)
};
let sandbox = match sandbox {
Some(s) => s,
None => {
let s = (self.builder_fn)().map_err(PoolError::from)?;
self.stats.record_creation();
self.current_size.fetch_add(1, Ordering::Relaxed);
s
}
};
self.stats.record_acquisition();
Ok(Some(PooledSandbox {
sandbox: Some(sandbox),
pool: Arc::new(PoolHandle {
pool: Arc::clone(&self.pool),
current_size: Arc::clone(&self.current_size),
closed: Arc::clone(&self.closed),
}),
_permit: permit,
}))
}
#[must_use]
pub fn stats(&self) -> PoolStats {
let current_size = self.current_size.load(Ordering::Relaxed);
let available = self.semaphore.available_permits();
let in_use = self.config.max_size - available;
PoolStats {
total: current_size,
available,
in_use,
total_acquisitions: self.stats.total_acquisitions.load(Ordering::Relaxed),
total_creations: self.stats.total_creations.load(Ordering::Relaxed),
wait_count: self.stats.wait_count.load(Ordering::Relaxed),
total_wait_time: Duration::from_nanos(
self.stats.total_wait_nanos.load(Ordering::Relaxed),
),
}
}
#[must_use]
pub fn config(&self) -> &PoolConfig {
&self.config
}
pub fn close(&self) {
self.closed.store(1, Ordering::Relaxed);
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.closed.load(Ordering::Relaxed) != 0
}
pub fn evict_idle(&self) -> usize {
let mut pool = self.pool.lock().unwrap();
let now = Instant::now();
let min_idle = self.config.min_idle;
let idle_timeout = self.config.idle_timeout;
let mut evicted = 0;
while pool.len() > min_idle {
if let Some(entry) = pool.front() {
if now.duration_since(entry.last_used) > idle_timeout {
pool.pop_front();
self.current_size.fetch_sub(1, Ordering::Relaxed);
evicted += 1;
} else {
break;
}
} else {
break;
}
}
evicted
}
}
struct PoolHandle {
pool: Arc<StdMutex<VecDeque<PoolEntry>>>,
current_size: Arc<AtomicUsize>,
closed: Arc<AtomicUsize>,
}
impl PoolHandle {
fn return_sandbox(&self, sandbox: Sandbox) {
if self.closed.load(Ordering::Relaxed) != 0 {
self.current_size.fetch_sub(1, Ordering::Relaxed);
return;
}
let mut pool = self.pool.lock().unwrap();
pool.push_back(PoolEntry {
sandbox,
last_used: Instant::now(),
});
}
}
pub struct PooledSandbox {
sandbox: Option<Sandbox>,
pool: Arc<PoolHandle>,
_permit: tokio::sync::OwnedSemaphorePermit,
}
impl std::fmt::Debug for PooledSandbox {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PooledSandbox")
.field("sandbox", &self.sandbox)
.finish_non_exhaustive()
}
}
impl Deref for PooledSandbox {
type Target = Sandbox;
fn deref(&self) -> &Self::Target {
self.sandbox.as_ref().expect("sandbox taken during drop")
}
}
impl DerefMut for PooledSandbox {
fn deref_mut(&mut self) -> &mut Self::Target {
self.sandbox.as_mut().expect("sandbox taken during drop")
}
}
impl Drop for PooledSandbox {
fn drop(&mut self) {
if let Some(mut sandbox) = self.sandbox.take() {
sandbox.clear_per_request_state();
self.pool.return_sandbox(sandbox);
}
}
}
impl PooledSandbox {
#[must_use]
pub fn sandbox(&self) -> &Sandbox {
self.sandbox.as_ref().expect("sandbox taken during drop")
}
#[must_use]
pub fn sandbox_mut(&mut self) -> &mut Sandbox {
self.sandbox.as_mut().expect("sandbox taken during drop")
}
#[must_use]
pub fn detach(mut self) -> Sandbox {
self.sandbox.take().expect("sandbox already taken")
}
#[must_use]
pub fn with_callbacks(mut self, callbacks: Vec<Box<dyn Callback>>) -> Self {
self.sandbox_mut().set_callbacks(callbacks);
self
}
#[must_use]
pub fn with_trace_handler<H: TraceHandler + 'static>(mut self, handler: H) -> Self {
self.sandbox_mut().set_trace_handler(handler);
self
}
#[must_use]
pub fn with_output_handler<H: OutputHandler + 'static>(mut self, handler: H) -> Self {
self.sandbox_mut().set_output_handler(handler);
self
}
#[must_use]
pub fn with_resource_limits(mut self, limits: ResourceLimits) -> Self {
self.sandbox_mut().set_resource_limits(limits);
self
}
#[cfg(feature = "vfs")]
#[must_use]
pub fn with_vfs_storage(mut self, storage: std::sync::Arc<dyn eryx_vfs::VfsStorage>) -> Self {
self.sandbox_mut().set_vfs_storage(storage);
self
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn pool_config_default() {
let config = PoolConfig::default();
assert_eq!(config.max_size, 10);
assert_eq!(config.min_idle, 1);
assert_eq!(config.idle_timeout, Duration::from_secs(300));
assert_eq!(config.acquire_timeout, Duration::from_secs(30));
assert!(config.reset_on_release);
}
#[test]
fn pool_stats_average_wait_time_zero_waits() {
let stats = PoolStats {
total: 5,
available: 3,
in_use: 2,
total_acquisitions: 100,
total_creations: 5,
wait_count: 0,
total_wait_time: Duration::ZERO,
};
assert_eq!(stats.average_wait_time(), Duration::ZERO);
}
#[test]
fn pool_stats_average_wait_time_with_waits() {
let stats = PoolStats {
total: 5,
available: 3,
in_use: 2,
total_acquisitions: 100,
total_creations: 5,
wait_count: 10,
total_wait_time: Duration::from_millis(1000),
};
assert_eq!(stats.average_wait_time(), Duration::from_millis(100));
}
#[test]
fn pool_error_display() {
let err = PoolError::Exhausted(10);
assert!(err.to_string().contains("10"));
let err = PoolError::Timeout(Duration::from_secs(5));
assert!(err.to_string().contains("5s"));
let err = PoolError::Creation("test error".to_string());
assert!(err.to_string().contains("test error"));
let err = PoolError::Closed;
assert!(err.to_string().contains("closed"));
}
#[cfg(feature = "embedded")]
mod embedded_tests {
use super::*;
#[tokio::test]
async fn pool_new_with_defaults() {
let config = PoolConfig {
min_idle: 1,
max_size: 3,
..Default::default()
};
let pool = SandboxPool::new(Sandbox::embedded(), config).await;
assert!(pool.is_ok(), "Failed to create pool: {:?}", pool.err());
let pool = pool.unwrap();
let stats = pool.stats();
assert_eq!(stats.total, 1); assert_eq!(stats.total_creations, 1);
}
#[tokio::test]
async fn pool_acquire_and_release() {
let config = PoolConfig {
min_idle: 1,
max_size: 3,
..Default::default()
};
let pool = SandboxPool::new(Sandbox::embedded(), config)
.await
.expect("Failed to create pool");
let sandbox = pool.acquire().await.expect("Failed to acquire sandbox");
let stats = pool.stats();
assert_eq!(stats.total_acquisitions, 1);
assert_eq!(stats.in_use, 1);
drop(sandbox);
let stats = pool.stats();
assert_eq!(stats.in_use, 0);
assert_eq!(stats.available, 3); }
#[tokio::test]
async fn pool_execute_code() {
let config = PoolConfig {
min_idle: 1,
max_size: 2,
..Default::default()
};
let pool = SandboxPool::new(Sandbox::embedded(), config)
.await
.expect("Failed to create pool");
let sandbox = pool.acquire().await.expect("Failed to acquire sandbox");
let result = sandbox.execute("print('Hello from pool!')").await;
assert!(result.is_ok(), "Execution failed: {:?}", result.err());
let output = result.unwrap();
assert_eq!(output.stdout.trim(), "Hello from pool!");
}
#[tokio::test]
async fn pool_multiple_acquisitions() {
let config = PoolConfig {
min_idle: 2,
max_size: 3,
..Default::default()
};
let pool = SandboxPool::new(Sandbox::embedded(), config)
.await
.expect("Failed to create pool");
let s1 = pool.acquire().await.expect("Failed to acquire sandbox 1");
let s2 = pool.acquire().await.expect("Failed to acquire sandbox 2");
let s3 = pool.acquire().await.expect("Failed to acquire sandbox 3");
let stats = pool.stats();
assert_eq!(stats.in_use, 3);
assert_eq!(stats.total_acquisitions, 3);
drop(s1);
drop(s2);
drop(s3);
let stats = pool.stats();
assert_eq!(stats.in_use, 0);
}
#[tokio::test]
async fn pool_timeout_on_exhaustion() {
let config = PoolConfig {
min_idle: 1,
max_size: 1,
acquire_timeout: Duration::from_millis(100),
..Default::default()
};
let pool = SandboxPool::new(Sandbox::embedded(), config)
.await
.expect("Failed to create pool");
let _s1 = pool.acquire().await.expect("Failed to acquire sandbox");
let result = pool.acquire().await;
assert!(matches!(result, Err(PoolError::Timeout(_))));
}
#[tokio::test]
async fn pool_with_custom_builder() {
let config = PoolConfig {
min_idle: 1,
max_size: 2,
..Default::default()
};
let pool = SandboxPool::with_builder(|| Sandbox::embedded().build(), config)
.await
.expect("Failed to create pool");
let sandbox = pool.acquire().await.expect("Failed to acquire sandbox");
let result = sandbox.execute("print(2 + 2)").await;
assert!(result.is_ok());
assert_eq!(result.unwrap().stdout.trim(), "4");
}
#[tokio::test]
async fn pool_close_prevents_new_acquisitions() {
let config = PoolConfig {
min_idle: 1,
max_size: 2,
..Default::default()
};
let pool = SandboxPool::new(Sandbox::embedded(), config)
.await
.expect("Failed to create pool");
pool.close();
assert!(pool.is_closed());
let result = pool.acquire().await;
assert!(matches!(result, Err(PoolError::Closed)));
}
#[tokio::test]
async fn pool_detach_sandbox() {
let config = PoolConfig {
min_idle: 1,
max_size: 2,
..Default::default()
};
let pool = SandboxPool::new(Sandbox::embedded(), config)
.await
.expect("Failed to create pool");
let pooled = pool.acquire().await.expect("Failed to acquire sandbox");
let sandbox = pooled.detach();
let result = sandbox.execute("print('detached')").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn pool_try_acquire() {
let config = PoolConfig {
min_idle: 1,
max_size: 1,
..Default::default()
};
let pool = SandboxPool::new(Sandbox::embedded(), config)
.await
.expect("Failed to create pool");
let s1 = pool.try_acquire().expect("try_acquire failed");
assert!(s1.is_some());
let _s1 = s1.unwrap();
let s2 = pool.try_acquire().expect("try_acquire failed");
assert!(s2.is_none());
}
#[tokio::test]
async fn pool_evict_idle() {
let config = PoolConfig {
min_idle: 1,
max_size: 3,
idle_timeout: Duration::from_millis(10),
..Default::default()
};
let pool = SandboxPool::new(Sandbox::embedded(), config)
.await
.expect("Failed to create pool");
{
let s1 = pool.acquire().await.unwrap();
let s2 = pool.acquire().await.unwrap();
drop(s1);
drop(s2);
}
tokio::time::sleep(Duration::from_millis(50)).await;
let evicted = pool.evict_idle();
assert!(
evicted >= 1,
"Expected at least 1 eviction, got {}",
evicted
);
}
}
}