use std::collections::HashMap;
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use microvm_runtime::model::{SnapshotRef, VmSpec};
use microvm_runtime::provider::{VmProvider, VmQuery};
use crate::stack_key::StackKey;
#[derive(Debug, Clone)]
pub struct WarmPoolConfig {
pub min_depth: usize,
pub max_depth: usize,
pub refill_interval: Duration,
pub entry_max_age: Duration,
}
impl Default for WarmPoolConfig {
fn default() -> Self {
Self {
min_depth: 1,
max_depth: 4,
refill_interval: Duration::from_secs(5),
entry_max_age: Duration::from_secs(600),
}
}
}
#[derive(Debug, Clone)]
pub enum ValidationResult {
Healthy,
Unhealthy(String),
}
pub trait EntryValidator: Send + Sync + 'static {
fn validate(&self, vm_id: &str) -> ValidationResult;
}
#[derive(Debug, Default, Clone)]
pub struct WarmPoolMetrics {
pub created_total: u64,
pub acquired_total: u64,
pub evicted_total: u64,
pub refill_runs_total: u64,
pub validation_failures_total: u64,
}
#[derive(Debug, Clone)]
pub struct WarmPoolHandle {
pub source_vm_id: String,
pub stack: StackKey,
pub source_snapshot: SnapshotRef,
}
#[derive(Debug)]
struct PoolEntry {
vm_id: String,
created_at: Instant,
}
#[derive(Debug)]
struct BucketState {
source_snapshot: SnapshotRef,
entries: Vec<PoolEntry>,
next_seed: u64,
}
#[derive(Debug, Default)]
struct Counters {
created: AtomicU64,
acquired: AtomicU64,
evicted: AtomicU64,
refill_runs: AtomicU64,
validation_failures: AtomicU64,
}
impl Counters {
fn snapshot(&self) -> WarmPoolMetrics {
WarmPoolMetrics {
created_total: self.created.load(Ordering::Relaxed),
acquired_total: self.acquired.load(Ordering::Relaxed),
evicted_total: self.evicted.load(Ordering::Relaxed),
refill_runs_total: self.refill_runs.load(Ordering::Relaxed),
validation_failures_total: self.validation_failures.load(Ordering::Relaxed),
}
}
}
struct PoolState {
buckets: Mutex<HashMap<StackKey, BucketState>>,
counters: Counters,
}
pub struct WarmPool<P: VmProvider + VmQuery + 'static> {
provider: P,
config: WarmPoolConfig,
validator: Arc<dyn EntryValidator>,
state: Arc<PoolState>,
shutdown_flag: Arc<AtomicBool>,
refill_handle: Option<JoinHandle<()>>,
}
impl<P: VmProvider + VmQuery + 'static> fmt::Debug for WarmPool<P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let depths: HashMap<StackKey, usize> = match self.state.buckets.lock() {
Ok(guard) => guard
.iter()
.map(|(k, b)| (k.clone(), b.entries.len()))
.collect(),
Err(poisoned) => poisoned
.into_inner()
.iter()
.map(|(k, b)| (k.clone(), b.entries.len()))
.collect(),
};
f.debug_struct("WarmPool")
.field("config", &self.config)
.field("depths", &depths)
.field(
"shutdown", &self.shutdown_flag.load(Ordering::Relaxed),
)
.finish()
}
}
impl<P: VmProvider + VmQuery + Clone + 'static> WarmPool<P> {
pub fn start(provider: P, config: WarmPoolConfig, validator: Arc<dyn EntryValidator>) -> Self {
let state = Arc::new(PoolState {
buckets: Mutex::new(HashMap::new()),
counters: Counters::default(),
});
let shutdown_flag = Arc::new(AtomicBool::new(false));
let refill_handle = spawn_refill_thread(
provider.clone(),
config.clone(),
Arc::clone(&validator),
Arc::clone(&state),
Arc::clone(&shutdown_flag),
);
Self {
provider,
config,
validator,
state,
shutdown_flag,
refill_handle: Some(refill_handle),
}
}
pub fn register(&self, stack: StackKey, source_snapshot: SnapshotRef) {
let mut guard = lock_buckets(&self.state.buckets);
guard
.entry(stack)
.and_modify(|b| b.source_snapshot = source_snapshot.clone())
.or_insert_with(|| BucketState {
source_snapshot,
entries: Vec::new(),
next_seed: 0,
});
}
pub fn unregister(&self, stack: &StackKey) {
let drained: Vec<String> = {
let mut guard = lock_buckets(&self.state.buckets);
match guard.remove(stack) {
Some(bucket) => bucket.entries.into_iter().map(|e| e.vm_id).collect(),
None => return,
}
};
for vm_id in drained {
let _ = self.provider.destroy_vm(&vm_id);
self.state.counters.evicted.fetch_add(1, Ordering::Relaxed);
}
}
pub fn acquire(&self, stack: &StackKey) -> Option<WarmPoolHandle> {
loop {
let (vm_id, source_snapshot) = {
let mut guard = lock_buckets(&self.state.buckets);
let bucket = guard.get_mut(stack)?;
let entry = bucket.entries.pop()?;
(entry.vm_id, bucket.source_snapshot.clone())
};
match self.validator.validate(&vm_id) {
ValidationResult::Healthy => {
self.state.counters.acquired.fetch_add(1, Ordering::Relaxed);
return Some(WarmPoolHandle {
source_vm_id: vm_id,
stack: stack.clone(),
source_snapshot,
});
}
ValidationResult::Unhealthy(_reason) => {
self.state
.counters
.validation_failures
.fetch_add(1, Ordering::Relaxed);
let _ = self.provider.destroy_vm(&vm_id);
self.state.counters.evicted.fetch_add(1, Ordering::Relaxed);
}
}
}
}
pub fn metrics(&self) -> WarmPoolMetrics {
self.state.counters.snapshot()
}
pub fn shutdown(&mut self) {
self.shutdown_flag.store(true, Ordering::SeqCst);
if let Some(handle) = self.refill_handle.take() {
join_with_timeout(handle, Duration::from_millis(200));
}
let drained: Vec<String> = {
let mut guard = lock_buckets(&self.state.buckets);
let mut all = Vec::new();
for bucket in guard.values_mut() {
all.extend(bucket.entries.drain(..).map(|e| e.vm_id));
}
all
};
for vm_id in drained {
let _ = self.provider.destroy_vm(&vm_id);
self.state.counters.evicted.fetch_add(1, Ordering::Relaxed);
}
}
}
impl<P: VmProvider + VmQuery + 'static> Drop for WarmPool<P> {
fn drop(&mut self) {
self.shutdown_flag.store(true, Ordering::SeqCst);
if let Some(handle) = self.refill_handle.take() {
join_with_timeout(handle, Duration::from_millis(200));
}
let drained: Vec<String> = match self.state.buckets.lock() {
Ok(mut guard) => {
let mut all = Vec::new();
for bucket in guard.values_mut() {
all.extend(bucket.entries.drain(..).map(|e| e.vm_id));
}
all
}
Err(poisoned) => {
let mut guard = poisoned.into_inner();
let mut all = Vec::new();
for bucket in guard.values_mut() {
all.extend(bucket.entries.drain(..).map(|e| e.vm_id));
}
all
}
};
for vm_id in drained {
let _ = self.provider.destroy_vm(&vm_id);
}
}
}
fn spawn_refill_thread<P: VmProvider + VmQuery + Clone + 'static>(
provider: P,
config: WarmPoolConfig,
validator: Arc<dyn EntryValidator>,
state: Arc<PoolState>,
shutdown_flag: Arc<AtomicBool>,
) -> JoinHandle<()> {
match thread::Builder::new()
.name("microvm-warm-pool-refill".into())
.spawn(move || refill_loop(provider, config, validator, state, shutdown_flag))
{
Ok(handle) => handle,
Err(_) => thread::spawn(|| {}),
}
}
fn refill_loop<P: VmProvider + VmQuery + Clone + 'static>(
provider: P,
config: WarmPoolConfig,
validator: Arc<dyn EntryValidator>,
state: Arc<PoolState>,
shutdown_flag: Arc<AtomicBool>,
) {
loop {
if shutdown_flag.load(Ordering::SeqCst) {
return;
}
run_refill_tick(&provider, &config, validator.as_ref(), &state);
state.counters.refill_runs.fetch_add(1, Ordering::Relaxed);
let deadline = Instant::now() + config.refill_interval;
while Instant::now() < deadline {
if shutdown_flag.load(Ordering::SeqCst) {
return;
}
let remaining = deadline.saturating_duration_since(Instant::now());
let slice = remaining.min(Duration::from_millis(25));
if slice.is_zero() {
break;
}
thread::sleep(slice);
}
}
}
fn run_refill_tick<P: VmProvider + VmQuery + 'static>(
provider: &P,
config: &WarmPoolConfig,
validator: &dyn EntryValidator,
state: &Arc<PoolState>,
) {
let now = Instant::now();
let to_evict: Vec<String> = {
let mut guard = lock_buckets(&state.buckets);
let mut victims = Vec::new();
for bucket in guard.values_mut() {
bucket.entries.retain(|entry| {
let alive = now.duration_since(entry.created_at) <= config.entry_max_age;
if !alive {
victims.push(entry.vm_id.clone());
}
alive
});
}
victims
};
for vm_id in to_evict {
let _ = provider.destroy_vm(&vm_id);
state.counters.evicted.fetch_add(1, Ordering::Relaxed);
}
struct RefillJob {
stack: StackKey,
deficit: usize,
snapshot: SnapshotRef,
next_seed: u64,
}
let jobs: Vec<RefillJob> = {
let mut guard = lock_buckets(&state.buckets);
let mut planned = Vec::new();
for (stack, bucket) in guard.iter_mut() {
if bucket.entries.len() < config.min_depth {
let deficit = config.max_depth.saturating_sub(bucket.entries.len());
if deficit == 0 {
continue;
}
let start = bucket.next_seed;
bucket.next_seed = start.saturating_add(deficit as u64);
planned.push(RefillJob {
stack: stack.clone(),
deficit,
snapshot: bucket.source_snapshot.clone(),
next_seed: start,
});
}
}
planned
};
for job in jobs {
for i in 0..job.deficit {
let seed = job.next_seed.saturating_add(i as u64);
let vm_id = format!(
"warm-{stack}-{ver}-{seed}",
stack = sanitize(&job.stack.stack_name),
ver = sanitize(&job.stack.version),
seed = seed,
);
let spec = VmSpec {
vcpu_count: Some(job.stack.vcpu_count),
mem_size_mib: Some(job.stack.mem_size_mib),
restore_from: Some(job.snapshot.clone()),
..VmSpec::default()
};
if let Err(_e) = provider.create_vm_with_spec(&vm_id, &spec) {
break;
}
state.counters.created.fetch_add(1, Ordering::Relaxed);
match validator.validate(&vm_id) {
ValidationResult::Healthy => {
let mut guard = lock_buckets(&state.buckets);
match guard.get_mut(&job.stack) {
Some(bucket) => {
bucket.entries.push(PoolEntry {
vm_id,
created_at: Instant::now(),
});
}
None => {
drop(guard);
let _ = provider.destroy_vm(&vm_id);
state.counters.evicted.fetch_add(1, Ordering::Relaxed);
}
}
}
ValidationResult::Unhealthy(_reason) => {
state
.counters
.validation_failures
.fetch_add(1, Ordering::Relaxed);
let _ = provider.destroy_vm(&vm_id);
state.counters.evicted.fetch_add(1, Ordering::Relaxed);
}
}
}
}
}
fn sanitize(input: &str) -> String {
input
.chars()
.map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
.collect()
}
fn lock_buckets(
m: &Mutex<HashMap<StackKey, BucketState>>,
) -> MutexGuard<'_, HashMap<StackKey, BucketState>> {
match m.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
}
fn join_with_timeout(handle: JoinHandle<()>, timeout: Duration) {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if handle.is_finished() {
let _ = handle.join();
return;
}
thread::sleep(Duration::from_millis(5));
}
drop(handle);
}
#[cfg(test)]
mod tests {
use super::*;
use microvm_runtime::InMemoryVmProvider;
use microvm_runtime::model::{VmStatus, VmView};
use std::sync::Mutex as StdMutex;
fn stack_alpha() -> StackKey {
StackKey {
stack_name: "alpha".into(),
version: "1.0.0".into(),
vcpu_count: 1,
mem_size_mib: 128,
}
}
fn stack_beta() -> StackKey {
StackKey {
stack_name: "beta".into(),
version: "2.0.0".into(),
vcpu_count: 2,
mem_size_mib: 256,
}
}
fn snapshot_for(stack: &StackKey) -> SnapshotRef {
SnapshotRef {
vm_id: format!("tpl-{}", stack.stack_name),
snapshot_id: format!("snap-{}-{}", stack.stack_name, stack.version),
resume_immediately: true,
network_overrides: Vec::new(),
}
}
fn fast_config(refill_ms: u64) -> WarmPoolConfig {
WarmPoolConfig {
min_depth: 2,
max_depth: 3,
refill_interval: Duration::from_millis(refill_ms),
entry_max_age: Duration::from_secs(600),
}
}
struct AlwaysHealthy;
impl EntryValidator for AlwaysHealthy {
fn validate(&self, _vm_id: &str) -> ValidationResult {
ValidationResult::Healthy
}
}
fn wait_until<F: FnMut() -> bool>(timeout: Duration, mut f: F) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if f() {
return true;
}
thread::sleep(Duration::from_millis(10));
}
false
}
fn depth<P>(pool: &WarmPool<P>, stack: &StackKey) -> usize
where
P: VmProvider + VmQuery + Clone + 'static,
{
let guard = lock_buckets(&pool.state.buckets);
guard.get(stack).map(|b| b.entries.len()).unwrap_or(0)
}
#[test]
fn refill_brings_bucket_to_max_depth() {
let provider = InMemoryVmProvider::default();
let pool = WarmPool::start(provider.clone(), fast_config(20), Arc::new(AlwaysHealthy));
let stack = stack_alpha();
pool.register(stack.clone(), snapshot_for(&stack));
let reached = wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3);
assert!(
reached,
"expected depth == max_depth (3), got {}",
depth(&pool, &stack)
);
let m = pool.metrics();
assert_eq!(m.created_total, 3, "should have created exactly 3 entries");
assert_eq!(m.acquired_total, 0);
assert_eq!(m.validation_failures_total, 0);
thread::sleep(Duration::from_millis(150));
assert_eq!(depth(&pool, &stack), 3);
assert_eq!(pool.metrics().created_total, 3);
}
#[test]
fn acquire_returns_none_when_bucket_empty() {
let provider = InMemoryVmProvider::default();
let pool = WarmPool::start(
provider,
WarmPoolConfig {
refill_interval: Duration::from_secs(60),
..fast_config(60)
},
Arc::new(AlwaysHealthy),
);
let stack = stack_alpha();
assert!(pool.acquire(&stack).is_none(), "no bucket registered");
pool.register(stack.clone(), snapshot_for(&stack));
assert!(
pool.acquire(&stack).is_none(),
"registered bucket should still be empty"
);
assert_eq!(pool.metrics().acquired_total, 0);
}
#[test]
fn acquire_pops_healthy_entry_with_correct_snapshot() {
let provider = InMemoryVmProvider::default();
let pool = WarmPool::start(provider, fast_config(20), Arc::new(AlwaysHealthy));
let stack = stack_alpha();
let snap = snapshot_for(&stack);
pool.register(stack.clone(), snap.clone());
assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) >= 2));
let handle = pool.acquire(&stack).expect("warm entry available");
assert_eq!(handle.stack, stack);
assert_eq!(handle.source_snapshot.vm_id, snap.vm_id);
assert_eq!(handle.source_snapshot.snapshot_id, snap.snapshot_id);
assert!(handle.source_vm_id.starts_with("warm-alpha-1_0_0-"));
let m = pool.metrics();
assert_eq!(m.acquired_total, 1);
assert_eq!(m.validation_failures_total, 0);
}
#[test]
fn acquire_evicts_unhealthy_entries_and_returns_next_healthy() {
let provider = InMemoryVmProvider::default();
struct Phased {
phase: StdMutex<u8>, unhealthy_left: StdMutex<u32>,
}
impl EntryValidator for Phased {
fn validate(&self, _vm_id: &str) -> ValidationResult {
let phase = *self.phase.lock().expect("phase");
if phase == 0 {
return ValidationResult::Healthy;
}
let mut left = self.unhealthy_left.lock().expect("uh");
if *left > 0 {
*left -= 1;
ValidationResult::Unhealthy("scripted bad".into())
} else {
ValidationResult::Healthy
}
}
}
let validator = Arc::new(Phased {
phase: StdMutex::new(0),
unhealthy_left: StdMutex::new(2),
});
let validator_for_pool: Arc<dyn EntryValidator> = validator.clone();
let pool = WarmPool::start(provider.clone(), fast_config(20), validator_for_pool);
let stack = stack_alpha();
pool.register(stack.clone(), snapshot_for(&stack));
assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3));
*validator.phase.lock().expect("phase") = 1;
let handle = pool.acquire(&stack).expect("eventually returns healthy");
assert!(handle.source_vm_id.starts_with("warm-alpha-1_0_0-"));
let m = pool.metrics();
assert_eq!(m.validation_failures_total, 2, "two entries rejected");
assert_eq!(m.acquired_total, 1, "one entry served");
assert_eq!(m.evicted_total, 2);
let vms = provider.list_vms().expect("list");
let destroyed = vms
.iter()
.filter(|v| v.status == VmStatus::Destroyed)
.count();
assert_eq!(destroyed, 2, "two rejected entries should be destroyed");
}
#[test]
fn acquire_returns_none_after_all_entries_rejected() {
struct AlwaysBad;
impl EntryValidator for AlwaysBad {
fn validate(&self, _vm_id: &str) -> ValidationResult {
ValidationResult::Unhealthy("never trust this VM".into())
}
}
let provider = InMemoryVmProvider::default();
let pool = WarmPool::start(provider, fast_config(20), Arc::new(AlwaysBad));
let stack = stack_alpha();
pool.register(stack.clone(), snapshot_for(&stack));
assert!(wait_until(Duration::from_secs(2), || {
pool.metrics().validation_failures_total >= 3
}));
assert_eq!(depth(&pool, &stack), 0);
assert!(pool.acquire(&stack).is_none());
}
#[test]
fn entry_max_age_eviction() {
let provider = InMemoryVmProvider::default();
let config = WarmPoolConfig {
min_depth: 2,
max_depth: 3,
refill_interval: Duration::from_millis(30),
entry_max_age: Duration::from_millis(80),
};
let pool = WarmPool::start(provider.clone(), config, Arc::new(AlwaysHealthy));
let stack = stack_alpha();
pool.register(stack.clone(), snapshot_for(&stack));
assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3));
let after_fill = pool.metrics().created_total;
assert_eq!(after_fill, 3);
assert!(wait_until(Duration::from_secs(3), || {
let m = pool.metrics();
m.evicted_total >= 3 && m.created_total >= 6
}));
}
#[test]
fn register_and_unregister() {
let provider = InMemoryVmProvider::default();
let pool = WarmPool::start(provider.clone(), fast_config(20), Arc::new(AlwaysHealthy));
let alpha = stack_alpha();
let beta = stack_beta();
pool.register(alpha.clone(), snapshot_for(&alpha));
pool.register(beta.clone(), snapshot_for(&beta));
assert!(wait_until(Duration::from_secs(2), || {
depth(&pool, &alpha) == 3 && depth(&pool, &beta) == 3
}));
let evictions_before = pool.metrics().evicted_total;
pool.unregister(&alpha);
{
let guard = lock_buckets(&pool.state.buckets);
assert!(!guard.contains_key(&alpha));
assert_eq!(guard.get(&beta).map(|b| b.entries.len()), Some(3));
}
let m = pool.metrics();
assert_eq!(m.evicted_total - evictions_before, 3);
}
#[test]
fn register_replaces_snapshot_for_existing_bucket() {
let provider = InMemoryVmProvider::default();
let pool = WarmPool::start(
provider,
WarmPoolConfig {
refill_interval: Duration::from_secs(60),
..fast_config(60)
},
Arc::new(AlwaysHealthy),
);
let stack = stack_alpha();
let snap_v1 = SnapshotRef {
vm_id: "tpl-v1".into(),
snapshot_id: "snap-v1".into(),
resume_immediately: true,
network_overrides: Vec::new(),
};
let snap_v2 = SnapshotRef {
vm_id: "tpl-v2".into(),
snapshot_id: "snap-v2".into(),
resume_immediately: true,
network_overrides: Vec::new(),
};
pool.register(stack.clone(), snap_v1);
pool.register(stack.clone(), snap_v2.clone());
let guard = lock_buckets(&pool.state.buckets);
let bucket = guard.get(&stack).expect("bucket");
assert_eq!(bucket.source_snapshot.snapshot_id, snap_v2.snapshot_id);
}
#[test]
fn metrics_increment_correctly() {
let provider = InMemoryVmProvider::default();
let pool = WarmPool::start(provider, fast_config(20), Arc::new(AlwaysHealthy));
let stack = stack_alpha();
pool.register(stack.clone(), snapshot_for(&stack));
assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3));
let _h1 = pool.acquire(&stack).expect("h1");
let _h2 = pool.acquire(&stack).expect("h2");
let m = pool.metrics();
assert!(m.created_total >= 3);
assert_eq!(m.acquired_total, 2);
assert!(m.refill_runs_total >= 1);
}
#[test]
fn shutdown_stops_refill_within_budget() {
let provider = InMemoryVmProvider::default();
let config = WarmPoolConfig {
min_depth: 1,
max_depth: 2,
refill_interval: Duration::from_secs(1),
entry_max_age: Duration::from_secs(600),
};
let mut pool = WarmPool::start(provider, config, Arc::new(AlwaysHealthy));
let stack = stack_alpha();
pool.register(stack.clone(), snapshot_for(&stack));
thread::sleep(Duration::from_millis(50));
let start = Instant::now();
pool.shutdown();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(500),
"shutdown took {elapsed:?}, expected < 500ms"
);
let guard = lock_buckets(&pool.state.buckets);
for (_k, bucket) in guard.iter() {
assert!(bucket.entries.is_empty(), "all entries should be drained");
}
}
#[test]
fn drop_destroys_remaining_entries() {
let provider = InMemoryVmProvider::default();
let stack = stack_alpha();
{
let pool = WarmPool::start(provider.clone(), fast_config(20), Arc::new(AlwaysHealthy));
pool.register(stack.clone(), snapshot_for(&stack));
assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3));
}
let vms = provider.list_vms().expect("list");
let alpha_vms: Vec<&VmView> = vms
.iter()
.filter(|v| v.vm_id.starts_with("warm-alpha-"))
.collect();
assert_eq!(alpha_vms.len(), 3);
for v in alpha_vms {
assert_eq!(
v.status,
VmStatus::Destroyed,
"vm {} not destroyed",
v.vm_id
);
}
}
#[test]
fn refill_interval_is_honored_under_short_budget() {
let provider = InMemoryVmProvider::default();
let config = WarmPoolConfig {
min_depth: 1,
max_depth: 1,
refill_interval: Duration::from_millis(50),
entry_max_age: Duration::from_secs(600),
};
let pool = WarmPool::start(provider, config, Arc::new(AlwaysHealthy));
thread::sleep(Duration::from_millis(350));
let runs = pool.metrics().refill_runs_total;
assert!(
runs >= 3,
"expected refill thread to tick at least 3 times in 350ms, got {runs}"
);
}
#[test]
fn validator_failure_reason_is_logged_via_metrics_counter() {
struct Counting {
calls: AtomicU64,
}
impl EntryValidator for Counting {
fn validate(&self, _vm_id: &str) -> ValidationResult {
let n = self.calls.fetch_add(1, Ordering::SeqCst);
if n.is_multiple_of(2) {
ValidationResult::Healthy
} else {
ValidationResult::Unhealthy(format!("scripted-fail-{n}"))
}
}
}
let provider = InMemoryVmProvider::default();
let pool = WarmPool::start(
provider,
fast_config(20),
Arc::new(Counting {
calls: AtomicU64::new(0),
}),
);
let stack = stack_alpha();
pool.register(stack.clone(), snapshot_for(&stack));
assert!(wait_until(Duration::from_secs(3), || {
pool.metrics().validation_failures_total >= 1
}));
assert!(pool.metrics().created_total >= 2);
}
#[test]
fn separate_buckets_do_not_interfere() {
let provider = InMemoryVmProvider::default();
let pool = WarmPool::start(provider, fast_config(20), Arc::new(AlwaysHealthy));
let alpha = stack_alpha();
let beta = stack_beta();
pool.register(alpha.clone(), snapshot_for(&alpha));
pool.register(beta.clone(), snapshot_for(&beta));
assert!(wait_until(Duration::from_secs(2), || {
depth(&pool, &alpha) == 3 && depth(&pool, &beta) == 3
}));
for _ in 0..3 {
assert!(pool.acquire(&alpha).is_some());
}
assert_eq!(depth(&pool, &beta), 3);
}
#[test]
fn unregister_unknown_key_is_noop() {
let provider = InMemoryVmProvider::default();
let pool = WarmPool::start(provider, fast_config(60), Arc::new(AlwaysHealthy));
let stack = stack_alpha();
pool.unregister(&stack);
assert_eq!(pool.metrics().evicted_total, 0);
}
#[test]
fn sanitize_handles_special_characters() {
assert_eq!(sanitize("alpha"), "alpha");
assert_eq!(sanitize("1.0.0"), "1_0_0");
assert_eq!(sanitize("a/b c"), "a_b_c");
assert_eq!(sanitize("ALPHA-9"), "ALPHA_9");
}
}