Skip to main content

omega_cache/
clock.rs

1use crate::core::engine::CacheEngine;
2use crate::core::entry::Entry;
3use crate::core::entry_ref::Ref;
4use crate::core::index::IndexTable;
5use crate::core::key::Key;
6use crate::core::request_quota::RequestQuota;
7use crate::core::ring::RingQueue;
8use crate::core::tag::{Index, Tag};
9use crate::core::thread_context::ThreadContext;
10use crate::core::utils::hash;
11use crate::metrics::{Metrics, MetricsConfig, MetricsSnapshot};
12use crossbeam::epoch::{Atomic, pin};
13use crossbeam::utils::CachePadded;
14use crossbeam_epoch::{Owned, Shared};
15use std::borrow::Borrow;
16use std::hash::Hash;
17use std::ptr::NonNull;
18use std::sync::atomic::AtomicU64;
19use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
20use std::time::Instant;
21
22#[derive(Debug)]
23struct Slot<K, V>
24where
25    K: Eq + Hash,
26{
27    tag: AtomicU64,
28    entry: Atomic<Entry<K, V>>,
29}
30
31impl<K, V> Default for Slot<K, V>
32where
33    K: Eq + Hash,
34{
35    fn default() -> Self {
36        Self {
37            tag: AtomicU64::default(),
38            entry: Default::default(),
39        }
40    }
41}
42
43/// An implementation of a concurrent Clock cache.
44///
45/// The Clock algorithm is an efficient $O(1)$ approximation of Least Recently Used (LRU).
46/// It uses a circular "clock hand" logic to iterate over slots and evict entries that
47/// haven't been recently accessed.
48///
49/// ### Concurrency Model
50/// This implementation is **lock-free for reads** and uses fine-grained atomic state
51/// transitions for concurrent writes and evictions.
52///
53/// * **Tag-based Synchronization:** Each slot is protected by an `AtomicU64` tag that
54///   combines a versioned index, a frequency bit (Hot/Cold), and a "Busy" bit.
55/// * **Memory Safety:** Uses Epoch-based Reclamation (via `crossbeam-epoch`) to ensure
56///   safe destruction of evicted entries while other threads hold references.
57pub struct ClockCache<K, V>
58where
59    K: Eq + Hash,
60{
61    /// Key → slot index mapping. Provides $O(1)$ lookup to find the physical slot.
62    index_table: IndexTable<K>,
63    /// Fixed-size array of slots. CachePadded to prevent false sharing between cores.
64    slots: Box<[CachePadded<Slot<K, V>>]>,
65    /// MPMC Pool of available indices.
66    ///
67    /// **Implementation Note:** The pool is sized to `capacity` but utilizes
68    /// `tail - head` logical checks to distinguish between a "Busy Slot"
69    /// (transient state) and a "Full Queue" (physical saturation).
70    index_pool: RingQueue,
71    capacity: usize,
72    metrics: Metrics,
73}
74
75impl<K, V> ClockCache<K, V>
76where
77    K: Eq + Hash,
78{
79    /// Creates a new Clock cache with a fixed capacity.
80    ///
81    /// ### Initialization Logic
82    /// 1. **Slots:** Allocates a contiguous block of memory for `capacity` slots.
83    ///    Each slot is wrapped in `CachePadded` to eliminate L1 cache-line contention
84    ///    between concurrent readers and writers.
85    /// 2. **Index Pool:** Initializes the MPMC `RingQueue`.
86    ///    *Note:* The pool is populated with all available slot indices (0..capacity)
87    ///    immediately. This "cold start" state treats every slot as vacant and
88    ///    ready for the first wave of insertions.
89    /// 3. **Metrics:** Sets up sharded atomic counters for tracking hits, misses,
90    ///    and eviction latency without creating a global bottleneck.
91    pub fn new(capacity: usize, metrics_config: MetricsConfig) -> Self {
92        let slots = (0..capacity)
93            .map(|_| CachePadded::new(Slot::default()))
94            .collect::<Vec<_>>()
95            .into_boxed_slice();
96
97        let index_pool = RingQueue::new(capacity);
98
99        let context = ThreadContext::default();
100
101        for index in 0..capacity {
102            index_pool
103                .push(index as u64, &context)
104                .expect("index pool can't be overflowed");
105        }
106        Self {
107            index_table: IndexTable::new(),
108            slots,
109            index_pool,
110            capacity,
111            metrics: Metrics::new(metrics_config),
112        }
113    }
114
115    /// Retrieves an entry from the cache.
116    ///
117    /// Uses an `Acquire` load on the slot tag to synchronize with the `Release`
118    /// store of the last writer. If the entry is `Cold`, it is upgraded to `Hot`
119    /// via a `compare_exchange` to protect it from the next eviction cycle.
120    pub fn get<Q>(&self, key: &Q, context: &ThreadContext) -> Option<Ref<K, V>>
121    where
122        Key<K>: Borrow<Q>,
123        Q: Eq + Hash + ?Sized,
124    {
125        let started_at = Instant::now();
126        let guard = pin();
127        let hash = hash(key);
128
129        loop {
130            match self.index_table.get(key) {
131                Some(index) => {
132                    let index = Index::from(index);
133
134                    let slot = &self.slots[index.slot_index()];
135                    let mut tag = Tag::from(slot.tag.load(Acquire));
136
137                    if !tag.is_match(index, hash) {
138                        let latency = started_at.elapsed().as_millis() as u64;
139                        self.metrics.record_miss();
140                        self.metrics.record_latency(latency);
141                        return None;
142                    }
143
144                    let entry = slot.entry.load(Acquire, &guard);
145
146                    match unsafe { entry.as_ref() } {
147                        None => {
148                            let latency = started_at.elapsed().as_millis() as u64;
149                            self.metrics.record_miss();
150                            self.metrics.record_latency(latency);
151                            break None;
152                        }
153                        Some(entry_ref) => {
154                            if entry_ref.key().borrow() != key || entry_ref.is_expired() {
155                                let latency = started_at.elapsed().as_millis() as u64;
156                                self.metrics.record_miss();
157                                self.metrics.record_latency(latency);
158                                break None;
159                            }
160
161                            match slot.tag.compare_exchange_weak(
162                                tag.into(),
163                                tag.increment_frequency().into(),
164                                Release,
165                                Acquire,
166                            ) {
167                                Ok(_) => {
168                                    context.decay();
169                                }
170                                Err(latest) => {
171                                    tag = Tag::from(latest);
172                                    context.wait();
173                                    continue;
174                                }
175                            }
176
177                            self.metrics.record_hit();
178                            self.metrics
179                                .record_latency(started_at.elapsed().as_millis() as u64);
180
181                            break Some(Ref::new(NonNull::from_ref(entry_ref), guard));
182                        }
183                    }
184                }
185                None => {
186                    self.metrics.record_miss();
187                    self.metrics
188                        .record_latency(started_at.elapsed().as_millis() as u64);
189                    return None;
190                }
191            }
192        }
193    }
194
195    /// Inserts a new entry or updates an existing one.
196    ///
197    /// If the key is missing, the thread initiates the **Eviction Loop**:
198    /// 1. Pops an index from the `index_pool`.
199    /// 2. If the slot is `Hot`: Decrements frequency and pushes it back (Second Chance).
200    /// 3. If the slot is `Cold`: Claims the slot using a `Busy` bit, swaps the entry,
201    ///    and updates the `IndexTable`.
202    ///
203    /// ### Memory Ordering Logic
204    /// 1. **Claiming:** `AcqRel` on the tag ensures exclusive access to the slot.
205    /// 2. **Writing:** `Relaxed` store on the `entry` is safe because it is
206    ///    guarded by the surrounding Tag barriers.
207    /// 3. **Publishing:** `Release` store on the final `Tag` makes the new
208    ///    entry visible to all concurrent readers.
209    pub fn insert(&self, entry: Entry<K, V>, context: &ThreadContext, quota: &mut RequestQuota) {
210        let started_at = Instant::now();
211        let guard = pin();
212        let hash = hash(entry.key());
213
214        while quota.consume() {
215            match self.index_table.get(entry.key()).map(Index::from) {
216                Some(index) => {
217                    let slot = &self.slots[index.slot_index()];
218                    let tag = Tag::from(slot.tag.load(Acquire));
219
220                    if !tag.is_match(index, hash) {
221                        context.wait();
222                        continue;
223                    }
224
225                    match slot.tag.compare_exchange_weak(
226                        tag.into(),
227                        tag.busy().into(),
228                        AcqRel,
229                        Relaxed,
230                    ) {
231                        Ok(_) => {
232                            context.decay();
233                        }
234                        Err(_) => {
235                            context.wait();
236                            continue;
237                        }
238                    }
239
240                    let old_entry = slot.entry.load(Relaxed, &guard);
241
242                    if let Some(old_entry_ref) = unsafe { old_entry.as_ref() } {
243                        if old_entry_ref.key() != entry.key() {
244                            context.wait();
245                            continue;
246                        }
247
248                        unsafe { guard.defer_destroy(old_entry) };
249                    }
250
251                    slot.entry.store(Owned::new(entry), Relaxed);
252                    slot.tag.store(tag.increment_frequency().into(), Release);
253
254                    self.metrics
255                        .record_latency(started_at.elapsed().as_millis() as u64);
256
257                    return;
258                }
259                None => match self.index_pool.pop(context) {
260                    Some(index) => {
261                        let index = Index::from(index);
262
263                        let slot = &self.slots[index.slot_index()];
264                        let mut tag = Tag::from(slot.tag.load(Acquire));
265
266                        loop {
267                            if tag.is_hot() {
268                                if let Err(latest) = slot.tag.compare_exchange_weak(
269                                    tag.into(),
270                                    tag.decrement_frequency().into(),
271                                    Release,
272                                    Acquire,
273                                ) {
274                                    tag = Tag::from(latest);
275                                    context.wait();
276                                    continue;
277                                }
278
279                                self.index_pool
280                                    .push(index.into(), context)
281                                    .expect("index pool can't be overflowed");
282
283                                context.decay();
284
285                                break;
286                            }
287
288                            if let Err(latest) = slot.tag.compare_exchange_weak(
289                                tag.into(),
290                                tag.busy().into(),
291                                AcqRel,
292                                Acquire,
293                            ) {
294                                tag = Tag::from(latest);
295                                context.wait();
296                                continue;
297                            }
298
299                            let entry = Owned::new(entry);
300                            let key = entry.key().clone();
301
302                            let victim = slot.entry.swap(entry, Relaxed, &guard);
303
304                            if let Some(victim_ref) = unsafe { victim.as_ref() } {
305                                self.index_table.remove(victim_ref.key());
306                                self.metrics.record_eviction();
307                                unsafe { guard.defer_destroy(victim) };
308                            }
309
310                            let (tag, index) = tag.advance(index);
311                            let tag = tag.with_signature(hash);
312
313                            slot.tag.store(tag.into(), Release);
314
315                            self.index_pool
316                                .push(index.into(), context)
317                                .expect("index pool can't be overflowed");
318
319                            context.decay();
320
321                            self.index_table.insert(key, index.into());
322
323                            self.metrics
324                                .record_latency(started_at.elapsed().as_millis() as u64);
325
326                            return;
327                        }
328                    }
329                    None => context.wait(),
330                },
331            }
332        }
333    }
334
335    /// Removes an entry from the cache and resets its physical slot.
336    ///
337    /// This operation performs a two-stage teardown:
338    /// 1. **Logical Removal:** Atomically removes the key from the `index_table`.
339    /// 2. **Physical Invalidation:** Transitions the associated slot's `Tag` to a
340    ///    reset/vacant state via a `compare_exchange` loop.
341    ///
342    /// ### Synchronization
343    /// * **Tag Match:** If the `Tag` no longer matches the expected index or hash
344    ///   (due to a concurrent eviction), the method returns `true` immediately,
345    ///   as the target entry is already gone.
346    /// * **Reset:** A successful `tag.reset()` ensures that any "in-flight" `get`
347    ///   requests observing this slot will see a signature mismatch and return `None`.
348    ///
349    /// ### Memory Reclamation
350    /// Note that while the `Tag` is reset, the `Entry` itself remains in the slot
351    /// and the `index` is not pushed back to the `index_pool`. The physical
352    /// memory is reclaimed by the `insert` logic when the Clock hand eventually
353    /// encounters this reset slot.
354    ///
355    /// # Parameters
356    /// * `key`: The key to be removed.
357    /// * `context`: The thread context for backoff coordination during contention.
358    ///
359    /// # Returns
360    /// Returns `true` if the entry was found and successfully invalidated.
361    pub fn remove<Q>(&self, key: &Q, context: &ThreadContext) -> bool
362    where
363        Key<K>: Borrow<Q>,
364        Q: Eq + Hash + ?Sized,
365    {
366        match self.index_table.remove(key) {
367            Some(index) => {
368                let index = Index::from(index);
369
370                let slot = &self.slots[index.slot_index()];
371                let mut tag = Tag::from(slot.tag.load(Acquire));
372
373                let hash = hash(key);
374
375                loop {
376                    if !tag.is_match(index, hash) {
377                        return true;
378                    }
379
380                    if let Err(latest) = slot.tag.compare_exchange_weak(
381                        tag.into(),
382                        tag.reset().into(),
383                        Release,
384                        Acquire,
385                    ) {
386                        tag = Tag::from(latest);
387                        context.wait();
388                        continue;
389                    }
390
391                    return true;
392                }
393            }
394            None => false,
395        }
396    }
397}
398
399impl<K, V> CacheEngine<K, V> for ClockCache<K, V>
400where
401    K: Eq + Hash,
402{
403    fn get<Q>(&self, key: &Q, context: &ThreadContext) -> Option<Ref<K, V>>
404    where
405        Key<K>: Borrow<Q>,
406        Q: Eq + Hash + ?Sized,
407    {
408        self.get(key, context)
409    }
410
411    fn insert(&self, entry: Entry<K, V>, context: &ThreadContext, quota: &mut RequestQuota) {
412        self.insert(entry, context, quota)
413    }
414
415    fn remove<Q>(&self, key: &Q, context: &ThreadContext) -> bool
416    where
417        Key<K>: Borrow<Q>,
418        Q: Eq + Hash + ?Sized,
419    {
420        self.remove(key, context)
421    }
422
423    #[inline]
424    fn capacity(&self) -> usize {
425        self.capacity
426    }
427
428    #[inline]
429    fn metrics(&self) -> MetricsSnapshot {
430        self.metrics.snapshot()
431    }
432}
433
434impl<K, V> Drop for ClockCache<K, V>
435where
436    K: Eq + Hash,
437{
438    fn drop(&mut self) {
439        let guard = pin();
440
441        for slot in &self.slots {
442            let shared_old = slot.entry.swap(Shared::null(), AcqRel, &guard);
443
444            if !shared_old.is_null() {
445                unsafe { guard.defer_destroy(shared_old) }
446            }
447        }
448
449        guard.flush();
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use crate::core::utils::random_string;
457    use crate::core::workload::{WorkloadGenerator, WorkloadStatistics};
458    use rand::rng;
459    use std::hash::Hash;
460    use std::sync::{Arc, Mutex};
461    use std::thread::scope;
462
463    #[inline(always)]
464    fn create_cache<K, V>(capacity: usize) -> ClockCache<K, V>
465    where
466        K: Eq + Hash,
467    {
468        ClockCache::new(capacity, MetricsConfig::default())
469    }
470
471    #[test]
472    fn test_clock_cache_insert_should_retrieve_stored_value() {
473        let cache = create_cache(10);
474        let context = ThreadContext::default();
475
476        let key = random_string();
477        let value = random_string();
478
479        cache.insert(
480            Entry::new(key.clone(), value.clone()),
481            &context,
482            &mut RequestQuota::default(),
483        );
484
485        let entry = cache.get(&key, &context).expect("must present");
486
487        assert_eq!(entry.key(), &key);
488        assert_eq!(entry.value(), &value);
489    }
490
491    #[test]
492    fn test_clock_cache_insert_should_overwrite_existing_key() {
493        let cache = create_cache(10);
494        let context = ThreadContext::default();
495
496        let key = random_string();
497        let value1 = random_string();
498        let value2 = random_string();
499
500        cache.insert(
501            Entry::new(key.clone(), value1),
502            &context,
503            &mut RequestQuota::default(),
504        );
505        cache.insert(
506            Entry::new(key.clone(), value2.clone()),
507            &context,
508            &mut RequestQuota::default(),
509        );
510
511        let entry = cache.get(&key, &context).expect("entry must present");
512
513        assert_eq!(entry.key(), &key);
514        assert_eq!(entry.value(), &value2);
515    }
516
517    #[test]
518    fn test_clock_cache_remove_should_invalidate_entry() {
519        let cache = create_cache(100);
520        let context = ThreadContext::default();
521
522        let key = random_string();
523        let value = random_string();
524
525        cache.insert(
526            Entry::new(key.clone(), value),
527            &context,
528            &mut RequestQuota::default(),
529        );
530
531        assert!(cache.get(&key, &context).is_some());
532
533        assert!(cache.remove(&key, &context));
534
535        assert!(cache.get(&key, &context).is_none());
536    }
537
538    #[test]
539    fn test_clock_cache_ghost_reference_safety_should_protect_memory() {
540        let cache = create_cache(2);
541        let context = ThreadContext::default();
542
543        let key = random_string();
544        let value = random_string();
545
546        cache.insert(
547            Entry::new(key.clone(), value.clone()),
548            &context,
549            &mut RequestQuota::default(),
550        );
551
552        let entry_ref = cache.get(&key, &context).expect("key should present");
553
554        for _ in 0..10000 {
555            let (key, value) = (random_string(), random_string());
556            cache.insert(
557                Entry::new(key.clone(), value),
558                &context,
559                &mut RequestQuota::default(),
560            );
561        }
562
563        assert!(cache.get(&key, &context).is_none());
564
565        assert_eq!(entry_ref.value(), &value);
566    }
567
568    #[test]
569    fn test_clock_cache_hot_entry_should_resist_eviction() {
570        let cache = create_cache(2);
571        let context = ThreadContext::default();
572
573        cache.insert(
574            Entry::new(1, random_string()),
575            &context,
576            &mut RequestQuota::default(),
577        );
578        cache.insert(
579            Entry::new(2, random_string()),
580            &context,
581            &mut RequestQuota::default(),
582        );
583
584        let _ = cache.get(&1, &context);
585
586        cache.insert(
587            Entry::new(3, random_string()),
588            &context,
589            &mut RequestQuota::default(),
590        );
591
592        assert!(
593            cache.get(&1, &context).is_some(),
594            "K1 should have been protected by Hot state"
595        );
596        assert!(
597            cache.get(&2, &context).is_none(),
598            "K2 should have been the first choice for eviction"
599        );
600    }
601
602    #[test]
603    fn test_clock_cache_ttl_expiration_should_hide_expired_items() {
604        let cache = create_cache(10);
605        let context = ThreadContext::default();
606        let key = random_string();
607        let value = random_string();
608
609        let expired = Arc::new(Mutex::new(false));
610
611        let is_expired = {
612            let expired = expired.clone();
613            move || *expired.lock().unwrap()
614        };
615
616        cache.insert(
617            Entry::with_custom_expiration(key.clone(), value.clone(), is_expired),
618            &context,
619            &mut RequestQuota::default(),
620        );
621
622        assert!(cache.get(&key, &context).is_some());
623
624        *expired.lock().unwrap() = true;
625
626        assert!(cache.get(&key, &context).is_none());
627    }
628
629    #[test]
630    fn test_clock_cache_concurrent_hammer_should_not_crash_or_hang() {
631        let cache = create_cache(1024);
632        let num_threads = 16;
633        let ops_per_thread = 10000;
634
635        scope(|s| {
636            for thread_id in 0..num_threads {
637                let cache = &cache;
638                s.spawn(move || {
639                    let context = ThreadContext::default();
640                    for i in 0..ops_per_thread {
641                        let key = (thread_id * 100) + (i % 50);
642                        let value = random_string();
643                        cache.insert(
644                            Entry::new(key, value),
645                            &context,
646                            &mut RequestQuota::default(),
647                        );
648                        let _ = cache.get(&key, &context);
649                        if i % 10 == 0 {
650                            cache.remove(&key, &context);
651                        }
652                    }
653                });
654            }
655        });
656    }
657
658    #[test]
659    fn test_clock_cache_should_preserve_hot_set() {
660        let capacity = 1024;
661        let cache = create_cache(capacity);
662        let context = ThreadContext::default();
663
664        let num_threads = 16;
665        let ops_per_thread = 15_000;
666        let workload_generator = WorkloadGenerator::new(10000, 1.2);
667        let workload_statistics = WorkloadStatistics::new();
668
669        let mut rand = rng();
670
671        for _ in 0..capacity {
672            let key = workload_generator.key(&mut rand);
673            cache.insert(
674                Entry::new(key.clone(), "value"),
675                &context,
676                &mut RequestQuota::default(),
677            );
678            workload_statistics.record(key);
679        }
680
681        scope(|scope| {
682            for _ in 0..num_threads {
683                scope.spawn(|| {
684                    let mut rand = rng();
685                    let context = ThreadContext::default();
686                    for _ in 0..ops_per_thread {
687                        let key = workload_generator.key(&mut rand);
688
689                        if cache.get(&key, &context).is_none() {
690                            cache.insert(
691                                Entry::new(key.clone(), "value"),
692                                &context,
693                                &mut RequestQuota::default(),
694                            );
695                            workload_statistics.record(key);
696                        }
697                    }
698                });
699            }
700        });
701
702        let count = workload_statistics
703            .frequent_keys(500)
704            .iter()
705            .fold(0, |acc, key| {
706                if cache.get(key, &context).is_some() {
707                    acc + 1
708                } else {
709                    acc
710                }
711            });
712
713        assert!(count >= 200)
714    }
715}