Skip to main content

pi/
hostcall_s3_fifo.rs

1//! Deterministic S3-FIFO-inspired admission policy for hostcall queues.
2//!
3//! This module models a tri-queue policy core that can be wired into the
4//! hostcall queue runtime:
5//! - `small`: probationary live entries
6//! - `main`: protected live entries
7//! - `ghost`: recently evicted identifiers for cheap reuse signals
8//!
9//! It is intentionally runtime-agnostic and side-effect free beyond state
10//! mutations, so integration code can compose it with existing queue and
11//! telemetry paths.
12
13#[cfg(test)]
14use std::collections::BTreeSet;
15use std::collections::{BTreeMap, VecDeque};
16
17/// Fallback trigger reason when S3-FIFO policy is disabled at runtime.
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum S3FifoFallbackReason {
20    SignalQualityInsufficient,
21    FairnessInstability,
22}
23
24/// Where a key ends up after one policy decision.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum S3FifoTier {
27    Small,
28    Main,
29    Ghost,
30    Fallback,
31}
32
33/// Deterministic decision kind from one `access` event.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum S3FifoDecisionKind {
36    AdmitSmall,
37    PromoteSmallToMain,
38    HitMain,
39    AdmitFromGhost,
40    RejectFairnessBudget,
41    FallbackBypass,
42}
43
44/// Decision payload produced by [`S3FifoPolicy::access`].
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub struct S3FifoDecision {
47    pub kind: S3FifoDecisionKind,
48    pub tier: S3FifoTier,
49    pub ghost_hit: bool,
50    pub fallback_reason: Option<S3FifoFallbackReason>,
51    pub live_depth: usize,
52    pub small_depth: usize,
53    pub main_depth: usize,
54    pub ghost_depth: usize,
55}
56
57/// Configuration for deterministic S3-FIFO policy behavior.
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub struct S3FifoConfig {
60    pub live_capacity: usize,
61    pub small_capacity: usize,
62    pub ghost_capacity: usize,
63    pub max_entries_per_owner: usize,
64    pub fallback_window: usize,
65    pub min_ghost_hits_in_window: usize,
66    pub max_budget_rejections_in_window: usize,
67}
68
69impl Default for S3FifoConfig {
70    fn default() -> Self {
71        Self {
72            live_capacity: 256,
73            small_capacity: 64,
74            ghost_capacity: 512,
75            max_entries_per_owner: 64,
76            fallback_window: 32,
77            min_ghost_hits_in_window: 2,
78            max_budget_rejections_in_window: 12,
79        }
80    }
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84enum LiveTier {
85    Small,
86    Main,
87}
88
89#[derive(Debug, Clone, Copy)]
90struct DecisionSignal {
91    ghost_hit: bool,
92    budget_rejected: bool,
93}
94
95/// Snapshot for logs and tests.
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct S3FifoTelemetry {
98    pub fallback_reason: Option<S3FifoFallbackReason>,
99    pub small_depth: usize,
100    pub main_depth: usize,
101    pub ghost_depth: usize,
102    pub live_depth: usize,
103    pub ghost_hits_total: u64,
104    pub admissions_total: u64,
105    pub promotions_total: u64,
106    pub budget_rejections_total: u64,
107    pub owner_live_counts: BTreeMap<String, usize>,
108}
109
110/// Deterministic S3-FIFO-inspired tri-queue admission controller.
111#[derive(Debug, Clone)]
112pub struct S3FifoPolicy<K: Ord + Clone> {
113    cfg: S3FifoConfig,
114    small: VecDeque<K>,
115    main: VecDeque<K>,
116    ghost_order: BTreeMap<u64, K>,
117    ghost_lookup: BTreeMap<K, u64>,
118    ghost_gen: u64,
119    live_tiers: BTreeMap<K, LiveTier>,
120    live_owners: BTreeMap<K, String>,
121    owner_live_counts: BTreeMap<String, usize>,
122    recent_signals: VecDeque<DecisionSignal>,
123    fallback_reason: Option<S3FifoFallbackReason>,
124    ghost_hits_total: u64,
125    admissions_total: u64,
126    promotions_total: u64,
127    budget_rejections_total: u64,
128}
129
130impl<K: Ord + Clone> S3FifoPolicy<K> {
131    #[must_use]
132    pub fn new(config: S3FifoConfig) -> Self {
133        let live_capacity = config.live_capacity.max(2);
134        let small_cap_floor = live_capacity.saturating_sub(1).max(1);
135        let small_capacity = config.small_capacity.max(1).min(small_cap_floor);
136        let ghost_capacity = config.ghost_capacity.max(1);
137        let max_entries_per_owner = config.max_entries_per_owner.max(1);
138        let fallback_window = config.fallback_window.max(1);
139
140        Self {
141            cfg: S3FifoConfig {
142                live_capacity,
143                small_capacity,
144                ghost_capacity,
145                max_entries_per_owner,
146                fallback_window,
147                min_ghost_hits_in_window: config.min_ghost_hits_in_window.min(fallback_window),
148                max_budget_rejections_in_window: config
149                    .max_budget_rejections_in_window
150                    .min(fallback_window),
151            },
152            small: VecDeque::new(),
153            main: VecDeque::new(),
154            ghost_order: BTreeMap::new(),
155            ghost_lookup: BTreeMap::new(),
156            ghost_gen: 0,
157            live_tiers: BTreeMap::new(),
158            live_owners: BTreeMap::new(),
159            owner_live_counts: BTreeMap::new(),
160            recent_signals: VecDeque::new(),
161            fallback_reason: None,
162            ghost_hits_total: 0,
163            admissions_total: 0,
164            promotions_total: 0,
165            budget_rejections_total: 0,
166        }
167    }
168
169    #[must_use]
170    pub const fn config(&self) -> S3FifoConfig {
171        self.cfg
172    }
173
174    #[must_use]
175    pub fn telemetry(&self) -> S3FifoTelemetry {
176        S3FifoTelemetry {
177            fallback_reason: self.fallback_reason,
178            small_depth: self.small.len(),
179            main_depth: self.main.len(),
180            ghost_depth: self.ghost_lookup.len(),
181            live_depth: self.live_depth(),
182            ghost_hits_total: self.ghost_hits_total,
183            admissions_total: self.admissions_total,
184            promotions_total: self.promotions_total,
185            budget_rejections_total: self.budget_rejections_total,
186            owner_live_counts: self.owner_live_counts.clone(),
187        }
188    }
189
190    #[must_use]
191    pub fn live_depth(&self) -> usize {
192        self.small.len().saturating_add(self.main.len())
193    }
194
195    pub fn clear_fallback(&mut self) {
196        self.fallback_reason = None;
197        self.recent_signals.clear();
198    }
199
200    pub fn access(&mut self, owner: &str, key: K) -> S3FifoDecision {
201        if let Some(reason) = self.fallback_reason {
202            return self.decision(
203                S3FifoDecisionKind::FallbackBypass,
204                S3FifoTier::Fallback,
205                false,
206                Some(reason),
207            );
208        }
209
210        let mut ghost_hit = false;
211        let kind = if matches!(self.live_tiers.get(&key), Some(LiveTier::Main)) {
212            self.touch_main(&key);
213            S3FifoDecisionKind::HitMain
214        } else if matches!(self.live_tiers.get(&key), Some(LiveTier::Small)) {
215            self.promote_small_to_main(&key);
216            self.promotions_total = self.promotions_total.saturating_add(1);
217            S3FifoDecisionKind::PromoteSmallToMain
218        } else if self.ghost_lookup.contains_key(&key) {
219            ghost_hit = true;
220            self.ghost_hits_total = self.ghost_hits_total.saturating_add(1);
221            if self.owner_at_budget(owner) {
222                self.budget_rejections_total = self.budget_rejections_total.saturating_add(1);
223                S3FifoDecisionKind::RejectFairnessBudget
224            } else {
225                self.admit_from_ghost(owner, key);
226                self.admissions_total = self.admissions_total.saturating_add(1);
227                S3FifoDecisionKind::AdmitFromGhost
228            }
229        } else if self.owner_at_budget(owner) {
230            self.budget_rejections_total = self.budget_rejections_total.saturating_add(1);
231            S3FifoDecisionKind::RejectFairnessBudget
232        } else {
233            self.admit_small(owner, key);
234            self.admissions_total = self.admissions_total.saturating_add(1);
235            S3FifoDecisionKind::AdmitSmall
236        };
237
238        let signal = DecisionSignal {
239            ghost_hit,
240            budget_rejected: kind == S3FifoDecisionKind::RejectFairnessBudget,
241        };
242        self.record_signal(signal);
243        self.evaluate_fallback();
244
245        let tier = Self::resolve_tier(kind, ghost_hit);
246        self.decision(kind, tier, ghost_hit, self.fallback_reason)
247    }
248
249    const fn resolve_tier(kind: S3FifoDecisionKind, ghost_hit: bool) -> S3FifoTier {
250        match kind {
251            S3FifoDecisionKind::HitMain
252            | S3FifoDecisionKind::PromoteSmallToMain
253            | S3FifoDecisionKind::AdmitFromGhost => S3FifoTier::Main,
254            S3FifoDecisionKind::AdmitSmall => S3FifoTier::Small,
255            S3FifoDecisionKind::RejectFairnessBudget => {
256                if ghost_hit {
257                    S3FifoTier::Ghost
258                } else {
259                    S3FifoTier::Small
260                }
261            }
262            S3FifoDecisionKind::FallbackBypass => S3FifoTier::Fallback,
263        }
264    }
265
266    fn decision(
267        &self,
268        kind: S3FifoDecisionKind,
269        tier: S3FifoTier,
270        ghost_hit: bool,
271        fallback_reason: Option<S3FifoFallbackReason>,
272    ) -> S3FifoDecision {
273        S3FifoDecision {
274            kind,
275            tier,
276            ghost_hit,
277            fallback_reason,
278            live_depth: self.live_depth(),
279            small_depth: self.small.len(),
280            main_depth: self.main.len(),
281            ghost_depth: self.ghost_lookup.len(),
282        }
283    }
284
285    fn owner_at_budget(&self, owner: &str) -> bool {
286        self.owner_live_counts.get(owner).copied().unwrap_or(0) >= self.cfg.max_entries_per_owner
287    }
288
289    const fn main_capacity(&self) -> usize {
290        self.cfg
291            .live_capacity
292            .saturating_sub(self.cfg.small_capacity)
293    }
294
295    fn admit_small(&mut self, owner: &str, key: K) {
296        self.purge_key(&key);
297        self.small.push_back(key.clone());
298        self.live_tiers.insert(key.clone(), LiveTier::Small);
299        self.live_owners.insert(key, owner.to_string());
300        self.increment_owner(owner);
301        self.enforce_small_capacity();
302        self.enforce_live_capacity();
303    }
304
305    fn admit_from_ghost(&mut self, owner: &str, key: K) {
306        self.remove_ghost(&key);
307        self.main.push_back(key.clone());
308        self.live_tiers.insert(key.clone(), LiveTier::Main);
309        self.live_owners.insert(key, owner.to_string());
310        self.increment_owner(owner);
311        self.enforce_main_capacity();
312        self.enforce_live_capacity();
313    }
314
315    fn promote_small_to_main(&mut self, key: &K) {
316        remove_from_queue(&mut self.small, key);
317        self.main.push_back(key.clone());
318        self.live_tiers.insert(key.clone(), LiveTier::Main);
319        self.enforce_main_capacity();
320        self.enforce_live_capacity();
321    }
322
323    fn touch_main(&mut self, key: &K) {
324        remove_from_queue(&mut self.main, key);
325        self.main.push_back(key.clone());
326    }
327
328    fn enforce_small_capacity(&mut self) {
329        while self.small.len() > self.cfg.small_capacity {
330            self.evict_small_front_to_ghost();
331        }
332    }
333
334    fn enforce_main_capacity(&mut self) {
335        while self.main.len() > self.main_capacity() {
336            self.evict_main_front_to_ghost();
337        }
338    }
339
340    fn enforce_live_capacity(&mut self) {
341        while self.live_depth() > self.cfg.live_capacity {
342            if self.small.is_empty() {
343                self.evict_main_front_to_ghost();
344            } else {
345                self.evict_small_front_to_ghost();
346            }
347        }
348    }
349
350    fn evict_small_front_to_ghost(&mut self) {
351        if let Some(key) = self.small.pop_front() {
352            self.live_tiers.remove(&key);
353            self.remove_owner_for_key(&key);
354            self.push_ghost(key);
355        }
356    }
357
358    fn evict_main_front_to_ghost(&mut self) {
359        if let Some(key) = self.main.pop_front() {
360            self.live_tiers.remove(&key);
361            self.remove_owner_for_key(&key);
362            self.push_ghost(key);
363        }
364    }
365
366    fn purge_key(&mut self, key: &K) {
367        self.remove_live_key(key);
368        self.remove_ghost(key);
369    }
370
371    fn remove_live_key(&mut self, key: &K) {
372        if let Some(tier) = self.live_tiers.remove(key) {
373            match tier {
374                LiveTier::Small => remove_from_queue(&mut self.small, key),
375                LiveTier::Main => remove_from_queue(&mut self.main, key),
376            }
377            self.remove_owner_for_key(key);
378        }
379    }
380
381    fn remove_owner_for_key(&mut self, key: &K) {
382        if let Some(owner) = self.live_owners.remove(key) {
383            self.decrement_owner(&owner);
384        }
385    }
386
387    fn push_ghost(&mut self, key: K) {
388        // Prevent generation overflow.
389        if self.ghost_gen == u64::MAX {
390            self.ghost_gen = 0;
391            self.ghost_order.clear();
392            self.ghost_lookup.clear();
393        }
394
395        self.ghost_gen = self.ghost_gen.saturating_add(1);
396
397        if let Some(gen_mut) = self.ghost_lookup.get_mut(&key) {
398            let old_gen = *gen_mut;
399            *gen_mut = self.ghost_gen;
400            if let Some(k_reused) = self.ghost_order.remove(&old_gen) {
401                self.ghost_order.insert(self.ghost_gen, k_reused);
402            }
403        } else {
404            self.ghost_lookup.insert(key.clone(), self.ghost_gen);
405            self.ghost_order.insert(self.ghost_gen, key);
406        }
407
408        while self.ghost_lookup.len() > self.cfg.ghost_capacity {
409            if let Some((_, popped_key)) = self.ghost_order.pop_first() {
410                self.ghost_lookup.remove(&popped_key);
411            } else {
412                break;
413            }
414        }
415    }
416
417    fn remove_ghost(&mut self, key: &K) {
418        if let Some(generation) = self.ghost_lookup.remove(key) {
419            self.ghost_order.remove(&generation);
420        }
421    }
422
423    fn increment_owner(&mut self, owner: &str) {
424        if let Some(count) = self.owner_live_counts.get_mut(owner) {
425            *count = count.saturating_add(1);
426        } else {
427            self.owner_live_counts.insert(owner.to_string(), 1);
428        }
429    }
430
431    fn decrement_owner(&mut self, owner: &str) {
432        let should_remove = self.owner_live_counts.get_mut(owner).is_some_and(|count| {
433            if *count > 1 {
434                *count -= 1;
435                false
436            } else {
437                true
438            }
439        });
440
441        if should_remove {
442            self.owner_live_counts.remove(owner);
443        }
444    }
445
446    fn record_signal(&mut self, signal: DecisionSignal) {
447        self.recent_signals.push_back(signal);
448        while self.recent_signals.len() > self.cfg.fallback_window {
449            self.recent_signals.pop_front();
450        }
451    }
452
453    fn evaluate_fallback(&mut self) {
454        if self.fallback_reason.is_some() || self.recent_signals.len() < self.cfg.fallback_window {
455            return;
456        }
457
458        let mut ghost_hits = 0usize;
459        let mut budget_rejections = 0usize;
460        for signal in &self.recent_signals {
461            if signal.ghost_hit {
462                ghost_hits = ghost_hits.saturating_add(1);
463            }
464            if signal.budget_rejected {
465                budget_rejections = budget_rejections.saturating_add(1);
466            }
467        }
468
469        if ghost_hits < self.cfg.min_ghost_hits_in_window {
470            self.fallback_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
471        } else if budget_rejections > self.cfg.max_budget_rejections_in_window {
472            self.fallback_reason = Some(S3FifoFallbackReason::FairnessInstability);
473        }
474    }
475}
476
477fn remove_from_queue<K: Ord>(queue: &mut VecDeque<K>, key: &K) {
478    if let Some(index) = queue.iter().position(|existing| existing == key) {
479        queue.remove(index);
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486
487    fn config() -> S3FifoConfig {
488        S3FifoConfig {
489            live_capacity: 4,
490            small_capacity: 2,
491            ghost_capacity: 4,
492            max_entries_per_owner: 2,
493            fallback_window: 4,
494            min_ghost_hits_in_window: 1,
495            max_budget_rejections_in_window: 2,
496        }
497    }
498
499    fn assert_no_duplicates(policy: &S3FifoPolicy<String>) {
500        let small: BTreeSet<_> = policy.small.iter().cloned().collect();
501        let main: BTreeSet<_> = policy.main.iter().cloned().collect();
502        let ghost: BTreeSet<_> = policy.ghost_lookup.keys().cloned().collect();
503
504        assert!(small.is_disjoint(&main));
505        assert!(small.is_disjoint(&ghost));
506        assert!(main.is_disjoint(&ghost));
507        assert_eq!(small.len() + main.len(), policy.live_tiers.len());
508    }
509
510    #[test]
511    fn small_hit_promotes_to_main() {
512        let mut policy = S3FifoPolicy::new(config());
513        let first = policy.access("ext-a", "k1".to_string());
514        assert_eq!(first.kind, S3FifoDecisionKind::AdmitSmall);
515
516        let second = policy.access("ext-a", "k1".to_string());
517        assert_eq!(second.kind, S3FifoDecisionKind::PromoteSmallToMain);
518        assert_eq!(second.tier, S3FifoTier::Main);
519        assert_eq!(second.main_depth, 1);
520        assert_eq!(second.small_depth, 0);
521        assert_no_duplicates(&policy);
522    }
523
524    #[test]
525    fn ghost_hit_reenters_live_set() {
526        let mut policy = S3FifoPolicy::new(S3FifoConfig {
527            small_capacity: 1,
528            ..config()
529        });
530
531        policy.access("ext-a", "k1".to_string());
532        policy.access("ext-a", "k2".to_string());
533        let decision = policy.access("ext-a", "k1".to_string());
534
535        assert_eq!(decision.kind, S3FifoDecisionKind::AdmitFromGhost);
536        assert!(decision.ghost_hit);
537        assert_eq!(decision.tier, S3FifoTier::Main);
538        assert_eq!(policy.telemetry().ghost_hits_total, 1);
539        assert_no_duplicates(&policy);
540    }
541
542    #[test]
543    fn fairness_budget_rejects_owner_overflow() {
544        let mut policy = S3FifoPolicy::new(S3FifoConfig {
545            max_entries_per_owner: 1,
546            ..config()
547        });
548
549        let admitted = policy.access("ext-a", "k1".to_string());
550        let rejected = policy.access("ext-a", "k2".to_string());
551
552        assert_eq!(admitted.kind, S3FifoDecisionKind::AdmitSmall);
553        assert_eq!(rejected.kind, S3FifoDecisionKind::RejectFairnessBudget);
554        assert_eq!(policy.live_depth(), 1);
555        assert_eq!(policy.telemetry().budget_rejections_total, 1);
556        assert_no_duplicates(&policy);
557    }
558
559    #[test]
560    fn fallback_triggers_on_low_signal_quality() {
561        let mut policy = S3FifoPolicy::new(S3FifoConfig {
562            min_ghost_hits_in_window: 2,
563            fallback_window: 4,
564            ..config()
565        });
566
567        for idx in 0..4 {
568            let key = format!("cold-{idx}");
569            let _ = policy.access("ext-a", key);
570        }
571
572        assert_eq!(
573            policy.telemetry().fallback_reason,
574            Some(S3FifoFallbackReason::SignalQualityInsufficient)
575        );
576
577        let bypass = policy.access("ext-a", "late-key".to_string());
578        assert_eq!(bypass.kind, S3FifoDecisionKind::FallbackBypass);
579        assert_eq!(bypass.tier, S3FifoTier::Fallback);
580    }
581
582    #[test]
583    fn fallback_triggers_on_rejection_spike() {
584        let mut policy = S3FifoPolicy::new(S3FifoConfig {
585            max_entries_per_owner: 1,
586            fallback_window: 3,
587            min_ghost_hits_in_window: 0,
588            max_budget_rejections_in_window: 1,
589            ..config()
590        });
591
592        let _ = policy.access("ext-a", "k1".to_string());
593        let _ = policy.access("ext-a", "k2".to_string());
594        let _ = policy.access("ext-a", "k3".to_string());
595
596        assert_eq!(
597            policy.telemetry().fallback_reason,
598            Some(S3FifoFallbackReason::FairnessInstability)
599        );
600    }
601
602    #[test]
603    fn fallback_latches_until_clear_and_reason_stays_stable() {
604        let mut policy = S3FifoPolicy::new(S3FifoConfig {
605            min_ghost_hits_in_window: 3,
606            fallback_window: 3,
607            ..config()
608        });
609
610        // Trigger low-signal fallback (no ghost hits across a full window).
611        let _ = policy.access("ext-a", "k1".to_string());
612        let _ = policy.access("ext-a", "k2".to_string());
613        let _ = policy.access("ext-a", "k3".to_string());
614
615        let expected_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
616        assert_eq!(policy.telemetry().fallback_reason, expected_reason);
617
618        // Once fallback is active, every decision should remain a bypass with the same reason
619        // until `clear_fallback` is explicitly invoked.
620        for key in ["k4", "k5", "k6"] {
621            let decision = policy.access("ext-b", key.to_string());
622            assert_eq!(decision.kind, S3FifoDecisionKind::FallbackBypass);
623            assert_eq!(decision.tier, S3FifoTier::Fallback);
624            assert_eq!(decision.fallback_reason, expected_reason);
625            assert_eq!(policy.telemetry().fallback_reason, expected_reason);
626        }
627
628        policy.clear_fallback();
629        assert_eq!(policy.telemetry().fallback_reason, None);
630
631        let post_clear = policy.access("ext-b", "k7".to_string());
632        assert_ne!(post_clear.kind, S3FifoDecisionKind::FallbackBypass);
633    }
634
635    #[test]
636    fn fairness_fallback_reports_same_reason_during_repeated_bypass() {
637        let mut policy = S3FifoPolicy::new(S3FifoConfig {
638            max_entries_per_owner: 1,
639            fallback_window: 3,
640            min_ghost_hits_in_window: 0,
641            max_budget_rejections_in_window: 1,
642            ..config()
643        });
644
645        // Trigger fairness-instability fallback via repeated budget rejections.
646        let _ = policy.access("ext-a", "k1".to_string());
647        let _ = policy.access("ext-a", "k2".to_string());
648        let _ = policy.access("ext-a", "k3".to_string());
649
650        let expected_reason = Some(S3FifoFallbackReason::FairnessInstability);
651        assert_eq!(policy.telemetry().fallback_reason, expected_reason);
652
653        for key in ["k4", "k5"] {
654            let decision = policy.access("ext-c", key.to_string());
655            assert_eq!(decision.kind, S3FifoDecisionKind::FallbackBypass);
656            assert_eq!(decision.fallback_reason, expected_reason);
657            assert_eq!(policy.telemetry().fallback_reason, expected_reason);
658        }
659    }
660
661    #[test]
662    fn fallback_reason_transitions_low_signal_to_fairness_after_clear() {
663        let mut policy = S3FifoPolicy::new(S3FifoConfig {
664            live_capacity: 2,
665            small_capacity: 1,
666            ghost_capacity: 4,
667            max_entries_per_owner: 1,
668            fallback_window: 3,
669            min_ghost_hits_in_window: 1,
670            max_budget_rejections_in_window: 1,
671        });
672
673        // First epoch: no ghost hits across a full window -> low-signal fallback.
674        let _ = policy.access("ext-a", "ls-live-1".to_string());
675        let _ = policy.access("ext-b", "ls-ghost-seed".to_string());
676        let _ = policy.access("ext-a", "ls-live-2".to_string());
677
678        let low_signal_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
679        assert_eq!(policy.telemetry().fallback_reason, low_signal_reason);
680        let low_signal_bypass = policy.access("ext-z", "ls-bypass".to_string());
681        assert_eq!(low_signal_bypass.kind, S3FifoDecisionKind::FallbackBypass);
682        assert_eq!(low_signal_bypass.fallback_reason, low_signal_reason);
683
684        policy.clear_fallback();
685        assert_eq!(policy.telemetry().fallback_reason, None);
686
687        // Second epoch: one ghost-hit rejection + two direct rejections -> fairness fallback.
688        let first = policy.access("ext-a", "ls-live-1".to_string());
689        assert_eq!(first.kind, S3FifoDecisionKind::RejectFairnessBudget);
690        assert!(first.ghost_hit);
691        let _ = policy.access("ext-a", "fair-rej-1".to_string());
692        let _ = policy.access("ext-a", "fair-rej-2".to_string());
693
694        let fairness_reason = Some(S3FifoFallbackReason::FairnessInstability);
695        assert_eq!(policy.telemetry().fallback_reason, fairness_reason);
696        let fairness_bypass = policy.access("ext-z", "fair-bypass".to_string());
697        assert_eq!(fairness_bypass.kind, S3FifoDecisionKind::FallbackBypass);
698        assert_eq!(fairness_bypass.fallback_reason, fairness_reason);
699    }
700
701    #[test]
702    fn fallback_reason_transitions_fairness_to_low_signal_after_clear() {
703        let mut policy = S3FifoPolicy::new(S3FifoConfig {
704            live_capacity: 2,
705            small_capacity: 1,
706            ghost_capacity: 4,
707            max_entries_per_owner: 1,
708            fallback_window: 3,
709            min_ghost_hits_in_window: 1,
710            max_budget_rejections_in_window: 0,
711        });
712
713        // First epoch: produce one ghost-hit budget rejection -> fairness fallback.
714        let _ = policy.access("ext-b", "ff-ghost-seed".to_string());
715        let _ = policy.access("ext-a", "ff-live".to_string());
716        let trigger = policy.access("ext-a", "ff-ghost-seed".to_string());
717        assert_eq!(trigger.kind, S3FifoDecisionKind::RejectFairnessBudget);
718        assert!(trigger.ghost_hit);
719
720        let fairness_reason = Some(S3FifoFallbackReason::FairnessInstability);
721        assert_eq!(policy.telemetry().fallback_reason, fairness_reason);
722        let fairness_bypass = policy.access("ext-z", "ff-bypass".to_string());
723        assert_eq!(fairness_bypass.kind, S3FifoDecisionKind::FallbackBypass);
724        assert_eq!(fairness_bypass.fallback_reason, fairness_reason);
725
726        policy.clear_fallback();
727        assert_eq!(policy.telemetry().fallback_reason, None);
728
729        // Second epoch: no ghost hits across window -> low-signal fallback.
730        let _ = policy.access("ext-c", "ff-low-1".to_string());
731        let _ = policy.access("ext-d", "ff-low-2".to_string());
732        let _ = policy.access("ext-e", "ff-low-3".to_string());
733
734        let low_signal_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
735        assert_eq!(policy.telemetry().fallback_reason, low_signal_reason);
736        let low_signal_bypass = policy.access("ext-z", "ff-low-bypass".to_string());
737        assert_eq!(low_signal_bypass.kind, S3FifoDecisionKind::FallbackBypass);
738        assert_eq!(low_signal_bypass.fallback_reason, low_signal_reason);
739    }
740
741    #[test]
742    fn single_window_clear_cycles_preserve_reason_precedence_and_bypass_counters() {
743        let mut policy = S3FifoPolicy::new(S3FifoConfig {
744            live_capacity: 4,
745            small_capacity: 2,
746            ghost_capacity: 4,
747            max_entries_per_owner: 1,
748            fallback_window: 1,
749            min_ghost_hits_in_window: 1,
750            max_budget_rejections_in_window: 0,
751        });
752
753        // Epoch 1: one low-signal admission should trip fallback immediately.
754        let first = policy.access("ext-a", "cold-1".to_string());
755        assert_eq!(first.kind, S3FifoDecisionKind::AdmitSmall);
756        let low_signal_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
757        assert_eq!(policy.telemetry().fallback_reason, low_signal_reason);
758
759        // Bypass while latched should not mutate accumulated counters.
760        let low_baseline = policy.telemetry();
761        let low_bypass = policy.access("ext-a", "low-bypass".to_string());
762        assert_eq!(low_bypass.kind, S3FifoDecisionKind::FallbackBypass);
763        let low_after = policy.telemetry();
764        assert_eq!(low_after.ghost_hits_total, low_baseline.ghost_hits_total);
765        assert_eq!(
766            low_after.budget_rejections_total,
767            low_baseline.budget_rejections_total
768        );
769
770        policy.clear_fallback();
771        assert_eq!(policy.telemetry().fallback_reason, None);
772
773        // Epoch 2: force a one-event ghost-hit budget rejection and verify fairness reason wins.
774        policy.push_ghost("ghost-hot".to_string());
775        policy
776            .owner_live_counts
777            .insert("ext-a".to_string(), policy.config().max_entries_per_owner);
778
779        let fairness_trigger = policy.access("ext-a", "ghost-hot".to_string());
780        assert_eq!(
781            fairness_trigger.kind,
782            S3FifoDecisionKind::RejectFairnessBudget
783        );
784        assert!(fairness_trigger.ghost_hit);
785
786        let fairness_reason = Some(S3FifoFallbackReason::FairnessInstability);
787        assert_eq!(policy.telemetry().fallback_reason, fairness_reason);
788
789        // Bypass while fairness-latched should also keep counters stable.
790        let fairness_baseline = policy.telemetry();
791        let fairness_bypass = policy.access("ext-a", "fair-bypass".to_string());
792        assert_eq!(fairness_bypass.kind, S3FifoDecisionKind::FallbackBypass);
793        let fairness_after = policy.telemetry();
794        assert_eq!(
795            fairness_after.ghost_hits_total,
796            fairness_baseline.ghost_hits_total
797        );
798        assert_eq!(
799            fairness_after.budget_rejections_total,
800            fairness_baseline.budget_rejections_total
801        );
802    }
803
804    // ── Additional public API coverage ──
805
806    #[test]
807    fn config_clamps_minimums_and_ceilings() {
808        let tiny = S3FifoConfig {
809            live_capacity: 0,
810            small_capacity: 0,
811            ghost_capacity: 0,
812            max_entries_per_owner: 0,
813            fallback_window: 0,
814            min_ghost_hits_in_window: 100,
815            max_budget_rejections_in_window: 100,
816        };
817        let policy = S3FifoPolicy::<String>::new(tiny);
818        let cfg = policy.config();
819        assert!(cfg.live_capacity >= 2, "live_capacity min is 2");
820        assert!(cfg.small_capacity >= 1, "small_capacity min is 1");
821        assert!(cfg.ghost_capacity >= 1, "ghost_capacity min is 1");
822        assert!(cfg.max_entries_per_owner >= 1, "per-owner min is 1");
823        assert!(cfg.fallback_window >= 1, "fallback_window min is 1");
824        // min_ghost_hits_in_window clamped to fallback_window
825        assert!(cfg.min_ghost_hits_in_window <= cfg.fallback_window);
826        // max_budget_rejections_in_window clamped to fallback_window
827        assert!(cfg.max_budget_rejections_in_window <= cfg.fallback_window);
828    }
829
830    #[test]
831    fn live_depth_reflects_small_plus_main() {
832        let mut policy = S3FifoPolicy::new(config());
833        assert_eq!(policy.live_depth(), 0);
834
835        // Admit to small
836        policy.access("ext-a", "k1".to_string());
837        assert_eq!(policy.live_depth(), 1);
838
839        // Admit another to small
840        policy.access("ext-b", "k2".to_string());
841        assert_eq!(policy.live_depth(), 2);
842
843        // Promote k1 to main
844        policy.access("ext-a", "k1".to_string());
845        // k1 moved from small to main, live_depth unchanged
846        assert_eq!(policy.live_depth(), 2);
847        assert_no_duplicates(&policy);
848    }
849
850    #[test]
851    fn telemetry_counters_accumulate_correctly() {
852        let mut policy = S3FifoPolicy::new(S3FifoConfig {
853            small_capacity: 1,
854            max_entries_per_owner: 3,
855            ..config()
856        });
857
858        // 1 admission
859        policy.access("ext-a", "k1".to_string());
860        let tel = policy.telemetry();
861        assert_eq!(tel.admissions_total, 1);
862        assert_eq!(tel.promotions_total, 0);
863        assert_eq!(tel.ghost_hits_total, 0);
864        assert_eq!(tel.budget_rejections_total, 0);
865        assert_eq!(tel.small_depth, 1);
866        assert_eq!(tel.main_depth, 0);
867
868        // Promote k1 small → main
869        policy.access("ext-a", "k1".to_string());
870        let tel = policy.telemetry();
871        assert_eq!(tel.promotions_total, 1);
872        assert_eq!(tel.small_depth, 0);
873        assert_eq!(tel.main_depth, 1);
874
875        // Admit k2 → small, evicts to ghost since small_capacity=1
876        policy.access("ext-a", "k2".to_string());
877        policy.access("ext-a", "k3".to_string());
878        // k2 was evicted from small to ghost when k3 entered
879        let tel = policy.telemetry();
880        assert!(tel.ghost_depth >= 1, "evicted key should be in ghost");
881    }
882
883    #[test]
884    fn telemetry_owner_live_counts_track_per_owner() {
885        let mut policy = S3FifoPolicy::new(config());
886        policy.access("ext-a", "k1".to_string());
887        policy.access("ext-b", "k2".to_string());
888        let tel = policy.telemetry();
889        assert_eq!(tel.owner_live_counts.get("ext-a"), Some(&1));
890        assert_eq!(tel.owner_live_counts.get("ext-b"), Some(&1));
891    }
892
893    #[test]
894    fn hit_main_reorders_without_changing_depth() {
895        // Use a large fallback_window so fallback doesn't trigger during setup
896        let mut policy = S3FifoPolicy::new(S3FifoConfig {
897            fallback_window: 32,
898            min_ghost_hits_in_window: 0,
899            ..config()
900        });
901        // Admit and promote both keys
902        policy.access("ext-a", "k1".to_string());
903        policy.access("ext-a", "k1".to_string()); // promote k1 to main
904        policy.access("ext-b", "k2".to_string());
905        policy.access("ext-b", "k2".to_string()); // promote k2 to main
906
907        let before = policy.telemetry();
908        let depth_before = before.main_depth;
909
910        // HitMain on k1 — should just reorder, no depth change
911        let decision = policy.access("ext-a", "k1".to_string());
912        assert_eq!(decision.kind, S3FifoDecisionKind::HitMain);
913        assert_eq!(policy.telemetry().main_depth, depth_before);
914        assert_no_duplicates(&policy);
915    }
916
917    #[test]
918    fn ghost_queue_evicts_oldest_when_at_capacity() {
919        let mut policy = S3FifoPolicy::new(S3FifoConfig {
920            ghost_capacity: 2,
921            small_capacity: 1,
922            ..config()
923        });
924
925        // Fill small with different keys; each evicts the previous to ghost
926        policy.access("ext-a", "k1".to_string()); // small: [k1]
927        policy.access("ext-b", "k2".to_string()); // small: [k2], ghost: [k1]
928        policy.access("ext-a", "k3".to_string()); // small: [k3], ghost: [k1, k2]
929        assert_eq!(policy.telemetry().ghost_depth, 2);
930
931        // One more eviction exceeds ghost_capacity=2 → k1 should be dropped
932        policy.access("ext-b", "k4".to_string()); // small: [k4], ghost: [k2, k3]
933        assert_eq!(policy.telemetry().ghost_depth, 2);
934        assert_no_duplicates(&policy);
935    }
936
937    #[test]
938    fn push_ghost_reinsertion_updates_recency_without_duplicates() {
939        let mut policy = S3FifoPolicy::new(S3FifoConfig {
940            ghost_capacity: 2,
941            ..config()
942        });
943
944        policy.push_ghost("k1".to_string());
945        policy.push_ghost("k2".to_string());
946        assert_eq!(
947            policy.ghost_order.values().cloned().collect::<Vec<_>>(),
948            vec!["k1".to_string(), "k2".to_string()]
949        );
950        assert_eq!(policy.ghost_lookup.len(), 2);
951
952        // Re-inserting an existing ghost key should move it to the newest slot,
953        // not duplicate it.
954        policy.push_ghost("k1".to_string());
955        assert_eq!(
956            policy.ghost_order.values().cloned().collect::<Vec<_>>(),
957            vec!["k2".to_string(), "k1".to_string()]
958        );
959        assert_eq!(policy.ghost_lookup.len(), 2);
960
961        // Capacity enforcement still applies after recency updates.
962        policy.push_ghost("k3".to_string());
963        assert_eq!(
964            policy.ghost_order.values().cloned().collect::<Vec<_>>(),
965            vec!["k1".to_string(), "k3".to_string()]
966        );
967        assert_eq!(policy.ghost_lookup.len(), 2);
968    }
969
970    #[test]
971    fn capacity_enforcement_evicts_to_stay_within_live_capacity() {
972        let mut policy = S3FifoPolicy::new(S3FifoConfig {
973            live_capacity: 3,
974            small_capacity: 2,
975            ghost_capacity: 8,
976            max_entries_per_owner: 10,
977            // Prevent fallback from triggering during the test
978            fallback_window: 32,
979            min_ghost_hits_in_window: 0,
980            max_budget_rejections_in_window: 32,
981        });
982
983        // Fill beyond live_capacity — each key from a different owner
984        for i in 0..6 {
985            policy.access(&format!("ext-{i}"), format!("k{i}"));
986        }
987
988        // live_depth must not exceed live_capacity
989        assert!(policy.live_depth() <= 3, "live_depth must respect capacity");
990        // Evicted keys should be in ghost
991        assert!(policy.telemetry().ghost_depth >= 3);
992        assert_no_duplicates(&policy);
993    }
994
995    #[test]
996    fn default_config_has_sensible_values() {
997        let cfg = S3FifoConfig::default();
998        assert_eq!(cfg.live_capacity, 256);
999        assert_eq!(cfg.small_capacity, 64);
1000        assert_eq!(cfg.ghost_capacity, 512);
1001        assert_eq!(cfg.max_entries_per_owner, 64);
1002        assert_eq!(cfg.fallback_window, 32);
1003        assert_eq!(cfg.min_ghost_hits_in_window, 2);
1004        assert_eq!(cfg.max_budget_rejections_in_window, 12);
1005    }
1006
1007    #[test]
1008    fn decision_fields_reflect_current_state() {
1009        let mut policy = S3FifoPolicy::new(config());
1010        let d1 = policy.access("ext-a", "k1".to_string());
1011        assert_eq!(d1.kind, S3FifoDecisionKind::AdmitSmall);
1012        assert_eq!(d1.tier, S3FifoTier::Small);
1013        assert!(!d1.ghost_hit);
1014        assert!(d1.fallback_reason.is_none());
1015        assert_eq!(d1.live_depth, 1);
1016        assert_eq!(d1.small_depth, 1);
1017        assert_eq!(d1.main_depth, 0);
1018    }
1019
1020    #[test]
1021    fn clear_fallback_resets_policy_gate() {
1022        let mut policy = S3FifoPolicy::new(S3FifoConfig {
1023            min_ghost_hits_in_window: 3,
1024            fallback_window: 3,
1025            ..config()
1026        });
1027
1028        let _ = policy.access("ext-a", "k1".to_string());
1029        let _ = policy.access("ext-a", "k2".to_string());
1030        let _ = policy.access("ext-a", "k3".to_string());
1031        assert!(policy.telemetry().fallback_reason.is_some());
1032
1033        policy.clear_fallback();
1034        assert!(policy.telemetry().fallback_reason.is_none());
1035
1036        let decision = policy.access("ext-a", "k4".to_string());
1037        assert_ne!(decision.kind, S3FifoDecisionKind::FallbackBypass);
1038    }
1039
1040    #[test]
1041    fn clear_fallback_clears_signal_window_and_preserves_counters() {
1042        let mut policy = S3FifoPolicy::new(S3FifoConfig {
1043            max_entries_per_owner: 1,
1044            fallback_window: 3,
1045            min_ghost_hits_in_window: 0,
1046            max_budget_rejections_in_window: 1,
1047            ..config()
1048        });
1049
1050        // Trigger fairness fallback from repeated budget rejections.
1051        let _ = policy.access("ext-a", "k1".to_string());
1052        let _ = policy.access("ext-a", "k2".to_string());
1053        let _ = policy.access("ext-a", "k3".to_string());
1054        assert_eq!(
1055            policy.telemetry().fallback_reason,
1056            Some(S3FifoFallbackReason::FairnessInstability)
1057        );
1058        assert_eq!(policy.recent_signals.len(), 3);
1059
1060        let before_clear = policy.telemetry();
1061        policy.clear_fallback();
1062
1063        assert_eq!(policy.telemetry().fallback_reason, None);
1064        assert!(
1065            policy.recent_signals.is_empty(),
1066            "clear_fallback should reset signal history"
1067        );
1068
1069        let after_clear = policy.telemetry();
1070        assert_eq!(after_clear.admissions_total, before_clear.admissions_total);
1071        assert_eq!(after_clear.promotions_total, before_clear.promotions_total);
1072        assert_eq!(after_clear.ghost_hits_total, before_clear.ghost_hits_total);
1073        assert_eq!(
1074            after_clear.budget_rejections_total,
1075            before_clear.budget_rejections_total
1076        );
1077
1078        // Normal decisions should resume immediately after reset.
1079        let admitted = policy.access("ext-b", "post-clear-admit".to_string());
1080        assert_eq!(admitted.kind, S3FifoDecisionKind::AdmitSmall);
1081        assert_eq!(policy.telemetry().fallback_reason, None);
1082        assert_eq!(policy.recent_signals.len(), 1);
1083
1084        // Fallback can be re-triggered deterministically as a new window fills.
1085        let reject_1 = policy.access("ext-a", "post-clear-reject-1".to_string());
1086        assert_eq!(reject_1.kind, S3FifoDecisionKind::RejectFairnessBudget);
1087        assert_eq!(policy.telemetry().fallback_reason, None);
1088
1089        let reject_2 = policy.access("ext-a", "post-clear-reject-2".to_string());
1090        assert_eq!(reject_2.kind, S3FifoDecisionKind::RejectFairnessBudget);
1091        assert_eq!(
1092            policy.telemetry().fallback_reason,
1093            Some(S3FifoFallbackReason::FairnessInstability)
1094        );
1095    }
1096
1097    // ── Property tests ──
1098
1099    mod proptest_s3fifo {
1100        use super::*;
1101        use proptest::prelude::*;
1102
1103        fn arb_access() -> impl Strategy<Value = (String, String)> {
1104            let owner = prop::sample::select(vec![
1105                "ext-a".to_string(),
1106                "ext-b".to_string(),
1107                "ext-c".to_string(),
1108            ]);
1109            let key = prop::sample::select(vec![
1110                "k0".to_string(),
1111                "k1".to_string(),
1112                "k2".to_string(),
1113                "k3".to_string(),
1114                "k4".to_string(),
1115                "k5".to_string(),
1116                "k6".to_string(),
1117                "k7".to_string(),
1118            ]);
1119            (owner, key)
1120        }
1121
1122        fn arb_config() -> impl Strategy<Value = S3FifoConfig> {
1123            (2..32usize, 1..16usize, 1..64usize, 1..8usize, 2..16usize).prop_map(
1124                |(live, small, ghost, per_owner, window)| S3FifoConfig {
1125                    live_capacity: live,
1126                    small_capacity: small,
1127                    ghost_capacity: ghost,
1128                    max_entries_per_owner: per_owner,
1129                    fallback_window: window,
1130                    min_ghost_hits_in_window: 0,
1131                    max_budget_rejections_in_window: window,
1132                },
1133            )
1134        }
1135
1136        proptest! {
1137            #[test]
1138            fn queues_always_disjoint_after_access_sequence(
1139                cfg in arb_config(),
1140                accesses in prop::collection::vec(arb_access(), 1..50),
1141            ) {
1142                let mut policy = S3FifoPolicy::new(cfg);
1143                for (owner, key) in &accesses {
1144                    let _ = policy.access(owner, key.clone());
1145                }
1146
1147                let small: BTreeSet<_> = policy.small.iter().cloned().collect();
1148                let main: BTreeSet<_> = policy.main.iter().cloned().collect();
1149                let ghost: BTreeSet<_> = policy.ghost_lookup.keys().cloned().collect();
1150
1151                assert!(small.is_disjoint(&main), "small and main must be disjoint");
1152                assert!(small.is_disjoint(&ghost), "small and ghost must be disjoint");
1153                assert!(main.is_disjoint(&ghost), "main and ghost must be disjoint");
1154            }
1155
1156            #[test]
1157            fn live_depth_never_exceeds_capacity(
1158                cfg in arb_config(),
1159                accesses in prop::collection::vec(arb_access(), 1..50),
1160            ) {
1161                let mut policy = S3FifoPolicy::new(cfg);
1162                for (owner, key) in &accesses {
1163                    let _ = policy.access(owner, key.clone());
1164                }
1165                let effective_cap = policy.config().live_capacity;
1166                assert!(
1167                    policy.live_depth() <= effective_cap,
1168                    "live_depth {} exceeded capacity {}",
1169                    policy.live_depth(),
1170                    effective_cap,
1171                );
1172            }
1173
1174            #[test]
1175            fn ghost_depth_never_exceeds_capacity(
1176                cfg in arb_config(),
1177                accesses in prop::collection::vec(arb_access(), 1..50),
1178            ) {
1179                let mut policy = S3FifoPolicy::new(cfg);
1180                for (owner, key) in &accesses {
1181                    let _ = policy.access(owner, key.clone());
1182                }
1183                let ghost_cap = policy.config().ghost_capacity;
1184                assert!(
1185                    policy.telemetry().ghost_depth <= ghost_cap,
1186                    "ghost_depth {} exceeded capacity {}",
1187                    policy.telemetry().ghost_depth,
1188                    ghost_cap,
1189                );
1190            }
1191
1192            #[test]
1193            fn live_depth_equals_small_plus_main(
1194                cfg in arb_config(),
1195                accesses in prop::collection::vec(arb_access(), 1..50),
1196            ) {
1197                let mut policy = S3FifoPolicy::new(cfg);
1198                for (owner, key) in &accesses {
1199                    let decision = policy.access(owner, key.clone());
1200                    assert_eq!(
1201                        decision.live_depth,
1202                        decision.small_depth + decision.main_depth,
1203                        "live_depth must equal small + main at every step"
1204                    );
1205                }
1206            }
1207
1208            #[test]
1209            fn counters_monotonically_nondecreasing(
1210                cfg in arb_config(),
1211                accesses in prop::collection::vec(arb_access(), 1..50),
1212            ) {
1213                let mut policy = S3FifoPolicy::new(cfg);
1214                let mut prev_admissions = 0u64;
1215                let mut prev_promotions = 0u64;
1216                let mut prev_ghost_hits = 0u64;
1217                let mut prev_rejections = 0u64;
1218
1219                for (owner, key) in &accesses {
1220                    let _ = policy.access(owner, key.clone());
1221                    let tel = policy.telemetry();
1222                    assert!(tel.admissions_total >= prev_admissions);
1223                    assert!(tel.promotions_total >= prev_promotions);
1224                    assert!(tel.ghost_hits_total >= prev_ghost_hits);
1225                    assert!(tel.budget_rejections_total >= prev_rejections);
1226                    prev_admissions = tel.admissions_total;
1227                    prev_promotions = tel.promotions_total;
1228                    prev_ghost_hits = tel.ghost_hits_total;
1229                    prev_rejections = tel.budget_rejections_total;
1230                }
1231            }
1232
1233            #[test]
1234            fn owner_counts_match_live_keys(
1235                cfg in arb_config(),
1236                accesses in prop::collection::vec(arb_access(), 1..50),
1237            ) {
1238                let mut policy = S3FifoPolicy::new(cfg);
1239                for (owner, key) in &accesses {
1240                    let _ = policy.access(owner, key.clone());
1241                }
1242
1243                // Verify owner counts by manually counting live_owners
1244                let mut expected: BTreeMap<String, usize> = BTreeMap::new();
1245                for owner_val in policy.live_owners.values() {
1246                    *expected.entry(owner_val.clone()).or_insert(0) += 1;
1247                }
1248                assert_eq!(
1249                    policy.owner_live_counts, expected,
1250                    "owner_live_counts must match actual live key distribution"
1251                );
1252            }
1253
1254            #[test]
1255            fn live_tier_map_consistent_with_queues(
1256                cfg in arb_config(),
1257                accesses in prop::collection::vec(arb_access(), 1..50),
1258            ) {
1259                let mut policy = S3FifoPolicy::new(cfg);
1260                for (owner, key) in &accesses {
1261                    let _ = policy.access(owner, key.clone());
1262                }
1263
1264                // Every key in small must be in live_tiers as Small
1265                for key in &policy.small {
1266                    assert_eq!(
1267                        policy.live_tiers.get(key),
1268                        Some(&LiveTier::Small),
1269                        "key in small queue must be tracked as Small"
1270                    );
1271                }
1272                // Every key in main must be in live_tiers as Main
1273                for key in &policy.main {
1274                    assert_eq!(
1275                        policy.live_tiers.get(key),
1276                        Some(&LiveTier::Main),
1277                        "key in main queue must be tracked as Main"
1278                    );
1279                }
1280                // live_tiers count must match small + main
1281                assert_eq!(
1282                    policy.live_tiers.len(),
1283                    policy.small.len() + policy.main.len(),
1284                );
1285            }
1286        }
1287    }
1288}