Skip to main content

omega_cache/
s3fifo.rs

1use crate::core::cms::CountMinSketch;
2use crate::core::engine::CacheEngine;
3use crate::core::entry::Entry;
4use crate::core::entry_ref::Ref;
5use crate::core::index::IndexTable;
6use crate::core::key::Key;
7use crate::core::request_quota::RequestQuota;
8use crate::core::ring::RingQueue;
9use crate::core::tag::{Index, Tag};
10use crate::core::thread_context::ThreadContext;
11use crate::core::utils;
12use crate::metrics::{Metrics, MetricsConfig, MetricsSnapshot};
13use crossbeam::utils::CachePadded;
14use crossbeam_epoch::{Atomic, Owned, pin};
15use crossbeam_epoch::{Guard, Shared};
16use std::borrow::Borrow;
17use std::hash::Hash;
18use std::ptr::NonNull;
19use std::sync::atomic::AtomicU64;
20use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
21use std::time::Instant;
22use utils::hash;
23
24pub struct Slot<K, V>
25where
26    K: Eq + Hash,
27{
28    entry: Atomic<Entry<K, V>>,
29    tag: AtomicU64,
30}
31
32impl<K, V> Slot<K, V>
33where
34    K: Eq + Hash,
35{
36    #[inline(always)]
37    fn new() -> Self {
38        Self {
39            entry: Atomic::null(),
40            tag: AtomicU64::default(),
41        }
42    }
43}
44
45impl<K, V> Default for Slot<K, V>
46where
47    K: Eq + Hash,
48{
49    #[inline(always)]
50    fn default() -> Self {
51        Slot::new()
52    }
53}
54
55/// A high-concurrency, segmented cache implementing the S3-FIFO eviction algorithm.
56///
57/// This structure organizes memory into a multi-tiered hierarchy to achieve
58/// scan-resistance and high hit rates, specifically optimized for modern
59/// multicore processors.
60///
61/// # Architecture
62/// S3-FIFO (Simple Scalable Static FIFO) extends traditional FIFO by using
63/// three distinct queues:
64/// 1. **Probationary**: A small FIFO queue (typically 10% of capacity) for new entries.
65/// 2. **Protected**: A large FIFO queue for frequently accessed entries.
66/// 3. **Ghost**: A "shadow" queue that tracks the hashes of evicted entries to
67///    inform future admission decisions.
68///
69/// # Concurrency & Performance
70/// - **Lock-Free Design**: Uses atomic operations and `crossbeam-epoch` for
71///   thread-safe access without global mutexes.
72/// - **False Sharing Protection**: Slots are wrapped in `CachePadded` to ensure
73///   different threads don't invalidate each other's CPU cache lines.
74/// - **Index Pool**: A dedicated queue manages slot reuse, eliminating the
75///   need for expensive memory allocations during the steady state.
76pub struct S3FIFOCache<K, V>
77where
78    K: Eq + Hash,
79{
80    /// Mapping of keys to versioned indices for fast lookups.
81    index_table: IndexTable<K>,
82    /// Contiguous storage for cache entries, padded to prevent false sharing.
83    slots: Box<[CachePadded<Slot<K, V>>]>,
84    /// The protected segment of the cache.
85    protected_segment: RingQueue,
86    /// The probationary segment for new data.
87    probation_segment: RingQueue,
88    /// Metadata queue for tracking evicted entry hashes.
89    ghost_queue: RingQueue,
90    /// Collection of available slot indices ready for new allocations.
91    index_pool: RingQueue,
92    /// Frequency estimator used to decide if an entry should bypass probation.
93    ghost_filter: CountMinSketch,
94    /// Hit/Miss counters and latency tracking.
95    metrics: Metrics,
96    /// Total number of entries the cache can hold.
97    capacity: usize,
98}
99
100impl<K, V> S3FIFOCache<K, V>
101where
102    K: Eq + Hash,
103{
104    /// Initializes a new cache instance with a segmented S3-FIFO architecture.
105    ///
106    /// This constructor allocates the underlying slot storage and partitions the
107    /// cache capacity into functional segments designed to balance scan-resistance
108    /// with high hit rates.
109    ///
110    /// # Segmentation Logic
111    /// - **Probation Segment (10%)**: Acts as the initial landing zone for new entries.
112    ///   It prevents "one-hit wonders" from polluting the main cache body.
113    /// - **Protected Segment (90%)**: Houses frequency-proven entries. Data here has
114    ///   survived a probationary period or was identified as frequent via the ghost filter.
115    /// - **Ghost Queue & Filter**: A historical tracking mechanism sized to match total
116    ///   capacity. It records the hashes of recently evicted items, allowing the
117    ///   admission policy to "remember" and promote returning keys.
118    ///
119    /// # Resource Initialization
120    /// - **Index Pool**: Pre-populated with all available slot indices (0 to capacity).
121    ///   This acts as a lock-free allocator for cache slots.
122    /// - **Cache Padding**: Each `Slot` is wrapped in `CachePadded` to prevent
123    ///   "false sharing," a critical optimization for performance on high-core-count
124    ///   processors like the M1 Pro.
125    /// - **Ghost Filter**: Uses a `CountMinSketch` with a depth of 4 to provide space-efficient
126    ///   frequency estimation for the admission policy.
127    ///
128    /// # Parameters
129    /// - `capacity`: The total number of entries the cache can hold. This value is
130    ///   distributed between the probation and protected segments.
131    /// - `metrics_config`: Configuration for hit/miss/latency tracking.
132    #[inline]
133    pub fn new(capacity: usize, metrics_config: MetricsConfig) -> Self {
134        const GHOST_FILTER_DEPTH: usize = 4;
135
136        let probation_segment_capacity = (capacity as f64 * 0.1) as usize;
137        let protected_segment_capacity = capacity - probation_segment_capacity;
138
139        let probation_segment = RingQueue::new(probation_segment_capacity);
140        let protected_segment = RingQueue::new(protected_segment_capacity);
141        let ghost_queue = RingQueue::new(capacity);
142
143        let index_pool = RingQueue::new(capacity);
144
145        let context = ThreadContext::default();
146
147        for index in 0..capacity {
148            let _ = index_pool.push(index as u64, &context);
149        }
150
151        let metrics = Metrics::new(metrics_config);
152
153        let slots = (0..capacity)
154            .map(|_| CachePadded::new(Slot::new()))
155            .collect::<Vec<_>>()
156            .into_boxed_slice();
157
158        Self {
159            index_table: IndexTable::new(),
160            slots,
161            protected_segment,
162            probation_segment,
163            ghost_queue,
164            index_pool,
165            ghost_filter: CountMinSketch::new(capacity, GHOST_FILTER_DEPTH),
166            metrics,
167            capacity,
168        }
169    }
170
171    /// Retrieves a value from the cache, upgrading its frequency on a successful match.
172    ///
173    /// This method implements a lock-free read path that utilizes atomic tags for
174    /// fast validation before accessing the actual entry memory.
175    ///
176    /// # Control Flow
177    /// 1. **Index Resolution**: Performs a lookup in the `index_table`. If the key is
178    ///    not found, records a miss and returns `None`.
179    /// 2. **Tag Validation**: Loads the slot's `Tag` using `Acquire` semantics and
180    ///    validates it against the provided `hash` and `index`. This prevents
181    ///    accessing a slot that has been repurposed (ABA protection).
182    /// 3. **Liveness & Expiration**:
183    ///    - Checks if the `Entry` is null or if the stored key has changed.
184    ///    - Validates the entry's TTL. If expired, it records a miss.
185    /// 4. **Frequency Upgrade**:
186    ///    - Attempts to increment the access frequency in the `Tag` via `compare_exchange_weak`.
187    ///    - On success, the entry is considered "Hot," potentially protecting it from
188    ///      future eviction.
189    ///    - On failure (contention), the thread performs a backoff and retries the loop.
190    /// 5. **Reference Return**: On a successful hit, returns a `Ref` which wraps
191    ///    the entry and the `Guard`, ensuring memory remains valid for the caller.
192    ///
193    /// # Memory Model & Synchronization
194    /// - **Acquire/Release**: The `Tag` load (`Acquire`) synchronizes with the `insert`
195    ///   or `evict` stores (`Release`), ensuring the `entry` pointer is valid.
196    /// - **Lock-Free Reads**: Readers never block writers. The frequency update is
197    ///   optimistic and handles contention via the `ThreadContext` wait/decay mechanism.
198    ///
199    /// # Parameters
200    /// - `key`: The key to look up.
201    /// - `context`: Thread-local state for frequency decay and contention management.
202    ///
203    /// # Returns
204    /// - `Some(Ref<K, V>)`: A handle to the entry if found and valid.
205    /// - `None`: If the key is missing, expired, or the signature mismatch occurs.
206    pub fn get<Q>(&self, key: &Q, context: &ThreadContext) -> Option<Ref<K, V>>
207    where
208        Key<K>: Borrow<Q>,
209        Q: Eq + Hash + ?Sized,
210    {
211        let called_at = Instant::now();
212        let hash = hash(key);
213        let guard = pin();
214
215        loop {
216            match self.index_table.get(key) {
217                Some(index) => {
218                    let index = Index::from(index);
219
220                    let slot = &self.slots[index.slot_index()];
221                    let mut tag = Tag::from(slot.tag.load(Acquire));
222
223                    if !tag.is_match(index, hash) {
224                        let latency = called_at.elapsed().as_millis() as u64;
225                        self.metrics.record_miss();
226                        self.metrics.record_latency(latency);
227                        return None;
228                    }
229
230                    let entry = slot.entry.load(Relaxed, &guard);
231
232                    match unsafe { entry.as_ref() } {
233                        None => {
234                            let latency = called_at.elapsed().as_millis() as u64;
235                            self.metrics.record_miss();
236                            self.metrics.record_latency(latency);
237                            break None;
238                        }
239                        Some(entry_ref) => {
240                            if entry_ref.key().borrow() != key || entry_ref.is_expired() {
241                                let latency = called_at.elapsed().as_millis() as u64;
242                                self.metrics.record_miss();
243                                self.metrics.record_latency(latency);
244                                break None;
245                            }
246
247                            match slot.tag.compare_exchange_weak(
248                                tag.into(),
249                                tag.increment_frequency().into(),
250                                Release,
251                                Acquire,
252                            ) {
253                                Ok(_) => {
254                                    context.decay();
255                                }
256                                Err(latest) => {
257                                    tag = Tag::from(latest);
258                                    context.wait();
259                                    continue;
260                                }
261                            }
262
263                            let latency = called_at.elapsed().as_millis() as u64;
264                            self.metrics.record_hit();
265                            self.metrics.record_latency(latency);
266
267                            break Some(Ref::new(NonNull::from_ref(entry_ref), guard));
268                        }
269                    }
270                }
271                None => {
272                    self.metrics.record_miss();
273                    self.metrics
274                        .record_latency(called_at.elapsed().as_millis() as u64);
275                    return None;
276                }
277            }
278        }
279    }
280
281    /// The main entry point for inserting or updating data within the cache.
282    ///
283    /// This method implements an adaptive admission policy by distinguishing between
284    /// existing entries, known "hot" candidates (via the ghost filter), and new arrivals.
285    ///
286    /// # Control Flow
287    /// 1. **Index Lookup**: Checks the `index_table` to see if the key is already resident.
288    /// 2. **Update Path (Resident Key)**:
289    ///    - If found, it attempts to lock the specific slot by transitioning its `Tag` to `busy`.
290    ///    - It validates the `signature` and `index` to ensure the slot hasn't been repurposed (ABA protection).
291    ///    - Upon a successful lock, it swaps the `Entry`, increments the access frequency in the `Tag`,
292    ///      and releases the lock.
293    ///    - If the lock fails or a mismatch is detected, the thread performs a backoff and retries.
294    /// 3. **Admission Path (New Key)**:
295    ///    - If the key is not in the index, the `ghost_filter` is consulted.
296    ///    - **Promotion**: If the key's hash is present in the ghost filter (indicating it was
297    ///      recently evicted or seen), it is inserted directly into the `protected_segment`.
298    ///    - **Probation**: If the key is entirely new, it is placed in the `probation_queue`.
299    ///
300    /// # Memory Model & Synchronization
301    /// - **AcqRel/Release**: The `compare_exchange_weak` and subsequent `store` on the `Tag`
302    ///   ensure that the `Entry` swap is safely published to readers.
303    /// - **Epoch-Based Reclamation**: `guard.defer_destroy` ensures the `old_entry` is only
304    ///   deallocated when no concurrent readers hold a reference to it.
305    /// - **Adaptive Backoff**: Uses `context.wait()` and `context.decay()` to handle contention
306    ///   gracefully on highly active slots.
307    ///
308    /// # Parameters
309    /// - `entry`: The key-value pair to insert.
310    /// - `context`: Thread-local state for synchronization and performance metrics.
311    /// - `quota`: Budget for the operation, primarily used if insertion triggers cascading evictions.
312    pub fn insert(&self, entry: Entry<K, V>, context: &ThreadContext, quota: &mut RequestQuota) {
313        let key = entry.key().clone();
314        let hash = hash(entry.key());
315        let guard = pin();
316
317        loop {
318            match self.index_table.get(&key).map(Index::from) {
319                Some(index) => {
320                    let slot = &self.slots[index.slot_index()];
321                    let tag = Tag::from(slot.tag.load(Acquire));
322
323                    if !(tag.is_match(index, hash)
324                        && slot
325                            .tag
326                            .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Relaxed)
327                            .is_ok())
328                    {
329                        context.wait();
330                        continue;
331                    }
332
333                    context.decay();
334
335                    let old_entry = slot.entry.swap(Owned::new(entry), Relaxed, &guard);
336                    slot.tag.store(tag.increment_frequency().into(), Release);
337
338                    unsafe { guard.defer_destroy(old_entry) };
339
340                    break;
341                }
342                None => {
343                    if self.ghost_filter.contains(&hash) {
344                        self.push_into_protected_segment(entry, &guard, context, quota)
345                    } else {
346                        self.push_into_probation_queue(entry, &guard, context, quota)
347                    }
348
349                    break;
350                }
351            }
352        }
353    }
354
355    /// Inserts a new entry into the probation segment, serving as the entry point for most new data.
356    ///
357    /// # Control Flow
358    /// 1. **Index Acquisition**: Attempts to retrieve an index from the `index_pool`. If exhausted,
359    ///    it triggers `evict_from_probation_segment` to reclaim space.
360    /// 2. **Segment Insertion**: Attempts to push the acquired index into the `probation_segment`.
361    /// 3. **Data Publication**:
362    ///    - Stores the `Entry` into the designated slot.
363    ///    - Maps the key to the index within the `IndexTable`.
364    ///    - Computes and stores a new `Tag` signature to validate the slot and release the write
365    ///      to concurrent readers.
366    /// 4. **Contention & Overflow**: If the segment push fails (full queue), the thread performs
367    ///    emergency eviction. The reclaimed index is returned to the pool, and the thread retries
368    ///    until successful or the `quota` is depleted.
369    ///
370    /// # Memory Model & Synchronization
371    /// - **Visibility Barrier**: The `Tag` store uses `Release` semantics, ensuring all previous
372    ///   writes (Entry and IndexTable) are visible to any thread that performs an `Acquire`
373    ///   load on that same tag.
374    /// - **Resource Safety**: In cases of quota exhaustion or unexpected failure, indices are
375    ///   restored to the `index_pool` to prevent permanent loss of cache capacity.
376    ///
377    /// # Returns
378    /// This function returns early without insertion if the `quota` is exhausted during
379    /// eviction or index recovery attempts.
380    fn push_into_probation_queue(
381        &self,
382        entry: Entry<K, V>,
383        guard: &Guard,
384        context: &ThreadContext,
385        quota: &mut RequestQuota,
386    ) {
387        let index = match self.index_pool.pop(context) {
388            Some(index) => Index::from(index),
389            None => match self.evict_from_probation_segment(guard, context, quota) {
390                Some(index) => index,
391                None => return,
392            },
393        };
394
395        loop {
396            if self.probation_segment.push(index.into(), context).is_ok() {
397                let slot = &self.slots[index.slot_index()];
398
399                let tag = Tag::from(slot.tag.load(Acquire));
400
401                let entry = Owned::new(entry);
402                let key = entry.key().clone();
403
404                slot.entry.store(entry, Relaxed);
405                self.index_table.insert(key.clone(), index.into());
406
407                let tag = tag.with_signature(hash(key.as_ref()));
408                slot.tag.store(tag.into(), Release);
409
410                break;
411            }
412
413            match self.evict_from_probation_segment(guard, context, quota) {
414                Some(evicted_index) => {
415                    self.index_pool
416                        .push(evicted_index.into(), context)
417                        .expect("the index pool can't overflow");
418                }
419                None => {
420                    self.index_pool
421                        .push(index.into(), context)
422                        .expect("the index pool can't overflow");
423
424                    break;
425                }
426            }
427        }
428    }
429
430    /// Evicts an entry from the probation segment, implementing a promotion path for frequently accessed keys.
431    ///
432    /// # Control Flow
433    /// 1. **Selection**: Pops an index from the `probation_segment`. Returns `None` if the quota is
434    ///    exhausted or the segment is empty.
435    /// 2. **Liveness Check**: Skips slots that are `busy` or uninitialized, re-queuing them to maintain
436    ///    segment integrity.
437    /// 3. **Promotion Path**:
438    ///    - If an entry is `Hot` (has been accessed) and is not expired, it qualifies for promotion.
439    ///    - The `Tag` is reset (clearing the hot bit), and the thread attempts to move the index
440    ///      into the protected segment via `promote_index`.
441    ///    - If promotion succeeds, the loop breaks to the next candidate.
442    /// 4. **Eviction Path**:
443    ///    - If not promoted, the thread attempts to CAS the tag to `busy`.
444    ///    - On success:
445    ///        - The key is removed from the `IndexTable` and the slot's entry is nullified.
446    ///        - The `Tag` signature is advanced to prevent ABA issues.
447    ///        - The evicted key is pushed into the `ghost_queue` to track its frequency for
448    ///          future admission decisions.
449    ///        - The entry's memory is scheduled for deallocation via `guard.defer_destroy`.
450    ///
451    /// # Memory Model & Synchronization
452    /// - **Acquire/Release Semantics**: Synchronizes slot data and index visibility across threads,
453    ///   ensuring the `busy` state transition is globally observed before data cleanup begins.
454    /// - **Ghost Synchronization**: The handover to the `ghost_queue` occurs after the entry is
455    ///   made undiscoverable, ensuring a clean transition from "resident" to "remembered."
456    /// - **Retry Logic**: Uses a nested loop and `compare_exchange_weak` to handle high contention
457    ///   on the slot's metadata without blocking.
458    ///
459    /// # Returns
460    /// - `Some(Index)`: The index of the successfully evicted slot, ready for reuse.
461    /// - `None`: Failure to evict due to quota exhaustion or an empty probation segment.
462    fn evict_from_probation_segment(
463        &self,
464        guard: &Guard,
465        context: &ThreadContext,
466        quota: &mut RequestQuota,
467    ) -> Option<Index> {
468        while quota.consume()
469            && let Some(index) = self.probation_segment.pop(context).map(Index::from)
470        {
471            let slot = &self.slots[index.slot_index()];
472            let mut tag = Tag::from(slot.tag.load(Acquire));
473            let mut reseted = false;
474
475            loop {
476                if tag.is_busy() || tag.signature() == 0 {
477                    if self.probation_segment.push(index.into(), context).is_ok() {
478                        break;
479                    }
480
481                    tag = Tag::from(slot.tag.load(Acquire));
482                    context.wait();
483                    continue;
484                }
485
486                let entry = slot.entry.load(Relaxed, guard);
487
488                let entry_ref =
489                    unsafe { entry.as_ref().expect("the occupied entry cannot be null") };
490
491                if !reseted && tag.is_hot() && !entry_ref.is_expired() {
492                    let updated_tag = tag.reset();
493
494                    match slot.tag.compare_exchange_weak(
495                        tag.into(),
496                        updated_tag.into(),
497                        Release,
498                        Acquire,
499                    ) {
500                        Ok(_) => {
501                            context.decay();
502                            reseted = true;
503                        }
504                        Err(latest) => {
505                            tag = Tag::from(latest);
506                            context.wait();
507                            continue;
508                        }
509                    }
510
511                    if self.promote_index(index, guard, context, quota) {
512                        break;
513                    }
514
515                    tag = updated_tag
516                }
517
518                match slot
519                    .tag
520                    .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Acquire)
521                {
522                    Ok(_) => {
523                        let key = entry_ref.key().clone();
524                        self.index_table.remove(key.as_ref());
525                        slot.entry.store(Shared::null(), Relaxed);
526
527                        let (tag, index) = tag.advance(index);
528                        slot.tag.store(tag.into(), Release);
529
530                        let _ = self.push_into_ghost_queue(key.as_ref(), context, quota);
531
532                        unsafe { guard.defer_destroy(entry) };
533
534                        return Some(index);
535                    }
536                    Err(latest) => {
537                        tag = Tag::from(latest);
538
539                        if self.probation_segment.push(index.into(), context).is_ok() {
540                            break;
541                        }
542                    }
543                }
544            }
545        }
546
547        None
548    }
549
550    /// Pushes a key's hash into the ghost queue and updates the frequency filter.
551    ///
552    /// # Control Flow
553    /// 1. **Hashing**: Computes the hash of the key to be used as a fingerprint in the ghost structures.
554    /// 2. **Insertion**: Attempts to push the hash into the `ghost_queue`.
555    ///    - If successful, it increments the frequency count in the `ghost_filter` and returns.
556    /// 3. **Queue Maintenance**: If the queue is full:
557    ///    - Checks the `quota`. If exhausted, the operation fails.
558    ///    - Pops the oldest hash from the queue to make room.
559    ///    - Decrements the frequency count for the evicted hash in the `ghost_filter` to keep the filter synchronized with the queue's contents.
560    /// 4. **Retry**: The loop continues until the new hash is successfully pushed or the quota limit is reached.
561    ///
562    /// # Logic & Invariants
563    /// - **Ghost Filter Synchronization**: The `ghost_filter` (likely a Counting Bloom Filter or similar) is strictly tied to the lifetime of hashes within the `ghost_queue`. This prevents "stale" frequency counts for keys that have long since left the ghost segment.
564    /// - **Admission Signaling**: The presence of a high count in the `ghost_filter` typically serves as the signal to promote a probationary entry to the protected segment upon its next access.
565    ///
566    /// # Returns
567    /// - `true`: The hash was successfully added to the ghost queue.
568    /// - `false`: The operation failed due to quota exhaustion.
569    #[inline(always)]
570    fn push_into_ghost_queue(
571        &self,
572        key: &K,
573        context: &ThreadContext,
574        quota: &mut RequestQuota,
575    ) -> bool {
576        let hash = hash(key);
577
578        loop {
579            if self.ghost_queue.push(hash, context).is_ok() {
580                self.ghost_filter.increment(&hash, context);
581                return true;
582            }
583
584            if !quota.consume() {
585                return false;
586            }
587
588            if let Some(oldest_hash) = self.ghost_queue.pop(context) {
589                self.ghost_filter.decrement(&oldest_hash, context);
590            }
591        }
592    }
593
594    /// Promotes an index into the protected segment, reclaiming space if necessary.
595    ///
596    /// # Control Flow
597    /// 1. **Initial Push**: Attempts to move the provided `index` into the `protected_segment`.
598    ///    If the segment has immediate capacity, the promotion is successful.
599    /// 2. **Eviction Loop**: If the segment is full, the thread attempts to free a slot by
600    ///    invoking `evict_from_protected_segment`.
601    /// 3. **Index Recovery**: Indices reclaimed via eviction are returned to the `index_pool`
602    ///    to maintain the total available slot count.
603    /// 4. **Termination**: The process repeats until the original index is successfully
604    ///    pushed or the `evict_from_protected_segment` call returns `None` (due to quota
605    ///    exhaustion or an empty segment), signaling a failed promotion.
606    ///
607    /// # Invariants
608    /// - **Index Conservation**: Every index evicted to make room for the promotion is
609    ///   pushed to the `index_pool` to ensure no slots are "lost" during high-contention
610    ///   re-balancing.
611    /// - **Panic Safety**: The `index_pool` push uses an expectation that the pool
612    ///   cannot overflow, assuming the pool capacity matches the total cache capacity.
613    ///
614    /// # Returns
615    /// - `true`: The index was successfully promoted into the protected segment.
616    /// - `false`: Promotion failed because the quota was exhausted before space could be cleared.
617    fn promote_index(
618        &self,
619        index: Index,
620        guard: &Guard,
621        context: &ThreadContext,
622        quota: &mut RequestQuota,
623    ) -> bool {
624        loop {
625            if self.protected_segment.push(index.into(), context).is_ok() {
626                return true;
627            }
628
629            match self.evict_from_protected_segment(guard, context, quota) {
630                Some(evicted_index) => {
631                    self.index_pool
632                        .push(evicted_index.into(), context)
633                        .expect("the index pool can't overflow");
634                }
635                None => return false,
636            }
637        }
638    }
639
640    /// Inserts a new entry into the protected segment, potentially triggering eviction if the segment is full.
641    ///
642    /// # Control Flow
643    /// 1. **Index Acquisition**: Attempts to pop an available index from the `index_pool`.
644    ///    If empty, it invokes `evict_from_protected_segment` to reclaim a slot.
645    /// 2. **Segment Placement**: Attempts to push the index into the `protected_segment` queue.
646    /// 3. **Data Publication**:
647    ///    - Stores the new `Entry` into the resolved slot.
648    ///    - Updates the `IndexTable` to map the key to the slot index.
649    ///    - Calculates a new `Tag` signature based on the key's hash and stores it to
650    ///      mark the slot as initialized and valid for readers.
651    /// 4. **Contention Handling**: If the segment push fails (queue full), it performs
652    ///    an emergency eviction. The evicted index is returned to the pool, and the
653    ///    original operation retries until successful or the quota is exhausted.
654    ///
655    /// # Memory Model & Synchronization
656    /// - **Publication Order**: The `Entry` is stored `Relaxed`, followed by the `IndexTable`
657    ///   insertion. The `Tag` is stored with `Release` semantics, acting as the memory
658    ///   barrier that makes the entry visible to concurrent readers.
659    /// - **Resource Recovery**: On failed pushes or quota exhaustion, indices are
660    ///   explicitly pushed back to the `index_pool` to prevent slot leakage.
661    ///
662    /// # Parameters
663    /// - `entry`: The data to be cached.
664    /// - `guard`: Epoch guard for memory reclamation safety.
665    /// - `context`: Thread-local state for queue operations and backoff.
666    /// - `quota`: Execution budget to prevent unbound searching during high pressure.
667    fn push_into_protected_segment(
668        &self,
669        entry: Entry<K, V>,
670        guard: &Guard,
671        context: &ThreadContext,
672        quota: &mut RequestQuota,
673    ) {
674        let index = match self.index_pool.pop(context) {
675            Some(index) => Index::from(index),
676            None => match self.evict_from_protected_segment(guard, context, quota) {
677                Some(index) => index,
678                None => return,
679            },
680        };
681
682        loop {
683            if self.protected_segment.push(index.into(), context).is_ok() {
684                let slot = &self.slots[index.slot_index()];
685                let tag = Tag::from(slot.tag.load(Acquire));
686
687                let entry = Owned::new(entry);
688                let key = entry.key().clone();
689
690                slot.entry.store(entry, Relaxed);
691                self.index_table.insert(key.clone(), index.into());
692
693                let tag = tag.with_signature(hash(key.as_ref()));
694                slot.tag.store(tag.into(), Release);
695
696                break;
697            }
698
699            match self.evict_from_protected_segment(guard, context, quota) {
700                Some(evicted_index) => {
701                    self.index_pool
702                        .push(evicted_index.into(), context)
703                        .expect("the index pool can't overflow");
704                }
705                None => {
706                    self.index_pool
707                        .push(index.into(), context)
708                        .expect("the index pool can't overflow");
709
710                    break;
711                }
712            }
713        }
714    }
715
716    /// Evicts an entry from the protected segment using a lock-free, second-chance algorithm.
717    ///
718    /// # Control Flow
719    /// 1. **Selection**: Pops an index from the `protected_segment`. If the quota is
720    ///    exhausted or the segment is empty, returns `None`.
721    /// 2. **Liveness Check**: Validates if the slot is currently `busy` or uninitialized.
722    ///    Contended slots are pushed back to the segment to maintain system liveness.
723    /// 3. **Phase 1 (Second-Chance Rotation)**:
724    ///    - If an entry is `Hot` and not expired, it receives a "second chance."
725    ///    - Its frequency is aged (decremented), and it is re-inserted into the protected segment.
726    /// 4. **Phase 2 (Atomic Eviction)**:
727    ///    - If the entry is eligible for eviction, the thread attempts to CAS the slot tag to `busy`.
728    ///    - On success, it synchronizes the `IndexTable`, nullifies the slot entry,
729    ///      and advances the tag signature to prevent ABA issues during subsequent lookups.
730    ///
731    /// # Memory Model & Synchronization
732    /// - **Acquire/Release Semantics**: Ensures that memory writes to the `Entry` and `IndexTable`
733    ///   are visible to other threads before the `Tag` state transition is observed.
734    /// - **RCU-style Reclamation**: Utilizes `guard.defer_destroy` to ensure that memory is
735    ///   only reclaimed after all concurrent readers have finished their operations.
736    /// - **Atomic Bit-Packing**: The `Tag` integrates the busy-lock, frequency, and signature
737    ///   into a single word to allow atomic state transitions without mutexes.
738    ///
739    /// # Returns
740    /// - `Some(Index)`: The index of the successfully cleared slot, ready for reuse.
741    /// - `None`: Eviction failed due to quota exhaustion or an empty segment.
742    fn evict_from_protected_segment(
743        &self,
744        guard: &Guard,
745        context: &ThreadContext,
746        quota: &mut RequestQuota,
747    ) -> Option<Index> {
748        while quota.consume()
749            && let Some(index) = self.protected_segment.pop(context).map(Index::from)
750        {
751            let slot = &self.slots[index.slot_index()];
752            let mut tag = Tag::from(slot.tag.load(Acquire));
753
754            loop {
755                if tag.is_busy() || tag.signature() == 0 {
756                    if self.protected_segment.push(index.into(), context).is_ok() {
757                        break;
758                    }
759
760                    tag = Tag::from(slot.tag.load(Acquire));
761                    context.wait();
762                    continue;
763                }
764
765                context.decay();
766
767                let entry = slot.entry.load(Relaxed, guard);
768                let entry_ref = unsafe { entry.as_ref().expect("occupied entry can't be null") };
769
770                // Phase 1 Second-Chance Rotation:
771                // Attempt to decide whether to evict an entry from the queue based on its frequency
772                // and TTL.
773                if tag.is_hot() && !entry_ref.is_expired() {
774                    let updated_tag = tag.decrement_frequency();
775
776                    match slot.tag.compare_exchange_weak(
777                        tag.into(),
778                        updated_tag.into(),
779                        Release,
780                        Acquire,
781                    ) {
782                        Ok(_) => {
783                            if self.protected_segment.push(index.into(), context).is_ok() {
784                                context.decay();
785                                break;
786                            }
787
788                            tag = updated_tag;
789                        }
790                        Err(latest) => {
791                            tag = Tag::from(latest);
792                            context.wait();
793                            continue;
794                        }
795                    }
796                }
797
798                // Phase 2 Eviction:
799                // Attempt to lock the entry for eviction; if locking fails, try to insert the index back into the queue.
800                match slot
801                    .tag
802                    .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Acquire)
803                {
804                    Ok(_) => {
805                        self.index_table.remove(entry_ref.key());
806                        slot.entry.store(Shared::null(), Relaxed);
807
808                        let (next_tag, next_index) = tag.advance(index);
809                        slot.tag.store(next_tag.into(), Release);
810
811                        unsafe { guard.defer_destroy(entry) };
812                        return Some(next_index);
813                    }
814                    Err(latest) => {
815                        tag = Tag::from(latest);
816                        context.wait();
817
818                        if self.protected_segment.push(index.into(), context).is_ok() {
819                            break;
820                        }
821                    }
822                }
823            }
824        }
825
826        None
827    }
828
829    /// Removes an entry from the cache by key, ensuring safe synchronization with concurrent readers and writers.
830    ///
831    /// This method uses a two-phase approach to safely invalidate a slot: first by locking the
832    /// metadata via a `busy` bit, and then by verifying the key identity before final removal.
833    ///
834    /// # Control Flow
835    /// 1. **Index Resolution**: Performs a lookup in the `index_table`. Returns `false` immediately
836    ///    if the key is not present.
837    /// 2. **Epoch & Liveness Check**: Validates the `Tag` against the provided `index` to ensure
838    ///    the slot hasn't been repurposed (ABA protection). If the slot is `busy`, it performs
839    ///    an adaptive backoff.
840    /// 3. **Atomic Lock**: Attempts to CAS the slot tag to a `busy` state. This grants exclusive
841    ///    access to the slot's entry pointer for the duration of the removal.
842    /// 4. **Key Verification**: Once locked, it loads the `Entry` and performs a final check
843    ///    to ensure the resident key matches the target `key`.
844    ///    - **Mismatch**: If the key changed during the lock acquisition, the tag is restored
845    ///      to its original state and returns `false`.
846    ///    - **Match**: The key is removed from the `index_table`, and the slot tag is `reset`
847    ///      (clearing frequency and busy bits) before being released.
848    ///
849    /// # Memory Model & Synchronization
850    /// - **AcqRel/Release**: Ensures that the `IndexTable` removal and any local modifications
851    ///   are globally visible before the slot's `busy` bit is cleared.
852    /// - **Spin-Reduction**: Utilizes `context.wait()` and `context.decay()` to prevent
853    ///   CPU-churn when multiple threads attempt to remove or update the same hot key.
854    /// - **Epoch Safety**: Uses a `pin()` guard to safely inspect the entry pointer without
855    ///   risking a use-after-free, even if another thread is concurrently evicting the slot.
856    ///
857    /// # Parameters
858    /// - `key`: The key of the entry to be removed.
859    /// - `context`: Thread-local state for managing backoff and contention.
860    ///
861    /// # Returns
862    /// - `true`: The entry was found and successfully removed.
863    /// - `false`: The entry was not found or the key did not match the current slot occupant.
864    pub fn remove<Q>(&self, key: &Q, context: &ThreadContext) -> bool
865    where
866        Key<K>: Borrow<Q>,
867        Q: Eq + Hash + ?Sized,
868    {
869        let index = match self.index_table.get(key) {
870            None => return false,
871            Some(index) => Index::from(index),
872        };
873
874        let slot = &self.slots[index.slot_index()];
875        let mut tag = Tag::from(slot.tag.load(Acquire));
876
877        loop {
878            if !tag.is_epoch_match(index) {
879                return false;
880            }
881
882            if tag.is_busy() {
883                context.wait();
884                continue;
885            }
886
887            match slot
888                .tag
889                .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Acquire)
890            {
891                Ok(_) => {
892                    context.decay();
893                }
894                Err(latest) => {
895                    tag = Tag::from(latest);
896                    context.wait();
897                    continue;
898                }
899            }
900
901            let guard = pin();
902
903            let entry = slot.entry.load(Relaxed, &guard);
904
905            let is_key_match = unsafe { entry.as_ref() }
906                .map(|entry_ref| entry_ref.key().borrow() == key)
907                .unwrap_or(false);
908
909            if !is_key_match {
910                slot.tag.store(tag.into(), Release);
911                return false;
912            }
913
914            self.index_table.remove(key);
915
916            slot.tag.store(tag.reset().into(), Release);
917
918            return true;
919        }
920    }
921}
922
923impl<K, V> CacheEngine<K, V> for S3FIFOCache<K, V>
924where
925    K: Eq + Hash,
926{
927    fn get<Q>(&self, key: &Q, context: &ThreadContext) -> Option<Ref<K, V>>
928    where
929        Key<K>: Borrow<Q>,
930        Q: Eq + Hash + ?Sized,
931    {
932        self.get(key, context)
933    }
934
935    fn insert(&self, entry: Entry<K, V>, context: &ThreadContext, quota: &mut RequestQuota) {
936        self.insert(entry, context, quota);
937    }
938
939    fn remove<Q>(&self, key: &Q, context: &ThreadContext) -> bool
940    where
941        Key<K>: Borrow<Q>,
942        Q: Eq + Hash + ?Sized,
943    {
944        self.remove(key, context)
945    }
946
947    fn capacity(&self) -> usize {
948        self.capacity
949    }
950
951    fn metrics(&self) -> MetricsSnapshot {
952        self.metrics.snapshot()
953    }
954}
955
956impl<K, V> Drop for S3FIFOCache<K, V>
957where
958    K: Eq + Hash,
959{
960    fn drop(&mut self) {
961        let guard = pin();
962
963        for slot in &self.slots {
964            let entry = slot.entry.swap(Shared::null(), Relaxed, &guard);
965
966            if !entry.is_null() {
967                unsafe { guard.defer_destroy(entry) }
968            }
969        }
970    }
971}
972
973#[cfg(test)]
974mod tests {
975    use super::*;
976    use crate::core::utils::random_string;
977    use crate::core::workload::{WorkloadGenerator, WorkloadStatistics};
978    use rand::{RngExt, rng};
979    use std::sync::{Arc, Mutex};
980    use std::thread::scope;
981
982    #[inline(always)]
983    fn create_cache<K, V>(capacity: usize) -> S3FIFOCache<K, V>
984    where
985        K: Eq + Hash,
986    {
987        S3FIFOCache::new(capacity, MetricsConfig::default())
988    }
989
990    #[test]
991    fn test_s3cache_insert_should_retrieve_stored_value() {
992        let cache = create_cache(10);
993        let context = ThreadContext::default();
994
995        let key = random_string();
996        let value = random_string();
997        let entry = Entry::new(key.clone(), value.clone());
998
999        cache.insert(entry, &context, &mut RequestQuota::default());
1000
1001        let entry_ref = cache.get(&key, &context).expect("must present");
1002
1003        assert_eq!(entry_ref.key(), &key);
1004        assert_eq!(entry_ref.value(), &value);
1005    }
1006
1007    #[test]
1008    fn test_s3cache_insert_should_overwrite_existing_key() {
1009        let cache = create_cache(10);
1010        let context = ThreadContext::default();
1011
1012        let key = random_string();
1013        let key_ref: &str = key.as_ref();
1014        let value1 = random_string();
1015        let value2 = random_string();
1016
1017        cache.insert(
1018            Entry::new(key.clone(), value1.clone()),
1019            &context,
1020            &mut RequestQuota::default(),
1021        );
1022
1023        let entry_ref = cache.get(key_ref, &context);
1024        assert!(entry_ref.is_some(), "the entry must present");
1025        assert_eq!(entry_ref.unwrap().value(), &value1);
1026
1027        cache.insert(
1028            Entry::new(key.clone(), value2.clone()),
1029            &context,
1030            &mut RequestQuota::default(),
1031        );
1032
1033        let entry_ref = cache.get(key_ref, &context);
1034        assert!(entry_ref.is_some(), "the entry must present");
1035        assert_eq!(entry_ref.unwrap().value(), &value2);
1036    }
1037
1038    #[test]
1039    fn test_s3cache_remove_should_invalidate_entry() {
1040        let cache = create_cache(100);
1041        let context = ThreadContext::default();
1042
1043        let key = random_string();
1044        let value = random_string();
1045
1046        cache.insert(
1047            Entry::new(key.clone(), value.clone()),
1048            &context,
1049            &mut RequestQuota::default(),
1050        );
1051
1052        let entry_ref = cache.get(&key, &context).expect("entry must present");
1053
1054        assert_eq!(entry_ref.key(), &key);
1055        assert_eq!(entry_ref.value(), &value);
1056
1057        assert!(cache.remove(&key, &context));
1058
1059        assert!(cache.get(&key, &context).is_none());
1060    }
1061
1062    #[test]
1063    fn test_s3cache_fill_beyond_capacity_should_evict_fifo() {
1064        let cache = create_cache(100);
1065        let context = ThreadContext::default();
1066
1067        for _ in 0..1000 {
1068            let key = random_string();
1069            let value = random_string();
1070            let entry = Entry::new(key, value);
1071            cache.insert(entry, &context, &mut RequestQuota::default());
1072        }
1073    }
1074
1075    #[test]
1076    fn test_s3cache_hot_entry_should_resist_eviction() {
1077        let cache = create_cache(1000);
1078        let context = &ThreadContext::default();
1079
1080        let key = random_string();
1081        let value = random_string();
1082        let entry = Entry::new(key.clone(), value.clone());
1083
1084        cache.insert(entry, context, &mut RequestQuota::default());
1085
1086        let entry_ref = cache.get(&key, &context).expect("entry must present");
1087
1088        assert_eq!(entry_ref.value(), &value);
1089
1090        for _ in 0..250 {
1091            cache.insert(
1092                Entry::new(random_string(), random_string()),
1093                context,
1094                &mut RequestQuota::default(),
1095            );
1096        }
1097
1098        let entry = cache.get(&key, &context).expect("must present");
1099
1100        assert_eq!(entry.key(), &key);
1101        assert_eq!(entry.value(), &value);
1102    }
1103
1104    #[test]
1105    fn test_s3cache_reinserted_ghost_entry_should_be_promoted_to_main() {
1106        let cache = create_cache(1000);
1107        let context = ThreadContext::default();
1108
1109        let (key, value) = (random_string(), random_string());
1110
1111        cache.insert(
1112            Entry::new(key.clone(), value.clone()),
1113            &context,
1114            &mut RequestQuota::default(),
1115        );
1116
1117        for _ in 0..1000 {
1118            let key = random_string();
1119            let value = random_string();
1120            cache.insert(
1121                Entry::new(key.clone(), value.clone()),
1122                &context,
1123                &mut RequestQuota::default(),
1124            );
1125        }
1126
1127        assert!(cache.get(&key, &context).is_none());
1128
1129        cache.insert(
1130            Entry::new(key.clone(), value.clone()),
1131            &context,
1132            &mut RequestQuota::default(),
1133        );
1134
1135        for _ in 0..1000 {
1136            let key = random_string();
1137            let value = random_string();
1138
1139            cache.insert(
1140                Entry::new(key.clone(), value.clone()),
1141                &context,
1142                &mut RequestQuota::default(),
1143            );
1144        }
1145
1146        let entry = cache.get(&key, &context).expect("entry must present");
1147
1148        assert_eq!(entry.key(), &key);
1149        assert_eq!(entry.value(), &value);
1150    }
1151
1152    #[test]
1153    fn test_s3cache_ghost_filter_should_protect_working_set() {
1154        let cache = create_cache(1000);
1155        let context = ThreadContext::default();
1156
1157        let hot_entries = vec![
1158            (random_string(), random_string()),
1159            (random_string(), random_string()),
1160            (random_string(), random_string()),
1161            (random_string(), random_string()),
1162            (random_string(), random_string()),
1163        ];
1164
1165        for (key, value) in &hot_entries {
1166            let key = key.clone();
1167            let value = value.clone();
1168
1169            cache.insert(
1170                Entry::new(key, value),
1171                &context,
1172                &mut RequestQuota::default(),
1173            );
1174        }
1175
1176        for i in 0..100000 {
1177            if i % 2 == 0 {
1178                let key = format!("key-{}", i);
1179                let value = format!("value-{}", i);
1180                let entry = Entry::new(key, value);
1181                cache.insert(entry, &context, &mut RequestQuota::default());
1182            } else {
1183                let index = rng().random_range(..hot_entries.len());
1184                let key = hot_entries[index].0.as_str();
1185                let _ = cache.get(key, &context);
1186            }
1187        }
1188
1189        let count = hot_entries
1190            .iter()
1191            .map(|(key, _)| cache.get(key, &context))
1192            .filter(Option::is_some)
1193            .count();
1194
1195        assert!(count >= 4);
1196    }
1197
1198    #[test]
1199    fn test_s3cache_concurrent_hammer_should_not_crash_or_hang() {
1200        let cache = create_cache(1000);
1201        let num_threads = 32;
1202        let ops_per_thread = 5000;
1203
1204        scope(|scope| {
1205            for _ in 0..num_threads {
1206                scope.spawn(|| {
1207                    let context = ThreadContext::default();
1208                    for op in 0..ops_per_thread {
1209                        let key = (op % 500).to_string();
1210                        if op % 2 == 0 {
1211                            cache.insert(
1212                                Entry::new(key, random_string()),
1213                                &context,
1214                                &mut RequestQuota::default(),
1215                            );
1216                        } else {
1217                            let _ = cache.get(&key, &context);
1218                        }
1219                    }
1220                });
1221            }
1222        });
1223    }
1224
1225    #[test]
1226    fn test_s3_fifo_should_protect_hot_set_under_high_churn() {
1227        let capacity = 1000;
1228        let cache = create_cache(capacity);
1229        let context = ThreadContext::default();
1230
1231        let num_threads = 16;
1232        let ops_per_thread = 10000;
1233
1234        let workload_generator = WorkloadGenerator::new(20000, 1.3);
1235        let workload_statistics = WorkloadStatistics::new();
1236
1237        let mut rand = rng();
1238
1239        for _ in 0..capacity {
1240            let key = workload_generator.key(&mut rand);
1241            cache.insert(
1242                Entry::new(key.clone(), "value"),
1243                &context,
1244                &mut RequestQuota::default(),
1245            );
1246            workload_statistics.record(key);
1247        }
1248
1249        scope(|scope| {
1250            for _ in 0..num_threads {
1251                scope.spawn(|| {
1252                    let mut thread_rng = rng();
1253                    let context = ThreadContext::default();
1254
1255                    for _ in 0..ops_per_thread {
1256                        let key = workload_generator.key(&mut thread_rng);
1257                        workload_statistics.record(key.clone());
1258
1259                        if cache.get(&key, &context).is_none() {
1260                            cache.insert(
1261                                Entry::new(key, "value"),
1262                                &context,
1263                                &mut RequestQuota::default(),
1264                            );
1265                        }
1266                    }
1267                });
1268            }
1269        });
1270
1271        let top_keys_size = 500;
1272        let frequent_keys = workload_statistics.frequent_keys(top_keys_size);
1273
1274        let count = frequent_keys.iter().fold(0, |acc, key| {
1275            if cache.get(key, &context).is_some() {
1276                acc + 1
1277            } else {
1278                acc
1279            }
1280        });
1281
1282        assert!(
1283            count >= 400,
1284            "S3-FIFO efficiency dropped! Captured only {}/{} hot keys",
1285            count,
1286            top_keys_size
1287        );
1288    }
1289
1290    #[test]
1291    fn test_s3cache_ttl_entry_should_expire() {
1292        let cache = create_cache(10);
1293        let context = ThreadContext::default();
1294        let key = random_string();
1295        let value = random_string();
1296
1297        let expired = Arc::new(Mutex::new(false));
1298
1299        let is_expired = {
1300            let expired = expired.clone();
1301            move || *expired.lock().unwrap()
1302        };
1303
1304        cache.insert(
1305            Entry::with_custom_expiration(key.clone(), value.clone(), is_expired),
1306            &context,
1307            &mut RequestQuota::default(),
1308        );
1309
1310        assert!(cache.get(&key, &context).is_some());
1311
1312        *expired.lock().unwrap() = true;
1313
1314        assert!(
1315            cache.get(&key, &context).is_none(),
1316            "Entry should have expired"
1317        );
1318    }
1319}