use super::*;
use async_trait::async_trait;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tempfile::TempDir;
use crate::ipc::Request;
use crate::policy::SandboxPolicy;
use crate::workspace::{CwdProvider, WorkspaceProvider};
#[derive(Debug, Default)]
struct CountingProvider {
project_root: PathBuf,
provisioned: AtomicUsize,
released: AtomicUsize,
}
impl CountingProvider {
fn new(project_root: PathBuf) -> Arc<Self> {
Arc::new(Self {
project_root,
provisioned: AtomicUsize::new(0),
released: AtomicUsize::new(0),
})
}
}
#[async_trait]
impl WorkspaceProvider for CountingProvider {
async fn provision(&self, _slot_id: &str) -> Result<PathBuf> {
self.provisioned.fetch_add(1, Ordering::SeqCst);
Ok(self.project_root.clone())
}
async fn release(&self, _slot_id: &str, _path: &Path) -> Result<Option<String>> {
self.released.fetch_add(1, Ordering::SeqCst);
Ok(None)
}
}
#[derive(Debug)]
struct FailingProvider;
#[async_trait]
impl WorkspaceProvider for FailingProvider {
async fn provision(&self, _slot_id: &str) -> Result<PathBuf> {
anyhow::bail!("synthetic provision failure for test")
}
async fn release(&self, _slot_id: &str, _path: &Path) -> Result<Option<String>> {
Ok(None)
}
}
fn ensure_worker_bin() {
if std::env::var("KODA_FS_WORKER_BIN").is_ok() {
return;
}
if std::env::var("CARGO_BIN_EXE_koda-fs-worker").is_ok() {
return;
}
let mut p = std::env::current_exe().expect("current_exe");
while p.pop() {
if p.ends_with("debug") || p.ends_with("release") {
let bin = p.join("koda-fs-worker");
if bin.exists() {
unsafe {
std::env::set_var("KODA_FS_WORKER_BIN", &bin);
}
return;
}
}
}
}
fn pool_dir() -> TempDir {
tempfile::tempdir().expect("tempdir")
}
#[test]
#[should_panic(expected = "target_per_bucket must be >= 1")]
fn new_with_zero_capacity_panics() {
let _ = SandboxPool::new(0);
}
#[test]
fn new_pool_is_empty() {
let pool = SandboxPool::new(4);
assert_eq!(pool.idle_count(), 0);
assert_eq!(pool.bucket_count(), 0);
}
#[tokio::test]
async fn cold_acquire_spawns_worker_and_calls_provision() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(2);
let provider = CountingProvider::new(dir.path().to_path_buf());
let slot = pool
.acquire(
provider.clone(),
dir.path().to_path_buf(),
&SandboxPolicy::default(),
None,
"slot-cold-1".to_string(),
)
.await
.expect("acquire");
assert_eq!(provider.provisioned.load(Ordering::SeqCst), 1);
assert_eq!(slot.slot_id(), "slot-cold-1");
assert_eq!(slot.workspace_path(), dir.path());
assert_eq!(pool.idle_count(), 0);
}
#[tokio::test]
async fn drop_returns_clean_worker_to_pool() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(2);
let provider = CountingProvider::new(dir.path().to_path_buf());
{
let _slot = pool
.acquire(
provider.clone(),
dir.path().to_path_buf(),
&SandboxPolicy::default(),
None,
"slot-return-1".to_string(),
)
.await
.expect("acquire");
}
assert_eq!(pool.idle_count(), 1, "clean drop must return worker");
}
#[tokio::test]
async fn warm_then_acquire_uses_warm_worker() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(2);
let provider = CountingProvider::new(dir.path().to_path_buf());
pool.warm_bucket(dir.path().to_path_buf(), &SandboxPolicy::default(), None, 2)
.await
.expect("warm");
assert_eq!(pool.idle_count(), 2);
let _slot = pool
.acquire(
provider,
dir.path().to_path_buf(),
&SandboxPolicy::default(),
None,
"slot-warm-1".to_string(),
)
.await
.expect("acquire");
assert_eq!(
pool.idle_count(),
1,
"acquire must consume from warm bucket"
);
}
#[tokio::test]
async fn warm_bucket_respects_target_cap() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(2); pool.warm_bucket(dir.path().to_path_buf(), &SandboxPolicy::default(), None, 5)
.await
.expect("warm");
assert_eq!(
pool.idle_count(),
2,
"warm_bucket must cap at target_per_bucket regardless of requested n"
);
}
#[tokio::test]
async fn warm_bucket_is_idempotent_at_cap() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(2);
pool.warm_bucket(dir.path().to_path_buf(), &SandboxPolicy::default(), None, 2)
.await
.expect("warm 1");
pool.warm_bucket(dir.path().to_path_buf(), &SandboxPolicy::default(), None, 2)
.await
.expect("warm 2");
assert_eq!(
pool.idle_count(),
2,
"second warm must NOT spawn extras when bucket is already at cap"
);
}
#[tokio::test]
async fn return_to_full_bucket_drops_worker() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(1); let provider = CountingProvider::new(dir.path().to_path_buf());
let s1 = pool
.acquire(
provider.clone(),
dir.path().to_path_buf(),
&SandboxPolicy::default(),
None,
"s1".to_string(),
)
.await
.expect("a1");
let s2 = pool
.acquire(
provider.clone(),
dir.path().to_path_buf(),
&SandboxPolicy::default(),
None,
"s2".to_string(),
)
.await
.expect("a2");
drop(s1);
assert_eq!(pool.idle_count(), 1, "first drop fills the cap");
drop(s2);
assert_eq!(
pool.idle_count(),
1,
"second drop must be discarded because bucket is at cap"
);
}
#[tokio::test]
async fn dirty_slot_does_not_return_worker_to_pool() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(2);
let provider = CountingProvider::new(dir.path().to_path_buf());
{
let mut slot = pool
.acquire(
provider.clone(),
dir.path().to_path_buf(),
&SandboxPolicy::default(),
None,
"dirty".to_string(),
)
.await
.expect("acquire");
slot.mark_dirty();
}
assert_eq!(
pool.idle_count(),
0,
"dirty drop must kill the worker, not return it"
);
}
#[tokio::test]
async fn distinct_policies_do_not_share_bucket() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(2);
let provider = CountingProvider::new(dir.path().to_path_buf());
let policy_a = SandboxPolicy::default();
let mut policy_b = SandboxPolicy::default();
policy_b.fs.allow_git_config = true;
{
let _ = pool
.acquire(
provider.clone(),
dir.path().to_path_buf(),
&policy_a,
None,
"pa".to_string(),
)
.await
.expect("a");
}
{
let _ = pool
.acquire(
provider.clone(),
dir.path().to_path_buf(),
&policy_b,
None,
"pb".to_string(),
)
.await
.expect("b");
}
assert_eq!(
pool.bucket_count(),
2,
"policies that differ in any field must land in distinct buckets"
);
}
#[tokio::test]
async fn provision_failure_does_not_leak_worker_into_pool() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(2);
let bad_provider: Arc<dyn WorkspaceProvider> = Arc::new(FailingProvider);
let res = pool
.acquire(
bad_provider,
dir.path().to_path_buf(),
&SandboxPolicy::default(),
None,
"leaky".to_string(),
)
.await;
assert!(res.is_err(), "acquire must propagate the provision error");
assert_eq!(
pool.idle_count(),
0,
"failed acquire must NOT return the half-built worker to the pool \
(we can't prove its state, so kill it)"
);
}
#[tokio::test]
async fn pool_drop_releases_warm_workers() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(3);
pool.warm_bucket(dir.path().to_path_buf(), &SandboxPolicy::default(), None, 3)
.await
.expect("warm");
assert_eq!(pool.idle_count(), 3);
let weak = Arc::downgrade(&pool);
drop(pool);
assert!(
weak.upgrade().is_none(),
"pool drop must release its Arc storage so the SandboxPool struct \
is destructed (which kills every cached worker via its Drop)"
);
}
#[tokio::test]
async fn slot_outliving_pool_still_drops_cleanly() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(2);
let provider = CountingProvider::new(dir.path().to_path_buf());
let slot = pool
.acquire(
provider.clone(),
dir.path().to_path_buf(),
&SandboxPolicy::default(),
None,
"orphan".to_string(),
)
.await
.expect("acquire");
drop(pool);
drop(slot);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert!(
provider.released.load(Ordering::SeqCst) >= 1,
"release should have been spawned even with pool gone"
);
}
#[tokio::test]
async fn acquired_worker_responds_to_ping() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(2);
let provider = Arc::new(CwdProvider::new(dir.path()));
let mut slot = pool
.acquire(
provider,
dir.path().to_path_buf(),
&SandboxPolicy::default(),
None,
"ping".to_string(),
)
.await
.expect("acquire");
let resp = slot
.worker()
.request(&Request::Ping)
.await
.expect("ping ok");
assert!(matches!(resp, crate::ipc::Response::Pong));
}
#[tokio::test]
async fn warm_acquire_is_faster_than_cold() {
ensure_worker_bin();
let dir = pool_dir();
let pool = SandboxPool::new(2);
let provider = Arc::new(CwdProvider::new(dir.path()));
let t_cold_start = std::time::Instant::now();
let _cold = pool
.acquire(
provider.clone(),
dir.path().to_path_buf(),
&SandboxPolicy::default(),
None,
"cold".to_string(),
)
.await
.expect("cold");
let cold = t_cold_start.elapsed();
drop(_cold);
let t_warm_start = std::time::Instant::now();
let _warm = pool
.acquire(
provider,
dir.path().to_path_buf(),
&SandboxPolicy::default(),
None,
"warm".to_string(),
)
.await
.expect("warm");
let warm = t_warm_start.elapsed();
assert!(
warm < cold / 2,
"warm acquire ({warm:?}) must beat cold ({cold:?}) by at least 2x; \
pool isn't actually amortizing spawn cost"
);
}