1use std::collections::HashMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
20use std::sync::{Arc, Mutex, MutexGuard};
21use std::thread::{self, JoinHandle};
22use std::time::{Duration, Instant};
23
24use microvm_runtime::model::{SnapshotRef, VmSpec};
25use microvm_runtime::provider::{VmProvider, VmQuery};
26
27use crate::stack_key::StackKey;
28
29#[derive(Debug, Clone)]
31pub struct WarmPoolConfig {
32 pub min_depth: usize,
35 pub max_depth: usize,
37 pub refill_interval: Duration,
41 pub entry_max_age: Duration,
45}
46
47impl Default for WarmPoolConfig {
48 fn default() -> Self {
49 Self {
50 min_depth: 1,
51 max_depth: 4,
52 refill_interval: Duration::from_secs(5),
53 entry_max_age: Duration::from_secs(600),
54 }
55 }
56}
57
58#[derive(Debug, Clone)]
60pub enum ValidationResult {
61 Healthy,
63 Unhealthy(String),
66}
67
68pub trait EntryValidator: Send + Sync + 'static {
76 fn validate(&self, vm_id: &str) -> ValidationResult;
79}
80
81#[derive(Debug, Default, Clone)]
84pub struct WarmPoolMetrics {
85 pub created_total: u64,
87 pub acquired_total: u64,
89 pub evicted_total: u64,
91 pub refill_runs_total: u64,
93 pub validation_failures_total: u64,
96}
97
98#[derive(Debug, Clone)]
112pub struct WarmPoolHandle {
113 pub source_vm_id: String,
116 pub stack: StackKey,
118 pub source_snapshot: SnapshotRef,
122}
123
124#[derive(Debug)]
126struct PoolEntry {
127 vm_id: String,
128 created_at: Instant,
129}
130
131#[derive(Debug)]
133struct BucketState {
134 source_snapshot: SnapshotRef,
136 entries: Vec<PoolEntry>,
138 next_seed: u64,
140}
141
142#[derive(Debug, Default)]
144struct Counters {
145 created: AtomicU64,
146 acquired: AtomicU64,
147 evicted: AtomicU64,
148 refill_runs: AtomicU64,
149 validation_failures: AtomicU64,
150}
151
152impl Counters {
153 fn snapshot(&self) -> WarmPoolMetrics {
154 WarmPoolMetrics {
155 created_total: self.created.load(Ordering::Relaxed),
156 acquired_total: self.acquired.load(Ordering::Relaxed),
157 evicted_total: self.evicted.load(Ordering::Relaxed),
158 refill_runs_total: self.refill_runs.load(Ordering::Relaxed),
159 validation_failures_total: self.validation_failures.load(Ordering::Relaxed),
160 }
161 }
162}
163
164struct PoolState {
166 buckets: Mutex<HashMap<StackKey, BucketState>>,
167 counters: Counters,
168}
169
170pub struct WarmPool<P: VmProvider + VmQuery + 'static> {
178 provider: P,
179 config: WarmPoolConfig,
180 validator: Arc<dyn EntryValidator>,
181 state: Arc<PoolState>,
182 shutdown_flag: Arc<AtomicBool>,
183 refill_handle: Option<JoinHandle<()>>,
184}
185
186impl<P: VmProvider + VmQuery + 'static> fmt::Debug for WarmPool<P> {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 let depths: HashMap<StackKey, usize> = match self.state.buckets.lock() {
189 Ok(guard) => guard
190 .iter()
191 .map(|(k, b)| (k.clone(), b.entries.len()))
192 .collect(),
193 Err(poisoned) => poisoned
194 .into_inner()
195 .iter()
196 .map(|(k, b)| (k.clone(), b.entries.len()))
197 .collect(),
198 };
199 f.debug_struct("WarmPool")
200 .field("config", &self.config)
201 .field("depths", &depths)
202 .field(
203 "shutdown", &self.shutdown_flag.load(Ordering::Relaxed),
205 )
206 .finish()
207 }
208}
209
210impl<P: VmProvider + VmQuery + Clone + 'static> WarmPool<P> {
211 pub fn start(provider: P, config: WarmPoolConfig, validator: Arc<dyn EntryValidator>) -> Self {
217 let state = Arc::new(PoolState {
218 buckets: Mutex::new(HashMap::new()),
219 counters: Counters::default(),
220 });
221 let shutdown_flag = Arc::new(AtomicBool::new(false));
222
223 let refill_handle = spawn_refill_thread(
224 provider.clone(),
225 config.clone(),
226 Arc::clone(&validator),
227 Arc::clone(&state),
228 Arc::clone(&shutdown_flag),
229 );
230
231 Self {
232 provider,
233 config,
234 validator,
235 state,
236 shutdown_flag,
237 refill_handle: Some(refill_handle),
238 }
239 }
240
241 pub fn register(&self, stack: StackKey, source_snapshot: SnapshotRef) {
249 let mut guard = lock_buckets(&self.state.buckets);
250 guard
251 .entry(stack)
252 .and_modify(|b| b.source_snapshot = source_snapshot.clone())
253 .or_insert_with(|| BucketState {
254 source_snapshot,
255 entries: Vec::new(),
256 next_seed: 0,
257 });
258 }
259
260 pub fn unregister(&self, stack: &StackKey) {
263 let drained: Vec<String> = {
264 let mut guard = lock_buckets(&self.state.buckets);
265 match guard.remove(stack) {
266 Some(bucket) => bucket.entries.into_iter().map(|e| e.vm_id).collect(),
267 None => return,
268 }
269 };
270 for vm_id in drained {
271 let _ = self.provider.destroy_vm(&vm_id);
272 self.state.counters.evicted.fetch_add(1, Ordering::Relaxed);
273 }
274 }
275
276 pub fn acquire(&self, stack: &StackKey) -> Option<WarmPoolHandle> {
286 loop {
287 let (vm_id, source_snapshot) = {
288 let mut guard = lock_buckets(&self.state.buckets);
289 let bucket = guard.get_mut(stack)?;
290 let entry = bucket.entries.pop()?;
291 (entry.vm_id, bucket.source_snapshot.clone())
292 };
293
294 match self.validator.validate(&vm_id) {
295 ValidationResult::Healthy => {
296 self.state.counters.acquired.fetch_add(1, Ordering::Relaxed);
297 return Some(WarmPoolHandle {
298 source_vm_id: vm_id,
299 stack: stack.clone(),
300 source_snapshot,
301 });
302 }
303 ValidationResult::Unhealthy(_reason) => {
304 self.state
305 .counters
306 .validation_failures
307 .fetch_add(1, Ordering::Relaxed);
308 let _ = self.provider.destroy_vm(&vm_id);
309 self.state.counters.evicted.fetch_add(1, Ordering::Relaxed);
310 }
312 }
313 }
314 }
315
316 pub fn metrics(&self) -> WarmPoolMetrics {
318 self.state.counters.snapshot()
319 }
320
321 pub fn shutdown(&mut self) {
325 self.shutdown_flag.store(true, Ordering::SeqCst);
326 if let Some(handle) = self.refill_handle.take() {
327 join_with_timeout(handle, Duration::from_millis(200));
328 }
329
330 let drained: Vec<String> = {
331 let mut guard = lock_buckets(&self.state.buckets);
332 let mut all = Vec::new();
333 for bucket in guard.values_mut() {
334 all.extend(bucket.entries.drain(..).map(|e| e.vm_id));
335 }
336 all
337 };
338 for vm_id in drained {
339 let _ = self.provider.destroy_vm(&vm_id);
340 self.state.counters.evicted.fetch_add(1, Ordering::Relaxed);
341 }
342 }
343}
344
345impl<P: VmProvider + VmQuery + 'static> Drop for WarmPool<P> {
346 fn drop(&mut self) {
347 self.shutdown_flag.store(true, Ordering::SeqCst);
351 if let Some(handle) = self.refill_handle.take() {
352 join_with_timeout(handle, Duration::from_millis(200));
353 }
354 let drained: Vec<String> = match self.state.buckets.lock() {
355 Ok(mut guard) => {
356 let mut all = Vec::new();
357 for bucket in guard.values_mut() {
358 all.extend(bucket.entries.drain(..).map(|e| e.vm_id));
359 }
360 all
361 }
362 Err(poisoned) => {
363 let mut guard = poisoned.into_inner();
364 let mut all = Vec::new();
365 for bucket in guard.values_mut() {
366 all.extend(bucket.entries.drain(..).map(|e| e.vm_id));
367 }
368 all
369 }
370 };
371 for vm_id in drained {
372 let _ = self.provider.destroy_vm(&vm_id);
373 }
374 }
375}
376
377fn spawn_refill_thread<P: VmProvider + VmQuery + Clone + 'static>(
379 provider: P,
380 config: WarmPoolConfig,
381 validator: Arc<dyn EntryValidator>,
382 state: Arc<PoolState>,
383 shutdown_flag: Arc<AtomicBool>,
384) -> JoinHandle<()> {
385 match thread::Builder::new()
390 .name("microvm-warm-pool-refill".into())
391 .spawn(move || refill_loop(provider, config, validator, state, shutdown_flag))
392 {
393 Ok(handle) => handle,
394 Err(_) => thread::spawn(|| {}),
395 }
396}
397
398fn refill_loop<P: VmProvider + VmQuery + Clone + 'static>(
399 provider: P,
400 config: WarmPoolConfig,
401 validator: Arc<dyn EntryValidator>,
402 state: Arc<PoolState>,
403 shutdown_flag: Arc<AtomicBool>,
404) {
405 loop {
406 if shutdown_flag.load(Ordering::SeqCst) {
407 return;
408 }
409 run_refill_tick(&provider, &config, validator.as_ref(), &state);
410 state.counters.refill_runs.fetch_add(1, Ordering::Relaxed);
411
412 let deadline = Instant::now() + config.refill_interval;
415 while Instant::now() < deadline {
416 if shutdown_flag.load(Ordering::SeqCst) {
417 return;
418 }
419 let remaining = deadline.saturating_duration_since(Instant::now());
420 let slice = remaining.min(Duration::from_millis(25));
421 if slice.is_zero() {
422 break;
423 }
424 thread::sleep(slice);
425 }
426 }
427}
428
429fn run_refill_tick<P: VmProvider + VmQuery + 'static>(
435 provider: &P,
436 config: &WarmPoolConfig,
437 validator: &dyn EntryValidator,
438 state: &Arc<PoolState>,
439) {
440 let now = Instant::now();
442 let to_evict: Vec<String> = {
443 let mut guard = lock_buckets(&state.buckets);
444 let mut victims = Vec::new();
445 for bucket in guard.values_mut() {
446 bucket.entries.retain(|entry| {
447 let alive = now.duration_since(entry.created_at) <= config.entry_max_age;
448 if !alive {
449 victims.push(entry.vm_id.clone());
450 }
451 alive
452 });
453 }
454 victims
455 };
456 for vm_id in to_evict {
457 let _ = provider.destroy_vm(&vm_id);
458 state.counters.evicted.fetch_add(1, Ordering::Relaxed);
459 }
460
461 struct RefillJob {
463 stack: StackKey,
464 deficit: usize,
465 snapshot: SnapshotRef,
466 next_seed: u64,
467 }
468 let jobs: Vec<RefillJob> = {
469 let mut guard = lock_buckets(&state.buckets);
470 let mut planned = Vec::new();
471 for (stack, bucket) in guard.iter_mut() {
472 if bucket.entries.len() < config.min_depth {
473 let deficit = config.max_depth.saturating_sub(bucket.entries.len());
474 if deficit == 0 {
475 continue;
476 }
477 let start = bucket.next_seed;
478 bucket.next_seed = start.saturating_add(deficit as u64);
482 planned.push(RefillJob {
483 stack: stack.clone(),
484 deficit,
485 snapshot: bucket.source_snapshot.clone(),
486 next_seed: start,
487 });
488 }
489 }
490 planned
491 };
492
493 for job in jobs {
495 for i in 0..job.deficit {
496 let seed = job.next_seed.saturating_add(i as u64);
497 let vm_id = format!(
498 "warm-{stack}-{ver}-{seed}",
499 stack = sanitize(&job.stack.stack_name),
500 ver = sanitize(&job.stack.version),
501 seed = seed,
502 );
503
504 let spec = VmSpec {
505 vcpu_count: Some(job.stack.vcpu_count),
506 mem_size_mib: Some(job.stack.mem_size_mib),
507 restore_from: Some(job.snapshot.clone()),
508 ..VmSpec::default()
509 };
510
511 if let Err(_e) = provider.create_vm_with_spec(&vm_id, &spec) {
512 break;
516 }
517 state.counters.created.fetch_add(1, Ordering::Relaxed);
518
519 match validator.validate(&vm_id) {
520 ValidationResult::Healthy => {
521 let mut guard = lock_buckets(&state.buckets);
522 match guard.get_mut(&job.stack) {
526 Some(bucket) => {
527 bucket.entries.push(PoolEntry {
528 vm_id,
529 created_at: Instant::now(),
530 });
531 }
532 None => {
533 drop(guard);
534 let _ = provider.destroy_vm(&vm_id);
535 state.counters.evicted.fetch_add(1, Ordering::Relaxed);
536 }
537 }
538 }
539 ValidationResult::Unhealthy(_reason) => {
540 state
541 .counters
542 .validation_failures
543 .fetch_add(1, Ordering::Relaxed);
544 let _ = provider.destroy_vm(&vm_id);
545 state.counters.evicted.fetch_add(1, Ordering::Relaxed);
546 }
547 }
548 }
549 }
550}
551
552fn sanitize(input: &str) -> String {
558 input
559 .chars()
560 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
561 .collect()
562}
563
564fn lock_buckets(
565 m: &Mutex<HashMap<StackKey, BucketState>>,
566) -> MutexGuard<'_, HashMap<StackKey, BucketState>> {
567 match m.lock() {
570 Ok(guard) => guard,
571 Err(poisoned) => poisoned.into_inner(),
572 }
573}
574
575fn join_with_timeout(handle: JoinHandle<()>, timeout: Duration) {
576 let deadline = Instant::now() + timeout;
577 while Instant::now() < deadline {
578 if handle.is_finished() {
579 let _ = handle.join();
580 return;
581 }
582 thread::sleep(Duration::from_millis(5));
583 }
584 drop(handle);
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592 use microvm_runtime::InMemoryVmProvider;
593 use microvm_runtime::model::{VmStatus, VmView};
594 use std::sync::Mutex as StdMutex;
595
596 fn stack_alpha() -> StackKey {
597 StackKey {
598 stack_name: "alpha".into(),
599 version: "1.0.0".into(),
600 vcpu_count: 1,
601 mem_size_mib: 128,
602 }
603 }
604
605 fn stack_beta() -> StackKey {
606 StackKey {
607 stack_name: "beta".into(),
608 version: "2.0.0".into(),
609 vcpu_count: 2,
610 mem_size_mib: 256,
611 }
612 }
613
614 fn snapshot_for(stack: &StackKey) -> SnapshotRef {
615 SnapshotRef {
616 vm_id: format!("tpl-{}", stack.stack_name),
617 snapshot_id: format!("snap-{}-{}", stack.stack_name, stack.version),
618 resume_immediately: true,
619 network_overrides: Vec::new(),
620 }
621 }
622
623 fn fast_config(refill_ms: u64) -> WarmPoolConfig {
624 WarmPoolConfig {
625 min_depth: 2,
626 max_depth: 3,
627 refill_interval: Duration::from_millis(refill_ms),
628 entry_max_age: Duration::from_secs(600),
629 }
630 }
631
632 struct AlwaysHealthy;
634 impl EntryValidator for AlwaysHealthy {
635 fn validate(&self, _vm_id: &str) -> ValidationResult {
636 ValidationResult::Healthy
637 }
638 }
639
640 fn wait_until<F: FnMut() -> bool>(timeout: Duration, mut f: F) -> bool {
641 let deadline = Instant::now() + timeout;
642 while Instant::now() < deadline {
643 if f() {
644 return true;
645 }
646 thread::sleep(Duration::from_millis(10));
647 }
648 false
649 }
650
651 fn depth<P>(pool: &WarmPool<P>, stack: &StackKey) -> usize
652 where
653 P: VmProvider + VmQuery + Clone + 'static,
654 {
655 let guard = lock_buckets(&pool.state.buckets);
656 guard.get(stack).map(|b| b.entries.len()).unwrap_or(0)
657 }
658
659 #[test]
660 fn refill_brings_bucket_to_max_depth() {
661 let provider = InMemoryVmProvider::default();
662 let pool = WarmPool::start(provider.clone(), fast_config(20), Arc::new(AlwaysHealthy));
663 let stack = stack_alpha();
664 pool.register(stack.clone(), snapshot_for(&stack));
665
666 let reached = wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3);
667 assert!(
668 reached,
669 "expected depth == max_depth (3), got {}",
670 depth(&pool, &stack)
671 );
672
673 let m = pool.metrics();
674 assert_eq!(m.created_total, 3, "should have created exactly 3 entries");
675 assert_eq!(m.acquired_total, 0);
676 assert_eq!(m.validation_failures_total, 0);
677
678 thread::sleep(Duration::from_millis(150));
681 assert_eq!(depth(&pool, &stack), 3);
682 assert_eq!(pool.metrics().created_total, 3);
683 }
684
685 #[test]
686 fn acquire_returns_none_when_bucket_empty() {
687 let provider = InMemoryVmProvider::default();
688 let pool = WarmPool::start(
689 provider,
690 WarmPoolConfig {
691 refill_interval: Duration::from_secs(60),
692 ..fast_config(60)
693 },
694 Arc::new(AlwaysHealthy),
695 );
696 let stack = stack_alpha();
697 assert!(pool.acquire(&stack).is_none(), "no bucket registered");
698
699 pool.register(stack.clone(), snapshot_for(&stack));
701 assert!(
702 pool.acquire(&stack).is_none(),
703 "registered bucket should still be empty"
704 );
705 assert_eq!(pool.metrics().acquired_total, 0);
706 }
707
708 #[test]
709 fn acquire_pops_healthy_entry_with_correct_snapshot() {
710 let provider = InMemoryVmProvider::default();
711 let pool = WarmPool::start(provider, fast_config(20), Arc::new(AlwaysHealthy));
712 let stack = stack_alpha();
713 let snap = snapshot_for(&stack);
714 pool.register(stack.clone(), snap.clone());
715
716 assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) >= 2));
717
718 let handle = pool.acquire(&stack).expect("warm entry available");
719 assert_eq!(handle.stack, stack);
720 assert_eq!(handle.source_snapshot.vm_id, snap.vm_id);
721 assert_eq!(handle.source_snapshot.snapshot_id, snap.snapshot_id);
722 assert!(handle.source_vm_id.starts_with("warm-alpha-1_0_0-"));
723
724 let m = pool.metrics();
725 assert_eq!(m.acquired_total, 1);
726 assert_eq!(m.validation_failures_total, 0);
727 }
728
729 #[test]
730 fn acquire_evicts_unhealthy_entries_and_returns_next_healthy() {
731 let provider = InMemoryVmProvider::default();
732 struct Phased {
745 phase: StdMutex<u8>, unhealthy_left: StdMutex<u32>,
747 }
748 impl EntryValidator for Phased {
749 fn validate(&self, _vm_id: &str) -> ValidationResult {
750 let phase = *self.phase.lock().expect("phase");
751 if phase == 0 {
752 return ValidationResult::Healthy;
753 }
754 let mut left = self.unhealthy_left.lock().expect("uh");
755 if *left > 0 {
756 *left -= 1;
757 ValidationResult::Unhealthy("scripted bad".into())
758 } else {
759 ValidationResult::Healthy
760 }
761 }
762 }
763 let validator = Arc::new(Phased {
764 phase: StdMutex::new(0),
765 unhealthy_left: StdMutex::new(2),
766 });
767 let validator_for_pool: Arc<dyn EntryValidator> = validator.clone();
768
769 let pool = WarmPool::start(provider.clone(), fast_config(20), validator_for_pool);
770 let stack = stack_alpha();
771 pool.register(stack.clone(), snapshot_for(&stack));
772
773 assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3));
774
775 *validator.phase.lock().expect("phase") = 1;
777
778 let handle = pool.acquire(&stack).expect("eventually returns healthy");
779 assert!(handle.source_vm_id.starts_with("warm-alpha-1_0_0-"));
780
781 let m = pool.metrics();
782 assert_eq!(m.validation_failures_total, 2, "two entries rejected");
783 assert_eq!(m.acquired_total, 1, "one entry served");
784 assert_eq!(m.evicted_total, 2);
787
788 let vms = provider.list_vms().expect("list");
790 let destroyed = vms
791 .iter()
792 .filter(|v| v.status == VmStatus::Destroyed)
793 .count();
794 assert_eq!(destroyed, 2, "two rejected entries should be destroyed");
795 }
796
797 #[test]
798 fn acquire_returns_none_after_all_entries_rejected() {
799 struct AlwaysBad;
800 impl EntryValidator for AlwaysBad {
801 fn validate(&self, _vm_id: &str) -> ValidationResult {
802 ValidationResult::Unhealthy("never trust this VM".into())
803 }
804 }
805 let provider = InMemoryVmProvider::default();
806 let pool = WarmPool::start(provider, fast_config(20), Arc::new(AlwaysBad));
809 let stack = stack_alpha();
810 pool.register(stack.clone(), snapshot_for(&stack));
811
812 assert!(wait_until(Duration::from_secs(2), || {
814 pool.metrics().validation_failures_total >= 3
815 }));
816
817 assert_eq!(depth(&pool, &stack), 0);
818 assert!(pool.acquire(&stack).is_none());
819 }
820
821 #[test]
822 fn entry_max_age_eviction() {
823 let provider = InMemoryVmProvider::default();
824 let config = WarmPoolConfig {
825 min_depth: 2,
826 max_depth: 3,
827 refill_interval: Duration::from_millis(30),
828 entry_max_age: Duration::from_millis(80),
829 };
830 let pool = WarmPool::start(provider.clone(), config, Arc::new(AlwaysHealthy));
831 let stack = stack_alpha();
832 pool.register(stack.clone(), snapshot_for(&stack));
833
834 assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3));
836 let after_fill = pool.metrics().created_total;
837 assert_eq!(after_fill, 3);
838
839 assert!(wait_until(Duration::from_secs(3), || {
841 let m = pool.metrics();
842 m.evicted_total >= 3 && m.created_total >= 6
845 }));
846 }
847
848 #[test]
849 fn register_and_unregister() {
850 let provider = InMemoryVmProvider::default();
851 let pool = WarmPool::start(provider.clone(), fast_config(20), Arc::new(AlwaysHealthy));
852 let alpha = stack_alpha();
853 let beta = stack_beta();
854
855 pool.register(alpha.clone(), snapshot_for(&alpha));
856 pool.register(beta.clone(), snapshot_for(&beta));
857
858 assert!(wait_until(Duration::from_secs(2), || {
859 depth(&pool, &alpha) == 3 && depth(&pool, &beta) == 3
860 }));
861
862 let evictions_before = pool.metrics().evicted_total;
863 pool.unregister(&alpha);
864
865 {
867 let guard = lock_buckets(&pool.state.buckets);
868 assert!(!guard.contains_key(&alpha));
869 assert_eq!(guard.get(&beta).map(|b| b.entries.len()), Some(3));
870 }
871
872 let m = pool.metrics();
874 assert_eq!(m.evicted_total - evictions_before, 3);
875 }
876
877 #[test]
878 fn register_replaces_snapshot_for_existing_bucket() {
879 let provider = InMemoryVmProvider::default();
880 let pool = WarmPool::start(
881 provider,
882 WarmPoolConfig {
883 refill_interval: Duration::from_secs(60),
884 ..fast_config(60)
885 },
886 Arc::new(AlwaysHealthy),
887 );
888 let stack = stack_alpha();
889 let snap_v1 = SnapshotRef {
890 vm_id: "tpl-v1".into(),
891 snapshot_id: "snap-v1".into(),
892 resume_immediately: true,
893 network_overrides: Vec::new(),
894 };
895 let snap_v2 = SnapshotRef {
896 vm_id: "tpl-v2".into(),
897 snapshot_id: "snap-v2".into(),
898 resume_immediately: true,
899 network_overrides: Vec::new(),
900 };
901 pool.register(stack.clone(), snap_v1);
902 pool.register(stack.clone(), snap_v2.clone());
903
904 let guard = lock_buckets(&pool.state.buckets);
905 let bucket = guard.get(&stack).expect("bucket");
906 assert_eq!(bucket.source_snapshot.snapshot_id, snap_v2.snapshot_id);
907 }
908
909 #[test]
910 fn metrics_increment_correctly() {
911 let provider = InMemoryVmProvider::default();
912 let pool = WarmPool::start(provider, fast_config(20), Arc::new(AlwaysHealthy));
913 let stack = stack_alpha();
914 pool.register(stack.clone(), snapshot_for(&stack));
915
916 assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3));
917 let _h1 = pool.acquire(&stack).expect("h1");
919 let _h2 = pool.acquire(&stack).expect("h2");
920
921 let m = pool.metrics();
922 assert!(m.created_total >= 3);
923 assert_eq!(m.acquired_total, 2);
924 assert!(m.refill_runs_total >= 1);
925 }
926
927 #[test]
928 fn shutdown_stops_refill_within_budget() {
929 let provider = InMemoryVmProvider::default();
930 let config = WarmPoolConfig {
932 min_depth: 1,
933 max_depth: 2,
934 refill_interval: Duration::from_secs(1),
935 entry_max_age: Duration::from_secs(600),
936 };
937 let mut pool = WarmPool::start(provider, config, Arc::new(AlwaysHealthy));
938 let stack = stack_alpha();
939 pool.register(stack.clone(), snapshot_for(&stack));
940 thread::sleep(Duration::from_millis(50));
942
943 let start = Instant::now();
944 pool.shutdown();
945 let elapsed = start.elapsed();
946 assert!(
947 elapsed < Duration::from_millis(500),
948 "shutdown took {elapsed:?}, expected < 500ms"
949 );
950
951 let guard = lock_buckets(&pool.state.buckets);
953 for (_k, bucket) in guard.iter() {
954 assert!(bucket.entries.is_empty(), "all entries should be drained");
955 }
956 }
957
958 #[test]
959 fn drop_destroys_remaining_entries() {
960 let provider = InMemoryVmProvider::default();
961 let stack = stack_alpha();
962 {
963 let pool = WarmPool::start(provider.clone(), fast_config(20), Arc::new(AlwaysHealthy));
964 pool.register(stack.clone(), snapshot_for(&stack));
965 assert!(wait_until(Duration::from_secs(2), || depth(&pool, &stack) == 3));
966 } let vms = provider.list_vms().expect("list");
970 let alpha_vms: Vec<&VmView> = vms
971 .iter()
972 .filter(|v| v.vm_id.starts_with("warm-alpha-"))
973 .collect();
974 assert_eq!(alpha_vms.len(), 3);
975 for v in alpha_vms {
976 assert_eq!(
977 v.status,
978 VmStatus::Destroyed,
979 "vm {} not destroyed",
980 v.vm_id
981 );
982 }
983 }
984
985 #[test]
986 fn refill_interval_is_honored_under_short_budget() {
987 let provider = InMemoryVmProvider::default();
991 let config = WarmPoolConfig {
992 min_depth: 1,
993 max_depth: 1,
994 refill_interval: Duration::from_millis(50),
995 entry_max_age: Duration::from_secs(600),
996 };
997 let pool = WarmPool::start(provider, config, Arc::new(AlwaysHealthy));
998 thread::sleep(Duration::from_millis(350));
1000 let runs = pool.metrics().refill_runs_total;
1001 assert!(
1004 runs >= 3,
1005 "expected refill thread to tick at least 3 times in 350ms, got {runs}"
1006 );
1007 }
1008
1009 #[test]
1010 fn validator_failure_reason_is_logged_via_metrics_counter() {
1011 struct Counting {
1014 calls: AtomicU64,
1015 }
1016 impl EntryValidator for Counting {
1017 fn validate(&self, _vm_id: &str) -> ValidationResult {
1018 let n = self.calls.fetch_add(1, Ordering::SeqCst);
1019 if n.is_multiple_of(2) {
1020 ValidationResult::Healthy
1021 } else {
1022 ValidationResult::Unhealthy(format!("scripted-fail-{n}"))
1023 }
1024 }
1025 }
1026 let provider = InMemoryVmProvider::default();
1027 let pool = WarmPool::start(
1028 provider,
1029 fast_config(20),
1030 Arc::new(Counting {
1031 calls: AtomicU64::new(0),
1032 }),
1033 );
1034 let stack = stack_alpha();
1035 pool.register(stack.clone(), snapshot_for(&stack));
1036
1037 assert!(wait_until(Duration::from_secs(3), || {
1043 pool.metrics().validation_failures_total >= 1
1044 }));
1045 assert!(pool.metrics().created_total >= 2);
1046 }
1047
1048 #[test]
1049 fn separate_buckets_do_not_interfere() {
1050 let provider = InMemoryVmProvider::default();
1051 let pool = WarmPool::start(provider, fast_config(20), Arc::new(AlwaysHealthy));
1052 let alpha = stack_alpha();
1053 let beta = stack_beta();
1054 pool.register(alpha.clone(), snapshot_for(&alpha));
1055 pool.register(beta.clone(), snapshot_for(&beta));
1056 assert!(wait_until(Duration::from_secs(2), || {
1057 depth(&pool, &alpha) == 3 && depth(&pool, &beta) == 3
1058 }));
1059
1060 for _ in 0..3 {
1062 assert!(pool.acquire(&alpha).is_some());
1063 }
1064 assert_eq!(depth(&pool, &beta), 3);
1065 }
1066
1067 #[test]
1068 fn unregister_unknown_key_is_noop() {
1069 let provider = InMemoryVmProvider::default();
1070 let pool = WarmPool::start(provider, fast_config(60), Arc::new(AlwaysHealthy));
1071 let stack = stack_alpha();
1072 pool.unregister(&stack);
1073 assert_eq!(pool.metrics().evicted_total, 0);
1074 }
1075
1076 #[test]
1077 fn sanitize_handles_special_characters() {
1078 assert_eq!(sanitize("alpha"), "alpha");
1079 assert_eq!(sanitize("1.0.0"), "1_0_0");
1080 assert_eq!(sanitize("a/b c"), "a_b_c");
1081 assert_eq!(sanitize("ALPHA-9"), "ALPHA_9");
1082 }
1083}