Skip to main content

omega_cache/
s3fifo.rs

1use crate::core::backoff::{Backoff, BackoffConfig};
2use crate::core::cms::CountMinSketch;
3use crate::core::engine::CacheEngine;
4use crate::core::entry::Entry;
5use crate::core::entry_ref::Ref;
6use crate::core::index::IndexTable;
7use crate::core::key::Key;
8use crate::core::ring::RingQueue;
9use crate::core::utils;
10use crate::metrics::{Metrics, MetricsConfig, MetricsSnapshot};
11use crossbeam::utils::CachePadded;
12use crossbeam_epoch::{Atomic, Owned, pin};
13use crossbeam_epoch::{Guard, Shared};
14use std::borrow::Borrow;
15use std::hash::Hash;
16use std::ptr::NonNull;
17use std::sync::atomic::AtomicU64;
18use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
19use std::time::Instant;
20use utils::hash;
21
22/// A 64-bit metadata entry for a cache or hash table slot.
23///
24/// Layout:
25/// - Bits 63-48: 16-bit Version ID (ABA protection)
26/// - Bits 47-16: 32-bit Signature (Hashed key identifier)
27/// - Bit 15:    Busy Flag (Concurrency lock)
28/// - Bits 7-0:  8-bit Frequency (Access counter)
29#[derive(Debug, Clone, Copy, Eq, PartialEq)]
30pub struct Tag(u64);
31
32impl Tag {
33    const ID_SHIFT: u64 = 48;
34    const SIGNATURE_SHIFT: u64 = 16;
35    const SIGNATURE_MASK: u64 = 0xFFFF_FFFF << Self::SIGNATURE_SHIFT;
36    const BUSY_MASK: u64 = 1 << 15;
37    const FREQUENCY_MASK: u64 = 0xFF;
38
39    // Mixing constants from MurmurHash3 (64-bit finalizer)
40    const SIGNATURE_C1: u64 = 0xff51afd7ed558ccd;
41    const SIGNATURE_C2: u64 = 0xc4ceb9fe1a85ec53;
42
43    /// Generates a non-zero 32-bit signature from a 64-bit hash.
44    /// Uses a branchless bit-avalanche to ensure high entropy.
45    #[inline]
46    fn make_signature(hash: u64) -> u32 {
47        let mut x = hash;
48        x ^= x >> 33;
49        x = x.wrapping_mul(Self::SIGNATURE_C1);
50        x ^= x >> 33;
51        x = x.wrapping_mul(Self::SIGNATURE_C2);
52        x ^= x >> 33;
53
54        // Force the Least Significant Bit to 1.
55        // This guarantees the signature is never 0, making 0 a safe 'Empty' marker.
56        (x as u32) | 1
57    }
58
59    #[inline]
60    fn id(self) -> u16 {
61        (self.0 >> Self::ID_SHIFT) as u16
62    }
63    #[inline]
64    fn signature(self) -> u32 {
65        (self.0 >> Self::SIGNATURE_SHIFT) as u32
66    }
67    #[inline]
68    fn is_busy(self) -> bool {
69        (self.0 & Self::BUSY_MASK) != 0
70    }
71    #[inline]
72    fn frequency(self) -> u8 {
73        (self.0 & Self::FREQUENCY_MASK) as u8
74    }
75
76    #[inline]
77    fn is_epoch_match(self, index: Index) -> bool {
78        self.id() == index.id()
79    }
80
81    #[inline]
82    fn is_hot(self) -> bool {
83        self.frequency() > 0
84    }
85
86    /// Returns true if the slot is live, matches the version ID, and the signature.
87    #[inline]
88    fn is_match(self, index: Index, hash: u64) -> bool {
89        self.id() == index.id() && !self.is_busy() && self.signature() == Self::make_signature(hash)
90    }
91
92    /// Creates a new Tag with a specific signature based on the provided hash.
93    #[inline]
94    fn with_signature(self, hash: u64) -> Self {
95        let sig = Self::make_signature(hash);
96        Self((self.0 & !Self::SIGNATURE_MASK) | (sig as u64) << Self::SIGNATURE_SHIFT)
97    }
98
99    /// Sets the busy bit for atomic/CAS operations.
100    #[inline]
101    fn busy(self) -> Self {
102        Self(self.0 | Self::BUSY_MASK)
103    }
104
105    /// Increments the frequency counter, saturating at 255.
106    #[inline]
107    fn increment_frequency(self) -> Self {
108        let frequency = self.frequency();
109        if frequency < u8::MAX {
110            Self((self.0 & !Self::FREQUENCY_MASK) | (frequency + 1) as u64)
111        } else {
112            self
113        }
114    }
115
116    /// Decrements the frequency counter, saturating at 0.
117    #[inline]
118    fn decrement_frequency(self) -> Self {
119        let frequency = self.frequency();
120        if frequency > 0 {
121            Self((self.0 & !Self::FREQUENCY_MASK) | (frequency - 1) as u64)
122        } else {
123            self
124        }
125    }
126
127    /// Advances the state for the next inhabitant (ID++, Metadata Cleared).
128    /// Returns a synchronized pair of Tag and Index.
129    #[inline]
130    fn advance(self, index: Index) -> (Self, Index) {
131        let next_id = self.id().wrapping_add(1);
132        let new_tag = Tag((next_id as u64) << Self::ID_SHIFT);
133        let new_index = index.with_id(next_id);
134        (new_tag, new_index)
135    }
136
137    /// Resets the entry's frequency bits to zero.
138    ///
139    /// This is used during promotion or specific eviction cycles to clear the
140    /// "hotness" of an entry. By masking out the frequency bits, the entry
141    /// loses its "second chance" status and must be accessed again to
142    /// build up its frequency.
143    ///
144    /// Returns a new `Tag` with the same metadata and busy status, but
145    /// with a frequency of 0.
146    #[inline]
147    fn reset(&self) -> Tag {
148        Tag(self.0 & !Self::FREQUENCY_MASK)
149    }
150}
151
152impl From<u64> for Tag {
153    #[inline]
154    fn from(raw: u64) -> Self {
155        Self(raw)
156    }
157}
158
159impl From<Tag> for u64 {
160    #[inline]
161    fn from(tag: Tag) -> u64 {
162        tag.0
163    }
164}
165
166/// A 64-bit versioned pointer.
167///
168/// Combines a 16-bit version ID with a 48-bit slot index. This ensures that
169/// an index pointing to an old version of a slot cannot be used after the
170/// slot has been advanced.
171#[derive(Debug, Clone, Copy, Eq, PartialEq)]
172pub struct Index(u64);
173
174impl Index {
175    const ID_SHIFT: u64 = 48;
176    const INDEX_MASK: u64 = 0x0000_FFFF_FFFF_FFFF;
177
178    /// Creates a new versioned Index.
179    ///
180    /// # Arguments
181    /// * `id` - The 16-bit version identifier.
182    /// * `slot_index` - The physical offset in the data array (max 48 bits).
183    #[inline]
184    pub fn new(id: u16, slot_index: usize) -> Self {
185        let clean_slot = (slot_index as u64) & Self::INDEX_MASK;
186        Self(((id as u64) << Self::ID_SHIFT) | clean_slot)
187    }
188
189    /// Returns the 16-bit version identifier.
190    #[inline]
191    pub fn id(self) -> u16 {
192        (self.0 >> Self::ID_SHIFT) as u16
193    }
194
195    /// Returns the 48-bit physical slot index.
196    #[inline]
197    pub fn slot_index(self) -> usize {
198        (self.0 & Self::INDEX_MASK) as usize
199    }
200
201    /// Returns a copy of the index with an updated version ID.
202    ///
203    /// This is typically used during an `advance` operation to synchronize
204    /// the index with a new `Tag`.
205    #[inline]
206    pub fn with_id(self, id: u16) -> Self {
207        let slot_part = self.0 & Self::INDEX_MASK;
208        Self(((id as u64) << Self::ID_SHIFT) | slot_part)
209    }
210}
211
212impl From<u64> for Index {
213    #[inline]
214    fn from(raw_index: u64) -> Self {
215        Index(raw_index)
216    }
217}
218
219impl From<Index> for u64 {
220    #[inline]
221    fn from(index: Index) -> Self {
222        index.0
223    }
224}
225
226pub struct Slot<K, V>
227where
228    K: Eq + Hash,
229{
230    entry: Atomic<Entry<K, V>>,
231    tag: AtomicU64,
232}
233
234impl<K, V> Slot<K, V>
235where
236    K: Eq + Hash,
237{
238    #[inline(always)]
239    fn new() -> Self {
240        Self {
241            entry: Atomic::null(),
242            tag: AtomicU64::default(),
243        }
244    }
245}
246
247impl<K, V> Default for Slot<K, V>
248where
249    K: Eq + Hash,
250{
251    #[inline(always)]
252    fn default() -> Self {
253        Slot::new()
254    }
255}
256
257/// A cache implementation based on the S3-FIFO (Simple Scalable Static FIFO) algorithm.
258///
259/// S3-FIFO uses a three-queue architecture to achieve high hit rates by separating
260/// transient "one-hit wonders" from frequently accessed data.
261///
262/// ### Architecture
263/// 1. **Small Queue**: An intake FIFO for new entries (probationary area).
264/// 2. **Main Queue**: A FIFO for entries that have proven their value (protected area).
265/// 3. **Ghost Queue**: Tracks hashes of recently evicted items to facilitate
266///    re-insertion directly into the Main Queue.
267///
268///
269pub struct S3FIFOCache<K, V>
270where
271    K: Eq + Hash,
272{
273    /// Mapping of keys to versioned indices for fast lookups.
274    index_table: IndexTable<K>,
275    /// Contiguous storage for cache entries, padded to prevent false sharing.
276    slots: Box<[CachePadded<Slot<K, V>>]>,
277    /// The protected segment of the cache.
278    main_queue: RingQueue,
279    /// The probationary segment for new data.
280    small_queue: RingQueue,
281    /// Metadata queue for tracking evicted entry hashes.
282    ghost_queue: RingQueue,
283    /// Collection of available slot indices ready for new allocations.
284    index_pool: RingQueue,
285    /// Frequency estimator used to decide if an entry should bypass probation.
286    ghost_filter: CountMinSketch,
287    /// Configuration for spinning/yielding behavior under high contention.
288    backoff_config: BackoffConfig,
289    /// Hit/Miss counters and latency tracking.
290    metrics: Metrics,
291    /// Total number of entries the cache can hold.
292    capacity: usize,
293}
294
295impl<K, V> S3FIFOCache<K, V>
296where
297    K: Eq + Hash,
298{
299    /// Creates a new `S3Cache` with the specified capacity.
300    ///
301    /// The total capacity is partitioned between a 10% small probationary
302    /// segment and a 90% main protected segment. All slots are initialized
303    /// as empty and their indices are added to the available pool.
304    #[inline]
305    pub fn new(
306        capacity: usize,
307        backoff_config: BackoffConfig,
308        metrics_config: MetricsConfig,
309    ) -> Self {
310        const GHOST_FILTER_DEPTH: usize = 4;
311
312        let small_queue_capacity = (capacity as f64 * 0.1) as usize;
313        let main_queue_capacity = capacity - small_queue_capacity;
314
315        let small_queue = RingQueue::new(small_queue_capacity, backoff_config);
316        let main_queue = RingQueue::new(main_queue_capacity, backoff_config);
317        let ghost_queue = RingQueue::new(capacity, backoff_config);
318
319        let index_pool = RingQueue::new(capacity, backoff_config);
320
321        for index in 0..capacity {
322            let _ = index_pool.push(index as u64);
323        }
324
325        let metrics = Metrics::new(metrics_config);
326
327        let slots = (0..capacity)
328            .map(|_| CachePadded::new(Slot::new()))
329            .collect::<Vec<_>>()
330            .into_boxed_slice();
331
332        Self {
333            index_table: IndexTable::new(),
334            slots,
335            main_queue,
336            small_queue,
337            ghost_queue,
338            index_pool,
339            ghost_filter: CountMinSketch::new(capacity, GHOST_FILTER_DEPTH),
340            backoff_config,
341            metrics,
342            capacity,
343        }
344    }
345
346    /// Retrieves a reference to an entry associated with the provided key.
347    ///
348    /// If the entry exists and is valid, its access frequency is atomically
349    /// incremented. This protects the entry from being removed during the
350    /// next space-reclamation cycle.
351    pub fn get<Q>(&self, key: &Q) -> Option<Ref<K, V>>
352    where
353        Key<K>: Borrow<Q>,
354        Q: Eq + Hash + ?Sized,
355    {
356        let called_at = Instant::now();
357        let hash = hash(key);
358        let guard = pin();
359        let mut backoff = self.backoff_config.build();
360
361        loop {
362            match self.index_table.get(key) {
363                Some(index) => {
364                    let index = Index::from(index);
365
366                    let slot = &self.slots[index.slot_index()];
367                    let mut tag = Tag::from(slot.tag.load(Acquire));
368
369                    if !tag.is_match(index, hash) {
370                        let latency = called_at.elapsed().as_millis() as u64;
371                        self.metrics.record_miss();
372                        self.metrics.record_latency(latency);
373                        return None;
374                    }
375
376                    let entry = slot.entry.load(Relaxed, &guard);
377
378                    match unsafe { entry.as_ref() } {
379                        None => {
380                            let latency = called_at.elapsed().as_millis() as u64;
381                            self.metrics.record_miss();
382                            self.metrics.record_latency(latency);
383                            break None;
384                        }
385                        Some(entry_ref) => {
386                            if entry_ref.key().borrow() != key {
387                                let latency = called_at.elapsed().as_millis() as u64;
388                                self.metrics.record_miss();
389                                self.metrics.record_latency(latency);
390                                break None;
391                            }
392
393                            if entry_ref.is_expired() {
394                                let latency = called_at.elapsed().as_millis() as u64;
395                                self.metrics.record_miss();
396                                self.metrics.record_latency(latency);
397                                break None;
398                            }
399
400                            if let Err(latest) = slot.tag.compare_exchange_weak(
401                                tag.into(),
402                                tag.increment_frequency().into(),
403                                Release,
404                                Acquire,
405                            ) {
406                                tag = Tag::from(latest);
407                                backoff.backoff();
408                                continue;
409                            }
410
411                            break Some(Ref::new(NonNull::from_ref(entry_ref), guard));
412                        }
413                    }
414                }
415                None => {
416                    self.metrics.record_miss();
417                    self.metrics
418                        .record_latency(called_at.elapsed().as_millis() as u64);
419                    return None;
420                }
421            }
422        }
423    }
424
425    /// Handles the admission of new entries into the probationary segment.
426    ///
427    /// If the segment is full, this function triggers space reclamation.
428    /// The entry is stored in a slot obtained from the available pool and
429    /// then appended to the small queue.
430    pub fn insert_with(
431        &self,
432        key: K,
433        value: V,
434        expired_at: Option<Instant>,
435        admission: impl Fn(&K, &K) -> bool,
436    ) {
437        let entry = Entry::new(key, value, expired_at);
438        let key = entry.key().clone();
439        let hash = hash(entry.key());
440        let guard = pin();
441        let mut backoff = self.backoff_config.build();
442
443        loop {
444            match self.index_table.get(&key).map(Index::from) {
445                Some(index) => {
446                    let slot = &self.slots[index.slot_index()];
447
448                    let tag = Tag::from(slot.tag.load(Acquire));
449
450                    if !(tag.is_match(index, hash)
451                        && slot
452                            .tag
453                            .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Relaxed)
454                            .is_ok())
455                    {
456                        backoff.backoff();
457                        continue;
458                    }
459
460                    let old_entry = slot.entry.swap(Owned::new(entry), Relaxed, &guard);
461                    slot.tag.store(tag.increment_frequency().into(), Release);
462
463                    unsafe { guard.defer_destroy(old_entry) };
464
465                    break;
466                }
467                None => {
468                    if self.ghost_filter.contains(&hash) {
469                        self.push_into_main_queue(entry, admission, &guard, &mut backoff)
470                    } else {
471                        self.push_into_small_queue(entry, admission, &guard, &mut backoff)
472                    }
473
474                    break;
475                }
476            }
477        }
478    }
479
480    /// Manages space reclamation within the probationary segment.
481    ///
482    /// This function iterates through entries in the small queue:
483    /// 1. **Promotion**: Entries with an active access frequency are moved
484    ///    to the main segment.
485    /// 2. **Removal**: Entries that are expired or have no access history
486    ///    are removed. Their hashes are added to the ghost filter, and
487    ///    their indices are returned to the available pool.
488    fn push_into_small_queue(
489        &self,
490        entry: Entry<K, V>,
491        admission: impl Fn(&K, &K) -> bool,
492        guard: &Guard,
493        backoff: &mut Backoff,
494    ) {
495        let index = match self.index_pool.pop() {
496            Some(index) => Index::from(index),
497            None => {
498                match self.evict_from_small_queue(|key| admission(entry.key(), key), guard, backoff)
499                {
500                    Some(index) => index,
501                    None => return,
502                }
503            }
504        };
505
506        let slot = &self.slots[index.slot_index()];
507
508        let tag = Tag::from(slot.tag.load(Acquire));
509
510        let entry = Owned::new(entry);
511        let key = entry.key().clone();
512
513        slot.entry.store(entry, Relaxed);
514
515        let tag = tag.with_signature(hash(key.as_ref()));
516
517        slot.tag.store(tag.into(), Release);
518
519        loop {
520            match self.small_queue.push(index.into()) {
521                Ok(_) => break,
522                Err(_) => {
523                    if let Some(index) = self.evict_from_small_queue(|_| true, guard, backoff) {
524                        self.index_pool
525                            .push(index.into())
526                            .expect("the index pool can't overflow");
527                    } else {
528                        backoff.backoff();
529                    }
530                }
531            }
532        }
533
534        self.index_table.insert(key, index.into());
535    }
536
537    /// Reclaims space from the probationary segment (Small Queue).
538    ///
539    /// This function implements the S3-FIFO admission policy:
540    /// 1. **Promotion**: If an entry has been accessed (`is_hot`), it is moved to the
541    ///    Main Queue and its frequency is reset.
542    /// 2. **Retention**: If the admission policy `allow_eviction` protects the entry,
543    ///    it is re-queued at the back of the Small Queue.
544    /// 3. **Eviction**: If an entry is expired or not valuable, it is moved to the
545    ///    Ghost Queue (tracking its hash) and its slot is returned for reuse.
546    fn evict_from_small_queue(
547        &self,
548        allow_eviction: impl Fn(&K) -> bool,
549        guard: &Guard,
550        backoff: &mut Backoff,
551    ) -> Option<Index> {
552        while let Some(index) = self.small_queue.pop().map(Index::from) {
553            let slot = &self.slots[index.slot_index()];
554            let mut tag = Tag::from(slot.tag.load(Acquire));
555
556            loop {
557                if tag.is_busy() {
558                    tag = Tag::from(slot.tag.load(Acquire));
559                    backoff.backoff();
560                    continue;
561                }
562
563                let entry = slot.entry.load(Relaxed, guard);
564
565                let entry_ref =
566                    unsafe { entry.as_ref().expect("the occupied entry cannot be null") };
567
568                if tag.is_hot() && !entry_ref.is_expired() {
569                    if let Err(latest) = slot.tag.compare_exchange_weak(
570                        tag.into(),
571                        tag.reset().into(),
572                        Release,
573                        Acquire,
574                    ) {
575                        tag = Tag::from(latest);
576                        backoff.backoff();
577                        continue;
578                    }
579
580                    self.promote_index(index, guard, backoff);
581                    break;
582                }
583
584                if !(entry_ref.is_expired() || allow_eviction(entry_ref.key()))
585                    && self.small_queue.push(index.into()).is_ok()
586                {
587                    return None;
588                }
589
590                match slot
591                    .tag
592                    .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Acquire)
593                {
594                    Ok(_) => {
595                        let key = entry_ref.key().clone();
596                        self.index_table.remove(key.as_ref());
597                        slot.entry.store(Shared::null(), Relaxed);
598
599                        let (tag, index) = tag.advance(index);
600                        slot.tag.store(tag.into(), Release);
601
602                        self.push_into_ghost_queue(key.as_ref(), backoff);
603
604                        unsafe { guard.defer_destroy(entry) };
605
606                        return Some(index);
607                    }
608                    Err(latest) => {
609                        tag = Tag::from(latest);
610                        backoff.backoff();
611                    }
612                }
613            }
614        }
615
616        None
617    }
618
619    /// Records the hash of an evicted entry into the Ghost Queue and Filter.
620    ///
621    /// The Ghost Queue acts as a temporal buffer for recently evicted keys. If a key is
622    /// re-inserted while its hash is still in this queue, it bypasses the Small Queue
623    /// (probation) and is promoted directly to the Main Queue.
624    ///
625    /// ### Mechanism
626    /// * **Admission**: If the `ghost_queue` is full, it performs a FIFO eviction of the
627    ///   oldest hash to make room for the new one.
628    /// * **Consistency**: It maintains a `ghost_filter` (Count-Min Sketch) in sync with
629    ///   the queue to provide fast $O(1)$ membership tests during insertion.
630    /// * **Contention**: Uses the provided `backoff` strategy if the underlying ring
631    ///   buffer is temporarily locked or contended.
632    #[inline(always)]
633    fn push_into_ghost_queue(&self, key: &K, backoff: &mut Backoff) {
634        let hash = hash(key);
635        loop {
636            match self.ghost_queue.push(hash) {
637                Ok(_) => {
638                    self.ghost_filter.increment(&hash);
639                    break;
640                }
641                Err(_) => {
642                    backoff.backoff();
643
644                    if let Some(hash) = self.ghost_queue.pop() {
645                        self.ghost_filter.decrement(&hash);
646                    }
647                }
648            }
649        }
650    }
651
652    /// Promotes an index into the `main_queue`, potentially triggering eviction.
653    ///
654    /// This method transitions an entry from the admission filter or small queue into
655    /// long-term storage. Because the `main_queue` has a fixed capacity, this operation
656    /// is blocking: if the queue is full, the calling thread is conscripted to perform
657    /// an eviction to make space.
658    ///
659    /// # Invariants
660    /// * The provided `index` must represent a valid, allocated slot.
661    /// * Successful execution ensures the index is pushed to `main_queue`.
662    fn promote_index(&self, index: Index, guard: &Guard, backoff: &mut Backoff) {
663        loop {
664            if self.main_queue.push(index.into()).is_ok() {
665                break;
666            }
667
668            if let Some(index) = self.evict_from_main_queue(|_| true, guard, backoff) {
669                self.index_pool
670                    .push(index.into())
671                    .expect("the index pool can't overflow");
672            } else {
673                backoff.backoff();
674            }
675        }
676    }
677
678    /// Tries to add a new entry to the main queue, making space if necessary.
679    ///
680    /// This function handles the lifecycle of putting data into the cache. It first looks
681    /// for an empty slot, and if none are available, it uses the admission policy to
682    /// decide which old entry to kick out.
683    ///
684    /// **Algorithm**
685    ///
686    /// * **Finding a Slot**: It first tries to receive an index from the pool.
687    ///   If the pool is empty, it triggers the eviction process. It passes the
688    ///   `admission` policy to the evictor to decide if the new entry is "worthy"
689    ///   enough to replace an old one. If the evictor can't free a slot (because
690    ///   the policy protected everything), the function gives up and returns.
691    ///
692    /// * **Preparing the Entry**: Once it has a slot, it stores the new data,
693    ///   calculates a hash signature for quick lookups, and updates the slot's metadata.
694    ///
695    /// * **Queueing**: The new entry is pushed onto the main queue. If the queue
696    ///   is full (which can happen in high-traffic concurrent scenarios), it
697    ///   manually triggers a "forced" eviction—ignoring the admission policy this
698    ///   time—to ensure the new entry can actually fit.
699    ///
700    /// * **Table Registration**: Finally, it records the entry in the index table
701    ///   so other threads can find this data using its key.
702    fn push_into_main_queue(
703        &self,
704        entry: Entry<K, V>,
705        admission: impl Fn(&K, &K) -> bool,
706        guard: &Guard,
707        backoff: &mut Backoff,
708    ) {
709        let index = match self.index_pool.pop() {
710            Some(index) => Index::from(index),
711            None => {
712                match self.evict_from_main_queue(|key| admission(entry.key(), key), guard, backoff)
713                {
714                    Some(index) => index,
715                    None => return,
716                }
717            }
718        };
719
720        let slot = &self.slots[index.slot_index()];
721        let tag = Tag::from(slot.tag.load(Acquire));
722
723        let entry = Owned::new(entry);
724        let key = entry.key().clone();
725
726        slot.entry.store(entry, Relaxed);
727        let tag = tag.with_signature(hash(key.as_ref()));
728        slot.tag.store(tag.into(), Release);
729
730        loop {
731            match self.main_queue.push(index.into()) {
732                Ok(_) => break,
733                Err(_) => {
734                    if let Some(index) = self.evict_from_main_queue(|_| true, guard, backoff) {
735                        self.index_pool
736                            .push(index.into())
737                            .expect("the index pool can't overflow");
738                    } else {
739                        backoff.backoff();
740                    }
741                }
742            }
743        }
744
745        self.index_table.insert(key, index.into());
746    }
747
748    /// Tries to evict an entry from the main queue following the passed admission policy.
749    ///
750    /// This function walks the queue and decides whether to recycle a slot or keep its data:
751    ///
752    /// * **Second-Chance Rotation**: If an entry is "hot" and hasn't expired, we lower its frequency
753    ///   and move it to the back of the queue so it stays in the cache longer.
754    ///
755    /// * **Admission Filter**: If an entry isn't hot, we check the admission policy.
756    ///   If the policy says to keep it, we put it back in the queue and return `None`.
757    ///
758    /// * **Physical Eviction**: If the entry is expired or the policy allows it, we lock the slot,
759    ///   remove the key from the table, and return the cleared index for immediate reuse.
760    fn evict_from_main_queue(
761        &self,
762        allow_eviction: impl Fn(&K) -> bool,
763        guard: &Guard,
764        backoff: &mut Backoff,
765    ) -> Option<Index> {
766        while let Some(index) = self.main_queue.pop().map(Index::from) {
767            let slot = &self.slots[index.slot_index()];
768            let mut tag = Tag::from(slot.tag.load(Acquire));
769
770            loop {
771                if tag.is_busy() {
772                    tag = Tag::from(slot.tag.load(Acquire));
773                    backoff.backoff();
774                    continue;
775                }
776
777                let entry = slot.entry.load(Relaxed, guard);
778                let entry_ref = unsafe { entry.as_ref().expect("occupied entry can't be null") };
779
780                // Phase 1 Second-Chance Rotation:
781                // Attempt to decide whether to evict an entry from the queue based on its frequency
782                // and TTL.
783                if tag.is_hot() && !entry_ref.is_expired() {
784                    let updated_tag = tag.decrement_frequency();
785                    if slot
786                        .tag
787                        .compare_exchange_weak(tag.into(), updated_tag.into(), Release, Acquire)
788                        .is_ok()
789                    {
790                        // Re-insert to the back. If push fails (rare), we fall through to evict.
791                        if self.main_queue.push(index.into()).is_ok() {
792                            break; // Move to the NEXT index in the 'while' loop
793                        }
794                        tag = updated_tag;
795                    } else {
796                        tag = Tag::from(slot.tag.load(Acquire));
797                        backoff.backoff();
798                        continue;
799                    }
800                }
801
802                // 2. Admission Phase
803                // If the entry is not expired, we ask the admission policy if we can evict it.
804                // If the policy says "don't evict" (returns false), we try to insert it back to the main queue.
805                if !(entry_ref.is_expired() || allow_eviction(entry_ref.key()))
806                    && self.main_queue.push(index.into()).is_ok()
807                {
808                    break;
809                }
810
811                // Phase 3 Eviction:
812                // Attempt to lock the entry for eviction; if locking fails, try to insert the index back into the queue.
813                match slot
814                    .tag
815                    .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Acquire)
816                {
817                    Ok(_) => {
818                        self.index_table.remove(entry_ref.key());
819                        slot.entry.store(Shared::null(), Relaxed);
820
821                        let (next_tag, next_index) = tag.advance(index);
822                        slot.tag.store(next_tag.into(), Release);
823
824                        unsafe { guard.defer_destroy(entry) };
825                        return Some(next_index);
826                    }
827                    Err(latest) => {
828                        tag = Tag::from(latest);
829                        backoff.backoff();
830
831                        if self.main_queue.push(index.into()).is_ok() {
832                            break;
833                        }
834                    }
835                }
836            }
837        }
838
839        None
840    }
841
842    pub fn remove<Q>(&self, key: &Q) -> bool
843    where
844        Key<K>: Borrow<Q>,
845        Q: Eq + Hash + ?Sized,
846    {
847        let mut backoff = self.backoff_config.build();
848
849        match self.index_table.remove(key).map(Index::from) {
850            None => false,
851            Some(index) => {
852                let slot = &self.slots[index.slot_index()];
853
854                let mut tag = Tag::from(slot.tag.load(Relaxed));
855
856                loop {
857                    if !tag.is_epoch_match(index) {
858                        return false;
859                    }
860
861                    if let Err(latest) = slot.tag.compare_exchange_weak(
862                        tag.into(),
863                        tag.reset().into(),
864                        Relaxed,
865                        Relaxed,
866                    ) {
867                        tag = Tag::from(latest);
868                        backoff.backoff();
869                        continue;
870                    }
871
872                    return true;
873                }
874            }
875        }
876    }
877}
878
879impl<K, V> CacheEngine<K, V> for S3FIFOCache<K, V>
880where
881    K: Eq + Hash,
882{
883    fn get<Q>(&self, key: &Q) -> Option<Ref<K, V>>
884    where
885        Key<K>: Borrow<Q>,
886        Q: Eq + Hash + ?Sized,
887    {
888        self.get(key)
889    }
890
891    fn insert_with<A>(&self, key: K, value: V, expired_at: Option<Instant>, admission: A)
892    where
893        A: Fn(&K, &K) -> bool,
894    {
895        self.insert_with(key, value, expired_at, admission)
896    }
897
898    fn remove<Q>(&self, key: &Q) -> bool
899    where
900        Key<K>: Borrow<Q>,
901        Q: Eq + Hash + ?Sized,
902    {
903        self.remove(key)
904    }
905
906    fn capacity(&self) -> usize {
907        self.capacity
908    }
909
910    fn metrics(&self) -> MetricsSnapshot {
911        self.metrics.snapshot()
912    }
913}
914
915#[cfg(test)]
916mod tests {
917    use super::*;
918    use crate::core::utils::random_string;
919    use crate::core::workload::{WorkloadGenerator, WorkloadStatistics};
920    use rand::distr::{Alphanumeric, SampleString};
921    use rand::{RngExt, rng};
922    use std::thread::{scope, sleep};
923    use std::time::Duration;
924
925    #[inline(always)]
926    fn create_cache<K, V>(capacity: usize) -> S3FIFOCache<K, V>
927    where
928        K: Eq + Hash,
929    {
930        S3FIFOCache::new(
931            capacity,
932            BackoffConfig::exponential(1000),
933            MetricsConfig::default(),
934        )
935    }
936
937    #[inline(always)]
938    fn random_alphanumeric(len: usize) -> String {
939        Alphanumeric.sample_string(&mut rand::rng(), len)
940    }
941
942    #[test]
943    fn test_s3cache_insert_should_retrieve_stored_value() {
944        let cache = create_cache(10);
945
946        let key = random_alphanumeric(32);
947        let value = random_alphanumeric(255);
948
949        cache.insert(key.clone(), value.clone(), None);
950
951        let entry = cache.get(&key).expect("must present");
952
953        assert_eq!(entry.key(), &key);
954        assert_eq!(entry.value(), &value);
955    }
956
957    #[test]
958    fn test_s3cache_insert_should_overwrite_existing_key() {
959        let cache = create_cache(10);
960
961        let key = random_alphanumeric(32);
962        let value1 = random_alphanumeric(255);
963        let value2 = random_alphanumeric(255);
964
965        cache.insert(key.clone(), value1, None);
966        cache.insert(key.clone(), value2.clone(), None);
967
968        let entry = cache.get(&key).expect("must present");
969
970        assert_eq!(entry.key(), &key);
971        assert_eq!(entry.value(), &value2);
972    }
973
974    #[test]
975    fn test_s3cache_remove_should_invalidate_entry() {
976        let cache = create_cache(100);
977
978        let key = random_alphanumeric(32);
979
980        cache.insert(key.clone(), random_alphanumeric(255), None);
981
982        assert!(cache.get(&key).is_some());
983
984        assert!(cache.remove(&key));
985
986        assert!(cache.get(&key).is_none());
987    }
988
989    #[test]
990    fn test_s3cache_fill_beyond_capacity_should_evict_fifo() {
991        let cache = create_cache(100);
992
993        for _ in 0..1000 {
994            let key = random_alphanumeric(32);
995            let value = random_alphanumeric(255);
996
997            cache.insert(key, value, None);
998        }
999    }
1000
1001    #[test]
1002    fn test_s3cache_hot_entry_should_resist_eviction() {
1003        let cache = create_cache(1000);
1004
1005        let key = random_alphanumeric(32);
1006        let value = random_alphanumeric(255);
1007
1008        cache.insert(key.clone(), value.clone(), None);
1009
1010        let entry = cache.get(&key).expect("must present");
1011
1012        assert_eq!(entry.value(), &value);
1013
1014        for _ in 0..250 {
1015            let key = random_alphanumeric(32);
1016            let value = random_alphanumeric(255);
1017
1018            cache.insert(key, value, None);
1019        }
1020
1021        let entry = cache.get(&key).expect("must present");
1022
1023        assert_eq!(entry.key(), &key);
1024        assert_eq!(entry.value(), &value);
1025    }
1026
1027    #[test]
1028    fn test_s3cache_reinserted_ghost_entry_should_be_promoted_to_main() {
1029        let cache = create_cache(1000);
1030
1031        let (key, value) = (random_alphanumeric(32), random_alphanumeric(255));
1032
1033        cache.insert(key.clone(), value.clone().to_string(), None);
1034
1035        for _ in 0..200 {
1036            let key = random_alphanumeric(32);
1037            let value = random_alphanumeric(255);
1038
1039            cache.insert(key, value, None);
1040        }
1041
1042        assert!(cache.get(&key).is_none());
1043
1044        cache.insert(key.clone(), value.clone().to_string(), None);
1045
1046        for _ in 0..1000 {
1047            let key = random_alphanumeric(32);
1048            let value = random_alphanumeric(255);
1049
1050            cache.insert(key, value, None);
1051        }
1052
1053        let entry = cache.get(&key).expect("must present");
1054
1055        assert_eq!(entry.key(), &key);
1056        assert_eq!(entry.value(), &value);
1057    }
1058
1059    #[test]
1060    fn test_s3cache_ghost_filter_should_protect_working_set() {
1061        let cache = create_cache(1000);
1062        let hot_entries = vec![("a", "a"), ("b", "b"), ("c", "c"), ("d", "d"), ("e", "e")];
1063
1064        for &(key, value) in &hot_entries {
1065            cache.insert(key.to_string(), value.to_string(), None);
1066        }
1067
1068        for i in 0..100000 {
1069            if i % 2 == 0 {
1070                cache.insert(format!("key-{}", i), format!("value-{}", i), None);
1071            } else {
1072                let index = rng().random_range(..hot_entries.len());
1073                let key = hot_entries[index].0;
1074                let _ = cache.get(key);
1075            }
1076        }
1077
1078        let count = hot_entries
1079            .iter()
1080            .map(|&(key, _)| cache.get(key))
1081            .filter(|it| it.is_some())
1082            .count();
1083
1084        assert!(count >= 4);
1085    }
1086
1087    #[test]
1088    fn test_s3cache_concurrent_hammer_should_not_crash_or_hang() {
1089        let cache = create_cache(1000);
1090        let num_threads = 32;
1091        let ops_per_thread = 5000;
1092
1093        scope(|s| {
1094            for _ in 0..num_threads {
1095                s.spawn(|| {
1096                    for i in 0..ops_per_thread {
1097                        let key = (i % 500).to_string();
1098                        if i % 2 == 0 {
1099                            cache.insert(key, random_alphanumeric(255), None);
1100                        } else {
1101                            let _ = cache.get(&key);
1102                        }
1103                    }
1104                });
1105            }
1106        });
1107    }
1108
1109    #[test]
1110    fn test_s3_fifo_should_protect_hot_set_under_high_churn() {
1111        let capacity = 1000;
1112        let cache = create_cache(capacity);
1113
1114        let num_threads = 8;
1115        let ops_per_thread = 10000;
1116
1117        let workload_generator = WorkloadGenerator::new(20000, 1.3);
1118        let workload_statistics = WorkloadStatistics::new();
1119
1120        let mut rand = rng();
1121        for _ in 0..capacity {
1122            let key = workload_generator.key(&mut rand);
1123            cache.insert(key.to_string(), random_string(), None);
1124            workload_statistics.record(key.to_string());
1125        }
1126
1127        scope(|scope| {
1128            for _ in 0..num_threads {
1129                scope.spawn(|| {
1130                    let mut thread_rng = rng();
1131                    for _ in 0..ops_per_thread {
1132                        let key = workload_generator.key(&mut thread_rng);
1133                        workload_statistics.record(key.to_string());
1134
1135                        if cache.get(key).is_none() {
1136                            let value = random_string();
1137                            cache.insert(key.to_string(), value, None);
1138                        }
1139                    }
1140                });
1141            }
1142        });
1143
1144        let top_keys_size = 500;
1145        let frequent_keys = workload_statistics.frequent_keys(top_keys_size);
1146
1147        let count = frequent_keys.iter().fold(0, |acc, key| {
1148            if cache.get(key).is_some() {
1149                acc + 1
1150            } else {
1151                acc
1152            }
1153        });
1154
1155        assert!(
1156            count >= 400,
1157            "S3-FIFO efficiency dropped! Captured only {}/{} hot keys",
1158            count,
1159            top_keys_size
1160        );
1161    }
1162
1163    #[test]
1164    fn test_s3cache_ttl_entry_should_expire() {
1165        let cache = create_cache(10);
1166        let key = random_string();
1167        let value = random_string();
1168
1169        cache.insert(
1170            key.clone(),
1171            value.clone(),
1172            Some(Instant::now() + Duration::from_millis(10)),
1173        );
1174
1175        assert!(cache.get(&key).is_some());
1176
1177        sleep(Duration::from_millis(50));
1178
1179        // Should be expired now
1180        assert!(cache.get(&key).is_none(), "Entry should have expired");
1181    }
1182
1183    #[test]
1184    fn test_s3cache_ttl_entry_should_not_expire_early() {
1185        use std::time::Duration;
1186        let cache = create_cache(10);
1187        let key = "not-expire-me".to_string();
1188        let value = "value".to_string();
1189
1190        cache.insert(
1191            key.clone(),
1192            value.clone(),
1193            Some(Instant::now() + Duration::from_secs(10)),
1194        );
1195
1196        sleep(Duration::from_millis(10));
1197
1198        assert!(
1199            cache.get(&key).is_some(),
1200            "Entry should not have expired yet"
1201        );
1202    }
1203
1204    #[test]
1205    fn test_s3cache_ttl_overwrite_should_update_expiry() {
1206        let cache = create_cache(10);
1207        let key = random_string();
1208
1209        cache.insert(
1210            key.clone(),
1211            "val1".to_string(),
1212            Some(Instant::now() + Duration::from_millis(10)),
1213        );
1214
1215        cache.insert(
1216            key.clone(),
1217            "val2".to_string(),
1218            Some(Instant::now() + Duration::from_secs(10)),
1219        );
1220
1221        sleep(Duration::from_millis(50));
1222
1223        let entry = cache.get(&key);
1224        assert!(
1225            entry.is_some(),
1226            "Entry should still be present with new TTL"
1227        );
1228        assert_eq!(entry.unwrap().value(), "val2");
1229    }
1230
1231    #[test]
1232    fn test_s3cache_expired_entries_should_be_evicted() {
1233        let capacity = 10;
1234        let cache = create_cache(capacity);
1235
1236        for _ in 0..capacity {
1237            let key = random_string();
1238
1239            cache.insert(
1240                key.clone(),
1241                random_string(),
1242                Some(Instant::now() + Duration::from_millis(300)),
1243            );
1244
1245            let _ = cache.get(&key);
1246        }
1247
1248        sleep(Duration::from_millis(500));
1249
1250        let latest_key = random_string();
1251
1252        cache.insert(latest_key.to_string(), random_string(), None);
1253
1254        assert!(cache.get(&latest_key).is_some());
1255    }
1256}