1#[cfg(test)]
14use std::collections::BTreeSet;
15use std::collections::{BTreeMap, VecDeque};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum S3FifoFallbackReason {
20 SignalQualityInsufficient,
21 FairnessInstability,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum S3FifoTier {
27 Small,
28 Main,
29 Ghost,
30 Fallback,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum S3FifoDecisionKind {
36 AdmitSmall,
37 PromoteSmallToMain,
38 HitMain,
39 AdmitFromGhost,
40 RejectFairnessBudget,
41 FallbackBypass,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub struct S3FifoDecision {
47 pub kind: S3FifoDecisionKind,
48 pub tier: S3FifoTier,
49 pub ghost_hit: bool,
50 pub fallback_reason: Option<S3FifoFallbackReason>,
51 pub live_depth: usize,
52 pub small_depth: usize,
53 pub main_depth: usize,
54 pub ghost_depth: usize,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub struct S3FifoConfig {
60 pub live_capacity: usize,
61 pub small_capacity: usize,
62 pub ghost_capacity: usize,
63 pub max_entries_per_owner: usize,
64 pub fallback_window: usize,
65 pub min_ghost_hits_in_window: usize,
66 pub max_budget_rejections_in_window: usize,
67}
68
69impl Default for S3FifoConfig {
70 fn default() -> Self {
71 Self {
72 live_capacity: 256,
73 small_capacity: 64,
74 ghost_capacity: 512,
75 max_entries_per_owner: 64,
76 fallback_window: 32,
77 min_ghost_hits_in_window: 2,
78 max_budget_rejections_in_window: 12,
79 }
80 }
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84enum LiveTier {
85 Small,
86 Main,
87}
88
89#[derive(Debug, Clone, Copy)]
90struct DecisionSignal {
91 ghost_hit: bool,
92 budget_rejected: bool,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct S3FifoTelemetry {
98 pub fallback_reason: Option<S3FifoFallbackReason>,
99 pub small_depth: usize,
100 pub main_depth: usize,
101 pub ghost_depth: usize,
102 pub live_depth: usize,
103 pub ghost_hits_total: u64,
104 pub admissions_total: u64,
105 pub promotions_total: u64,
106 pub budget_rejections_total: u64,
107 pub owner_live_counts: BTreeMap<String, usize>,
108}
109
110#[derive(Debug, Clone)]
112pub struct S3FifoPolicy<K: Ord + Clone> {
113 cfg: S3FifoConfig,
114 small: VecDeque<K>,
115 main: VecDeque<K>,
116 ghost_order: BTreeMap<u64, K>,
117 ghost_lookup: BTreeMap<K, u64>,
118 ghost_gen: u64,
119 live_tiers: BTreeMap<K, LiveTier>,
120 live_owners: BTreeMap<K, String>,
121 owner_live_counts: BTreeMap<String, usize>,
122 recent_signals: VecDeque<DecisionSignal>,
123 fallback_reason: Option<S3FifoFallbackReason>,
124 ghost_hits_total: u64,
125 admissions_total: u64,
126 promotions_total: u64,
127 budget_rejections_total: u64,
128}
129
130impl<K: Ord + Clone> S3FifoPolicy<K> {
131 #[must_use]
132 pub fn new(config: S3FifoConfig) -> Self {
133 let live_capacity = config.live_capacity.max(2);
134 let small_cap_floor = live_capacity.saturating_sub(1).max(1);
135 let small_capacity = config.small_capacity.max(1).min(small_cap_floor);
136 let ghost_capacity = config.ghost_capacity.max(1);
137 let max_entries_per_owner = config.max_entries_per_owner.max(1);
138 let fallback_window = config.fallback_window.max(1);
139
140 Self {
141 cfg: S3FifoConfig {
142 live_capacity,
143 small_capacity,
144 ghost_capacity,
145 max_entries_per_owner,
146 fallback_window,
147 min_ghost_hits_in_window: config.min_ghost_hits_in_window.min(fallback_window),
148 max_budget_rejections_in_window: config
149 .max_budget_rejections_in_window
150 .min(fallback_window),
151 },
152 small: VecDeque::new(),
153 main: VecDeque::new(),
154 ghost_order: BTreeMap::new(),
155 ghost_lookup: BTreeMap::new(),
156 ghost_gen: 0,
157 live_tiers: BTreeMap::new(),
158 live_owners: BTreeMap::new(),
159 owner_live_counts: BTreeMap::new(),
160 recent_signals: VecDeque::new(),
161 fallback_reason: None,
162 ghost_hits_total: 0,
163 admissions_total: 0,
164 promotions_total: 0,
165 budget_rejections_total: 0,
166 }
167 }
168
169 #[must_use]
170 pub const fn config(&self) -> S3FifoConfig {
171 self.cfg
172 }
173
174 #[must_use]
175 pub fn telemetry(&self) -> S3FifoTelemetry {
176 S3FifoTelemetry {
177 fallback_reason: self.fallback_reason,
178 small_depth: self.small.len(),
179 main_depth: self.main.len(),
180 ghost_depth: self.ghost_lookup.len(),
181 live_depth: self.live_depth(),
182 ghost_hits_total: self.ghost_hits_total,
183 admissions_total: self.admissions_total,
184 promotions_total: self.promotions_total,
185 budget_rejections_total: self.budget_rejections_total,
186 owner_live_counts: self.owner_live_counts.clone(),
187 }
188 }
189
190 #[must_use]
191 pub fn live_depth(&self) -> usize {
192 self.small.len().saturating_add(self.main.len())
193 }
194
195 pub fn clear_fallback(&mut self) {
196 self.fallback_reason = None;
197 self.recent_signals.clear();
198 }
199
200 pub fn access(&mut self, owner: &str, key: K) -> S3FifoDecision {
201 if let Some(reason) = self.fallback_reason {
202 return self.decision(
203 S3FifoDecisionKind::FallbackBypass,
204 S3FifoTier::Fallback,
205 false,
206 Some(reason),
207 );
208 }
209
210 let mut ghost_hit = false;
211 let kind = if matches!(self.live_tiers.get(&key), Some(LiveTier::Main)) {
212 self.touch_main(&key);
213 S3FifoDecisionKind::HitMain
214 } else if matches!(self.live_tiers.get(&key), Some(LiveTier::Small)) {
215 self.promote_small_to_main(&key);
216 self.promotions_total = self.promotions_total.saturating_add(1);
217 S3FifoDecisionKind::PromoteSmallToMain
218 } else if self.ghost_lookup.contains_key(&key) {
219 ghost_hit = true;
220 self.ghost_hits_total = self.ghost_hits_total.saturating_add(1);
221 if self.owner_at_budget(owner) {
222 self.budget_rejections_total = self.budget_rejections_total.saturating_add(1);
223 S3FifoDecisionKind::RejectFairnessBudget
224 } else {
225 self.admit_from_ghost(owner, key);
226 self.admissions_total = self.admissions_total.saturating_add(1);
227 S3FifoDecisionKind::AdmitFromGhost
228 }
229 } else if self.owner_at_budget(owner) {
230 self.budget_rejections_total = self.budget_rejections_total.saturating_add(1);
231 S3FifoDecisionKind::RejectFairnessBudget
232 } else {
233 self.admit_small(owner, key);
234 self.admissions_total = self.admissions_total.saturating_add(1);
235 S3FifoDecisionKind::AdmitSmall
236 };
237
238 let signal = DecisionSignal {
239 ghost_hit,
240 budget_rejected: kind == S3FifoDecisionKind::RejectFairnessBudget,
241 };
242 self.record_signal(signal);
243 self.evaluate_fallback();
244
245 let tier = Self::resolve_tier(kind, ghost_hit);
246 self.decision(kind, tier, ghost_hit, self.fallback_reason)
247 }
248
249 const fn resolve_tier(kind: S3FifoDecisionKind, ghost_hit: bool) -> S3FifoTier {
250 match kind {
251 S3FifoDecisionKind::HitMain
252 | S3FifoDecisionKind::PromoteSmallToMain
253 | S3FifoDecisionKind::AdmitFromGhost => S3FifoTier::Main,
254 S3FifoDecisionKind::AdmitSmall => S3FifoTier::Small,
255 S3FifoDecisionKind::RejectFairnessBudget => {
256 if ghost_hit {
257 S3FifoTier::Ghost
258 } else {
259 S3FifoTier::Small
260 }
261 }
262 S3FifoDecisionKind::FallbackBypass => S3FifoTier::Fallback,
263 }
264 }
265
266 fn decision(
267 &self,
268 kind: S3FifoDecisionKind,
269 tier: S3FifoTier,
270 ghost_hit: bool,
271 fallback_reason: Option<S3FifoFallbackReason>,
272 ) -> S3FifoDecision {
273 S3FifoDecision {
274 kind,
275 tier,
276 ghost_hit,
277 fallback_reason,
278 live_depth: self.live_depth(),
279 small_depth: self.small.len(),
280 main_depth: self.main.len(),
281 ghost_depth: self.ghost_lookup.len(),
282 }
283 }
284
285 fn owner_at_budget(&self, owner: &str) -> bool {
286 self.owner_live_counts.get(owner).copied().unwrap_or(0) >= self.cfg.max_entries_per_owner
287 }
288
289 const fn main_capacity(&self) -> usize {
290 self.cfg
291 .live_capacity
292 .saturating_sub(self.cfg.small_capacity)
293 }
294
295 fn admit_small(&mut self, owner: &str, key: K) {
296 self.purge_key(&key);
297 self.small.push_back(key.clone());
298 self.live_tiers.insert(key.clone(), LiveTier::Small);
299 self.live_owners.insert(key, owner.to_string());
300 self.increment_owner(owner);
301 self.enforce_small_capacity();
302 self.enforce_live_capacity();
303 }
304
305 fn admit_from_ghost(&mut self, owner: &str, key: K) {
306 self.remove_ghost(&key);
307 self.main.push_back(key.clone());
308 self.live_tiers.insert(key.clone(), LiveTier::Main);
309 self.live_owners.insert(key, owner.to_string());
310 self.increment_owner(owner);
311 self.enforce_main_capacity();
312 self.enforce_live_capacity();
313 }
314
315 fn promote_small_to_main(&mut self, key: &K) {
316 remove_from_queue(&mut self.small, key);
317 self.main.push_back(key.clone());
318 self.live_tiers.insert(key.clone(), LiveTier::Main);
319 self.enforce_main_capacity();
320 self.enforce_live_capacity();
321 }
322
323 fn touch_main(&mut self, key: &K) {
324 remove_from_queue(&mut self.main, key);
325 self.main.push_back(key.clone());
326 }
327
328 fn enforce_small_capacity(&mut self) {
329 while self.small.len() > self.cfg.small_capacity {
330 self.evict_small_front_to_ghost();
331 }
332 }
333
334 fn enforce_main_capacity(&mut self) {
335 while self.main.len() > self.main_capacity() {
336 self.evict_main_front_to_ghost();
337 }
338 }
339
340 fn enforce_live_capacity(&mut self) {
341 while self.live_depth() > self.cfg.live_capacity {
342 if self.small.is_empty() {
343 self.evict_main_front_to_ghost();
344 } else {
345 self.evict_small_front_to_ghost();
346 }
347 }
348 }
349
350 fn evict_small_front_to_ghost(&mut self) {
351 if let Some(key) = self.small.pop_front() {
352 self.live_tiers.remove(&key);
353 self.remove_owner_for_key(&key);
354 self.push_ghost(key);
355 }
356 }
357
358 fn evict_main_front_to_ghost(&mut self) {
359 if let Some(key) = self.main.pop_front() {
360 self.live_tiers.remove(&key);
361 self.remove_owner_for_key(&key);
362 self.push_ghost(key);
363 }
364 }
365
366 fn purge_key(&mut self, key: &K) {
367 self.remove_live_key(key);
368 self.remove_ghost(key);
369 }
370
371 fn remove_live_key(&mut self, key: &K) {
372 if let Some(tier) = self.live_tiers.remove(key) {
373 match tier {
374 LiveTier::Small => remove_from_queue(&mut self.small, key),
375 LiveTier::Main => remove_from_queue(&mut self.main, key),
376 }
377 self.remove_owner_for_key(key);
378 }
379 }
380
381 fn remove_owner_for_key(&mut self, key: &K) {
382 if let Some(owner) = self.live_owners.remove(key) {
383 self.decrement_owner(&owner);
384 }
385 }
386
387 fn push_ghost(&mut self, key: K) {
388 if self.ghost_gen == u64::MAX {
390 self.ghost_gen = 0;
391 self.ghost_order.clear();
392 self.ghost_lookup.clear();
393 }
394
395 self.ghost_gen = self.ghost_gen.saturating_add(1);
396
397 if let Some(gen_mut) = self.ghost_lookup.get_mut(&key) {
398 let old_gen = *gen_mut;
399 *gen_mut = self.ghost_gen;
400 if let Some(k_reused) = self.ghost_order.remove(&old_gen) {
401 self.ghost_order.insert(self.ghost_gen, k_reused);
402 }
403 } else {
404 self.ghost_lookup.insert(key.clone(), self.ghost_gen);
405 self.ghost_order.insert(self.ghost_gen, key);
406 }
407
408 while self.ghost_lookup.len() > self.cfg.ghost_capacity {
409 if let Some((_, popped_key)) = self.ghost_order.pop_first() {
410 self.ghost_lookup.remove(&popped_key);
411 } else {
412 break;
413 }
414 }
415 }
416
417 fn remove_ghost(&mut self, key: &K) {
418 if let Some(generation) = self.ghost_lookup.remove(key) {
419 self.ghost_order.remove(&generation);
420 }
421 }
422
423 fn increment_owner(&mut self, owner: &str) {
424 if let Some(count) = self.owner_live_counts.get_mut(owner) {
425 *count = count.saturating_add(1);
426 } else {
427 self.owner_live_counts.insert(owner.to_string(), 1);
428 }
429 }
430
431 fn decrement_owner(&mut self, owner: &str) {
432 let should_remove = self.owner_live_counts.get_mut(owner).is_some_and(|count| {
433 if *count > 1 {
434 *count -= 1;
435 false
436 } else {
437 true
438 }
439 });
440
441 if should_remove {
442 self.owner_live_counts.remove(owner);
443 }
444 }
445
446 fn record_signal(&mut self, signal: DecisionSignal) {
447 self.recent_signals.push_back(signal);
448 while self.recent_signals.len() > self.cfg.fallback_window {
449 self.recent_signals.pop_front();
450 }
451 }
452
453 fn evaluate_fallback(&mut self) {
454 if self.fallback_reason.is_some() || self.recent_signals.len() < self.cfg.fallback_window {
455 return;
456 }
457
458 let mut ghost_hits = 0usize;
459 let mut budget_rejections = 0usize;
460 for signal in &self.recent_signals {
461 if signal.ghost_hit {
462 ghost_hits = ghost_hits.saturating_add(1);
463 }
464 if signal.budget_rejected {
465 budget_rejections = budget_rejections.saturating_add(1);
466 }
467 }
468
469 if ghost_hits < self.cfg.min_ghost_hits_in_window {
470 self.fallback_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
471 } else if budget_rejections > self.cfg.max_budget_rejections_in_window {
472 self.fallback_reason = Some(S3FifoFallbackReason::FairnessInstability);
473 }
474 }
475}
476
477fn remove_from_queue<K: Ord>(queue: &mut VecDeque<K>, key: &K) {
478 if let Some(index) = queue.iter().position(|existing| existing == key) {
479 queue.remove(index);
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486
487 fn config() -> S3FifoConfig {
488 S3FifoConfig {
489 live_capacity: 4,
490 small_capacity: 2,
491 ghost_capacity: 4,
492 max_entries_per_owner: 2,
493 fallback_window: 4,
494 min_ghost_hits_in_window: 1,
495 max_budget_rejections_in_window: 2,
496 }
497 }
498
499 fn assert_no_duplicates(policy: &S3FifoPolicy<String>) {
500 let small: BTreeSet<_> = policy.small.iter().cloned().collect();
501 let main: BTreeSet<_> = policy.main.iter().cloned().collect();
502 let ghost: BTreeSet<_> = policy.ghost_lookup.keys().cloned().collect();
503
504 assert!(small.is_disjoint(&main));
505 assert!(small.is_disjoint(&ghost));
506 assert!(main.is_disjoint(&ghost));
507 assert_eq!(small.len() + main.len(), policy.live_tiers.len());
508 }
509
510 #[test]
511 fn small_hit_promotes_to_main() {
512 let mut policy = S3FifoPolicy::new(config());
513 let first = policy.access("ext-a", "k1".to_string());
514 assert_eq!(first.kind, S3FifoDecisionKind::AdmitSmall);
515
516 let second = policy.access("ext-a", "k1".to_string());
517 assert_eq!(second.kind, S3FifoDecisionKind::PromoteSmallToMain);
518 assert_eq!(second.tier, S3FifoTier::Main);
519 assert_eq!(second.main_depth, 1);
520 assert_eq!(second.small_depth, 0);
521 assert_no_duplicates(&policy);
522 }
523
524 #[test]
525 fn ghost_hit_reenters_live_set() {
526 let mut policy = S3FifoPolicy::new(S3FifoConfig {
527 small_capacity: 1,
528 ..config()
529 });
530
531 policy.access("ext-a", "k1".to_string());
532 policy.access("ext-a", "k2".to_string());
533 let decision = policy.access("ext-a", "k1".to_string());
534
535 assert_eq!(decision.kind, S3FifoDecisionKind::AdmitFromGhost);
536 assert!(decision.ghost_hit);
537 assert_eq!(decision.tier, S3FifoTier::Main);
538 assert_eq!(policy.telemetry().ghost_hits_total, 1);
539 assert_no_duplicates(&policy);
540 }
541
542 #[test]
543 fn fairness_budget_rejects_owner_overflow() {
544 let mut policy = S3FifoPolicy::new(S3FifoConfig {
545 max_entries_per_owner: 1,
546 ..config()
547 });
548
549 let admitted = policy.access("ext-a", "k1".to_string());
550 let rejected = policy.access("ext-a", "k2".to_string());
551
552 assert_eq!(admitted.kind, S3FifoDecisionKind::AdmitSmall);
553 assert_eq!(rejected.kind, S3FifoDecisionKind::RejectFairnessBudget);
554 assert_eq!(policy.live_depth(), 1);
555 assert_eq!(policy.telemetry().budget_rejections_total, 1);
556 assert_no_duplicates(&policy);
557 }
558
559 #[test]
560 fn fallback_triggers_on_low_signal_quality() {
561 let mut policy = S3FifoPolicy::new(S3FifoConfig {
562 min_ghost_hits_in_window: 2,
563 fallback_window: 4,
564 ..config()
565 });
566
567 for idx in 0..4 {
568 let key = format!("cold-{idx}");
569 let _ = policy.access("ext-a", key);
570 }
571
572 assert_eq!(
573 policy.telemetry().fallback_reason,
574 Some(S3FifoFallbackReason::SignalQualityInsufficient)
575 );
576
577 let bypass = policy.access("ext-a", "late-key".to_string());
578 assert_eq!(bypass.kind, S3FifoDecisionKind::FallbackBypass);
579 assert_eq!(bypass.tier, S3FifoTier::Fallback);
580 }
581
582 #[test]
583 fn fallback_triggers_on_rejection_spike() {
584 let mut policy = S3FifoPolicy::new(S3FifoConfig {
585 max_entries_per_owner: 1,
586 fallback_window: 3,
587 min_ghost_hits_in_window: 0,
588 max_budget_rejections_in_window: 1,
589 ..config()
590 });
591
592 let _ = policy.access("ext-a", "k1".to_string());
593 let _ = policy.access("ext-a", "k2".to_string());
594 let _ = policy.access("ext-a", "k3".to_string());
595
596 assert_eq!(
597 policy.telemetry().fallback_reason,
598 Some(S3FifoFallbackReason::FairnessInstability)
599 );
600 }
601
602 #[test]
603 fn fallback_latches_until_clear_and_reason_stays_stable() {
604 let mut policy = S3FifoPolicy::new(S3FifoConfig {
605 min_ghost_hits_in_window: 3,
606 fallback_window: 3,
607 ..config()
608 });
609
610 let _ = policy.access("ext-a", "k1".to_string());
612 let _ = policy.access("ext-a", "k2".to_string());
613 let _ = policy.access("ext-a", "k3".to_string());
614
615 let expected_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
616 assert_eq!(policy.telemetry().fallback_reason, expected_reason);
617
618 for key in ["k4", "k5", "k6"] {
621 let decision = policy.access("ext-b", key.to_string());
622 assert_eq!(decision.kind, S3FifoDecisionKind::FallbackBypass);
623 assert_eq!(decision.tier, S3FifoTier::Fallback);
624 assert_eq!(decision.fallback_reason, expected_reason);
625 assert_eq!(policy.telemetry().fallback_reason, expected_reason);
626 }
627
628 policy.clear_fallback();
629 assert_eq!(policy.telemetry().fallback_reason, None);
630
631 let post_clear = policy.access("ext-b", "k7".to_string());
632 assert_ne!(post_clear.kind, S3FifoDecisionKind::FallbackBypass);
633 }
634
635 #[test]
636 fn fairness_fallback_reports_same_reason_during_repeated_bypass() {
637 let mut policy = S3FifoPolicy::new(S3FifoConfig {
638 max_entries_per_owner: 1,
639 fallback_window: 3,
640 min_ghost_hits_in_window: 0,
641 max_budget_rejections_in_window: 1,
642 ..config()
643 });
644
645 let _ = policy.access("ext-a", "k1".to_string());
647 let _ = policy.access("ext-a", "k2".to_string());
648 let _ = policy.access("ext-a", "k3".to_string());
649
650 let expected_reason = Some(S3FifoFallbackReason::FairnessInstability);
651 assert_eq!(policy.telemetry().fallback_reason, expected_reason);
652
653 for key in ["k4", "k5"] {
654 let decision = policy.access("ext-c", key.to_string());
655 assert_eq!(decision.kind, S3FifoDecisionKind::FallbackBypass);
656 assert_eq!(decision.fallback_reason, expected_reason);
657 assert_eq!(policy.telemetry().fallback_reason, expected_reason);
658 }
659 }
660
661 #[test]
662 fn fallback_reason_transitions_low_signal_to_fairness_after_clear() {
663 let mut policy = S3FifoPolicy::new(S3FifoConfig {
664 live_capacity: 2,
665 small_capacity: 1,
666 ghost_capacity: 4,
667 max_entries_per_owner: 1,
668 fallback_window: 3,
669 min_ghost_hits_in_window: 1,
670 max_budget_rejections_in_window: 1,
671 });
672
673 let _ = policy.access("ext-a", "ls-live-1".to_string());
675 let _ = policy.access("ext-b", "ls-ghost-seed".to_string());
676 let _ = policy.access("ext-a", "ls-live-2".to_string());
677
678 let low_signal_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
679 assert_eq!(policy.telemetry().fallback_reason, low_signal_reason);
680 let low_signal_bypass = policy.access("ext-z", "ls-bypass".to_string());
681 assert_eq!(low_signal_bypass.kind, S3FifoDecisionKind::FallbackBypass);
682 assert_eq!(low_signal_bypass.fallback_reason, low_signal_reason);
683
684 policy.clear_fallback();
685 assert_eq!(policy.telemetry().fallback_reason, None);
686
687 let first = policy.access("ext-a", "ls-live-1".to_string());
689 assert_eq!(first.kind, S3FifoDecisionKind::RejectFairnessBudget);
690 assert!(first.ghost_hit);
691 let _ = policy.access("ext-a", "fair-rej-1".to_string());
692 let _ = policy.access("ext-a", "fair-rej-2".to_string());
693
694 let fairness_reason = Some(S3FifoFallbackReason::FairnessInstability);
695 assert_eq!(policy.telemetry().fallback_reason, fairness_reason);
696 let fairness_bypass = policy.access("ext-z", "fair-bypass".to_string());
697 assert_eq!(fairness_bypass.kind, S3FifoDecisionKind::FallbackBypass);
698 assert_eq!(fairness_bypass.fallback_reason, fairness_reason);
699 }
700
701 #[test]
702 fn fallback_reason_transitions_fairness_to_low_signal_after_clear() {
703 let mut policy = S3FifoPolicy::new(S3FifoConfig {
704 live_capacity: 2,
705 small_capacity: 1,
706 ghost_capacity: 4,
707 max_entries_per_owner: 1,
708 fallback_window: 3,
709 min_ghost_hits_in_window: 1,
710 max_budget_rejections_in_window: 0,
711 });
712
713 let _ = policy.access("ext-b", "ff-ghost-seed".to_string());
715 let _ = policy.access("ext-a", "ff-live".to_string());
716 let trigger = policy.access("ext-a", "ff-ghost-seed".to_string());
717 assert_eq!(trigger.kind, S3FifoDecisionKind::RejectFairnessBudget);
718 assert!(trigger.ghost_hit);
719
720 let fairness_reason = Some(S3FifoFallbackReason::FairnessInstability);
721 assert_eq!(policy.telemetry().fallback_reason, fairness_reason);
722 let fairness_bypass = policy.access("ext-z", "ff-bypass".to_string());
723 assert_eq!(fairness_bypass.kind, S3FifoDecisionKind::FallbackBypass);
724 assert_eq!(fairness_bypass.fallback_reason, fairness_reason);
725
726 policy.clear_fallback();
727 assert_eq!(policy.telemetry().fallback_reason, None);
728
729 let _ = policy.access("ext-c", "ff-low-1".to_string());
731 let _ = policy.access("ext-d", "ff-low-2".to_string());
732 let _ = policy.access("ext-e", "ff-low-3".to_string());
733
734 let low_signal_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
735 assert_eq!(policy.telemetry().fallback_reason, low_signal_reason);
736 let low_signal_bypass = policy.access("ext-z", "ff-low-bypass".to_string());
737 assert_eq!(low_signal_bypass.kind, S3FifoDecisionKind::FallbackBypass);
738 assert_eq!(low_signal_bypass.fallback_reason, low_signal_reason);
739 }
740
741 #[test]
742 fn single_window_clear_cycles_preserve_reason_precedence_and_bypass_counters() {
743 let mut policy = S3FifoPolicy::new(S3FifoConfig {
744 live_capacity: 4,
745 small_capacity: 2,
746 ghost_capacity: 4,
747 max_entries_per_owner: 1,
748 fallback_window: 1,
749 min_ghost_hits_in_window: 1,
750 max_budget_rejections_in_window: 0,
751 });
752
753 let first = policy.access("ext-a", "cold-1".to_string());
755 assert_eq!(first.kind, S3FifoDecisionKind::AdmitSmall);
756 let low_signal_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
757 assert_eq!(policy.telemetry().fallback_reason, low_signal_reason);
758
759 let low_baseline = policy.telemetry();
761 let low_bypass = policy.access("ext-a", "low-bypass".to_string());
762 assert_eq!(low_bypass.kind, S3FifoDecisionKind::FallbackBypass);
763 let low_after = policy.telemetry();
764 assert_eq!(low_after.ghost_hits_total, low_baseline.ghost_hits_total);
765 assert_eq!(
766 low_after.budget_rejections_total,
767 low_baseline.budget_rejections_total
768 );
769
770 policy.clear_fallback();
771 assert_eq!(policy.telemetry().fallback_reason, None);
772
773 policy.push_ghost("ghost-hot".to_string());
775 policy
776 .owner_live_counts
777 .insert("ext-a".to_string(), policy.config().max_entries_per_owner);
778
779 let fairness_trigger = policy.access("ext-a", "ghost-hot".to_string());
780 assert_eq!(
781 fairness_trigger.kind,
782 S3FifoDecisionKind::RejectFairnessBudget
783 );
784 assert!(fairness_trigger.ghost_hit);
785
786 let fairness_reason = Some(S3FifoFallbackReason::FairnessInstability);
787 assert_eq!(policy.telemetry().fallback_reason, fairness_reason);
788
789 let fairness_baseline = policy.telemetry();
791 let fairness_bypass = policy.access("ext-a", "fair-bypass".to_string());
792 assert_eq!(fairness_bypass.kind, S3FifoDecisionKind::FallbackBypass);
793 let fairness_after = policy.telemetry();
794 assert_eq!(
795 fairness_after.ghost_hits_total,
796 fairness_baseline.ghost_hits_total
797 );
798 assert_eq!(
799 fairness_after.budget_rejections_total,
800 fairness_baseline.budget_rejections_total
801 );
802 }
803
804 #[test]
807 fn config_clamps_minimums_and_ceilings() {
808 let tiny = S3FifoConfig {
809 live_capacity: 0,
810 small_capacity: 0,
811 ghost_capacity: 0,
812 max_entries_per_owner: 0,
813 fallback_window: 0,
814 min_ghost_hits_in_window: 100,
815 max_budget_rejections_in_window: 100,
816 };
817 let policy = S3FifoPolicy::<String>::new(tiny);
818 let cfg = policy.config();
819 assert!(cfg.live_capacity >= 2, "live_capacity min is 2");
820 assert!(cfg.small_capacity >= 1, "small_capacity min is 1");
821 assert!(cfg.ghost_capacity >= 1, "ghost_capacity min is 1");
822 assert!(cfg.max_entries_per_owner >= 1, "per-owner min is 1");
823 assert!(cfg.fallback_window >= 1, "fallback_window min is 1");
824 assert!(cfg.min_ghost_hits_in_window <= cfg.fallback_window);
826 assert!(cfg.max_budget_rejections_in_window <= cfg.fallback_window);
828 }
829
830 #[test]
831 fn live_depth_reflects_small_plus_main() {
832 let mut policy = S3FifoPolicy::new(config());
833 assert_eq!(policy.live_depth(), 0);
834
835 policy.access("ext-a", "k1".to_string());
837 assert_eq!(policy.live_depth(), 1);
838
839 policy.access("ext-b", "k2".to_string());
841 assert_eq!(policy.live_depth(), 2);
842
843 policy.access("ext-a", "k1".to_string());
845 assert_eq!(policy.live_depth(), 2);
847 assert_no_duplicates(&policy);
848 }
849
850 #[test]
851 fn telemetry_counters_accumulate_correctly() {
852 let mut policy = S3FifoPolicy::new(S3FifoConfig {
853 small_capacity: 1,
854 max_entries_per_owner: 3,
855 ..config()
856 });
857
858 policy.access("ext-a", "k1".to_string());
860 let tel = policy.telemetry();
861 assert_eq!(tel.admissions_total, 1);
862 assert_eq!(tel.promotions_total, 0);
863 assert_eq!(tel.ghost_hits_total, 0);
864 assert_eq!(tel.budget_rejections_total, 0);
865 assert_eq!(tel.small_depth, 1);
866 assert_eq!(tel.main_depth, 0);
867
868 policy.access("ext-a", "k1".to_string());
870 let tel = policy.telemetry();
871 assert_eq!(tel.promotions_total, 1);
872 assert_eq!(tel.small_depth, 0);
873 assert_eq!(tel.main_depth, 1);
874
875 policy.access("ext-a", "k2".to_string());
877 policy.access("ext-a", "k3".to_string());
878 let tel = policy.telemetry();
880 assert!(tel.ghost_depth >= 1, "evicted key should be in ghost");
881 }
882
883 #[test]
884 fn telemetry_owner_live_counts_track_per_owner() {
885 let mut policy = S3FifoPolicy::new(config());
886 policy.access("ext-a", "k1".to_string());
887 policy.access("ext-b", "k2".to_string());
888 let tel = policy.telemetry();
889 assert_eq!(tel.owner_live_counts.get("ext-a"), Some(&1));
890 assert_eq!(tel.owner_live_counts.get("ext-b"), Some(&1));
891 }
892
893 #[test]
894 fn hit_main_reorders_without_changing_depth() {
895 let mut policy = S3FifoPolicy::new(S3FifoConfig {
897 fallback_window: 32,
898 min_ghost_hits_in_window: 0,
899 ..config()
900 });
901 policy.access("ext-a", "k1".to_string());
903 policy.access("ext-a", "k1".to_string()); policy.access("ext-b", "k2".to_string());
905 policy.access("ext-b", "k2".to_string()); let before = policy.telemetry();
908 let depth_before = before.main_depth;
909
910 let decision = policy.access("ext-a", "k1".to_string());
912 assert_eq!(decision.kind, S3FifoDecisionKind::HitMain);
913 assert_eq!(policy.telemetry().main_depth, depth_before);
914 assert_no_duplicates(&policy);
915 }
916
917 #[test]
918 fn ghost_queue_evicts_oldest_when_at_capacity() {
919 let mut policy = S3FifoPolicy::new(S3FifoConfig {
920 ghost_capacity: 2,
921 small_capacity: 1,
922 ..config()
923 });
924
925 policy.access("ext-a", "k1".to_string()); policy.access("ext-b", "k2".to_string()); policy.access("ext-a", "k3".to_string()); assert_eq!(policy.telemetry().ghost_depth, 2);
930
931 policy.access("ext-b", "k4".to_string()); assert_eq!(policy.telemetry().ghost_depth, 2);
934 assert_no_duplicates(&policy);
935 }
936
937 #[test]
938 fn push_ghost_reinsertion_updates_recency_without_duplicates() {
939 let mut policy = S3FifoPolicy::new(S3FifoConfig {
940 ghost_capacity: 2,
941 ..config()
942 });
943
944 policy.push_ghost("k1".to_string());
945 policy.push_ghost("k2".to_string());
946 assert_eq!(
947 policy.ghost_order.values().cloned().collect::<Vec<_>>(),
948 vec!["k1".to_string(), "k2".to_string()]
949 );
950 assert_eq!(policy.ghost_lookup.len(), 2);
951
952 policy.push_ghost("k1".to_string());
955 assert_eq!(
956 policy.ghost_order.values().cloned().collect::<Vec<_>>(),
957 vec!["k2".to_string(), "k1".to_string()]
958 );
959 assert_eq!(policy.ghost_lookup.len(), 2);
960
961 policy.push_ghost("k3".to_string());
963 assert_eq!(
964 policy.ghost_order.values().cloned().collect::<Vec<_>>(),
965 vec!["k1".to_string(), "k3".to_string()]
966 );
967 assert_eq!(policy.ghost_lookup.len(), 2);
968 }
969
970 #[test]
971 fn capacity_enforcement_evicts_to_stay_within_live_capacity() {
972 let mut policy = S3FifoPolicy::new(S3FifoConfig {
973 live_capacity: 3,
974 small_capacity: 2,
975 ghost_capacity: 8,
976 max_entries_per_owner: 10,
977 fallback_window: 32,
979 min_ghost_hits_in_window: 0,
980 max_budget_rejections_in_window: 32,
981 });
982
983 for i in 0..6 {
985 policy.access(&format!("ext-{i}"), format!("k{i}"));
986 }
987
988 assert!(policy.live_depth() <= 3, "live_depth must respect capacity");
990 assert!(policy.telemetry().ghost_depth >= 3);
992 assert_no_duplicates(&policy);
993 }
994
995 #[test]
996 fn default_config_has_sensible_values() {
997 let cfg = S3FifoConfig::default();
998 assert_eq!(cfg.live_capacity, 256);
999 assert_eq!(cfg.small_capacity, 64);
1000 assert_eq!(cfg.ghost_capacity, 512);
1001 assert_eq!(cfg.max_entries_per_owner, 64);
1002 assert_eq!(cfg.fallback_window, 32);
1003 assert_eq!(cfg.min_ghost_hits_in_window, 2);
1004 assert_eq!(cfg.max_budget_rejections_in_window, 12);
1005 }
1006
1007 #[test]
1008 fn decision_fields_reflect_current_state() {
1009 let mut policy = S3FifoPolicy::new(config());
1010 let d1 = policy.access("ext-a", "k1".to_string());
1011 assert_eq!(d1.kind, S3FifoDecisionKind::AdmitSmall);
1012 assert_eq!(d1.tier, S3FifoTier::Small);
1013 assert!(!d1.ghost_hit);
1014 assert!(d1.fallback_reason.is_none());
1015 assert_eq!(d1.live_depth, 1);
1016 assert_eq!(d1.small_depth, 1);
1017 assert_eq!(d1.main_depth, 0);
1018 }
1019
1020 #[test]
1021 fn clear_fallback_resets_policy_gate() {
1022 let mut policy = S3FifoPolicy::new(S3FifoConfig {
1023 min_ghost_hits_in_window: 3,
1024 fallback_window: 3,
1025 ..config()
1026 });
1027
1028 let _ = policy.access("ext-a", "k1".to_string());
1029 let _ = policy.access("ext-a", "k2".to_string());
1030 let _ = policy.access("ext-a", "k3".to_string());
1031 assert!(policy.telemetry().fallback_reason.is_some());
1032
1033 policy.clear_fallback();
1034 assert!(policy.telemetry().fallback_reason.is_none());
1035
1036 let decision = policy.access("ext-a", "k4".to_string());
1037 assert_ne!(decision.kind, S3FifoDecisionKind::FallbackBypass);
1038 }
1039
1040 #[test]
1041 fn clear_fallback_clears_signal_window_and_preserves_counters() {
1042 let mut policy = S3FifoPolicy::new(S3FifoConfig {
1043 max_entries_per_owner: 1,
1044 fallback_window: 3,
1045 min_ghost_hits_in_window: 0,
1046 max_budget_rejections_in_window: 1,
1047 ..config()
1048 });
1049
1050 let _ = policy.access("ext-a", "k1".to_string());
1052 let _ = policy.access("ext-a", "k2".to_string());
1053 let _ = policy.access("ext-a", "k3".to_string());
1054 assert_eq!(
1055 policy.telemetry().fallback_reason,
1056 Some(S3FifoFallbackReason::FairnessInstability)
1057 );
1058 assert_eq!(policy.recent_signals.len(), 3);
1059
1060 let before_clear = policy.telemetry();
1061 policy.clear_fallback();
1062
1063 assert_eq!(policy.telemetry().fallback_reason, None);
1064 assert!(
1065 policy.recent_signals.is_empty(),
1066 "clear_fallback should reset signal history"
1067 );
1068
1069 let after_clear = policy.telemetry();
1070 assert_eq!(after_clear.admissions_total, before_clear.admissions_total);
1071 assert_eq!(after_clear.promotions_total, before_clear.promotions_total);
1072 assert_eq!(after_clear.ghost_hits_total, before_clear.ghost_hits_total);
1073 assert_eq!(
1074 after_clear.budget_rejections_total,
1075 before_clear.budget_rejections_total
1076 );
1077
1078 let admitted = policy.access("ext-b", "post-clear-admit".to_string());
1080 assert_eq!(admitted.kind, S3FifoDecisionKind::AdmitSmall);
1081 assert_eq!(policy.telemetry().fallback_reason, None);
1082 assert_eq!(policy.recent_signals.len(), 1);
1083
1084 let reject_1 = policy.access("ext-a", "post-clear-reject-1".to_string());
1086 assert_eq!(reject_1.kind, S3FifoDecisionKind::RejectFairnessBudget);
1087 assert_eq!(policy.telemetry().fallback_reason, None);
1088
1089 let reject_2 = policy.access("ext-a", "post-clear-reject-2".to_string());
1090 assert_eq!(reject_2.kind, S3FifoDecisionKind::RejectFairnessBudget);
1091 assert_eq!(
1092 policy.telemetry().fallback_reason,
1093 Some(S3FifoFallbackReason::FairnessInstability)
1094 );
1095 }
1096
1097 mod proptest_s3fifo {
1100 use super::*;
1101 use proptest::prelude::*;
1102
1103 fn arb_access() -> impl Strategy<Value = (String, String)> {
1104 let owner = prop::sample::select(vec![
1105 "ext-a".to_string(),
1106 "ext-b".to_string(),
1107 "ext-c".to_string(),
1108 ]);
1109 let key = prop::sample::select(vec![
1110 "k0".to_string(),
1111 "k1".to_string(),
1112 "k2".to_string(),
1113 "k3".to_string(),
1114 "k4".to_string(),
1115 "k5".to_string(),
1116 "k6".to_string(),
1117 "k7".to_string(),
1118 ]);
1119 (owner, key)
1120 }
1121
1122 fn arb_config() -> impl Strategy<Value = S3FifoConfig> {
1123 (2..32usize, 1..16usize, 1..64usize, 1..8usize, 2..16usize).prop_map(
1124 |(live, small, ghost, per_owner, window)| S3FifoConfig {
1125 live_capacity: live,
1126 small_capacity: small,
1127 ghost_capacity: ghost,
1128 max_entries_per_owner: per_owner,
1129 fallback_window: window,
1130 min_ghost_hits_in_window: 0,
1131 max_budget_rejections_in_window: window,
1132 },
1133 )
1134 }
1135
1136 proptest! {
1137 #[test]
1138 fn queues_always_disjoint_after_access_sequence(
1139 cfg in arb_config(),
1140 accesses in prop::collection::vec(arb_access(), 1..50),
1141 ) {
1142 let mut policy = S3FifoPolicy::new(cfg);
1143 for (owner, key) in &accesses {
1144 let _ = policy.access(owner, key.clone());
1145 }
1146
1147 let small: BTreeSet<_> = policy.small.iter().cloned().collect();
1148 let main: BTreeSet<_> = policy.main.iter().cloned().collect();
1149 let ghost: BTreeSet<_> = policy.ghost_lookup.keys().cloned().collect();
1150
1151 assert!(small.is_disjoint(&main), "small and main must be disjoint");
1152 assert!(small.is_disjoint(&ghost), "small and ghost must be disjoint");
1153 assert!(main.is_disjoint(&ghost), "main and ghost must be disjoint");
1154 }
1155
1156 #[test]
1157 fn live_depth_never_exceeds_capacity(
1158 cfg in arb_config(),
1159 accesses in prop::collection::vec(arb_access(), 1..50),
1160 ) {
1161 let mut policy = S3FifoPolicy::new(cfg);
1162 for (owner, key) in &accesses {
1163 let _ = policy.access(owner, key.clone());
1164 }
1165 let effective_cap = policy.config().live_capacity;
1166 assert!(
1167 policy.live_depth() <= effective_cap,
1168 "live_depth {} exceeded capacity {}",
1169 policy.live_depth(),
1170 effective_cap,
1171 );
1172 }
1173
1174 #[test]
1175 fn ghost_depth_never_exceeds_capacity(
1176 cfg in arb_config(),
1177 accesses in prop::collection::vec(arb_access(), 1..50),
1178 ) {
1179 let mut policy = S3FifoPolicy::new(cfg);
1180 for (owner, key) in &accesses {
1181 let _ = policy.access(owner, key.clone());
1182 }
1183 let ghost_cap = policy.config().ghost_capacity;
1184 assert!(
1185 policy.telemetry().ghost_depth <= ghost_cap,
1186 "ghost_depth {} exceeded capacity {}",
1187 policy.telemetry().ghost_depth,
1188 ghost_cap,
1189 );
1190 }
1191
1192 #[test]
1193 fn live_depth_equals_small_plus_main(
1194 cfg in arb_config(),
1195 accesses in prop::collection::vec(arb_access(), 1..50),
1196 ) {
1197 let mut policy = S3FifoPolicy::new(cfg);
1198 for (owner, key) in &accesses {
1199 let decision = policy.access(owner, key.clone());
1200 assert_eq!(
1201 decision.live_depth,
1202 decision.small_depth + decision.main_depth,
1203 "live_depth must equal small + main at every step"
1204 );
1205 }
1206 }
1207
1208 #[test]
1209 fn counters_monotonically_nondecreasing(
1210 cfg in arb_config(),
1211 accesses in prop::collection::vec(arb_access(), 1..50),
1212 ) {
1213 let mut policy = S3FifoPolicy::new(cfg);
1214 let mut prev_admissions = 0u64;
1215 let mut prev_promotions = 0u64;
1216 let mut prev_ghost_hits = 0u64;
1217 let mut prev_rejections = 0u64;
1218
1219 for (owner, key) in &accesses {
1220 let _ = policy.access(owner, key.clone());
1221 let tel = policy.telemetry();
1222 assert!(tel.admissions_total >= prev_admissions);
1223 assert!(tel.promotions_total >= prev_promotions);
1224 assert!(tel.ghost_hits_total >= prev_ghost_hits);
1225 assert!(tel.budget_rejections_total >= prev_rejections);
1226 prev_admissions = tel.admissions_total;
1227 prev_promotions = tel.promotions_total;
1228 prev_ghost_hits = tel.ghost_hits_total;
1229 prev_rejections = tel.budget_rejections_total;
1230 }
1231 }
1232
1233 #[test]
1234 fn owner_counts_match_live_keys(
1235 cfg in arb_config(),
1236 accesses in prop::collection::vec(arb_access(), 1..50),
1237 ) {
1238 let mut policy = S3FifoPolicy::new(cfg);
1239 for (owner, key) in &accesses {
1240 let _ = policy.access(owner, key.clone());
1241 }
1242
1243 let mut expected: BTreeMap<String, usize> = BTreeMap::new();
1245 for owner_val in policy.live_owners.values() {
1246 *expected.entry(owner_val.clone()).or_insert(0) += 1;
1247 }
1248 assert_eq!(
1249 policy.owner_live_counts, expected,
1250 "owner_live_counts must match actual live key distribution"
1251 );
1252 }
1253
1254 #[test]
1255 fn live_tier_map_consistent_with_queues(
1256 cfg in arb_config(),
1257 accesses in prop::collection::vec(arb_access(), 1..50),
1258 ) {
1259 let mut policy = S3FifoPolicy::new(cfg);
1260 for (owner, key) in &accesses {
1261 let _ = policy.access(owner, key.clone());
1262 }
1263
1264 for key in &policy.small {
1266 assert_eq!(
1267 policy.live_tiers.get(key),
1268 Some(&LiveTier::Small),
1269 "key in small queue must be tracked as Small"
1270 );
1271 }
1272 for key in &policy.main {
1274 assert_eq!(
1275 policy.live_tiers.get(key),
1276 Some(&LiveTier::Main),
1277 "key in main queue must be tracked as Main"
1278 );
1279 }
1280 assert_eq!(
1282 policy.live_tiers.len(),
1283 policy.small.len() + policy.main.len(),
1284 );
1285 }
1286 }
1287 }
1288}