use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, Weak};
use anyhow::Result;
use tracing::{debug, warn};
use crate::policy::SandboxPolicy;
use crate::proxy::ProxyHandle;
use crate::worker_client::WorkerClient;
use crate::workspace::WorkspaceProvider;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct BucketKey {
writable_root: PathBuf,
policy: SandboxPolicy,
proxy_port: Option<u16>,
}
pub struct SandboxPool {
target_per_bucket: usize,
free: Mutex<HashMap<BucketKey, Vec<WorkerClient>>>,
}
impl std::fmt::Debug for SandboxPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let n = self.free.lock().map(|m| m.len()).unwrap_or(0);
f.debug_struct("SandboxPool")
.field("target_per_bucket", &self.target_per_bucket)
.field("buckets", &n)
.finish()
}
}
impl SandboxPool {
pub fn new(target_per_bucket: usize) -> Arc<Self> {
assert!(target_per_bucket > 0, "target_per_bucket must be >= 1");
Arc::new(Self {
target_per_bucket,
free: Mutex::new(HashMap::new()),
})
}
pub async fn warm_bucket(
self: &Arc<Self>,
writable_root: PathBuf,
policy: &SandboxPolicy,
proxy: Option<&ProxyHandle>,
n: usize,
) -> Result<()> {
let key = BucketKey {
writable_root: writable_root.clone(),
policy: policy.clone(),
proxy_port: proxy.map(|p| p.port),
};
let already = {
let map = self.free.lock().expect("pool mutex poisoned");
map.get(&key).map(|v| v.len()).unwrap_or(0)
};
let want = n.min(self.target_per_bucket).saturating_sub(already);
debug!(
"warm_bucket: target={n}, cap={}, already={already}, spawning={want}",
self.target_per_bucket
);
for _ in 0..want {
let client =
WorkerClient::spawn_with_policy_and_proxy(writable_root.clone(), policy, proxy)
.await?;
let mut map = self.free.lock().expect("pool mutex poisoned");
let bucket = map.entry(key.clone()).or_default();
if bucket.len() < self.target_per_bucket {
bucket.push(client);
} else {
drop(client); }
}
Ok(())
}
pub async fn acquire(
self: &Arc<Self>,
provider: Arc<dyn WorkspaceProvider>,
writable_root: PathBuf,
policy: &SandboxPolicy,
proxy: Option<&ProxyHandle>,
slot_id: String,
) -> Result<SandboxSlot> {
let acquire_start = std::time::Instant::now();
let key = BucketKey {
writable_root: writable_root.clone(),
policy: policy.clone(),
proxy_port: proxy.map(|p| p.port),
};
let warm = {
let mut map = self.free.lock().expect("pool mutex poisoned");
map.get_mut(&key).and_then(|bucket| bucket.pop())
};
let was_warm = warm.is_some();
let worker = if let Some(w) = warm {
debug!("acquire: warm hit for slot_id={slot_id}");
w
} else {
debug!("acquire: cold spawn for slot_id={slot_id}");
WorkerClient::spawn_with_policy_and_proxy(writable_root, policy, proxy).await?
};
let workspace_path = match provider.provision(&slot_id).await {
Ok(p) => p,
Err(e) => {
drop(worker); return Err(e);
}
};
let latency_us = acquire_start.elapsed().as_micros() as u64;
tracing::info!(
target: "sandbox.acquire",
slot_id = %slot_id,
latency_us,
warm = was_warm,
"sandbox slot acquired",
);
Ok(SandboxSlot {
worker: Some(worker),
pool: Arc::downgrade(self),
key,
provider: Some(provider),
slot_id,
workspace_path,
dirty: false,
})
}
pub fn idle_count(&self) -> usize {
self.free
.lock()
.expect("pool mutex poisoned")
.values()
.map(Vec::len)
.sum()
}
pub fn bucket_count(&self) -> usize {
self.free.lock().expect("pool mutex poisoned").len()
}
fn return_worker(&self, key: BucketKey, worker: WorkerClient) {
let mut map = self.free.lock().expect("pool mutex poisoned");
let bucket = map.entry(key).or_default();
if bucket.len() < self.target_per_bucket {
bucket.push(worker);
} else {
drop(worker);
}
}
}
impl Drop for SandboxPool {
fn drop(&mut self) {
if let Ok(mut map) = self.free.lock() {
map.clear();
}
}
}
pub struct SandboxSlot {
worker: Option<WorkerClient>,
pool: Weak<SandboxPool>,
key: BucketKey,
provider: Option<Arc<dyn WorkspaceProvider>>,
slot_id: String,
workspace_path: PathBuf,
dirty: bool,
}
impl std::fmt::Debug for SandboxSlot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SandboxSlot")
.field("slot_id", &self.slot_id)
.field("workspace_path", &self.workspace_path)
.field("dirty", &self.dirty)
.finish()
}
}
impl SandboxSlot {
pub fn worker(&mut self) -> &mut WorkerClient {
self.worker
.as_mut()
.expect("slot worker accessed after drop")
}
pub fn workspace_path(&self) -> &std::path::Path {
&self.workspace_path
}
pub fn slot_id(&self) -> &str {
&self.slot_id
}
pub fn mark_dirty(&mut self) {
self.dirty = true;
}
pub async fn close(mut self) -> Result<()> {
let worker = self.worker.take().expect("worker present in close()");
let provider = self.provider.take().expect("provider present in close()");
if !self.dirty {
if let Some(pool) = self.pool.upgrade() {
pool.return_worker(
std::mem::replace(
&mut self.key,
BucketKey {
writable_root: PathBuf::new(),
policy: SandboxPolicy::default(),
proxy_port: None,
},
),
worker,
);
} else {
drop(worker);
}
} else {
drop(worker);
}
match provider.release(&self.slot_id, &self.workspace_path).await {
Ok(Some(hint)) => {
debug!("slot {} released with hint: {hint}", self.slot_id);
}
Ok(None) => {}
Err(e) => {
warn!("slot {} release failed: {e}", self.slot_id);
return Err(e);
}
}
Ok(())
}
}
impl Drop for SandboxSlot {
fn drop(&mut self) {
if let Some(worker) = self.worker.take() {
if self.dirty {
drop(worker);
} else if let Some(pool) = self.pool.upgrade() {
pool.return_worker(self.key.clone(), worker);
} else {
drop(worker);
}
}
if let Some(provider) = self.provider.take() {
let slot_id = std::mem::take(&mut self.slot_id);
let workspace_path = std::mem::take(&mut self.workspace_path);
match tokio::runtime::Handle::try_current() {
Ok(handle) => {
handle.spawn(async move {
if let Err(e) = provider.release(&slot_id, &workspace_path).await {
warn!("slot {slot_id} async release failed: {e}");
}
});
}
Err(_) => {
warn!(
"slot {slot_id} dropped outside tokio runtime; \
workspace release skipped — call close() for \
deterministic cleanup"
);
}
}
}
}
}
#[cfg(test)]
mod tests;