1#[cfg(test)]
14use std::collections::BTreeSet;
15use std::collections::{BTreeMap, VecDeque};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum S3FifoFallbackReason {
20 SignalQualityInsufficient,
21 FairnessInstability,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum S3FifoTier {
27 Small,
28 Main,
29 Ghost,
30 Fallback,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum S3FifoDecisionKind {
36 AdmitSmall,
37 PromoteSmallToMain,
38 HitMain,
39 AdmitFromGhost,
40 RejectFairnessBudget,
41 FallbackBypass,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub struct S3FifoDecision {
47 pub kind: S3FifoDecisionKind,
48 pub tier: S3FifoTier,
49 pub ghost_hit: bool,
50 pub fallback_reason: Option<S3FifoFallbackReason>,
51 pub live_depth: usize,
52 pub small_depth: usize,
53 pub main_depth: usize,
54 pub ghost_depth: usize,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub struct S3FifoConfig {
60 pub live_capacity: usize,
61 pub small_capacity: usize,
62 pub ghost_capacity: usize,
63 pub max_entries_per_owner: usize,
64 pub fallback_window: usize,
65 pub min_ghost_hits_in_window: usize,
66 pub max_budget_rejections_in_window: usize,
67}
68
69impl Default for S3FifoConfig {
70 fn default() -> Self {
71 Self {
72 live_capacity: 256,
73 small_capacity: 64,
74 ghost_capacity: 512,
75 max_entries_per_owner: 64,
76 fallback_window: 32,
77 min_ghost_hits_in_window: 2,
78 max_budget_rejections_in_window: 12,
79 }
80 }
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84enum LiveTier {
85 Small,
86 Main,
87}
88
89#[derive(Debug, Clone, Copy)]
90struct DecisionSignal {
91 ghost_hit: bool,
92 budget_rejected: bool,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct S3FifoTelemetry {
98 pub fallback_reason: Option<S3FifoFallbackReason>,
99 pub small_depth: usize,
100 pub main_depth: usize,
101 pub ghost_depth: usize,
102 pub live_depth: usize,
103 pub ghost_hits_total: u64,
104 pub admissions_total: u64,
105 pub promotions_total: u64,
106 pub budget_rejections_total: u64,
107 pub owner_live_counts: BTreeMap<String, usize>,
108}
109
110#[derive(Debug, Clone)]
112pub struct S3FifoPolicy<K: Ord + Clone> {
113 cfg: S3FifoConfig,
114 small: VecDeque<K>,
115 main: VecDeque<K>,
116 ghost_order: BTreeMap<u64, K>,
117 ghost_lookup: BTreeMap<K, u64>,
118 ghost_gen: u64,
119 live_tiers: BTreeMap<K, LiveTier>,
120 live_owners: BTreeMap<K, String>,
121 owner_live_counts: BTreeMap<String, usize>,
122 recent_signals: VecDeque<DecisionSignal>,
123 fallback_reason: Option<S3FifoFallbackReason>,
124 ghost_hits_total: u64,
125 admissions_total: u64,
126 promotions_total: u64,
127 budget_rejections_total: u64,
128}
129
130impl<K: Ord + Clone> S3FifoPolicy<K> {
131 #[must_use]
132 pub fn new(config: S3FifoConfig) -> Self {
133 let live_capacity = config.live_capacity.max(2);
134 let small_cap_floor = live_capacity.saturating_sub(1).max(1);
135 let small_capacity = config.small_capacity.max(1).min(small_cap_floor);
136 let ghost_capacity = config.ghost_capacity.max(1);
137 let max_entries_per_owner = config.max_entries_per_owner.max(1);
138 let fallback_window = config.fallback_window.max(1);
139
140 Self {
141 cfg: S3FifoConfig {
142 live_capacity,
143 small_capacity,
144 ghost_capacity,
145 max_entries_per_owner,
146 fallback_window,
147 min_ghost_hits_in_window: config.min_ghost_hits_in_window.min(fallback_window),
148 max_budget_rejections_in_window: config
149 .max_budget_rejections_in_window
150 .min(fallback_window),
151 },
152 small: VecDeque::new(),
153 main: VecDeque::new(),
154 ghost_order: BTreeMap::new(),
155 ghost_lookup: BTreeMap::new(),
156 ghost_gen: 0,
157 live_tiers: BTreeMap::new(),
158 live_owners: BTreeMap::new(),
159 owner_live_counts: BTreeMap::new(),
160 recent_signals: VecDeque::new(),
161 fallback_reason: None,
162 ghost_hits_total: 0,
163 admissions_total: 0,
164 promotions_total: 0,
165 budget_rejections_total: 0,
166 }
167 }
168
169 #[must_use]
170 pub const fn config(&self) -> S3FifoConfig {
171 self.cfg
172 }
173
174 #[must_use]
175 pub fn telemetry(&self) -> S3FifoTelemetry {
176 S3FifoTelemetry {
177 fallback_reason: self.fallback_reason,
178 small_depth: self.small.len(),
179 main_depth: self.main.len(),
180 ghost_depth: self.ghost_lookup.len(),
181 live_depth: self.live_depth(),
182 ghost_hits_total: self.ghost_hits_total,
183 admissions_total: self.admissions_total,
184 promotions_total: self.promotions_total,
185 budget_rejections_total: self.budget_rejections_total,
186 owner_live_counts: self.owner_live_counts.clone(),
187 }
188 }
189
190 #[must_use]
191 pub fn live_depth(&self) -> usize {
192 self.small.len().saturating_add(self.main.len())
193 }
194
195 pub fn clear_fallback(&mut self) {
196 self.fallback_reason = None;
197 self.recent_signals.clear();
198 }
199
200 pub fn access(&mut self, owner: &str, key: K) -> S3FifoDecision {
201 if let Some(reason) = self.fallback_reason {
202 return self.decision(
203 S3FifoDecisionKind::FallbackBypass,
204 S3FifoTier::Fallback,
205 false,
206 Some(reason),
207 );
208 }
209
210 let mut ghost_hit = false;
211 let kind = if matches!(self.live_tiers.get(&key), Some(LiveTier::Main)) {
212 self.touch_main(&key);
213 S3FifoDecisionKind::HitMain
214 } else if matches!(self.live_tiers.get(&key), Some(LiveTier::Small)) {
215 self.promote_small_to_main(&key);
216 self.promotions_total = self.promotions_total.saturating_add(1);
217 S3FifoDecisionKind::PromoteSmallToMain
218 } else if self.ghost_lookup.contains_key(&key) {
219 ghost_hit = true;
220 self.ghost_hits_total = self.ghost_hits_total.saturating_add(1);
221 if self.owner_at_budget(owner) {
222 self.budget_rejections_total = self.budget_rejections_total.saturating_add(1);
223 S3FifoDecisionKind::RejectFairnessBudget
224 } else {
225 self.admit_from_ghost(owner, key);
226 self.admissions_total = self.admissions_total.saturating_add(1);
227 S3FifoDecisionKind::AdmitFromGhost
228 }
229 } else if self.owner_at_budget(owner) {
230 self.budget_rejections_total = self.budget_rejections_total.saturating_add(1);
231 S3FifoDecisionKind::RejectFairnessBudget
232 } else {
233 self.admit_small(owner, key);
234 self.admissions_total = self.admissions_total.saturating_add(1);
235 S3FifoDecisionKind::AdmitSmall
236 };
237
238 let signal = DecisionSignal {
239 ghost_hit,
240 budget_rejected: kind == S3FifoDecisionKind::RejectFairnessBudget,
241 };
242 self.record_signal(signal);
243 self.evaluate_fallback();
244
245 let tier = Self::resolve_tier(kind, ghost_hit);
246 self.decision(kind, tier, ghost_hit, self.fallback_reason)
247 }
248
249 const fn resolve_tier(kind: S3FifoDecisionKind, ghost_hit: bool) -> S3FifoTier {
250 match kind {
251 S3FifoDecisionKind::HitMain
252 | S3FifoDecisionKind::PromoteSmallToMain
253 | S3FifoDecisionKind::AdmitFromGhost => S3FifoTier::Main,
254 S3FifoDecisionKind::AdmitSmall => S3FifoTier::Small,
255 S3FifoDecisionKind::RejectFairnessBudget => {
256 if ghost_hit {
257 S3FifoTier::Ghost
258 } else {
259 S3FifoTier::Small
260 }
261 }
262 S3FifoDecisionKind::FallbackBypass => S3FifoTier::Fallback,
263 }
264 }
265
266 fn decision(
267 &self,
268 kind: S3FifoDecisionKind,
269 tier: S3FifoTier,
270 ghost_hit: bool,
271 fallback_reason: Option<S3FifoFallbackReason>,
272 ) -> S3FifoDecision {
273 S3FifoDecision {
274 kind,
275 tier,
276 ghost_hit,
277 fallback_reason,
278 live_depth: self.live_depth(),
279 small_depth: self.small.len(),
280 main_depth: self.main.len(),
281 ghost_depth: self.ghost_lookup.len(),
282 }
283 }
284
285 fn owner_at_budget(&self, owner: &str) -> bool {
286 self.owner_live_counts.get(owner).copied().unwrap_or(0) >= self.cfg.max_entries_per_owner
287 }
288
289 const fn main_capacity(&self) -> usize {
290 self.cfg
291 .live_capacity
292 .saturating_sub(self.cfg.small_capacity)
293 }
294
295 fn admit_small(&mut self, owner: &str, key: K) {
296 self.purge_key(&key);
297 self.small.push_back(key.clone());
298 self.live_tiers.insert(key.clone(), LiveTier::Small);
299 self.live_owners.insert(key, owner.to_string());
300 self.increment_owner(owner);
301 self.enforce_small_capacity();
302 self.enforce_live_capacity();
303 }
304
305 fn admit_from_ghost(&mut self, owner: &str, key: K) {
306 self.remove_ghost(&key);
307 self.main.push_back(key.clone());
308 self.live_tiers.insert(key.clone(), LiveTier::Main);
309 self.live_owners.insert(key, owner.to_string());
310 self.increment_owner(owner);
311 self.enforce_main_capacity();
312 self.enforce_live_capacity();
313 }
314
315 fn promote_small_to_main(&mut self, key: &K) {
316 remove_from_queue(&mut self.small, key);
317 self.main.push_back(key.clone());
318 self.live_tiers.insert(key.clone(), LiveTier::Main);
319 self.enforce_main_capacity();
320 self.enforce_live_capacity();
321 }
322
323 fn touch_main(&mut self, key: &K) {
324 remove_from_queue(&mut self.main, key);
325 self.main.push_back(key.clone());
326 }
327
328 fn enforce_small_capacity(&mut self) {
329 while self.small.len() > self.cfg.small_capacity {
330 self.evict_small_front_to_ghost();
331 }
332 }
333
334 fn enforce_main_capacity(&mut self) {
335 while self.main.len() > self.main_capacity() {
336 self.evict_main_front_to_ghost();
337 }
338 }
339
340 fn enforce_live_capacity(&mut self) {
341 while self.live_depth() > self.cfg.live_capacity {
342 if self.small.is_empty() {
343 self.evict_main_front_to_ghost();
344 } else {
345 self.evict_small_front_to_ghost();
346 }
347 }
348 }
349
350 fn evict_small_front_to_ghost(&mut self) {
351 if let Some(key) = self.small.pop_front() {
352 self.live_tiers.remove(&key);
353 self.remove_owner_for_key(&key);
354 self.push_ghost(key);
355 }
356 }
357
358 fn evict_main_front_to_ghost(&mut self) {
359 if let Some(key) = self.main.pop_front() {
360 self.live_tiers.remove(&key);
361 self.remove_owner_for_key(&key);
362 self.push_ghost(key);
363 }
364 }
365
366 fn purge_key(&mut self, key: &K) {
367 self.remove_live_key(key);
368 self.remove_ghost(key);
369 }
370
371 fn remove_live_key(&mut self, key: &K) {
372 if let Some(tier) = self.live_tiers.remove(key) {
373 match tier {
374 LiveTier::Small => remove_from_queue(&mut self.small, key),
375 LiveTier::Main => remove_from_queue(&mut self.main, key),
376 }
377 self.remove_owner_for_key(key);
378 }
379 }
380
381 fn remove_owner_for_key(&mut self, key: &K) {
382 if let Some(owner) = self.live_owners.remove(key) {
383 self.decrement_owner(&owner);
384 }
385 }
386
387 fn push_ghost(&mut self, key: K) {
388 if self.ghost_gen == u64::MAX {
390 self.ghost_gen = 0;
391 self.ghost_order.clear();
392 self.ghost_lookup.clear();
393 }
394
395 self.ghost_gen = self.ghost_gen.saturating_add(1);
396
397 if let Some(gen_mut) = self.ghost_lookup.get_mut(&key) {
398 let old_gen = *gen_mut;
399 *gen_mut = self.ghost_gen;
400 if let Some(k_reused) = self.ghost_order.remove(&old_gen) {
401 self.ghost_order.insert(self.ghost_gen, k_reused);
402 }
403 } else {
404 self.ghost_lookup.insert(key.clone(), self.ghost_gen);
405 self.ghost_order.insert(self.ghost_gen, key);
406 }
407
408 while self.ghost_lookup.len() > self.cfg.ghost_capacity {
409 if let Some((_, popped_key)) = self.ghost_order.pop_first() {
410 self.ghost_lookup.remove(&popped_key);
411 } else {
412 break;
413 }
414 }
415 }
416
417 fn remove_ghost(&mut self, key: &K) {
418 if let Some(generation) = self.ghost_lookup.remove(key) {
419 self.ghost_order.remove(&generation);
420 }
421 }
422
423 fn increment_owner(&mut self, owner: &str) {
424 let entry = self.owner_live_counts.entry(owner.to_string()).or_insert(0);
425 *entry = entry.saturating_add(1);
426 }
427
428 fn decrement_owner(&mut self, owner: &str) {
429 let Some(count) = self.owner_live_counts.get_mut(owner) else {
430 return;
431 };
432 if *count > 1 {
433 *count -= 1;
434 } else {
435 self.owner_live_counts.remove(owner);
436 }
437 }
438
439 fn record_signal(&mut self, signal: DecisionSignal) {
440 self.recent_signals.push_back(signal);
441 while self.recent_signals.len() > self.cfg.fallback_window {
442 self.recent_signals.pop_front();
443 }
444 }
445
446 fn evaluate_fallback(&mut self) {
447 if self.fallback_reason.is_some() || self.recent_signals.len() < self.cfg.fallback_window {
448 return;
449 }
450
451 let mut ghost_hits = 0usize;
452 let mut budget_rejections = 0usize;
453 for signal in &self.recent_signals {
454 if signal.ghost_hit {
455 ghost_hits = ghost_hits.saturating_add(1);
456 }
457 if signal.budget_rejected {
458 budget_rejections = budget_rejections.saturating_add(1);
459 }
460 }
461
462 if ghost_hits < self.cfg.min_ghost_hits_in_window {
463 self.fallback_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
464 } else if budget_rejections > self.cfg.max_budget_rejections_in_window {
465 self.fallback_reason = Some(S3FifoFallbackReason::FairnessInstability);
466 }
467 }
468}
469
470fn remove_from_queue<K: Ord>(queue: &mut VecDeque<K>, key: &K) {
471 if let Some(index) = queue.iter().position(|existing| existing == key) {
472 queue.remove(index);
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479
480 fn config() -> S3FifoConfig {
481 S3FifoConfig {
482 live_capacity: 4,
483 small_capacity: 2,
484 ghost_capacity: 4,
485 max_entries_per_owner: 2,
486 fallback_window: 4,
487 min_ghost_hits_in_window: 1,
488 max_budget_rejections_in_window: 2,
489 }
490 }
491
492 fn assert_no_duplicates(policy: &S3FifoPolicy<String>) {
493 let small: BTreeSet<_> = policy.small.iter().cloned().collect();
494 let main: BTreeSet<_> = policy.main.iter().cloned().collect();
495 let ghost: BTreeSet<_> = policy.ghost_lookup.keys().cloned().collect();
496
497 assert!(small.is_disjoint(&main));
498 assert!(small.is_disjoint(&ghost));
499 assert!(main.is_disjoint(&ghost));
500 assert_eq!(small.len() + main.len(), policy.live_tiers.len());
501 }
502
503 #[test]
504 fn small_hit_promotes_to_main() {
505 let mut policy = S3FifoPolicy::new(config());
506 let first = policy.access("ext-a", "k1".to_string());
507 assert_eq!(first.kind, S3FifoDecisionKind::AdmitSmall);
508
509 let second = policy.access("ext-a", "k1".to_string());
510 assert_eq!(second.kind, S3FifoDecisionKind::PromoteSmallToMain);
511 assert_eq!(second.tier, S3FifoTier::Main);
512 assert_eq!(second.main_depth, 1);
513 assert_eq!(second.small_depth, 0);
514 assert_no_duplicates(&policy);
515 }
516
517 #[test]
518 fn ghost_hit_reenters_live_set() {
519 let mut policy = S3FifoPolicy::new(S3FifoConfig {
520 small_capacity: 1,
521 ..config()
522 });
523
524 policy.access("ext-a", "k1".to_string());
525 policy.access("ext-a", "k2".to_string());
526 let decision = policy.access("ext-a", "k1".to_string());
527
528 assert_eq!(decision.kind, S3FifoDecisionKind::AdmitFromGhost);
529 assert!(decision.ghost_hit);
530 assert_eq!(decision.tier, S3FifoTier::Main);
531 assert_eq!(policy.telemetry().ghost_hits_total, 1);
532 assert_no_duplicates(&policy);
533 }
534
535 #[test]
536 fn fairness_budget_rejects_owner_overflow() {
537 let mut policy = S3FifoPolicy::new(S3FifoConfig {
538 max_entries_per_owner: 1,
539 ..config()
540 });
541
542 let admitted = policy.access("ext-a", "k1".to_string());
543 let rejected = policy.access("ext-a", "k2".to_string());
544
545 assert_eq!(admitted.kind, S3FifoDecisionKind::AdmitSmall);
546 assert_eq!(rejected.kind, S3FifoDecisionKind::RejectFairnessBudget);
547 assert_eq!(policy.live_depth(), 1);
548 assert_eq!(policy.telemetry().budget_rejections_total, 1);
549 assert_no_duplicates(&policy);
550 }
551
552 #[test]
553 fn fallback_triggers_on_low_signal_quality() {
554 let mut policy = S3FifoPolicy::new(S3FifoConfig {
555 min_ghost_hits_in_window: 2,
556 fallback_window: 4,
557 ..config()
558 });
559
560 for idx in 0..4 {
561 let key = format!("cold-{idx}");
562 let _ = policy.access("ext-a", key);
563 }
564
565 assert_eq!(
566 policy.telemetry().fallback_reason,
567 Some(S3FifoFallbackReason::SignalQualityInsufficient)
568 );
569
570 let bypass = policy.access("ext-a", "late-key".to_string());
571 assert_eq!(bypass.kind, S3FifoDecisionKind::FallbackBypass);
572 assert_eq!(bypass.tier, S3FifoTier::Fallback);
573 }
574
575 #[test]
576 fn fallback_triggers_on_rejection_spike() {
577 let mut policy = S3FifoPolicy::new(S3FifoConfig {
578 max_entries_per_owner: 1,
579 fallback_window: 3,
580 min_ghost_hits_in_window: 0,
581 max_budget_rejections_in_window: 1,
582 ..config()
583 });
584
585 let _ = policy.access("ext-a", "k1".to_string());
586 let _ = policy.access("ext-a", "k2".to_string());
587 let _ = policy.access("ext-a", "k3".to_string());
588
589 assert_eq!(
590 policy.telemetry().fallback_reason,
591 Some(S3FifoFallbackReason::FairnessInstability)
592 );
593 }
594
595 #[test]
596 fn fallback_latches_until_clear_and_reason_stays_stable() {
597 let mut policy = S3FifoPolicy::new(S3FifoConfig {
598 min_ghost_hits_in_window: 3,
599 fallback_window: 3,
600 ..config()
601 });
602
603 let _ = policy.access("ext-a", "k1".to_string());
605 let _ = policy.access("ext-a", "k2".to_string());
606 let _ = policy.access("ext-a", "k3".to_string());
607
608 let expected_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
609 assert_eq!(policy.telemetry().fallback_reason, expected_reason);
610
611 for key in ["k4", "k5", "k6"] {
614 let decision = policy.access("ext-b", key.to_string());
615 assert_eq!(decision.kind, S3FifoDecisionKind::FallbackBypass);
616 assert_eq!(decision.tier, S3FifoTier::Fallback);
617 assert_eq!(decision.fallback_reason, expected_reason);
618 assert_eq!(policy.telemetry().fallback_reason, expected_reason);
619 }
620
621 policy.clear_fallback();
622 assert_eq!(policy.telemetry().fallback_reason, None);
623
624 let post_clear = policy.access("ext-b", "k7".to_string());
625 assert_ne!(post_clear.kind, S3FifoDecisionKind::FallbackBypass);
626 }
627
628 #[test]
629 fn fairness_fallback_reports_same_reason_during_repeated_bypass() {
630 let mut policy = S3FifoPolicy::new(S3FifoConfig {
631 max_entries_per_owner: 1,
632 fallback_window: 3,
633 min_ghost_hits_in_window: 0,
634 max_budget_rejections_in_window: 1,
635 ..config()
636 });
637
638 let _ = policy.access("ext-a", "k1".to_string());
640 let _ = policy.access("ext-a", "k2".to_string());
641 let _ = policy.access("ext-a", "k3".to_string());
642
643 let expected_reason = Some(S3FifoFallbackReason::FairnessInstability);
644 assert_eq!(policy.telemetry().fallback_reason, expected_reason);
645
646 for key in ["k4", "k5"] {
647 let decision = policy.access("ext-c", key.to_string());
648 assert_eq!(decision.kind, S3FifoDecisionKind::FallbackBypass);
649 assert_eq!(decision.fallback_reason, expected_reason);
650 assert_eq!(policy.telemetry().fallback_reason, expected_reason);
651 }
652 }
653
654 #[test]
655 fn fallback_reason_transitions_low_signal_to_fairness_after_clear() {
656 let mut policy = S3FifoPolicy::new(S3FifoConfig {
657 live_capacity: 2,
658 small_capacity: 1,
659 ghost_capacity: 4,
660 max_entries_per_owner: 1,
661 fallback_window: 3,
662 min_ghost_hits_in_window: 1,
663 max_budget_rejections_in_window: 1,
664 });
665
666 let _ = policy.access("ext-a", "ls-live-1".to_string());
668 let _ = policy.access("ext-b", "ls-ghost-seed".to_string());
669 let _ = policy.access("ext-a", "ls-live-2".to_string());
670
671 let low_signal_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
672 assert_eq!(policy.telemetry().fallback_reason, low_signal_reason);
673 let low_signal_bypass = policy.access("ext-z", "ls-bypass".to_string());
674 assert_eq!(low_signal_bypass.kind, S3FifoDecisionKind::FallbackBypass);
675 assert_eq!(low_signal_bypass.fallback_reason, low_signal_reason);
676
677 policy.clear_fallback();
678 assert_eq!(policy.telemetry().fallback_reason, None);
679
680 let first = policy.access("ext-a", "ls-live-1".to_string());
682 assert_eq!(first.kind, S3FifoDecisionKind::RejectFairnessBudget);
683 assert!(first.ghost_hit);
684 let _ = policy.access("ext-a", "fair-rej-1".to_string());
685 let _ = policy.access("ext-a", "fair-rej-2".to_string());
686
687 let fairness_reason = Some(S3FifoFallbackReason::FairnessInstability);
688 assert_eq!(policy.telemetry().fallback_reason, fairness_reason);
689 let fairness_bypass = policy.access("ext-z", "fair-bypass".to_string());
690 assert_eq!(fairness_bypass.kind, S3FifoDecisionKind::FallbackBypass);
691 assert_eq!(fairness_bypass.fallback_reason, fairness_reason);
692 }
693
694 #[test]
695 fn fallback_reason_transitions_fairness_to_low_signal_after_clear() {
696 let mut policy = S3FifoPolicy::new(S3FifoConfig {
697 live_capacity: 2,
698 small_capacity: 1,
699 ghost_capacity: 4,
700 max_entries_per_owner: 1,
701 fallback_window: 3,
702 min_ghost_hits_in_window: 1,
703 max_budget_rejections_in_window: 0,
704 });
705
706 let _ = policy.access("ext-b", "ff-ghost-seed".to_string());
708 let _ = policy.access("ext-a", "ff-live".to_string());
709 let trigger = policy.access("ext-a", "ff-ghost-seed".to_string());
710 assert_eq!(trigger.kind, S3FifoDecisionKind::RejectFairnessBudget);
711 assert!(trigger.ghost_hit);
712
713 let fairness_reason = Some(S3FifoFallbackReason::FairnessInstability);
714 assert_eq!(policy.telemetry().fallback_reason, fairness_reason);
715 let fairness_bypass = policy.access("ext-z", "ff-bypass".to_string());
716 assert_eq!(fairness_bypass.kind, S3FifoDecisionKind::FallbackBypass);
717 assert_eq!(fairness_bypass.fallback_reason, fairness_reason);
718
719 policy.clear_fallback();
720 assert_eq!(policy.telemetry().fallback_reason, None);
721
722 let _ = policy.access("ext-c", "ff-low-1".to_string());
724 let _ = policy.access("ext-d", "ff-low-2".to_string());
725 let _ = policy.access("ext-e", "ff-low-3".to_string());
726
727 let low_signal_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
728 assert_eq!(policy.telemetry().fallback_reason, low_signal_reason);
729 let low_signal_bypass = policy.access("ext-z", "ff-low-bypass".to_string());
730 assert_eq!(low_signal_bypass.kind, S3FifoDecisionKind::FallbackBypass);
731 assert_eq!(low_signal_bypass.fallback_reason, low_signal_reason);
732 }
733
734 #[test]
735 fn single_window_clear_cycles_preserve_reason_precedence_and_bypass_counters() {
736 let mut policy = S3FifoPolicy::new(S3FifoConfig {
737 live_capacity: 4,
738 small_capacity: 2,
739 ghost_capacity: 4,
740 max_entries_per_owner: 1,
741 fallback_window: 1,
742 min_ghost_hits_in_window: 1,
743 max_budget_rejections_in_window: 0,
744 });
745
746 let first = policy.access("ext-a", "cold-1".to_string());
748 assert_eq!(first.kind, S3FifoDecisionKind::AdmitSmall);
749 let low_signal_reason = Some(S3FifoFallbackReason::SignalQualityInsufficient);
750 assert_eq!(policy.telemetry().fallback_reason, low_signal_reason);
751
752 let low_baseline = policy.telemetry();
754 let low_bypass = policy.access("ext-a", "low-bypass".to_string());
755 assert_eq!(low_bypass.kind, S3FifoDecisionKind::FallbackBypass);
756 let low_after = policy.telemetry();
757 assert_eq!(low_after.ghost_hits_total, low_baseline.ghost_hits_total);
758 assert_eq!(
759 low_after.budget_rejections_total,
760 low_baseline.budget_rejections_total
761 );
762
763 policy.clear_fallback();
764 assert_eq!(policy.telemetry().fallback_reason, None);
765
766 policy.push_ghost("ghost-hot".to_string());
768 policy
769 .owner_live_counts
770 .insert("ext-a".to_string(), policy.config().max_entries_per_owner);
771
772 let fairness_trigger = policy.access("ext-a", "ghost-hot".to_string());
773 assert_eq!(
774 fairness_trigger.kind,
775 S3FifoDecisionKind::RejectFairnessBudget
776 );
777 assert!(fairness_trigger.ghost_hit);
778
779 let fairness_reason = Some(S3FifoFallbackReason::FairnessInstability);
780 assert_eq!(policy.telemetry().fallback_reason, fairness_reason);
781
782 let fairness_baseline = policy.telemetry();
784 let fairness_bypass = policy.access("ext-a", "fair-bypass".to_string());
785 assert_eq!(fairness_bypass.kind, S3FifoDecisionKind::FallbackBypass);
786 let fairness_after = policy.telemetry();
787 assert_eq!(
788 fairness_after.ghost_hits_total,
789 fairness_baseline.ghost_hits_total
790 );
791 assert_eq!(
792 fairness_after.budget_rejections_total,
793 fairness_baseline.budget_rejections_total
794 );
795 }
796
797 #[test]
800 fn config_clamps_minimums_and_ceilings() {
801 let tiny = S3FifoConfig {
802 live_capacity: 0,
803 small_capacity: 0,
804 ghost_capacity: 0,
805 max_entries_per_owner: 0,
806 fallback_window: 0,
807 min_ghost_hits_in_window: 100,
808 max_budget_rejections_in_window: 100,
809 };
810 let policy = S3FifoPolicy::<String>::new(tiny);
811 let cfg = policy.config();
812 assert!(cfg.live_capacity >= 2, "live_capacity min is 2");
813 assert!(cfg.small_capacity >= 1, "small_capacity min is 1");
814 assert!(cfg.ghost_capacity >= 1, "ghost_capacity min is 1");
815 assert!(cfg.max_entries_per_owner >= 1, "per-owner min is 1");
816 assert!(cfg.fallback_window >= 1, "fallback_window min is 1");
817 assert!(cfg.min_ghost_hits_in_window <= cfg.fallback_window);
819 assert!(cfg.max_budget_rejections_in_window <= cfg.fallback_window);
821 }
822
823 #[test]
824 fn live_depth_reflects_small_plus_main() {
825 let mut policy = S3FifoPolicy::new(config());
826 assert_eq!(policy.live_depth(), 0);
827
828 policy.access("ext-a", "k1".to_string());
830 assert_eq!(policy.live_depth(), 1);
831
832 policy.access("ext-b", "k2".to_string());
834 assert_eq!(policy.live_depth(), 2);
835
836 policy.access("ext-a", "k1".to_string());
838 assert_eq!(policy.live_depth(), 2);
840 assert_no_duplicates(&policy);
841 }
842
843 #[test]
844 fn telemetry_counters_accumulate_correctly() {
845 let mut policy = S3FifoPolicy::new(S3FifoConfig {
846 small_capacity: 1,
847 max_entries_per_owner: 3,
848 ..config()
849 });
850
851 policy.access("ext-a", "k1".to_string());
853 let tel = policy.telemetry();
854 assert_eq!(tel.admissions_total, 1);
855 assert_eq!(tel.promotions_total, 0);
856 assert_eq!(tel.ghost_hits_total, 0);
857 assert_eq!(tel.budget_rejections_total, 0);
858 assert_eq!(tel.small_depth, 1);
859 assert_eq!(tel.main_depth, 0);
860
861 policy.access("ext-a", "k1".to_string());
863 let tel = policy.telemetry();
864 assert_eq!(tel.promotions_total, 1);
865 assert_eq!(tel.small_depth, 0);
866 assert_eq!(tel.main_depth, 1);
867
868 policy.access("ext-a", "k2".to_string());
870 policy.access("ext-a", "k3".to_string());
871 let tel = policy.telemetry();
873 assert!(tel.ghost_depth >= 1, "evicted key should be in ghost");
874 }
875
876 #[test]
877 fn telemetry_owner_live_counts_track_per_owner() {
878 let mut policy = S3FifoPolicy::new(config());
879 policy.access("ext-a", "k1".to_string());
880 policy.access("ext-b", "k2".to_string());
881 let tel = policy.telemetry();
882 assert_eq!(tel.owner_live_counts.get("ext-a"), Some(&1));
883 assert_eq!(tel.owner_live_counts.get("ext-b"), Some(&1));
884 }
885
886 #[test]
887 fn hit_main_reorders_without_changing_depth() {
888 let mut policy = S3FifoPolicy::new(S3FifoConfig {
890 fallback_window: 32,
891 min_ghost_hits_in_window: 0,
892 ..config()
893 });
894 policy.access("ext-a", "k1".to_string());
896 policy.access("ext-a", "k1".to_string()); policy.access("ext-b", "k2".to_string());
898 policy.access("ext-b", "k2".to_string()); let before = policy.telemetry();
901 let depth_before = before.main_depth;
902
903 let decision = policy.access("ext-a", "k1".to_string());
905 assert_eq!(decision.kind, S3FifoDecisionKind::HitMain);
906 assert_eq!(policy.telemetry().main_depth, depth_before);
907 assert_no_duplicates(&policy);
908 }
909
910 #[test]
911 fn ghost_queue_evicts_oldest_when_at_capacity() {
912 let mut policy = S3FifoPolicy::new(S3FifoConfig {
913 ghost_capacity: 2,
914 small_capacity: 1,
915 ..config()
916 });
917
918 policy.access("ext-a", "k1".to_string()); policy.access("ext-b", "k2".to_string()); policy.access("ext-a", "k3".to_string()); assert_eq!(policy.telemetry().ghost_depth, 2);
923
924 policy.access("ext-b", "k4".to_string()); assert_eq!(policy.telemetry().ghost_depth, 2);
927 assert_no_duplicates(&policy);
928 }
929
930 #[test]
931 fn push_ghost_reinsertion_updates_recency_without_duplicates() {
932 let mut policy = S3FifoPolicy::new(S3FifoConfig {
933 ghost_capacity: 2,
934 ..config()
935 });
936
937 policy.push_ghost("k1".to_string());
938 policy.push_ghost("k2".to_string());
939 assert_eq!(
940 policy.ghost_order.values().cloned().collect::<Vec<_>>(),
941 vec!["k1".to_string(), "k2".to_string()]
942 );
943 assert_eq!(policy.ghost_lookup.len(), 2);
944
945 policy.push_ghost("k1".to_string());
948 assert_eq!(
949 policy.ghost_order.values().cloned().collect::<Vec<_>>(),
950 vec!["k2".to_string(), "k1".to_string()]
951 );
952 assert_eq!(policy.ghost_lookup.len(), 2);
953
954 policy.push_ghost("k3".to_string());
956 assert_eq!(
957 policy.ghost_order.values().cloned().collect::<Vec<_>>(),
958 vec!["k1".to_string(), "k3".to_string()]
959 );
960 assert_eq!(policy.ghost_lookup.len(), 2);
961 }
962
963 #[test]
964 fn capacity_enforcement_evicts_to_stay_within_live_capacity() {
965 let mut policy = S3FifoPolicy::new(S3FifoConfig {
966 live_capacity: 3,
967 small_capacity: 2,
968 ghost_capacity: 8,
969 max_entries_per_owner: 10,
970 fallback_window: 32,
972 min_ghost_hits_in_window: 0,
973 max_budget_rejections_in_window: 32,
974 });
975
976 for i in 0..6 {
978 policy.access(&format!("ext-{i}"), format!("k{i}"));
979 }
980
981 assert!(policy.live_depth() <= 3, "live_depth must respect capacity");
983 assert!(policy.telemetry().ghost_depth >= 3);
985 assert_no_duplicates(&policy);
986 }
987
988 #[test]
989 fn default_config_has_sensible_values() {
990 let cfg = S3FifoConfig::default();
991 assert_eq!(cfg.live_capacity, 256);
992 assert_eq!(cfg.small_capacity, 64);
993 assert_eq!(cfg.ghost_capacity, 512);
994 assert_eq!(cfg.max_entries_per_owner, 64);
995 assert_eq!(cfg.fallback_window, 32);
996 assert_eq!(cfg.min_ghost_hits_in_window, 2);
997 assert_eq!(cfg.max_budget_rejections_in_window, 12);
998 }
999
1000 #[test]
1001 fn decision_fields_reflect_current_state() {
1002 let mut policy = S3FifoPolicy::new(config());
1003 let d1 = policy.access("ext-a", "k1".to_string());
1004 assert_eq!(d1.kind, S3FifoDecisionKind::AdmitSmall);
1005 assert_eq!(d1.tier, S3FifoTier::Small);
1006 assert!(!d1.ghost_hit);
1007 assert!(d1.fallback_reason.is_none());
1008 assert_eq!(d1.live_depth, 1);
1009 assert_eq!(d1.small_depth, 1);
1010 assert_eq!(d1.main_depth, 0);
1011 }
1012
1013 #[test]
1014 fn clear_fallback_resets_policy_gate() {
1015 let mut policy = S3FifoPolicy::new(S3FifoConfig {
1016 min_ghost_hits_in_window: 3,
1017 fallback_window: 3,
1018 ..config()
1019 });
1020
1021 let _ = policy.access("ext-a", "k1".to_string());
1022 let _ = policy.access("ext-a", "k2".to_string());
1023 let _ = policy.access("ext-a", "k3".to_string());
1024 assert!(policy.telemetry().fallback_reason.is_some());
1025
1026 policy.clear_fallback();
1027 assert!(policy.telemetry().fallback_reason.is_none());
1028
1029 let decision = policy.access("ext-a", "k4".to_string());
1030 assert_ne!(decision.kind, S3FifoDecisionKind::FallbackBypass);
1031 }
1032
1033 #[test]
1034 fn clear_fallback_clears_signal_window_and_preserves_counters() {
1035 let mut policy = S3FifoPolicy::new(S3FifoConfig {
1036 max_entries_per_owner: 1,
1037 fallback_window: 3,
1038 min_ghost_hits_in_window: 0,
1039 max_budget_rejections_in_window: 1,
1040 ..config()
1041 });
1042
1043 let _ = policy.access("ext-a", "k1".to_string());
1045 let _ = policy.access("ext-a", "k2".to_string());
1046 let _ = policy.access("ext-a", "k3".to_string());
1047 assert_eq!(
1048 policy.telemetry().fallback_reason,
1049 Some(S3FifoFallbackReason::FairnessInstability)
1050 );
1051 assert_eq!(policy.recent_signals.len(), 3);
1052
1053 let before_clear = policy.telemetry();
1054 policy.clear_fallback();
1055
1056 assert_eq!(policy.telemetry().fallback_reason, None);
1057 assert!(
1058 policy.recent_signals.is_empty(),
1059 "clear_fallback should reset signal history"
1060 );
1061
1062 let after_clear = policy.telemetry();
1063 assert_eq!(after_clear.admissions_total, before_clear.admissions_total);
1064 assert_eq!(after_clear.promotions_total, before_clear.promotions_total);
1065 assert_eq!(after_clear.ghost_hits_total, before_clear.ghost_hits_total);
1066 assert_eq!(
1067 after_clear.budget_rejections_total,
1068 before_clear.budget_rejections_total
1069 );
1070
1071 let admitted = policy.access("ext-b", "post-clear-admit".to_string());
1073 assert_eq!(admitted.kind, S3FifoDecisionKind::AdmitSmall);
1074 assert_eq!(policy.telemetry().fallback_reason, None);
1075 assert_eq!(policy.recent_signals.len(), 1);
1076
1077 let reject_1 = policy.access("ext-a", "post-clear-reject-1".to_string());
1079 assert_eq!(reject_1.kind, S3FifoDecisionKind::RejectFairnessBudget);
1080 assert_eq!(policy.telemetry().fallback_reason, None);
1081
1082 let reject_2 = policy.access("ext-a", "post-clear-reject-2".to_string());
1083 assert_eq!(reject_2.kind, S3FifoDecisionKind::RejectFairnessBudget);
1084 assert_eq!(
1085 policy.telemetry().fallback_reason,
1086 Some(S3FifoFallbackReason::FairnessInstability)
1087 );
1088 }
1089
1090 mod proptest_s3fifo {
1093 use super::*;
1094 use proptest::prelude::*;
1095
1096 fn arb_access() -> impl Strategy<Value = (String, String)> {
1097 let owner = prop::sample::select(vec![
1098 "ext-a".to_string(),
1099 "ext-b".to_string(),
1100 "ext-c".to_string(),
1101 ]);
1102 let key = prop::sample::select(vec![
1103 "k0".to_string(),
1104 "k1".to_string(),
1105 "k2".to_string(),
1106 "k3".to_string(),
1107 "k4".to_string(),
1108 "k5".to_string(),
1109 "k6".to_string(),
1110 "k7".to_string(),
1111 ]);
1112 (owner, key)
1113 }
1114
1115 fn arb_config() -> impl Strategy<Value = S3FifoConfig> {
1116 (2..32usize, 1..16usize, 1..64usize, 1..8usize, 2..16usize).prop_map(
1117 |(live, small, ghost, per_owner, window)| S3FifoConfig {
1118 live_capacity: live,
1119 small_capacity: small,
1120 ghost_capacity: ghost,
1121 max_entries_per_owner: per_owner,
1122 fallback_window: window,
1123 min_ghost_hits_in_window: 0,
1124 max_budget_rejections_in_window: window,
1125 },
1126 )
1127 }
1128
1129 proptest! {
1130 #[test]
1131 fn queues_always_disjoint_after_access_sequence(
1132 cfg in arb_config(),
1133 accesses in prop::collection::vec(arb_access(), 1..50),
1134 ) {
1135 let mut policy = S3FifoPolicy::new(cfg);
1136 for (owner, key) in &accesses {
1137 let _ = policy.access(owner, key.clone());
1138 }
1139
1140 let small: BTreeSet<_> = policy.small.iter().cloned().collect();
1141 let main: BTreeSet<_> = policy.main.iter().cloned().collect();
1142 let ghost: BTreeSet<_> = policy.ghost_lookup.keys().cloned().collect();
1143
1144 assert!(small.is_disjoint(&main), "small and main must be disjoint");
1145 assert!(small.is_disjoint(&ghost), "small and ghost must be disjoint");
1146 assert!(main.is_disjoint(&ghost), "main and ghost must be disjoint");
1147 }
1148
1149 #[test]
1150 fn live_depth_never_exceeds_capacity(
1151 cfg in arb_config(),
1152 accesses in prop::collection::vec(arb_access(), 1..50),
1153 ) {
1154 let mut policy = S3FifoPolicy::new(cfg);
1155 for (owner, key) in &accesses {
1156 let _ = policy.access(owner, key.clone());
1157 }
1158 let effective_cap = policy.config().live_capacity;
1159 assert!(
1160 policy.live_depth() <= effective_cap,
1161 "live_depth {} exceeded capacity {}",
1162 policy.live_depth(),
1163 effective_cap,
1164 );
1165 }
1166
1167 #[test]
1168 fn ghost_depth_never_exceeds_capacity(
1169 cfg in arb_config(),
1170 accesses in prop::collection::vec(arb_access(), 1..50),
1171 ) {
1172 let mut policy = S3FifoPolicy::new(cfg);
1173 for (owner, key) in &accesses {
1174 let _ = policy.access(owner, key.clone());
1175 }
1176 let ghost_cap = policy.config().ghost_capacity;
1177 assert!(
1178 policy.telemetry().ghost_depth <= ghost_cap,
1179 "ghost_depth {} exceeded capacity {}",
1180 policy.telemetry().ghost_depth,
1181 ghost_cap,
1182 );
1183 }
1184
1185 #[test]
1186 fn live_depth_equals_small_plus_main(
1187 cfg in arb_config(),
1188 accesses in prop::collection::vec(arb_access(), 1..50),
1189 ) {
1190 let mut policy = S3FifoPolicy::new(cfg);
1191 for (owner, key) in &accesses {
1192 let decision = policy.access(owner, key.clone());
1193 assert_eq!(
1194 decision.live_depth,
1195 decision.small_depth + decision.main_depth,
1196 "live_depth must equal small + main at every step"
1197 );
1198 }
1199 }
1200
1201 #[test]
1202 fn counters_monotonically_nondecreasing(
1203 cfg in arb_config(),
1204 accesses in prop::collection::vec(arb_access(), 1..50),
1205 ) {
1206 let mut policy = S3FifoPolicy::new(cfg);
1207 let mut prev_admissions = 0u64;
1208 let mut prev_promotions = 0u64;
1209 let mut prev_ghost_hits = 0u64;
1210 let mut prev_rejections = 0u64;
1211
1212 for (owner, key) in &accesses {
1213 let _ = policy.access(owner, key.clone());
1214 let tel = policy.telemetry();
1215 assert!(tel.admissions_total >= prev_admissions);
1216 assert!(tel.promotions_total >= prev_promotions);
1217 assert!(tel.ghost_hits_total >= prev_ghost_hits);
1218 assert!(tel.budget_rejections_total >= prev_rejections);
1219 prev_admissions = tel.admissions_total;
1220 prev_promotions = tel.promotions_total;
1221 prev_ghost_hits = tel.ghost_hits_total;
1222 prev_rejections = tel.budget_rejections_total;
1223 }
1224 }
1225
1226 #[test]
1227 fn owner_counts_match_live_keys(
1228 cfg in arb_config(),
1229 accesses in prop::collection::vec(arb_access(), 1..50),
1230 ) {
1231 let mut policy = S3FifoPolicy::new(cfg);
1232 for (owner, key) in &accesses {
1233 let _ = policy.access(owner, key.clone());
1234 }
1235
1236 let mut expected: BTreeMap<String, usize> = BTreeMap::new();
1238 for owner_val in policy.live_owners.values() {
1239 *expected.entry(owner_val.clone()).or_insert(0) += 1;
1240 }
1241 assert_eq!(
1242 policy.owner_live_counts, expected,
1243 "owner_live_counts must match actual live key distribution"
1244 );
1245 }
1246
1247 #[test]
1248 fn live_tier_map_consistent_with_queues(
1249 cfg in arb_config(),
1250 accesses in prop::collection::vec(arb_access(), 1..50),
1251 ) {
1252 let mut policy = S3FifoPolicy::new(cfg);
1253 for (owner, key) in &accesses {
1254 let _ = policy.access(owner, key.clone());
1255 }
1256
1257 for key in &policy.small {
1259 assert_eq!(
1260 policy.live_tiers.get(key),
1261 Some(&LiveTier::Small),
1262 "key in small queue must be tracked as Small"
1263 );
1264 }
1265 for key in &policy.main {
1267 assert_eq!(
1268 policy.live_tiers.get(key),
1269 Some(&LiveTier::Main),
1270 "key in main queue must be tracked as Main"
1271 );
1272 }
1273 assert_eq!(
1275 policy.live_tiers.len(),
1276 policy.small.len() + policy.main.len(),
1277 );
1278 }
1279 }
1280 }
1281}