Skip to main content

microvm_warm_pool/
pool.rs

1//! Warm pool harness.
2//!
3//! Keeps a configurable number of pre-restored microVMs alive per [`StackKey`],
4//! refilled by a single background thread. The pool itself never speaks the
5//! Firecracker HTTP API directly — every state change is delegated to a
6//! [`VmProvider`] implementation (typically
7//! [`microvm_runtime::FirecrackerVmProvider`] in production, or
8//! [`microvm_runtime::InMemoryVmProvider`] in tests).
9//!
10//! # Shutdown semantics
11//!
12//! [`WarmPool::shutdown`] sets an atomic flag observed by the refill thread,
13//! joins with a 200 ms budget, then best-effort destroys every live entry.
14//! Mirrors the shutdown pattern used by `microvm_runtime::metrics` and
15//! `microvm_runtime::console`. [`Drop`] calls `shutdown` automatically.
16
17use std::collections::HashMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
20use std::sync::{Arc, Mutex, MutexGuard};
21use std::thread::{self, JoinHandle};
22use std::time::{Duration, Instant};
23
24use microvm_runtime::model::{SnapshotRef, VmSpec};
25use microvm_runtime::provider::{VmProvider, VmQuery};
26
27use crate::stack_key::StackKey;
28
29/// Tunables for the pool's refill scheduler and eviction policy.
30#[derive(Debug, Clone)]
31pub struct WarmPoolConfig {
32    /// Low-water mark per registered key. When a bucket drops below this depth
33    /// the refill thread tops it up.
34    pub min_depth: usize,
35    /// High-water mark per registered key. Refill stops once depth hits this.
36    pub max_depth: usize,
37    /// Refill scheduler interval. The thread sleeps this long between sweeps.
38    /// Lower values trade CPU + provider load for shorter recovery after a
39    /// burst of acquires.
40    pub refill_interval: Duration,
41    /// Maximum age of a pool entry before it is evicted and recreated. Defends
42    /// against stale snapshots (TAP leaks, kernel-side timer drift, sidecar
43    /// caches gone stale, etc.).
44    pub entry_max_age: Duration,
45}
46
47impl Default for WarmPoolConfig {
48    fn default() -> Self {
49        Self {
50            min_depth: 1,
51            max_depth: 4,
52            refill_interval: Duration::from_secs(5),
53            entry_max_age: Duration::from_secs(600),
54        }
55    }
56}
57
58/// Verdict returned by [`EntryValidator::validate`].
59#[derive(Debug, Clone)]
60pub enum ValidationResult {
61    /// Entry is safe to hand off.
62    Healthy,
63    /// Entry is rotten — destroy it and try the next one. The string is logged
64    /// alongside metrics so operators can distinguish failure modes.
65    Unhealthy(String),
66}
67
68/// Per-entry health probe, run synchronously on every [`WarmPool::acquire`]
69/// and after every successful refill spawn.
70///
71/// Implementations typically do a sidecar `GET /health` over the entry's
72/// vsock socket or a TCP probe to a known guest port. Anything that signals
73/// "this VM is actually serving requests right now," not just "Firecracker
74/// thinks it's alive."
75pub trait EntryValidator: Send + Sync + 'static {
76    /// Probe an entry's health. Called synchronously — return quickly, or the
77    /// acquire path stalls.
78    fn validate(&self, vm_id: &str) -> ValidationResult;
79}
80
81/// Snapshot of counters intended for Prometheus-style scraping. Cloned cheaply
82/// via [`WarmPool::metrics`]; no metrics library dependency by design.
83#[derive(Debug, Default, Clone)]
84pub struct WarmPoolMetrics {
85    /// Total number of pool entries ever spawned (refill thread).
86    pub created_total: u64,
87    /// Total number of successful acquires.
88    pub acquired_total: u64,
89    /// Total number of entries evicted (age, validation failure, shutdown).
90    pub evicted_total: u64,
91    /// Total number of refill scheduler ticks the thread has executed.
92    pub refill_runs_total: u64,
93    /// Total number of acquires that drained an unhealthy entry before
94    /// returning. Indicates pool quality, not pool throughput.
95    pub validation_failures_total: u64,
96}
97
98/// A pre-restored VM checked out of the pool, ready for the caller to bring up
99/// under a new tenant identity.
100///
101/// The caller takes the [`SnapshotRef`] out of the handle, attaches their
102/// per-tenant `network_overrides`, and feeds it to
103/// `provider.create_vm_with_spec(new_vm_id, &VmSpec { restore_from: Some(...), ..VmSpec::default() })`.
104/// Firecracker 1.10+ honors the override on restore — the heavy kernel/init
105/// path is already done, so the new tenant only pays the restore cost.
106///
107/// `source_vm_id` is informational: it identifies the pool entry that was
108/// drained for this acquire. The pool has already removed it from its tracking
109/// map by the time the handle is returned, so the caller doesn't have to
110/// worry about double-handoff.
111#[derive(Debug, Clone)]
112pub struct WarmPoolHandle {
113    /// VM ID the pool was tracking before this acquire. Pool no longer holds
114    /// any reference to it; the caller owns it.
115    pub source_vm_id: String,
116    /// Bucket key this entry was drained from.
117    pub stack: StackKey,
118    /// Snapshot that was registered for this bucket. Pass to
119    /// [`VmProvider::create_vm_with_spec`] after setting
120    /// [`SnapshotRef::network_overrides`] for the new tenant.
121    pub source_snapshot: SnapshotRef,
122}
123
124/// One live entry in a bucket.
125#[derive(Debug)]
126struct PoolEntry {
127    vm_id: String,
128    created_at: Instant,
129}
130
131/// Per-bucket state.
132#[derive(Debug)]
133struct BucketState {
134    /// Snapshot the refill thread restores from for new entries.
135    source_snapshot: SnapshotRef,
136    /// Live, validated-on-spawn entries ready for handoff.
137    entries: Vec<PoolEntry>,
138    /// Monotonic counter for unique seed IDs.
139    next_seed: u64,
140}
141
142/// Atomic counters mirrored into [`WarmPoolMetrics`].
143#[derive(Debug, Default)]
144struct Counters {
145    created: AtomicU64,
146    acquired: AtomicU64,
147    evicted: AtomicU64,
148    refill_runs: AtomicU64,
149    validation_failures: AtomicU64,
150}
151
152impl Counters {
153    fn snapshot(&self) -> WarmPoolMetrics {
154        WarmPoolMetrics {
155            created_total: self.created.load(Ordering::Relaxed),
156            acquired_total: self.acquired.load(Ordering::Relaxed),
157            evicted_total: self.evicted.load(Ordering::Relaxed),
158            refill_runs_total: self.refill_runs.load(Ordering::Relaxed),
159            validation_failures_total: self.validation_failures.load(Ordering::Relaxed),
160        }
161    }
162}
163
164/// Shared state between owner-thread API calls and the refill thread.
165struct PoolState {
166    buckets: Mutex<HashMap<StackKey, BucketState>>,
167    counters: Counters,
168}
169
170/// Warm pool of pre-restored microVMs.
171///
172/// Construct with [`WarmPool::start`]; register snapshots to keep warm with
173/// [`WarmPool::register`]; check out entries with [`WarmPool::acquire`].
174///
175/// The pool is `Send + Sync`; clone the inner `Arc` if you need to pass it
176/// across thread boundaries.
177pub struct WarmPool<P: VmProvider + VmQuery + 'static> {
178    provider: P,
179    config: WarmPoolConfig,
180    validator: Arc<dyn EntryValidator>,
181    state: Arc<PoolState>,
182    shutdown_flag: Arc<AtomicBool>,
183    refill_handle: Option<JoinHandle<()>>,
184}
185
186impl<P: VmProvider + VmQuery + 'static> fmt::Debug for WarmPool<P> {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        let depths: HashMap<StackKey, usize> = match self.state.buckets.lock() {
189            Ok(guard) => guard
190                .iter()
191                .map(|(k, b)| (k.clone(), b.entries.len()))
192                .collect(),
193            Err(poisoned) => poisoned
194                .into_inner()
195                .iter()
196                .map(|(k, b)| (k.clone(), b.entries.len()))
197                .collect(),
198        };
199        f.debug_struct("WarmPool")
200            .field("config", &self.config)
201            .field("depths", &depths)
202            .field(
203                "shutdown", // current flag value, not a static field
204                &self.shutdown_flag.load(Ordering::Relaxed),
205            )
206            .finish()
207    }
208}
209
210impl<P: VmProvider + VmQuery + Clone + 'static> WarmPool<P> {
211    /// Start a pool with a fresh refill thread.
212    ///
213    /// The pool holds a clone of `provider`; another clone runs inside the
214    /// refill thread. Providers from `microvm-runtime` are cheap to clone
215    /// (`Arc`-backed).
216    pub fn start(provider: P, config: WarmPoolConfig, validator: Arc<dyn EntryValidator>) -> Self {
217        let state = Arc::new(PoolState {
218            buckets: Mutex::new(HashMap::new()),
219            counters: Counters::default(),
220        });
221        let shutdown_flag = Arc::new(AtomicBool::new(false));
222
223        let refill_handle = spawn_refill_thread(
224            provider.clone(),
225            config.clone(),
226            Arc::clone(&validator),
227            Arc::clone(&state),
228            Arc::clone(&shutdown_flag),
229        );
230
231        Self {
232            provider,
233            config,
234            validator,
235            state,
236            shutdown_flag,
237            refill_handle: Some(refill_handle),
238        }
239    }
240
241    /// Tell the pool which snapshot to keep warm for `stack`. The refill
242    /// thread will bring this bucket up to `min_depth` on its next tick.
243    ///
244    /// Calling `register` again for the same key replaces the snapshot but
245    /// keeps existing entries — they continue to be served until evicted by
246    /// age or validation. Operators wanting hard-cutover should pair this
247    /// with [`unregister`](Self::unregister).
248    pub fn register(&self, stack: StackKey, source_snapshot: SnapshotRef) {
249        let mut guard = lock_buckets(&self.state.buckets);
250        guard
251            .entry(stack)
252            .and_modify(|b| b.source_snapshot = source_snapshot.clone())
253            .or_insert_with(|| BucketState {
254                source_snapshot,
255                entries: Vec::new(),
256                next_seed: 0,
257            });
258    }
259
260    /// Stop tracking `stack`. Existing entries are destroyed immediately
261    /// (best-effort) so the pool doesn't keep paying for them.
262    pub fn unregister(&self, stack: &StackKey) {
263        let drained: Vec<String> = {
264            let mut guard = lock_buckets(&self.state.buckets);
265            match guard.remove(stack) {
266                Some(bucket) => bucket.entries.into_iter().map(|e| e.vm_id).collect(),
267                None => return,
268            }
269        };
270        for vm_id in drained {
271            let _ = self.provider.destroy_vm(&vm_id);
272            self.state.counters.evicted.fetch_add(1, Ordering::Relaxed);
273        }
274    }
275
276    /// Hand off a pre-restored VM. Pops an entry from `stack`'s bucket and
277    /// validates it; on `Healthy`, returns a [`WarmPoolHandle`]. On
278    /// `Unhealthy`, the entry is destroyed (validation_failures counter
279    /// increments) and the next entry is tried. Returns `None` if the bucket
280    /// is empty or every remaining entry failed validation.
281    ///
282    /// The returned [`WarmPoolHandle::source_snapshot`] is what the caller
283    /// passes back to [`VmProvider::create_vm_with_spec`] after applying its
284    /// own [`SnapshotRef::network_overrides`].
285    pub fn acquire(&self, stack: &StackKey) -> Option<WarmPoolHandle> {
286        loop {
287            let (vm_id, source_snapshot) = {
288                let mut guard = lock_buckets(&self.state.buckets);
289                let bucket = guard.get_mut(stack)?;
290                let entry = bucket.entries.pop()?;
291                (entry.vm_id, bucket.source_snapshot.clone())
292            };
293
294            match self.validator.validate(&vm_id) {
295                ValidationResult::Healthy => {
296                    self.state.counters.acquired.fetch_add(1, Ordering::Relaxed);
297                    return Some(WarmPoolHandle {
298                        source_vm_id: vm_id,
299                        stack: stack.clone(),
300                        source_snapshot,
301                    });
302                }
303                ValidationResult::Unhealthy(_reason) => {
304                    self.state
305                        .counters
306                        .validation_failures
307                        .fetch_add(1, Ordering::Relaxed);
308                    let _ = self.provider.destroy_vm(&vm_id);
309                    self.state.counters.evicted.fetch_add(1, Ordering::Relaxed);
310                    // loop and try the next entry
311                }
312            }
313        }
314    }
315
316    /// Snapshot of counters for observability scraping.
317    pub fn metrics(&self) -> WarmPoolMetrics {
318        self.state.counters.snapshot()
319    }
320
321    /// Stop the refill thread and best-effort destroy all live entries.
322    ///
323    /// Idempotent. [`Drop`] also calls this.
324    pub fn shutdown(&mut self) {
325        self.shutdown_flag.store(true, Ordering::SeqCst);
326        if let Some(handle) = self.refill_handle.take() {
327            join_with_timeout(handle, Duration::from_millis(200));
328        }
329
330        let drained: Vec<String> = {
331            let mut guard = lock_buckets(&self.state.buckets);
332            let mut all = Vec::new();
333            for bucket in guard.values_mut() {
334                all.extend(bucket.entries.drain(..).map(|e| e.vm_id));
335            }
336            all
337        };
338        for vm_id in drained {
339            let _ = self.provider.destroy_vm(&vm_id);
340            self.state.counters.evicted.fetch_add(1, Ordering::Relaxed);
341        }
342    }
343}
344
345impl<P: VmProvider + VmQuery + 'static> Drop for WarmPool<P> {
346    fn drop(&mut self) {
347        // Best-effort cleanup. `shutdown` is idempotent so calling it on an
348        // already-shut-down pool is safe; the refill_handle is `None` and
349        // the bucket map is already empty.
350        self.shutdown_flag.store(true, Ordering::SeqCst);
351        if let Some(handle) = self.refill_handle.take() {
352            join_with_timeout(handle, Duration::from_millis(200));
353        }
354        let drained: Vec<String> = match self.state.buckets.lock() {
355            Ok(mut guard) => {
356                let mut all = Vec::new();
357                for bucket in guard.values_mut() {
358                    all.extend(bucket.entries.drain(..).map(|e| e.vm_id));
359                }
360                all
361            }
362            Err(poisoned) => {
363                let mut guard = poisoned.into_inner();
364                let mut all = Vec::new();
365                for bucket in guard.values_mut() {
366                    all.extend(bucket.entries.drain(..).map(|e| e.vm_id));
367                }
368                all
369            }
370        };
371        for vm_id in drained {
372            let _ = self.provider.destroy_vm(&vm_id);
373        }
374    }
375}
376
377/// Spawn the single refill thread that services every registered bucket.
378fn spawn_refill_thread<P: VmProvider + VmQuery + Clone + 'static>(
379    provider: P,
380    config: WarmPoolConfig,
381    validator: Arc<dyn EntryValidator>,
382    state: Arc<PoolState>,
383    shutdown_flag: Arc<AtomicBool>,
384) -> JoinHandle<()> {
385    // Builder::name() unwraps are reserved for "this can never fail in
386    // practice and a failure means the OS is out of TIDs," but to stay in line
387    // with the "no unwrap outside tests" rule we fall back to spawn() if the
388    // named build fails for any reason.
389    match thread::Builder::new()
390        .name("microvm-warm-pool-refill".into())
391        .spawn(move || refill_loop(provider, config, validator, state, shutdown_flag))
392    {
393        Ok(handle) => handle,
394        Err(_) => thread::spawn(|| {}),
395    }
396}
397
398fn refill_loop<P: VmProvider + VmQuery + Clone + 'static>(
399    provider: P,
400    config: WarmPoolConfig,
401    validator: Arc<dyn EntryValidator>,
402    state: Arc<PoolState>,
403    shutdown_flag: Arc<AtomicBool>,
404) {
405    loop {
406        if shutdown_flag.load(Ordering::SeqCst) {
407            return;
408        }
409        run_refill_tick(&provider, &config, validator.as_ref(), &state);
410        state.counters.refill_runs.fetch_add(1, Ordering::Relaxed);
411
412        // Sleep in small slices so shutdown takes effect promptly even with a
413        // long refill_interval.
414        let deadline = Instant::now() + config.refill_interval;
415        while Instant::now() < deadline {
416            if shutdown_flag.load(Ordering::SeqCst) {
417                return;
418            }
419            let remaining = deadline.saturating_duration_since(Instant::now());
420            let slice = remaining.min(Duration::from_millis(25));
421            if slice.is_zero() {
422                break;
423            }
424            thread::sleep(slice);
425        }
426    }
427}
428
429/// One pass of the refill scheduler:
430///
431/// 1. Evict any entry older than `entry_max_age`.
432/// 2. For each registered key below `min_depth`, top up toward `max_depth` by
433///    spawning + validating new entries.
434fn run_refill_tick<P: VmProvider + VmQuery + 'static>(
435    provider: &P,
436    config: &WarmPoolConfig,
437    validator: &dyn EntryValidator,
438    state: &Arc<PoolState>,
439) {
440    // --- 1. Age-based eviction ----------------------------------------------
441    let now = Instant::now();
442    let to_evict: Vec<String> = {
443        let mut guard = lock_buckets(&state.buckets);
444        let mut victims = Vec::new();
445        for bucket in guard.values_mut() {
446            bucket.entries.retain(|entry| {
447                let alive = now.duration_since(entry.created_at) <= config.entry_max_age;
448                if !alive {
449                    victims.push(entry.vm_id.clone());
450                }
451                alive
452            });
453        }
454        victims
455    };
456    for vm_id in to_evict {
457        let _ = provider.destroy_vm(&vm_id);
458        state.counters.evicted.fetch_add(1, Ordering::Relaxed);
459    }
460
461    // --- 2. Plan refills (under-lock snapshot of work to do) ----------------
462    struct RefillJob {
463        stack: StackKey,
464        deficit: usize,
465        snapshot: SnapshotRef,
466        next_seed: u64,
467    }
468    let jobs: Vec<RefillJob> = {
469        let mut guard = lock_buckets(&state.buckets);
470        let mut planned = Vec::new();
471        for (stack, bucket) in guard.iter_mut() {
472            if bucket.entries.len() < config.min_depth {
473                let deficit = config.max_depth.saturating_sub(bucket.entries.len());
474                if deficit == 0 {
475                    continue;
476                }
477                let start = bucket.next_seed;
478                // Reserve enough seed numbers up front so that two concurrent
479                // refill iterations (today: only one thread, but defensive)
480                // can never collide on a name.
481                bucket.next_seed = start.saturating_add(deficit as u64);
482                planned.push(RefillJob {
483                    stack: stack.clone(),
484                    deficit,
485                    snapshot: bucket.source_snapshot.clone(),
486                    next_seed: start,
487                });
488            }
489        }
490        planned
491    };
492
493    // --- 3. Spawn each planned entry without holding the lock ---------------
494    for job in jobs {
495        for i in 0..job.deficit {
496            let seed = job.next_seed.saturating_add(i as u64);
497            let vm_id = format!(
498                "warm-{stack}-{ver}-{seed}",
499                stack = sanitize(&job.stack.stack_name),
500                ver = sanitize(&job.stack.version),
501                seed = seed,
502            );
503
504            let spec = VmSpec {
505                vcpu_count: Some(job.stack.vcpu_count),
506                mem_size_mib: Some(job.stack.mem_size_mib),
507                restore_from: Some(job.snapshot.clone()),
508                ..VmSpec::default()
509            };
510
511            if let Err(_e) = provider.create_vm_with_spec(&vm_id, &spec) {
512                // Don't keep retrying inside one tick — the next scheduler
513                // tick will revisit the deficit. Avoids hot-spinning on a
514                // broken provider.
515                break;
516            }
517            state.counters.created.fetch_add(1, Ordering::Relaxed);
518
519            match validator.validate(&vm_id) {
520                ValidationResult::Healthy => {
521                    let mut guard = lock_buckets(&state.buckets);
522                    // Bucket might have been unregistered between plan and
523                    // insert; in that case, destroy the orphan instead of
524                    // leaking it.
525                    match guard.get_mut(&job.stack) {
526                        Some(bucket) => {
527                            bucket.entries.push(PoolEntry {
528                                vm_id,
529                                created_at: Instant::now(),
530                            });
531                        }
532                        None => {
533                            drop(guard);
534                            let _ = provider.destroy_vm(&vm_id);
535                            state.counters.evicted.fetch_add(1, Ordering::Relaxed);
536                        }
537                    }
538                }
539                ValidationResult::Unhealthy(_reason) => {
540                    state
541                        .counters
542                        .validation_failures
543                        .fetch_add(1, Ordering::Relaxed);
544                    let _ = provider.destroy_vm(&vm_id);
545                    state.counters.evicted.fetch_add(1, Ordering::Relaxed);
546                }
547            }
548        }
549    }
550}
551
552/// Map characters in stack name / version to something safe for a VM ID.
553///
554/// `VmProvider` implementations vary in how strict they are about VM IDs
555/// (filesystem-safe, URL-safe, etc.). Keeping the seed ID to `[A-Za-z0-9_]`
556/// is the conservative intersection.
557fn sanitize(input: &str) -> String {
558    input
559        .chars()
560        .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
561        .collect()
562}
563
564fn lock_buckets(
565    m: &Mutex<HashMap<StackKey, BucketState>>,
566) -> MutexGuard<'_, HashMap<StackKey, BucketState>> {
567    // Recover from a poisoned mutex by taking the inner guard. Matches the
568    // `into_inner` convention used elsewhere in microvm-runtime.
569    match m.lock() {
570        Ok(guard) => guard,
571        Err(poisoned) => poisoned.into_inner(),
572    }
573}
574
575fn join_with_timeout(handle: JoinHandle<()>, timeout: Duration) {
576    let deadline = Instant::now() + timeout;
577    while Instant::now() < deadline {
578        if handle.is_finished() {
579            let _ = handle.join();
580            return;
581        }
582        thread::sleep(Duration::from_millis(5));
583    }
584    // Timeout: leak the handle. The thread will exit on its next loop
585    // iteration when it observes the shutdown flag.
586    drop(handle);
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592    use microvm_runtime::InMemoryVmProvider;
593    use microvm_runtime::model::{VmStatus, VmView};
594    use std::sync::Mutex as StdMutex;
595
596    fn stack_alpha() -> StackKey {
597        StackKey {
598            stack_name: "alpha".into(),
599            version: "1.0.0".into(),
600            vcpu_count: 1,
601            mem_size_mib: 128,
602        }
603    }
604
605    fn stack_beta() -> StackKey {
606        StackKey {
607            stack_name: "beta".into(),
608            version: "2.0.0".into(),
609            vcpu_count: 2,
610            mem_size_mib: 256,
611        }
612    }
613
614    fn snapshot_for(stack: &StackKey) -> SnapshotRef {
615        SnapshotRef {
616            vm_id: format!("tpl-{}", stack.stack_name),
617            snapshot_id: format!("snap-{}-{}", stack.stack_name, stack.version),
618            resume_immediately: true,
619            network_overrides: Vec::new(),
620        }
621    }
622
623    fn fast_config(refill_ms: u64) -> WarmPoolConfig {
624        WarmPoolConfig {
625            min_depth: 2,
626            max_depth: 3,
627            refill_interval: Duration::from_millis(refill_ms),
628            entry_max_age: Duration::from_secs(600),
629        }
630    }
631
632    /// Validator that always reports `Healthy`.
633    struct AlwaysHealthy;
634    impl EntryValidator for AlwaysHealthy {
635        fn validate(&self, _vm_id: &str) -> ValidationResult {
636            ValidationResult::Healthy
637        }
638    }
639
640    fn wait_until<F: FnMut() -> bool>(timeout: Duration, mut f: F) -> bool {
641        let deadline = Instant::now() + timeout;
642        while Instant::now() < deadline {
643            if f() {
644                return true;
645            }
646            thread::sleep(Duration::from_millis(10));
647        }
648        false
649    }
650
651    fn depth<P>(pool: &WarmPool<P>, stack: &StackKey) -> usize
652    where
653        P: VmProvider + VmQuery + Clone + 'static,
654    {
655        let guard = lock_buckets(&pool.state.buckets);
656        guard.get(stack).map(|b| b.entries.len()).unwrap_or(0)
657    }
658
659    #[test]
660    fn refill_brings_bucket_to_max_depth() {
661        let provider = InMemoryVmProvider::default();
662        let pool = WarmPool::start(provider.clone(), fast_config(20), Arc::new(AlwaysHealthy));
663        let stack = stack_alpha();
664        pool.register(stack.clone(), snapshot_for(&stack));
665
666        let reached = wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3);
667        assert!(
668            reached,
669            "expected depth == max_depth (3), got {}",
670            depth(&pool, &stack)
671        );
672
673        let m = pool.metrics();
674        assert_eq!(m.created_total, 3, "should have created exactly 3 entries");
675        assert_eq!(m.acquired_total, 0);
676        assert_eq!(m.validation_failures_total, 0);
677
678        // Refilling stops at max_depth — give the scheduler a few more ticks
679        // and confirm we don't blow past 3.
680        thread::sleep(Duration::from_millis(150));
681        assert_eq!(depth(&pool, &stack), 3);
682        assert_eq!(pool.metrics().created_total, 3);
683    }
684
685    #[test]
686    fn acquire_returns_none_when_bucket_empty() {
687        let provider = InMemoryVmProvider::default();
688        let pool = WarmPool::start(
689            provider,
690            WarmPoolConfig {
691                refill_interval: Duration::from_secs(60),
692                ..fast_config(60)
693            },
694            Arc::new(AlwaysHealthy),
695        );
696        let stack = stack_alpha();
697        assert!(pool.acquire(&stack).is_none(), "no bucket registered");
698
699        // Register but don't wait for refill — bucket exists but is empty.
700        pool.register(stack.clone(), snapshot_for(&stack));
701        assert!(
702            pool.acquire(&stack).is_none(),
703            "registered bucket should still be empty"
704        );
705        assert_eq!(pool.metrics().acquired_total, 0);
706    }
707
708    #[test]
709    fn acquire_pops_healthy_entry_with_correct_snapshot() {
710        let provider = InMemoryVmProvider::default();
711        let pool = WarmPool::start(provider, fast_config(20), Arc::new(AlwaysHealthy));
712        let stack = stack_alpha();
713        let snap = snapshot_for(&stack);
714        pool.register(stack.clone(), snap.clone());
715
716        assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) >= 2));
717
718        let handle = pool.acquire(&stack).expect("warm entry available");
719        assert_eq!(handle.stack, stack);
720        assert_eq!(handle.source_snapshot.vm_id, snap.vm_id);
721        assert_eq!(handle.source_snapshot.snapshot_id, snap.snapshot_id);
722        assert!(handle.source_vm_id.starts_with("warm-alpha-1_0_0-"));
723
724        let m = pool.metrics();
725        assert_eq!(m.acquired_total, 1);
726        assert_eq!(m.validation_failures_total, 0);
727    }
728
729    #[test]
730    fn acquire_evicts_unhealthy_entries_and_returns_next_healthy() {
731        let provider = InMemoryVmProvider::default();
732        // Refill validator: every spawned entry is Healthy. acquire-time
733        // validator: queued unhealthy verdicts. We need to keep them separate
734        // since the same validator is shared.
735        //
736        // Strategy: keep two pending Unhealthy verdicts at acquire time. The
737        // refill thread fills the bucket first (all Healthy because that's
738        // what the scripted queue produces if we let it run with Healthy
739        // verdicts pre-queued — but the simpler arrangement is to use a
740        // dedicated counter and feed Unhealthy only after refill completes).
741        //
742        // We avoid that complexity by registering AFTER pre-warming via a
743        // helper validator that delays unhealthy verdicts until a flag flips.
744        struct Phased {
745            phase: StdMutex<u8>, // 0 = refill (Healthy), 1 = acquire window (Unhealthy x2 then Healthy)
746            unhealthy_left: StdMutex<u32>,
747        }
748        impl EntryValidator for Phased {
749            fn validate(&self, _vm_id: &str) -> ValidationResult {
750                let phase = *self.phase.lock().expect("phase");
751                if phase == 0 {
752                    return ValidationResult::Healthy;
753                }
754                let mut left = self.unhealthy_left.lock().expect("uh");
755                if *left > 0 {
756                    *left -= 1;
757                    ValidationResult::Unhealthy("scripted bad".into())
758                } else {
759                    ValidationResult::Healthy
760                }
761            }
762        }
763        let validator = Arc::new(Phased {
764            phase: StdMutex::new(0),
765            unhealthy_left: StdMutex::new(2),
766        });
767        let validator_for_pool: Arc<dyn EntryValidator> = validator.clone();
768
769        let pool = WarmPool::start(provider.clone(), fast_config(20), validator_for_pool);
770        let stack = stack_alpha();
771        pool.register(stack.clone(), snapshot_for(&stack));
772
773        assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3));
774
775        // Flip into acquire phase: validator will now reject 2 entries.
776        *validator.phase.lock().expect("phase") = 1;
777
778        let handle = pool.acquire(&stack).expect("eventually returns healthy");
779        assert!(handle.source_vm_id.starts_with("warm-alpha-1_0_0-"));
780
781        let m = pool.metrics();
782        assert_eq!(m.validation_failures_total, 2, "two entries rejected");
783        assert_eq!(m.acquired_total, 1, "one entry served");
784        // 2 rejected + 0 aged = 2 evictions (acquired entry is not an
785        // eviction; it was handed off).
786        assert_eq!(m.evicted_total, 2);
787
788        // The destroyed VMs should be Destroyed in the underlying provider.
789        let vms = provider.list_vms().expect("list");
790        let destroyed = vms
791            .iter()
792            .filter(|v| v.status == VmStatus::Destroyed)
793            .count();
794        assert_eq!(destroyed, 2, "two rejected entries should be destroyed");
795    }
796
797    #[test]
798    fn acquire_returns_none_after_all_entries_rejected() {
799        struct AlwaysBad;
800        impl EntryValidator for AlwaysBad {
801            fn validate(&self, _vm_id: &str) -> ValidationResult {
802                ValidationResult::Unhealthy("never trust this VM".into())
803            }
804        }
805        let provider = InMemoryVmProvider::default();
806        // Refill itself will reject every entry it spawns, so the bucket
807        // stays empty.
808        let pool = WarmPool::start(provider, fast_config(20), Arc::new(AlwaysBad));
809        let stack = stack_alpha();
810        pool.register(stack.clone(), snapshot_for(&stack));
811
812        // Give refill at least a few ticks to spawn and reject entries.
813        assert!(wait_until(Duration::from_secs(2), || {
814            pool.metrics().validation_failures_total >= 3
815        }));
816
817        assert_eq!(depth(&pool, &stack), 0);
818        assert!(pool.acquire(&stack).is_none());
819    }
820
821    #[test]
822    fn entry_max_age_eviction() {
823        let provider = InMemoryVmProvider::default();
824        let config = WarmPoolConfig {
825            min_depth: 2,
826            max_depth: 3,
827            refill_interval: Duration::from_millis(30),
828            entry_max_age: Duration::from_millis(80),
829        };
830        let pool = WarmPool::start(provider.clone(), config, Arc::new(AlwaysHealthy));
831        let stack = stack_alpha();
832        pool.register(stack.clone(), snapshot_for(&stack));
833
834        // Wait for initial fill.
835        assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3));
836        let after_fill = pool.metrics().created_total;
837        assert_eq!(after_fill, 3);
838
839        // Now wait for entries to age out + refresh.
840        assert!(wait_until(Duration::from_secs(3), || {
841            let m = pool.metrics();
842            // Both: entries got evicted AND new ones got created beyond the
843            // initial 3.
844            m.evicted_total >= 3 && m.created_total >= 6
845        }));
846    }
847
848    #[test]
849    fn register_and_unregister() {
850        let provider = InMemoryVmProvider::default();
851        let pool = WarmPool::start(provider.clone(), fast_config(20), Arc::new(AlwaysHealthy));
852        let alpha = stack_alpha();
853        let beta = stack_beta();
854
855        pool.register(alpha.clone(), snapshot_for(&alpha));
856        pool.register(beta.clone(), snapshot_for(&beta));
857
858        assert!(wait_until(Duration::from_secs(2), || {
859            depth(&pool, &alpha) == 3 && depth(&pool, &beta) == 3
860        }));
861
862        let evictions_before = pool.metrics().evicted_total;
863        pool.unregister(&alpha);
864
865        // alpha bucket gone, beta untouched.
866        {
867            let guard = lock_buckets(&pool.state.buckets);
868            assert!(!guard.contains_key(&alpha));
869            assert_eq!(guard.get(&beta).map(|b| b.entries.len()), Some(3));
870        }
871
872        // 3 alpha entries should have been destroyed.
873        let m = pool.metrics();
874        assert_eq!(m.evicted_total - evictions_before, 3);
875    }
876
877    #[test]
878    fn register_replaces_snapshot_for_existing_bucket() {
879        let provider = InMemoryVmProvider::default();
880        let pool = WarmPool::start(
881            provider,
882            WarmPoolConfig {
883                refill_interval: Duration::from_secs(60),
884                ..fast_config(60)
885            },
886            Arc::new(AlwaysHealthy),
887        );
888        let stack = stack_alpha();
889        let snap_v1 = SnapshotRef {
890            vm_id: "tpl-v1".into(),
891            snapshot_id: "snap-v1".into(),
892            resume_immediately: true,
893            network_overrides: Vec::new(),
894        };
895        let snap_v2 = SnapshotRef {
896            vm_id: "tpl-v2".into(),
897            snapshot_id: "snap-v2".into(),
898            resume_immediately: true,
899            network_overrides: Vec::new(),
900        };
901        pool.register(stack.clone(), snap_v1);
902        pool.register(stack.clone(), snap_v2.clone());
903
904        let guard = lock_buckets(&pool.state.buckets);
905        let bucket = guard.get(&stack).expect("bucket");
906        assert_eq!(bucket.source_snapshot.snapshot_id, snap_v2.snapshot_id);
907    }
908
909    #[test]
910    fn metrics_increment_correctly() {
911        let provider = InMemoryVmProvider::default();
912        let pool = WarmPool::start(provider, fast_config(20), Arc::new(AlwaysHealthy));
913        let stack = stack_alpha();
914        pool.register(stack.clone(), snapshot_for(&stack));
915
916        assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3));
917        // Acquire two entries.
918        let _h1 = pool.acquire(&stack).expect("h1");
919        let _h2 = pool.acquire(&stack).expect("h2");
920
921        let m = pool.metrics();
922        assert!(m.created_total >= 3);
923        assert_eq!(m.acquired_total, 2);
924        assert!(m.refill_runs_total >= 1);
925    }
926
927    #[test]
928    fn shutdown_stops_refill_within_budget() {
929        let provider = InMemoryVmProvider::default();
930        // refill_interval = 1s; shutdown should still complete in <500ms.
931        let config = WarmPoolConfig {
932            min_depth: 1,
933            max_depth: 2,
934            refill_interval: Duration::from_secs(1),
935            entry_max_age: Duration::from_secs(600),
936        };
937        let mut pool = WarmPool::start(provider, config, Arc::new(AlwaysHealthy));
938        let stack = stack_alpha();
939        pool.register(stack.clone(), snapshot_for(&stack));
940        // Give the first refill tick time to add at least one entry.
941        thread::sleep(Duration::from_millis(50));
942
943        let start = Instant::now();
944        pool.shutdown();
945        let elapsed = start.elapsed();
946        assert!(
947            elapsed < Duration::from_millis(500),
948            "shutdown took {elapsed:?}, expected < 500ms"
949        );
950
951        // After shutdown the bucket map should be drained.
952        let guard = lock_buckets(&pool.state.buckets);
953        for (_k, bucket) in guard.iter() {
954            assert!(bucket.entries.is_empty(), "all entries should be drained");
955        }
956    }
957
958    #[test]
959    fn drop_destroys_remaining_entries() {
960        let provider = InMemoryVmProvider::default();
961        let stack = stack_alpha();
962        {
963            let pool = WarmPool::start(provider.clone(), fast_config(20), Arc::new(AlwaysHealthy));
964            pool.register(stack.clone(), snapshot_for(&stack));
965            assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3));
966        } // drop here
967
968        // All warm-alpha-* VMs should now be in Destroyed state.
969        let vms = provider.list_vms().expect("list");
970        let alpha_vms: Vec<&VmView> = vms
971            .iter()
972            .filter(|v| v.vm_id.starts_with("warm-alpha-"))
973            .collect();
974        assert_eq!(alpha_vms.len(), 3);
975        for v in alpha_vms {
976            assert_eq!(
977                v.status,
978                VmStatus::Destroyed,
979                "vm {} not destroyed",
980                v.vm_id
981            );
982        }
983    }
984
985    #[test]
986    fn refill_interval_is_honored_under_short_budget() {
987        // Use a 50ms refill interval; verify the refill_runs_total counter
988        // ticks up multiple times in a short window. Validates the scheduler
989        // wakes up at roughly the configured cadence.
990        let provider = InMemoryVmProvider::default();
991        let config = WarmPoolConfig {
992            min_depth: 1,
993            max_depth: 1,
994            refill_interval: Duration::from_millis(50),
995            entry_max_age: Duration::from_secs(600),
996        };
997        let pool = WarmPool::start(provider, config, Arc::new(AlwaysHealthy));
998        // No buckets registered → ticks still run, just no-op.
999        thread::sleep(Duration::from_millis(350));
1000        let runs = pool.metrics().refill_runs_total;
1001        // 350ms / 50ms = 7 expected ticks; allow generous slack for CI
1002        // jitter.
1003        assert!(
1004            runs >= 3,
1005            "expected refill thread to tick at least 3 times in 350ms, got {runs}"
1006        );
1007    }
1008
1009    #[test]
1010    fn validator_failure_reason_is_logged_via_metrics_counter() {
1011        // The reason string itself isn't observable from outside (logged
1012        // only), but the counter increment is.
1013        struct Counting {
1014            calls: AtomicU64,
1015        }
1016        impl EntryValidator for Counting {
1017            fn validate(&self, _vm_id: &str) -> ValidationResult {
1018                let n = self.calls.fetch_add(1, Ordering::SeqCst);
1019                if n.is_multiple_of(2) {
1020                    ValidationResult::Healthy
1021                } else {
1022                    ValidationResult::Unhealthy(format!("scripted-fail-{n}"))
1023                }
1024            }
1025        }
1026        let provider = InMemoryVmProvider::default();
1027        let pool = WarmPool::start(
1028            provider,
1029            fast_config(20),
1030            Arc::new(Counting {
1031                calls: AtomicU64::new(0),
1032            }),
1033        );
1034        let stack = stack_alpha();
1035        pool.register(stack.clone(), snapshot_for(&stack));
1036
1037        // Half of refill spawns will fail. The pool only refills when
1038        // depth < min_depth (2), so steady state with a 50% failure rate is
1039        // depth=2 after each tick. What we verify is that the validation
1040        // failure counter ticks up — i.e. unhealthy entries were destroyed
1041        // rather than landing in the bucket.
1042        assert!(wait_until(Duration::from_secs(3), || {
1043            pool.metrics().validation_failures_total >= 1
1044        }));
1045        assert!(pool.metrics().created_total >= 2);
1046    }
1047
1048    #[test]
1049    fn separate_buckets_do_not_interfere() {
1050        let provider = InMemoryVmProvider::default();
1051        let pool = WarmPool::start(provider, fast_config(20), Arc::new(AlwaysHealthy));
1052        let alpha = stack_alpha();
1053        let beta = stack_beta();
1054        pool.register(alpha.clone(), snapshot_for(&alpha));
1055        pool.register(beta.clone(), snapshot_for(&beta));
1056        assert!(wait_until(Duration::from_secs(2), || {
1057            depth(&pool, &alpha) == 3 && depth(&pool, &beta) == 3
1058        }));
1059
1060        // Drain alpha. Beta should remain at depth 3.
1061        for _ in 0..3 {
1062            assert!(pool.acquire(&alpha).is_some());
1063        }
1064        assert_eq!(depth(&pool, &beta), 3);
1065    }
1066
1067    #[test]
1068    fn unregister_unknown_key_is_noop() {
1069        let provider = InMemoryVmProvider::default();
1070        let pool = WarmPool::start(provider, fast_config(60), Arc::new(AlwaysHealthy));
1071        let stack = stack_alpha();
1072        pool.unregister(&stack);
1073        assert_eq!(pool.metrics().evicted_total, 0);
1074    }
1075
1076    #[test]
1077    fn sanitize_handles_special_characters() {
1078        assert_eq!(sanitize("alpha"), "alpha");
1079        assert_eq!(sanitize("1.0.0"), "1_0_0");
1080        assert_eq!(sanitize("a/b c"), "a_b_c");
1081        assert_eq!(sanitize("ALPHA-9"), "ALPHA_9");
1082    }
1083}