omega_cache/s3fifo.rs
1use crate::core::cms::CountMinSketch;
2use crate::core::engine::CacheEngine;
3use crate::core::entry::Entry;
4use crate::core::entry_ref::Ref;
5use crate::core::index::IndexTable;
6use crate::core::key::Key;
7use crate::core::request_quota::RequestQuota;
8use crate::core::ring::RingQueue;
9use crate::core::tag::{Index, Tag};
10use crate::core::thread_context::ThreadContext;
11use crate::core::utils;
12use crate::metrics::{Metrics, MetricsConfig, MetricsSnapshot};
13use crossbeam::utils::CachePadded;
14use crossbeam_epoch::{Atomic, Owned, pin};
15use crossbeam_epoch::{Guard, Shared};
16use std::borrow::Borrow;
17use std::hash::Hash;
18use std::ptr::NonNull;
19use std::sync::atomic::AtomicU64;
20use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
21use std::time::Instant;
22use utils::hash;
23
24pub struct Slot<K, V>
25where
26 K: Eq + Hash,
27{
28 entry: Atomic<Entry<K, V>>,
29 tag: AtomicU64,
30}
31
32impl<K, V> Slot<K, V>
33where
34 K: Eq + Hash,
35{
36 #[inline(always)]
37 fn new() -> Self {
38 Self {
39 entry: Atomic::null(),
40 tag: AtomicU64::default(),
41 }
42 }
43}
44
45impl<K, V> Default for Slot<K, V>
46where
47 K: Eq + Hash,
48{
49 #[inline(always)]
50 fn default() -> Self {
51 Slot::new()
52 }
53}
54
55/// A high-concurrency, segmented cache implementing the S3-FIFO eviction algorithm.
56///
57/// This structure organizes memory into a multi-tiered hierarchy to achieve
58/// scan-resistance and high hit rates, specifically optimized for modern
59/// multicore processors.
60///
61/// # Architecture
62/// S3-FIFO (Simple Scalable Static FIFO) extends traditional FIFO by using
63/// three distinct queues:
64/// 1. **Probationary**: A small FIFO queue (typically 10% of capacity) for new entries.
65/// 2. **Protected**: A large FIFO queue for frequently accessed entries.
66/// 3. **Ghost**: A "shadow" queue that tracks the hashes of evicted entries to
67/// inform future admission decisions.
68///
69/// # Concurrency & Performance
70/// - **Lock-Free Design**: Uses atomic operations and `crossbeam-epoch` for
71/// thread-safe access without global mutexes.
72/// - **False Sharing Protection**: Slots are wrapped in `CachePadded` to ensure
73/// different threads don't invalidate each other's CPU cache lines.
74/// - **Index Pool**: A dedicated queue manages slot reuse, eliminating the
75/// need for expensive memory allocations during the steady state.
76pub struct S3FIFOCache<K, V>
77where
78 K: Eq + Hash,
79{
80 /// Mapping of keys to versioned indices for fast lookups.
81 index_table: IndexTable<K>,
82 /// Contiguous storage for cache entries, padded to prevent false sharing.
83 slots: Box<[CachePadded<Slot<K, V>>]>,
84 /// The protected segment of the cache.
85 protected_segment: RingQueue,
86 /// The probationary segment for new data.
87 probation_segment: RingQueue,
88 /// Metadata queue for tracking evicted entry hashes.
89 ghost_queue: RingQueue,
90 /// Collection of available slot indices ready for new allocations.
91 index_pool: RingQueue,
92 /// Frequency estimator used to decide if an entry should bypass probation.
93 ghost_filter: CountMinSketch,
94 /// Hit/Miss counters and latency tracking.
95 metrics: Metrics,
96 /// Total number of entries the cache can hold.
97 capacity: usize,
98}
99
100impl<K, V> S3FIFOCache<K, V>
101where
102 K: Eq + Hash,
103{
104 /// Initializes a new cache instance with a segmented S3-FIFO architecture.
105 ///
106 /// This constructor allocates the underlying slot storage and partitions the
107 /// cache capacity into functional segments designed to balance scan-resistance
108 /// with high hit rates.
109 ///
110 /// # Segmentation Logic
111 /// - **Probation Segment (10%)**: Acts as the initial landing zone for new entries.
112 /// It prevents "one-hit wonders" from polluting the main cache body.
113 /// - **Protected Segment (90%)**: Houses frequency-proven entries. Data here has
114 /// survived a probationary period or was identified as frequent via the ghost filter.
115 /// - **Ghost Queue & Filter**: A historical tracking mechanism sized to match total
116 /// capacity. It records the hashes of recently evicted items, allowing the
117 /// admission policy to "remember" and promote returning keys.
118 ///
119 /// # Resource Initialization
120 /// - **Index Pool**: Pre-populated with all available slot indices (0 to capacity).
121 /// This acts as a lock-free allocator for cache slots.
122 /// - **Cache Padding**: Each `Slot` is wrapped in `CachePadded` to prevent
123 /// "false sharing," a critical optimization for performance on high-core-count
124 /// processors like the M1 Pro.
125 /// - **Ghost Filter**: Uses a `CountMinSketch` with a depth of 4 to provide space-efficient
126 /// frequency estimation for the admission policy.
127 ///
128 /// # Parameters
129 /// - `capacity`: The total number of entries the cache can hold. This value is
130 /// distributed between the probation and protected segments.
131 /// - `metrics_config`: Configuration for hit/miss/latency tracking.
132 #[inline]
133 pub fn new(capacity: usize, metrics_config: MetricsConfig) -> Self {
134 const GHOST_FILTER_DEPTH: usize = 4;
135
136 let probation_segment_capacity = (capacity as f64 * 0.1) as usize;
137 let protected_segment_capacity = capacity - probation_segment_capacity;
138
139 let probation_segment = RingQueue::new(probation_segment_capacity);
140 let protected_segment = RingQueue::new(protected_segment_capacity);
141 let ghost_queue = RingQueue::new(capacity);
142
143 let index_pool = RingQueue::new(capacity);
144
145 let context = ThreadContext::default();
146
147 for index in 0..capacity {
148 let _ = index_pool.push(index as u64, &context);
149 }
150
151 let metrics = Metrics::new(metrics_config);
152
153 let slots = (0..capacity)
154 .map(|_| CachePadded::new(Slot::new()))
155 .collect::<Vec<_>>()
156 .into_boxed_slice();
157
158 Self {
159 index_table: IndexTable::new(),
160 slots,
161 protected_segment,
162 probation_segment,
163 ghost_queue,
164 index_pool,
165 ghost_filter: CountMinSketch::new(capacity, GHOST_FILTER_DEPTH),
166 metrics,
167 capacity,
168 }
169 }
170
171 /// Retrieves a value from the cache, upgrading its frequency on a successful match.
172 ///
173 /// This method implements a lock-free read path that utilizes atomic tags for
174 /// fast validation before accessing the actual entry memory.
175 ///
176 /// # Control Flow
177 /// 1. **Index Resolution**: Performs a lookup in the `index_table`. If the key is
178 /// not found, records a miss and returns `None`.
179 /// 2. **Tag Validation**: Loads the slot's `Tag` using `Acquire` semantics and
180 /// validates it against the provided `hash` and `index`. This prevents
181 /// accessing a slot that has been repurposed (ABA protection).
182 /// 3. **Liveness & Expiration**:
183 /// - Checks if the `Entry` is null or if the stored key has changed.
184 /// - Validates the entry's TTL. If expired, it records a miss.
185 /// 4. **Frequency Upgrade**:
186 /// - Attempts to increment the access frequency in the `Tag` via `compare_exchange_weak`.
187 /// - On success, the entry is considered "Hot," potentially protecting it from
188 /// future eviction.
189 /// - On failure (contention), the thread performs a backoff and retries the loop.
190 /// 5. **Reference Return**: On a successful hit, returns a `Ref` which wraps
191 /// the entry and the `Guard`, ensuring memory remains valid for the caller.
192 ///
193 /// # Memory Model & Synchronization
194 /// - **Acquire/Release**: The `Tag` load (`Acquire`) synchronizes with the `insert`
195 /// or `evict` stores (`Release`), ensuring the `entry` pointer is valid.
196 /// - **Lock-Free Reads**: Readers never block writers. The frequency update is
197 /// optimistic and handles contention via the `ThreadContext` wait/decay mechanism.
198 ///
199 /// # Parameters
200 /// - `key`: The key to look up.
201 /// - `context`: Thread-local state for frequency decay and contention management.
202 ///
203 /// # Returns
204 /// - `Some(Ref<K, V>)`: A handle to the entry if found and valid.
205 /// - `None`: If the key is missing, expired, or the signature mismatch occurs.
206 pub fn get<Q>(&self, key: &Q, context: &ThreadContext) -> Option<Ref<K, V>>
207 where
208 Key<K>: Borrow<Q>,
209 Q: Eq + Hash + ?Sized,
210 {
211 let called_at = Instant::now();
212 let hash = hash(key);
213 let guard = pin();
214
215 loop {
216 match self.index_table.get(key) {
217 Some(index) => {
218 let index = Index::from(index);
219
220 let slot = &self.slots[index.slot_index()];
221 let mut tag = Tag::from(slot.tag.load(Acquire));
222
223 if !tag.is_match(index, hash) {
224 let latency = called_at.elapsed().as_millis() as u64;
225 self.metrics.record_miss();
226 self.metrics.record_latency(latency);
227 return None;
228 }
229
230 let entry = slot.entry.load(Relaxed, &guard);
231
232 match unsafe { entry.as_ref() } {
233 None => {
234 let latency = called_at.elapsed().as_millis() as u64;
235 self.metrics.record_miss();
236 self.metrics.record_latency(latency);
237 break None;
238 }
239 Some(entry_ref) => {
240 if entry_ref.key().borrow() != key || entry_ref.is_expired() {
241 let latency = called_at.elapsed().as_millis() as u64;
242 self.metrics.record_miss();
243 self.metrics.record_latency(latency);
244 break None;
245 }
246
247 match slot.tag.compare_exchange_weak(
248 tag.into(),
249 tag.increment_frequency().into(),
250 Release,
251 Acquire,
252 ) {
253 Ok(_) => {
254 context.decay();
255 }
256 Err(latest) => {
257 tag = Tag::from(latest);
258 context.wait();
259 continue;
260 }
261 }
262
263 let latency = called_at.elapsed().as_millis() as u64;
264 self.metrics.record_hit();
265 self.metrics.record_latency(latency);
266
267 break Some(Ref::new(NonNull::from_ref(entry_ref), guard));
268 }
269 }
270 }
271 None => {
272 self.metrics.record_miss();
273 self.metrics
274 .record_latency(called_at.elapsed().as_millis() as u64);
275 return None;
276 }
277 }
278 }
279 }
280
281 /// The main entry point for inserting or updating data within the cache.
282 ///
283 /// This method implements an adaptive admission policy by distinguishing between
284 /// existing entries, known "hot" candidates (via the ghost filter), and new arrivals.
285 ///
286 /// # Control Flow
287 /// 1. **Index Lookup**: Checks the `index_table` to see if the key is already resident.
288 /// 2. **Update Path (Resident Key)**:
289 /// - If found, it attempts to lock the specific slot by transitioning its `Tag` to `busy`.
290 /// - It validates the `signature` and `index` to ensure the slot hasn't been repurposed (ABA protection).
291 /// - Upon a successful lock, it swaps the `Entry`, increments the access frequency in the `Tag`,
292 /// and releases the lock.
293 /// - If the lock fails or a mismatch is detected, the thread performs a backoff and retries.
294 /// 3. **Admission Path (New Key)**:
295 /// - If the key is not in the index, the `ghost_filter` is consulted.
296 /// - **Promotion**: If the key's hash is present in the ghost filter (indicating it was
297 /// recently evicted or seen), it is inserted directly into the `protected_segment`.
298 /// - **Probation**: If the key is entirely new, it is placed in the `probation_queue`.
299 ///
300 /// # Memory Model & Synchronization
301 /// - **AcqRel/Release**: The `compare_exchange_weak` and subsequent `store` on the `Tag`
302 /// ensure that the `Entry` swap is safely published to readers.
303 /// - **Epoch-Based Reclamation**: `guard.defer_destroy` ensures the `old_entry` is only
304 /// deallocated when no concurrent readers hold a reference to it.
305 /// - **Adaptive Backoff**: Uses `context.wait()` and `context.decay()` to handle contention
306 /// gracefully on highly active slots.
307 ///
308 /// # Parameters
309 /// - `entry`: The key-value pair to insert.
310 /// - `context`: Thread-local state for synchronization and performance metrics.
311 /// - `quota`: Budget for the operation, primarily used if insertion triggers cascading evictions.
312 pub fn insert(&self, entry: Entry<K, V>, context: &ThreadContext, quota: &mut RequestQuota) {
313 let key = entry.key().clone();
314 let hash = hash(entry.key());
315 let guard = pin();
316
317 loop {
318 match self.index_table.get(&key).map(Index::from) {
319 Some(index) => {
320 let slot = &self.slots[index.slot_index()];
321 let tag = Tag::from(slot.tag.load(Acquire));
322
323 if !(tag.is_match(index, hash)
324 && slot
325 .tag
326 .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Relaxed)
327 .is_ok())
328 {
329 context.wait();
330 continue;
331 }
332
333 context.decay();
334
335 let old_entry = slot.entry.swap(Owned::new(entry), Relaxed, &guard);
336 slot.tag.store(tag.increment_frequency().into(), Release);
337
338 unsafe { guard.defer_destroy(old_entry) };
339
340 break;
341 }
342 None => {
343 if self.ghost_filter.contains(&hash) {
344 self.push_into_protected_segment(entry, &guard, context, quota)
345 } else {
346 self.push_into_probation_queue(entry, &guard, context, quota)
347 }
348
349 break;
350 }
351 }
352 }
353 }
354
355 /// Inserts a new entry into the probation segment, serving as the entry point for most new data.
356 ///
357 /// # Control Flow
358 /// 1. **Index Acquisition**: Attempts to retrieve an index from the `index_pool`. If exhausted,
359 /// it triggers `evict_from_probation_segment` to reclaim space.
360 /// 2. **Segment Insertion**: Attempts to push the acquired index into the `probation_segment`.
361 /// 3. **Data Publication**:
362 /// - Stores the `Entry` into the designated slot.
363 /// - Maps the key to the index within the `IndexTable`.
364 /// - Computes and stores a new `Tag` signature to validate the slot and release the write
365 /// to concurrent readers.
366 /// 4. **Contention & Overflow**: If the segment push fails (full queue), the thread performs
367 /// emergency eviction. The reclaimed index is returned to the pool, and the thread retries
368 /// until successful or the `quota` is depleted.
369 ///
370 /// # Memory Model & Synchronization
371 /// - **Visibility Barrier**: The `Tag` store uses `Release` semantics, ensuring all previous
372 /// writes (Entry and IndexTable) are visible to any thread that performs an `Acquire`
373 /// load on that same tag.
374 /// - **Resource Safety**: In cases of quota exhaustion or unexpected failure, indices are
375 /// restored to the `index_pool` to prevent permanent loss of cache capacity.
376 ///
377 /// # Returns
378 /// This function returns early without insertion if the `quota` is exhausted during
379 /// eviction or index recovery attempts.
380 fn push_into_probation_queue(
381 &self,
382 entry: Entry<K, V>,
383 guard: &Guard,
384 context: &ThreadContext,
385 quota: &mut RequestQuota,
386 ) {
387 let index = match self.index_pool.pop(context) {
388 Some(index) => Index::from(index),
389 None => match self.evict_from_probation_segment(guard, context, quota) {
390 Some(index) => index,
391 None => return,
392 },
393 };
394
395 loop {
396 if self.probation_segment.push(index.into(), context).is_ok() {
397 let slot = &self.slots[index.slot_index()];
398
399 let tag = Tag::from(slot.tag.load(Acquire));
400
401 let entry = Owned::new(entry);
402 let key = entry.key().clone();
403
404 slot.entry.store(entry, Relaxed);
405 self.index_table.insert(key.clone(), index.into());
406
407 let tag = tag.with_signature(hash(key.as_ref()));
408 slot.tag.store(tag.into(), Release);
409
410 break;
411 }
412
413 match self.evict_from_probation_segment(guard, context, quota) {
414 Some(evicted_index) => {
415 self.index_pool
416 .push(evicted_index.into(), context)
417 .expect("the index pool can't overflow");
418 }
419 None => {
420 self.index_pool
421 .push(index.into(), context)
422 .expect("the index pool can't overflow");
423
424 break;
425 }
426 }
427 }
428 }
429
430 /// Evicts an entry from the probation segment, implementing a promotion path for frequently accessed keys.
431 ///
432 /// # Control Flow
433 /// 1. **Selection**: Pops an index from the `probation_segment`. Returns `None` if the quota is
434 /// exhausted or the segment is empty.
435 /// 2. **Liveness Check**: Skips slots that are `busy` or uninitialized, re-queuing them to maintain
436 /// segment integrity.
437 /// 3. **Promotion Path**:
438 /// - If an entry is `Hot` (has been accessed) and is not expired, it qualifies for promotion.
439 /// - The `Tag` is reset (clearing the hot bit), and the thread attempts to move the index
440 /// into the protected segment via `promote_index`.
441 /// - If promotion succeeds, the loop breaks to the next candidate.
442 /// 4. **Eviction Path**:
443 /// - If not promoted, the thread attempts to CAS the tag to `busy`.
444 /// - On success:
445 /// - The key is removed from the `IndexTable` and the slot's entry is nullified.
446 /// - The `Tag` signature is advanced to prevent ABA issues.
447 /// - The evicted key is pushed into the `ghost_queue` to track its frequency for
448 /// future admission decisions.
449 /// - The entry's memory is scheduled for deallocation via `guard.defer_destroy`.
450 ///
451 /// # Memory Model & Synchronization
452 /// - **Acquire/Release Semantics**: Synchronizes slot data and index visibility across threads,
453 /// ensuring the `busy` state transition is globally observed before data cleanup begins.
454 /// - **Ghost Synchronization**: The handover to the `ghost_queue` occurs after the entry is
455 /// made undiscoverable, ensuring a clean transition from "resident" to "remembered."
456 /// - **Retry Logic**: Uses a nested loop and `compare_exchange_weak` to handle high contention
457 /// on the slot's metadata without blocking.
458 ///
459 /// # Returns
460 /// - `Some(Index)`: The index of the successfully evicted slot, ready for reuse.
461 /// - `None`: Failure to evict due to quota exhaustion or an empty probation segment.
462 fn evict_from_probation_segment(
463 &self,
464 guard: &Guard,
465 context: &ThreadContext,
466 quota: &mut RequestQuota,
467 ) -> Option<Index> {
468 while quota.consume()
469 && let Some(index) = self.probation_segment.pop(context).map(Index::from)
470 {
471 let slot = &self.slots[index.slot_index()];
472 let mut tag = Tag::from(slot.tag.load(Acquire));
473 let mut reseted = false;
474
475 loop {
476 if tag.is_busy() || tag.signature() == 0 {
477 if self.probation_segment.push(index.into(), context).is_ok() {
478 break;
479 }
480
481 tag = Tag::from(slot.tag.load(Acquire));
482 context.wait();
483 continue;
484 }
485
486 let entry = slot.entry.load(Relaxed, guard);
487
488 let entry_ref =
489 unsafe { entry.as_ref().expect("the occupied entry cannot be null") };
490
491 if !reseted && tag.is_hot() && !entry_ref.is_expired() {
492 let updated_tag = tag.reset();
493
494 match slot.tag.compare_exchange_weak(
495 tag.into(),
496 updated_tag.into(),
497 Release,
498 Acquire,
499 ) {
500 Ok(_) => {
501 context.decay();
502 reseted = true;
503 }
504 Err(latest) => {
505 tag = Tag::from(latest);
506 context.wait();
507 continue;
508 }
509 }
510
511 if self.promote_index(index, guard, context, quota) {
512 break;
513 }
514
515 tag = updated_tag
516 }
517
518 match slot
519 .tag
520 .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Acquire)
521 {
522 Ok(_) => {
523 let key = entry_ref.key().clone();
524 self.index_table.remove(key.as_ref());
525 slot.entry.store(Shared::null(), Relaxed);
526
527 let (tag, index) = tag.advance(index);
528 slot.tag.store(tag.into(), Release);
529
530 let _ = self.push_into_ghost_queue(key.as_ref(), context, quota);
531
532 unsafe { guard.defer_destroy(entry) };
533
534 return Some(index);
535 }
536 Err(latest) => {
537 tag = Tag::from(latest);
538
539 if self.probation_segment.push(index.into(), context).is_ok() {
540 break;
541 }
542 }
543 }
544 }
545 }
546
547 None
548 }
549
550 /// Pushes a key's hash into the ghost queue and updates the frequency filter.
551 ///
552 /// # Control Flow
553 /// 1. **Hashing**: Computes the hash of the key to be used as a fingerprint in the ghost structures.
554 /// 2. **Insertion**: Attempts to push the hash into the `ghost_queue`.
555 /// - If successful, it increments the frequency count in the `ghost_filter` and returns.
556 /// 3. **Queue Maintenance**: If the queue is full:
557 /// - Checks the `quota`. If exhausted, the operation fails.
558 /// - Pops the oldest hash from the queue to make room.
559 /// - Decrements the frequency count for the evicted hash in the `ghost_filter` to keep the filter synchronized with the queue's contents.
560 /// 4. **Retry**: The loop continues until the new hash is successfully pushed or the quota limit is reached.
561 ///
562 /// # Logic & Invariants
563 /// - **Ghost Filter Synchronization**: The `ghost_filter` (likely a Counting Bloom Filter or similar) is strictly tied to the lifetime of hashes within the `ghost_queue`. This prevents "stale" frequency counts for keys that have long since left the ghost segment.
564 /// - **Admission Signaling**: The presence of a high count in the `ghost_filter` typically serves as the signal to promote a probationary entry to the protected segment upon its next access.
565 ///
566 /// # Returns
567 /// - `true`: The hash was successfully added to the ghost queue.
568 /// - `false`: The operation failed due to quota exhaustion.
569 #[inline(always)]
570 fn push_into_ghost_queue(
571 &self,
572 key: &K,
573 context: &ThreadContext,
574 quota: &mut RequestQuota,
575 ) -> bool {
576 let hash = hash(key);
577
578 loop {
579 if self.ghost_queue.push(hash, context).is_ok() {
580 self.ghost_filter.increment(&hash, context);
581 return true;
582 }
583
584 if !quota.consume() {
585 return false;
586 }
587
588 if let Some(oldest_hash) = self.ghost_queue.pop(context) {
589 self.ghost_filter.decrement(&oldest_hash, context);
590 }
591 }
592 }
593
594 /// Promotes an index into the protected segment, reclaiming space if necessary.
595 ///
596 /// # Control Flow
597 /// 1. **Initial Push**: Attempts to move the provided `index` into the `protected_segment`.
598 /// If the segment has immediate capacity, the promotion is successful.
599 /// 2. **Eviction Loop**: If the segment is full, the thread attempts to free a slot by
600 /// invoking `evict_from_protected_segment`.
601 /// 3. **Index Recovery**: Indices reclaimed via eviction are returned to the `index_pool`
602 /// to maintain the total available slot count.
603 /// 4. **Termination**: The process repeats until the original index is successfully
604 /// pushed or the `evict_from_protected_segment` call returns `None` (due to quota
605 /// exhaustion or an empty segment), signaling a failed promotion.
606 ///
607 /// # Invariants
608 /// - **Index Conservation**: Every index evicted to make room for the promotion is
609 /// pushed to the `index_pool` to ensure no slots are "lost" during high-contention
610 /// re-balancing.
611 /// - **Panic Safety**: The `index_pool` push uses an expectation that the pool
612 /// cannot overflow, assuming the pool capacity matches the total cache capacity.
613 ///
614 /// # Returns
615 /// - `true`: The index was successfully promoted into the protected segment.
616 /// - `false`: Promotion failed because the quota was exhausted before space could be cleared.
617 fn promote_index(
618 &self,
619 index: Index,
620 guard: &Guard,
621 context: &ThreadContext,
622 quota: &mut RequestQuota,
623 ) -> bool {
624 loop {
625 if self.protected_segment.push(index.into(), context).is_ok() {
626 return true;
627 }
628
629 match self.evict_from_protected_segment(guard, context, quota) {
630 Some(evicted_index) => {
631 self.index_pool
632 .push(evicted_index.into(), context)
633 .expect("the index pool can't overflow");
634 }
635 None => return false,
636 }
637 }
638 }
639
640 /// Inserts a new entry into the protected segment, potentially triggering eviction if the segment is full.
641 ///
642 /// # Control Flow
643 /// 1. **Index Acquisition**: Attempts to pop an available index from the `index_pool`.
644 /// If empty, it invokes `evict_from_protected_segment` to reclaim a slot.
645 /// 2. **Segment Placement**: Attempts to push the index into the `protected_segment` queue.
646 /// 3. **Data Publication**:
647 /// - Stores the new `Entry` into the resolved slot.
648 /// - Updates the `IndexTable` to map the key to the slot index.
649 /// - Calculates a new `Tag` signature based on the key's hash and stores it to
650 /// mark the slot as initialized and valid for readers.
651 /// 4. **Contention Handling**: If the segment push fails (queue full), it performs
652 /// an emergency eviction. The evicted index is returned to the pool, and the
653 /// original operation retries until successful or the quota is exhausted.
654 ///
655 /// # Memory Model & Synchronization
656 /// - **Publication Order**: The `Entry` is stored `Relaxed`, followed by the `IndexTable`
657 /// insertion. The `Tag` is stored with `Release` semantics, acting as the memory
658 /// barrier that makes the entry visible to concurrent readers.
659 /// - **Resource Recovery**: On failed pushes or quota exhaustion, indices are
660 /// explicitly pushed back to the `index_pool` to prevent slot leakage.
661 ///
662 /// # Parameters
663 /// - `entry`: The data to be cached.
664 /// - `guard`: Epoch guard for memory reclamation safety.
665 /// - `context`: Thread-local state for queue operations and backoff.
666 /// - `quota`: Execution budget to prevent unbound searching during high pressure.
667 fn push_into_protected_segment(
668 &self,
669 entry: Entry<K, V>,
670 guard: &Guard,
671 context: &ThreadContext,
672 quota: &mut RequestQuota,
673 ) {
674 let index = match self.index_pool.pop(context) {
675 Some(index) => Index::from(index),
676 None => match self.evict_from_protected_segment(guard, context, quota) {
677 Some(index) => index,
678 None => return,
679 },
680 };
681
682 loop {
683 if self.protected_segment.push(index.into(), context).is_ok() {
684 let slot = &self.slots[index.slot_index()];
685 let tag = Tag::from(slot.tag.load(Acquire));
686
687 let entry = Owned::new(entry);
688 let key = entry.key().clone();
689
690 slot.entry.store(entry, Relaxed);
691 self.index_table.insert(key.clone(), index.into());
692
693 let tag = tag.with_signature(hash(key.as_ref()));
694 slot.tag.store(tag.into(), Release);
695
696 break;
697 }
698
699 match self.evict_from_protected_segment(guard, context, quota) {
700 Some(evicted_index) => {
701 self.index_pool
702 .push(evicted_index.into(), context)
703 .expect("the index pool can't overflow");
704 }
705 None => {
706 self.index_pool
707 .push(index.into(), context)
708 .expect("the index pool can't overflow");
709
710 break;
711 }
712 }
713 }
714 }
715
716 /// Evicts an entry from the protected segment using a lock-free, second-chance algorithm.
717 ///
718 /// # Control Flow
719 /// 1. **Selection**: Pops an index from the `protected_segment`. If the quota is
720 /// exhausted or the segment is empty, returns `None`.
721 /// 2. **Liveness Check**: Validates if the slot is currently `busy` or uninitialized.
722 /// Contended slots are pushed back to the segment to maintain system liveness.
723 /// 3. **Phase 1 (Second-Chance Rotation)**:
724 /// - If an entry is `Hot` and not expired, it receives a "second chance."
725 /// - Its frequency is aged (decremented), and it is re-inserted into the protected segment.
726 /// 4. **Phase 2 (Atomic Eviction)**:
727 /// - If the entry is eligible for eviction, the thread attempts to CAS the slot tag to `busy`.
728 /// - On success, it synchronizes the `IndexTable`, nullifies the slot entry,
729 /// and advances the tag signature to prevent ABA issues during subsequent lookups.
730 ///
731 /// # Memory Model & Synchronization
732 /// - **Acquire/Release Semantics**: Ensures that memory writes to the `Entry` and `IndexTable`
733 /// are visible to other threads before the `Tag` state transition is observed.
734 /// - **RCU-style Reclamation**: Utilizes `guard.defer_destroy` to ensure that memory is
735 /// only reclaimed after all concurrent readers have finished their operations.
736 /// - **Atomic Bit-Packing**: The `Tag` integrates the busy-lock, frequency, and signature
737 /// into a single word to allow atomic state transitions without mutexes.
738 ///
739 /// # Returns
740 /// - `Some(Index)`: The index of the successfully cleared slot, ready for reuse.
741 /// - `None`: Eviction failed due to quota exhaustion or an empty segment.
742 fn evict_from_protected_segment(
743 &self,
744 guard: &Guard,
745 context: &ThreadContext,
746 quota: &mut RequestQuota,
747 ) -> Option<Index> {
748 while quota.consume()
749 && let Some(index) = self.protected_segment.pop(context).map(Index::from)
750 {
751 let slot = &self.slots[index.slot_index()];
752 let mut tag = Tag::from(slot.tag.load(Acquire));
753
754 loop {
755 if tag.is_busy() || tag.signature() == 0 {
756 if self.protected_segment.push(index.into(), context).is_ok() {
757 break;
758 }
759
760 tag = Tag::from(slot.tag.load(Acquire));
761 context.wait();
762 continue;
763 }
764
765 context.decay();
766
767 let entry = slot.entry.load(Relaxed, guard);
768 let entry_ref = unsafe { entry.as_ref().expect("occupied entry can't be null") };
769
770 // Phase 1 Second-Chance Rotation:
771 // Attempt to decide whether to evict an entry from the queue based on its frequency
772 // and TTL.
773 if tag.is_hot() && !entry_ref.is_expired() {
774 let updated_tag = tag.decrement_frequency();
775
776 match slot.tag.compare_exchange_weak(
777 tag.into(),
778 updated_tag.into(),
779 Release,
780 Acquire,
781 ) {
782 Ok(_) => {
783 if self.protected_segment.push(index.into(), context).is_ok() {
784 context.decay();
785 break;
786 }
787
788 tag = updated_tag;
789 }
790 Err(latest) => {
791 tag = Tag::from(latest);
792 context.wait();
793 continue;
794 }
795 }
796 }
797
798 // Phase 2 Eviction:
799 // Attempt to lock the entry for eviction; if locking fails, try to insert the index back into the queue.
800 match slot
801 .tag
802 .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Acquire)
803 {
804 Ok(_) => {
805 self.index_table.remove(entry_ref.key());
806 slot.entry.store(Shared::null(), Relaxed);
807
808 let (next_tag, next_index) = tag.advance(index);
809 slot.tag.store(next_tag.into(), Release);
810
811 unsafe { guard.defer_destroy(entry) };
812 return Some(next_index);
813 }
814 Err(latest) => {
815 tag = Tag::from(latest);
816 context.wait();
817
818 if self.protected_segment.push(index.into(), context).is_ok() {
819 break;
820 }
821 }
822 }
823 }
824 }
825
826 None
827 }
828
829 /// Removes an entry from the cache by key, ensuring safe synchronization with concurrent readers and writers.
830 ///
831 /// This method uses a two-phase approach to safely invalidate a slot: first by locking the
832 /// metadata via a `busy` bit, and then by verifying the key identity before final removal.
833 ///
834 /// # Control Flow
835 /// 1. **Index Resolution**: Performs a lookup in the `index_table`. Returns `false` immediately
836 /// if the key is not present.
837 /// 2. **Epoch & Liveness Check**: Validates the `Tag` against the provided `index` to ensure
838 /// the slot hasn't been repurposed (ABA protection). If the slot is `busy`, it performs
839 /// an adaptive backoff.
840 /// 3. **Atomic Lock**: Attempts to CAS the slot tag to a `busy` state. This grants exclusive
841 /// access to the slot's entry pointer for the duration of the removal.
842 /// 4. **Key Verification**: Once locked, it loads the `Entry` and performs a final check
843 /// to ensure the resident key matches the target `key`.
844 /// - **Mismatch**: If the key changed during the lock acquisition, the tag is restored
845 /// to its original state and returns `false`.
846 /// - **Match**: The key is removed from the `index_table`, and the slot tag is `reset`
847 /// (clearing frequency and busy bits) before being released.
848 ///
849 /// # Memory Model & Synchronization
850 /// - **AcqRel/Release**: Ensures that the `IndexTable` removal and any local modifications
851 /// are globally visible before the slot's `busy` bit is cleared.
852 /// - **Spin-Reduction**: Utilizes `context.wait()` and `context.decay()` to prevent
853 /// CPU-churn when multiple threads attempt to remove or update the same hot key.
854 /// - **Epoch Safety**: Uses a `pin()` guard to safely inspect the entry pointer without
855 /// risking a use-after-free, even if another thread is concurrently evicting the slot.
856 ///
857 /// # Parameters
858 /// - `key`: The key of the entry to be removed.
859 /// - `context`: Thread-local state for managing backoff and contention.
860 ///
861 /// # Returns
862 /// - `true`: The entry was found and successfully removed.
863 /// - `false`: The entry was not found or the key did not match the current slot occupant.
864 pub fn remove<Q>(&self, key: &Q, context: &ThreadContext) -> bool
865 where
866 Key<K>: Borrow<Q>,
867 Q: Eq + Hash + ?Sized,
868 {
869 let index = match self.index_table.get(key) {
870 None => return false,
871 Some(index) => Index::from(index),
872 };
873
874 let slot = &self.slots[index.slot_index()];
875 let mut tag = Tag::from(slot.tag.load(Acquire));
876
877 loop {
878 if !tag.is_epoch_match(index) {
879 return false;
880 }
881
882 if tag.is_busy() {
883 context.wait();
884 continue;
885 }
886
887 match slot
888 .tag
889 .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Acquire)
890 {
891 Ok(_) => {
892 context.decay();
893 }
894 Err(latest) => {
895 tag = Tag::from(latest);
896 context.wait();
897 continue;
898 }
899 }
900
901 let guard = pin();
902
903 let entry = slot.entry.load(Relaxed, &guard);
904
905 let is_key_match = unsafe { entry.as_ref() }
906 .map(|entry_ref| entry_ref.key().borrow() == key)
907 .unwrap_or(false);
908
909 if !is_key_match {
910 slot.tag.store(tag.into(), Release);
911 return false;
912 }
913
914 self.index_table.remove(key);
915
916 slot.tag.store(tag.reset().into(), Release);
917
918 return true;
919 }
920 }
921}
922
923impl<K, V> CacheEngine<K, V> for S3FIFOCache<K, V>
924where
925 K: Eq + Hash,
926{
927 fn get<Q>(&self, key: &Q, context: &ThreadContext) -> Option<Ref<K, V>>
928 where
929 Key<K>: Borrow<Q>,
930 Q: Eq + Hash + ?Sized,
931 {
932 self.get(key, context)
933 }
934
935 fn insert(&self, entry: Entry<K, V>, context: &ThreadContext, quota: &mut RequestQuota) {
936 self.insert(entry, context, quota);
937 }
938
939 fn remove<Q>(&self, key: &Q, context: &ThreadContext) -> bool
940 where
941 Key<K>: Borrow<Q>,
942 Q: Eq + Hash + ?Sized,
943 {
944 self.remove(key, context)
945 }
946
947 fn capacity(&self) -> usize {
948 self.capacity
949 }
950
951 fn metrics(&self) -> MetricsSnapshot {
952 self.metrics.snapshot()
953 }
954}
955
956impl<K, V> Drop for S3FIFOCache<K, V>
957where
958 K: Eq + Hash,
959{
960 fn drop(&mut self) {
961 let guard = pin();
962
963 for slot in &self.slots {
964 let entry = slot.entry.swap(Shared::null(), Relaxed, &guard);
965
966 if !entry.is_null() {
967 unsafe { guard.defer_destroy(entry) }
968 }
969 }
970 }
971}
972
973#[cfg(test)]
974mod tests {
975 use super::*;
976 use crate::core::utils::random_string;
977 use crate::core::workload::{WorkloadGenerator, WorkloadStatistics};
978 use rand::{RngExt, rng};
979 use std::sync::{Arc, Mutex};
980 use std::thread::scope;
981
982 #[inline(always)]
983 fn create_cache<K, V>(capacity: usize) -> S3FIFOCache<K, V>
984 where
985 K: Eq + Hash,
986 {
987 S3FIFOCache::new(capacity, MetricsConfig::default())
988 }
989
990 #[test]
991 fn test_s3cache_insert_should_retrieve_stored_value() {
992 let cache = create_cache(10);
993 let context = ThreadContext::default();
994
995 let key = random_string();
996 let value = random_string();
997 let entry = Entry::new(key.clone(), value.clone());
998
999 cache.insert(entry, &context, &mut RequestQuota::default());
1000
1001 let entry_ref = cache.get(&key, &context).expect("must present");
1002
1003 assert_eq!(entry_ref.key(), &key);
1004 assert_eq!(entry_ref.value(), &value);
1005 }
1006
1007 #[test]
1008 fn test_s3cache_insert_should_overwrite_existing_key() {
1009 let cache = create_cache(10);
1010 let context = ThreadContext::default();
1011
1012 let key = random_string();
1013 let key_ref: &str = key.as_ref();
1014 let value1 = random_string();
1015 let value2 = random_string();
1016
1017 cache.insert(
1018 Entry::new(key.clone(), value1.clone()),
1019 &context,
1020 &mut RequestQuota::default(),
1021 );
1022
1023 let entry_ref = cache.get(key_ref, &context);
1024 assert!(entry_ref.is_some(), "the entry must present");
1025 assert_eq!(entry_ref.unwrap().value(), &value1);
1026
1027 cache.insert(
1028 Entry::new(key.clone(), value2.clone()),
1029 &context,
1030 &mut RequestQuota::default(),
1031 );
1032
1033 let entry_ref = cache.get(key_ref, &context);
1034 assert!(entry_ref.is_some(), "the entry must present");
1035 assert_eq!(entry_ref.unwrap().value(), &value2);
1036 }
1037
1038 #[test]
1039 fn test_s3cache_remove_should_invalidate_entry() {
1040 let cache = create_cache(100);
1041 let context = ThreadContext::default();
1042
1043 let key = random_string();
1044 let value = random_string();
1045
1046 cache.insert(
1047 Entry::new(key.clone(), value.clone()),
1048 &context,
1049 &mut RequestQuota::default(),
1050 );
1051
1052 let entry_ref = cache.get(&key, &context).expect("entry must present");
1053
1054 assert_eq!(entry_ref.key(), &key);
1055 assert_eq!(entry_ref.value(), &value);
1056
1057 assert!(cache.remove(&key, &context));
1058
1059 assert!(cache.get(&key, &context).is_none());
1060 }
1061
1062 #[test]
1063 fn test_s3cache_fill_beyond_capacity_should_evict_fifo() {
1064 let cache = create_cache(100);
1065 let context = ThreadContext::default();
1066
1067 for _ in 0..1000 {
1068 let key = random_string();
1069 let value = random_string();
1070 let entry = Entry::new(key, value);
1071 cache.insert(entry, &context, &mut RequestQuota::default());
1072 }
1073 }
1074
1075 #[test]
1076 fn test_s3cache_hot_entry_should_resist_eviction() {
1077 let cache = create_cache(1000);
1078 let context = &ThreadContext::default();
1079
1080 let key = random_string();
1081 let value = random_string();
1082 let entry = Entry::new(key.clone(), value.clone());
1083
1084 cache.insert(entry, context, &mut RequestQuota::default());
1085
1086 let entry_ref = cache.get(&key, &context).expect("entry must present");
1087
1088 assert_eq!(entry_ref.value(), &value);
1089
1090 for _ in 0..250 {
1091 cache.insert(
1092 Entry::new(random_string(), random_string()),
1093 context,
1094 &mut RequestQuota::default(),
1095 );
1096 }
1097
1098 let entry = cache.get(&key, &context).expect("must present");
1099
1100 assert_eq!(entry.key(), &key);
1101 assert_eq!(entry.value(), &value);
1102 }
1103
1104 #[test]
1105 fn test_s3cache_reinserted_ghost_entry_should_be_promoted_to_main() {
1106 let cache = create_cache(1000);
1107 let context = ThreadContext::default();
1108
1109 let (key, value) = (random_string(), random_string());
1110
1111 cache.insert(
1112 Entry::new(key.clone(), value.clone()),
1113 &context,
1114 &mut RequestQuota::default(),
1115 );
1116
1117 for _ in 0..1000 {
1118 let key = random_string();
1119 let value = random_string();
1120 cache.insert(
1121 Entry::new(key.clone(), value.clone()),
1122 &context,
1123 &mut RequestQuota::default(),
1124 );
1125 }
1126
1127 assert!(cache.get(&key, &context).is_none());
1128
1129 cache.insert(
1130 Entry::new(key.clone(), value.clone()),
1131 &context,
1132 &mut RequestQuota::default(),
1133 );
1134
1135 for _ in 0..1000 {
1136 let key = random_string();
1137 let value = random_string();
1138
1139 cache.insert(
1140 Entry::new(key.clone(), value.clone()),
1141 &context,
1142 &mut RequestQuota::default(),
1143 );
1144 }
1145
1146 let entry = cache.get(&key, &context).expect("entry must present");
1147
1148 assert_eq!(entry.key(), &key);
1149 assert_eq!(entry.value(), &value);
1150 }
1151
1152 #[test]
1153 fn test_s3cache_ghost_filter_should_protect_working_set() {
1154 let cache = create_cache(1000);
1155 let context = ThreadContext::default();
1156
1157 let hot_entries = vec![
1158 (random_string(), random_string()),
1159 (random_string(), random_string()),
1160 (random_string(), random_string()),
1161 (random_string(), random_string()),
1162 (random_string(), random_string()),
1163 ];
1164
1165 for (key, value) in &hot_entries {
1166 let key = key.clone();
1167 let value = value.clone();
1168
1169 cache.insert(
1170 Entry::new(key, value),
1171 &context,
1172 &mut RequestQuota::default(),
1173 );
1174 }
1175
1176 for i in 0..100000 {
1177 if i % 2 == 0 {
1178 let key = format!("key-{}", i);
1179 let value = format!("value-{}", i);
1180 let entry = Entry::new(key, value);
1181 cache.insert(entry, &context, &mut RequestQuota::default());
1182 } else {
1183 let index = rng().random_range(..hot_entries.len());
1184 let key = hot_entries[index].0.as_str();
1185 let _ = cache.get(key, &context);
1186 }
1187 }
1188
1189 let count = hot_entries
1190 .iter()
1191 .map(|(key, _)| cache.get(key, &context))
1192 .filter(Option::is_some)
1193 .count();
1194
1195 assert!(count >= 4);
1196 }
1197
1198 #[test]
1199 fn test_s3cache_concurrent_hammer_should_not_crash_or_hang() {
1200 let cache = create_cache(1000);
1201 let num_threads = 32;
1202 let ops_per_thread = 5000;
1203
1204 scope(|scope| {
1205 for _ in 0..num_threads {
1206 scope.spawn(|| {
1207 let context = ThreadContext::default();
1208 for op in 0..ops_per_thread {
1209 let key = (op % 500).to_string();
1210 if op % 2 == 0 {
1211 cache.insert(
1212 Entry::new(key, random_string()),
1213 &context,
1214 &mut RequestQuota::default(),
1215 );
1216 } else {
1217 let _ = cache.get(&key, &context);
1218 }
1219 }
1220 });
1221 }
1222 });
1223 }
1224
1225 #[test]
1226 fn test_s3_fifo_should_protect_hot_set_under_high_churn() {
1227 let capacity = 1000;
1228 let cache = create_cache(capacity);
1229 let context = ThreadContext::default();
1230
1231 let num_threads = 16;
1232 let ops_per_thread = 10000;
1233
1234 let workload_generator = WorkloadGenerator::new(20000, 1.3);
1235 let workload_statistics = WorkloadStatistics::new();
1236
1237 let mut rand = rng();
1238
1239 for _ in 0..capacity {
1240 let key = workload_generator.key(&mut rand);
1241 cache.insert(
1242 Entry::new(key.clone(), "value"),
1243 &context,
1244 &mut RequestQuota::default(),
1245 );
1246 workload_statistics.record(key);
1247 }
1248
1249 scope(|scope| {
1250 for _ in 0..num_threads {
1251 scope.spawn(|| {
1252 let mut thread_rng = rng();
1253 let context = ThreadContext::default();
1254
1255 for _ in 0..ops_per_thread {
1256 let key = workload_generator.key(&mut thread_rng);
1257 workload_statistics.record(key.clone());
1258
1259 if cache.get(&key, &context).is_none() {
1260 cache.insert(
1261 Entry::new(key, "value"),
1262 &context,
1263 &mut RequestQuota::default(),
1264 );
1265 }
1266 }
1267 });
1268 }
1269 });
1270
1271 let top_keys_size = 500;
1272 let frequent_keys = workload_statistics.frequent_keys(top_keys_size);
1273
1274 let count = frequent_keys.iter().fold(0, |acc, key| {
1275 if cache.get(key, &context).is_some() {
1276 acc + 1
1277 } else {
1278 acc
1279 }
1280 });
1281
1282 assert!(
1283 count >= 400,
1284 "S3-FIFO efficiency dropped! Captured only {}/{} hot keys",
1285 count,
1286 top_keys_size
1287 );
1288 }
1289
1290 #[test]
1291 fn test_s3cache_ttl_entry_should_expire() {
1292 let cache = create_cache(10);
1293 let context = ThreadContext::default();
1294 let key = random_string();
1295 let value = random_string();
1296
1297 let expired = Arc::new(Mutex::new(false));
1298
1299 let is_expired = {
1300 let expired = expired.clone();
1301 move || *expired.lock().unwrap()
1302 };
1303
1304 cache.insert(
1305 Entry::with_custom_expiration(key.clone(), value.clone(), is_expired),
1306 &context,
1307 &mut RequestQuota::default(),
1308 );
1309
1310 assert!(cache.get(&key, &context).is_some());
1311
1312 *expired.lock().unwrap() = true;
1313
1314 assert!(
1315 cache.get(&key, &context).is_none(),
1316 "Entry should have expired"
1317 );
1318 }
1319}