Skip to main content

omega_cache/
clock.rs

1use crate::clock::WriteResult::{Rejected, Retry, Written};
2use crate::core::backoff::BackoffConfig;
3use crate::core::engine::CacheEngine;
4use crate::core::entry::Entry;
5use crate::core::entry_ref::Ref;
6use crate::core::key::Key;
7use crate::metrics::{Metrics, MetricsConfig, MetricsSnapshot};
8use SlotState::{Claimed, Cold, Hot, Vacant};
9use crossbeam::epoch::{Atomic, Guard, Owned, pin};
10use crossbeam_epoch::Shared;
11use dashmap::DashMap;
12use std::borrow::Borrow;
13use std::hash::Hash;
14use std::ptr::NonNull;
15use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
16use std::sync::atomic::{AtomicU8, AtomicUsize};
17use std::time::Instant;
18
19/// Represents the state of a slot in the Clock cache.
20///
21/// # Invariants
22/// - A slot always has exactly one state.
23/// - `Vacant` → slot has no entry.
24/// - `Cold` → slot has an entry that is eligible for eviction.
25/// - `Hot` → slot has an entry recently accessed.
26/// - `Claimed` → temporary state during write; readers treat it as empty.
27#[repr(u8)]
28#[derive(Copy, Clone, Debug, PartialEq, Eq)]
29enum SlotState {
30    Vacant = 0,
31    Cold = 1,
32    Hot = 2,
33    Claimed = 3,
34}
35
36impl From<SlotState> for u8 {
37    fn from(clock: SlotState) -> Self {
38        match clock {
39            Vacant => 0,
40            Cold => 1,
41            Hot => 2,
42            Claimed => 3,
43        }
44    }
45}
46
47impl From<u8> for SlotState {
48    fn from(clock: u8) -> Self {
49        match clock {
50            0 => Vacant,
51            1 => Cold,
52            2 => Hot,
53            3 => Claimed,
54            _ => unreachable!("only values 0-3 are supported"),
55        }
56    }
57}
58
59#[derive(Debug)]
60enum WriteResult<K, V>
61where
62    K: Eq + Hash,
63{
64    Written,
65    Retry(Entry<K, V>),
66    Rejected,
67}
68
69/// A single slot in the Clock cache.
70///
71/// Holds an atomic pointer to an `Entry` and a clock state.
72#[derive(Debug)]
73struct Slot<K, V>
74where
75    K: Eq + Hash,
76{
77    /// The atomic entry stored in the slot.
78    entry: Atomic<Entry<K, V>>,
79    /// The clock state for eviction logic.
80    state: AtomicU8,
81}
82
83impl<K, V> Slot<K, V>
84where
85    K: Eq + Hash,
86{
87    /// Creates a new, empty slot.
88    fn empty() -> Self {
89        Self {
90            entry: Atomic::default(),
91            state: AtomicU8::new(Vacant.into()),
92        }
93    }
94
95    /// Attempts to retrieve a value from the slot.
96    ///
97    /// If the slot contains a valid, non-expired entry matching the provided key,
98    /// it returns a `Ref` handle. This handle consumes the provided `Guard` to
99    /// ensure the entry's memory remains valid for the duration of the handle's life.
100    ///
101    /// # State Transitions
102    /// - Accessing a `Cold` entry triggers an asynchronous upgrade to `Hot`.
103    /// - Returns `None` if the slot is `Vacant`, `Claimed`, or contains a different/expired key.
104    fn get<Q>(&self, key: &Q, guard: Guard) -> Option<Ref<K, V>>
105    where
106        Key<K>: Borrow<Q>,
107        Q: Eq + Hash + ?Sized,
108    {
109        let state = self.state();
110
111        match state {
112            Vacant | Claimed => None,
113            Cold | Hot => {
114                let shared_entry = self.entry.load(Acquire, &guard);
115                if shared_entry.is_null() {
116                    return None;
117                }
118
119                let entry = unsafe { shared_entry.deref() };
120
121                if entry.key().borrow() != key || entry.is_expired() {
122                    return None;
123                }
124
125                if state == Cold {
126                    self.upgrade();
127                }
128
129                Some(Ref::new(NonNull::from(entry), guard))
130            }
131        }
132    }
133
134    /// Performs a synchronized write operation on the slot.
135    ///
136    /// This method manages the insertion or replacement of entries using a
137    /// Compare-and-Swap (CAS) on the slot state to prevent race conditions
138    /// between readers and writers.
139    ///
140    /// # Safety
141    /// When an old entry is replaced, its memory is reclaimed via `guard.defer_destroy`
142    /// to ensure any concurrent readers can finish their operations safely.
143    ///
144    /// # Returns
145    /// - `Written`: The new entry was stored.
146    /// - `Rejected`: Admission policy or expiration check denied the write.
147    /// - `Retry`: The slot was hot or claimed; the caller should attempt another slot.
148    fn try_write<A, E, I>(
149        &self,
150        entry: Entry<K, V>,
151        guard: Guard,
152        admission: A,
153        on_evict: E,
154        on_insert: I,
155    ) -> WriteResult<K, V>
156    where
157        A: for<'a> Fn(&'a K, &'a K) -> bool,
158        E: for<'a> FnOnce(&'a Key<K>),
159        I: for<'a> FnOnce(&'a Key<K>),
160    {
161        let state = self.state();
162
163        if !self.is_writable(state, &guard) {
164            if state == Hot {
165                let _ = self.downgrade();
166            }
167
168            return Retry(entry);
169        }
170
171        if !self.claim(state) {
172            return Retry(entry);
173        }
174
175        let shared_current_entry = self.entry.load(Acquire, &guard);
176
177        if let Some(current_entry) = unsafe { shared_current_entry.as_ref() } {
178            if !(current_entry.is_expired() || admission(entry.key(), current_entry.key())) {
179                self.store_state(state);
180                return Rejected;
181            }
182
183            on_evict(current_entry.key());
184            unsafe { guard.defer_destroy(shared_current_entry) };
185        }
186
187        let key = entry.key().clone();
188
189        self.entry.store(Owned::new(entry), Relaxed);
190        on_insert(&key);
191        self.state.store(Cold.into(), Release);
192
193        Written
194    }
195
196    #[inline]
197    fn is_writable(&self, state: SlotState, guard: &Guard) -> bool {
198        let shared_entry = self.entry.load(Acquire, guard);
199
200        match unsafe { shared_entry.as_ref() } {
201            Some(entry) if entry.is_expired() => true,
202            None => true,
203            _ => match state {
204                Vacant | Cold => true,
205                Hot | Claimed => false,
206            },
207        }
208    }
209
210    /// Attempts to transition the slot to the `Claimed` state to secure exclusive write access.
211    ///
212    /// This is a "lock-free" acquisition. If the current state has drifted from the
213    /// provided `state` parameter since it was last read, the exchange will fail,
214    /// signaling that another thread has either updated or claimed the slot.
215    ///
216    /// # Returns
217    /// - `true`: The slot is now `Claimed` by the current thread.
218    /// - `false`: The state changed externally; the caller must re-evaluate.
219    #[inline]
220    fn claim(&self, state: SlotState) -> bool {
221        self.state
222            .compare_exchange_weak(state.into(), Claimed.into(), Relaxed, Relaxed)
223            .is_ok()
224    }
225
226    /// Evaluates whether the slot is eligible for eviction or insertion.
227    ///
228    /// A slot is considered writable if:
229    /// 1. It is currently `Vacant` (empty).
230    /// 2. It is `Cold` (eligible for replacement in the Clock algorithm).
231    /// 3. The existing entry has passed its expiration deadline, regardless of state.
232    ///
233    /// If the slot is `Hot` or already `Claimed`, this returns `false`.
234    fn state(&self) -> SlotState {
235        self.state.load(Acquire).into()
236    }
237
238    /// Store a new clock state.
239    fn store_state(&self, state: SlotState) {
240        self.state.store(state.into(), Release);
241    }
242
243    /// Upgrade a `Cold` entry to `Hot`.
244    fn upgrade(&self) -> bool {
245        self.state
246            .compare_exchange_weak(Cold.into(), Hot.into(), Release, Relaxed)
247            .is_ok()
248    }
249
250    /// Downgrade a `Hot` entry to `Cold`.
251    fn downgrade(&self) -> bool {
252        self.state
253            .compare_exchange_weak(Hot.into(), Cold.into(), Release, Relaxed)
254            .is_ok()
255    }
256}
257
258/// Concurrent fixed-size Clock cache.
259///
260/// Maintains a mapping from keys → slot indices and uses
261/// a clock-hand eviction policy.
262pub struct ClockCache<K, V>
263where
264    K: Eq + Hash,
265{
266    /// Key → slot index mapping.
267    index: DashMap<Key<K>, usize>,
268    /// Fixed-size array of slots.
269    slots: Box<[Slot<K, V>]>,
270    /// Clock hand for eviction.
271    hand: AtomicUsize,
272    /// Capacity mask (capacity must be power-of-two).
273    capacity_mask: usize,
274    /// Maximum number of elements.
275    capacity: usize,
276    backoff_config: BackoffConfig,
277    metrics: Metrics,
278}
279
280impl<K, V> ClockCache<K, V>
281where
282    K: Eq + Hash,
283{
284    /// Create a new Clock cache with the given capacity.
285    pub fn new(
286        capacity: usize,
287        backoff_config: BackoffConfig,
288        metrics_config: MetricsConfig,
289    ) -> Self {
290        let capacity = capacity.next_power_of_two();
291
292        let slots = (0..capacity)
293            .map(|_| Slot::empty())
294            .collect::<Vec<_>>()
295            .into_boxed_slice();
296
297        let capacity_mask = capacity - 1;
298
299        Self {
300            index: DashMap::new(),
301            slots,
302            hand: Default::default(),
303            capacity_mask,
304            capacity,
305            backoff_config,
306            metrics: Metrics::new(metrics_config),
307        }
308    }
309}
310
311impl<K, V> CacheEngine<K, V> for ClockCache<K, V>
312where
313    K: Eq + Hash,
314{
315    /// Get a value by key.
316    ///
317    /// Returns a `Handler` that pins the entry in memory.
318    fn get<Q>(&self, key: &Q) -> Option<Ref<K, V>>
319    where
320        Key<K>: Borrow<Q>,
321        Q: Eq + Hash + ?Sized,
322    {
323        let called_at = Instant::now();
324        let guard = pin();
325
326        self.index
327            .get(key)
328            .and_then(|entry| {
329                let index = *entry.value();
330
331                match self.slots[index].get(key, guard) {
332                    None => {
333                        self.metrics.record_miss();
334                        let elapsed = called_at.elapsed().as_millis() as u64;
335                        self.metrics.record_latency(elapsed);
336                        None
337                    }
338                    Some(reference) => {
339                        self.metrics.record_hit();
340                        let elapsed = called_at.elapsed().as_millis() as u64;
341                        self.metrics.record_latency(elapsed);
342                        Some(reference)
343                    }
344                }
345            })
346            .or_else(|| {
347                self.metrics.record_miss();
348                let elapsed = called_at.elapsed().as_millis() as u64;
349                self.metrics.record_latency(elapsed);
350                None
351            })
352    }
353
354    fn insert_with<F>(&self, key: K, value: V, expired_at: Option<Instant>, admission: F)
355    where
356        F: Fn(&K, &K) -> bool,
357    {
358        let called_at = Instant::now();
359        let mut entry = Entry::new(key, value, expired_at);
360        let mut iter = SlotIter::new(self);
361        let mut backoff = self.backoff_config.build();
362
363        loop {
364            let (index, slot) = match self.index.get(entry.key()) {
365                None => iter.next().expect("cache has at least one slot"),
366                Some(reference) => {
367                    let index = *reference.value();
368                    let slot = &self.slots[index];
369                    (index, slot)
370                }
371            };
372
373            let guard = pin();
374
375            match slot.try_write(
376                entry,
377                guard,
378                &admission,
379                |key| {
380                    self.index.remove(key);
381                    self.metrics.record_eviction();
382                },
383                |key| {
384                    self.index.insert(key.clone(), index);
385                },
386            ) {
387                Written | Rejected => {
388                    let elapsed = called_at.elapsed().as_millis() as u64;
389                    self.metrics.record_latency(elapsed);
390                    break;
391                }
392                Retry(passed_entry) => {
393                    backoff.backoff();
394                    entry = passed_entry
395                }
396            };
397        }
398    }
399
400    fn remove<Q>(&self, key: &Q) -> bool
401    where
402        Key<K>: Borrow<Q>,
403        Q: Eq + Hash + ?Sized,
404    {
405        self.index.remove(key).is_some()
406    }
407
408    #[inline]
409    fn capacity(&self) -> usize {
410        self.capacity
411    }
412
413    #[inline]
414    fn metrics(&self) -> MetricsSnapshot {
415        self.metrics.snapshot()
416    }
417}
418
419impl<K, V> Drop for ClockCache<K, V>
420where
421    K: Eq + Hash,
422{
423    fn drop(&mut self) {
424        let guard = pin();
425
426        for slot in &self.slots {
427            let shared_old = slot.entry.swap(Shared::null(), AcqRel, &guard);
428
429            if !shared_old.is_null() {
430                unsafe { guard.defer_destroy(shared_old) }
431            }
432        }
433
434        guard.flush();
435    }
436}
437
438/// Iterator over slots for eviction.
439///
440/// Skips `Claimed` slots and advances the clock-hand atomically.
441struct SlotIter<'a, K, V>
442where
443    K: Eq + Hash,
444{
445    slots: &'a [Slot<K, V>],
446    hand: &'a AtomicUsize,
447    capacity_mask: usize,
448}
449
450impl<'a, K, V> SlotIter<'a, K, V>
451where
452    K: Eq + Hash,
453{
454    fn new(cache: &'a ClockCache<K, V>) -> Self {
455        Self {
456            slots: &cache.slots,
457            hand: &cache.hand,
458            capacity_mask: cache.capacity_mask,
459        }
460    }
461}
462
463impl<'a, K, V> Iterator for SlotIter<'a, K, V>
464where
465    K: Eq + Hash,
466{
467    type Item = (usize, &'a Slot<K, V>);
468
469    fn next(&mut self) -> Option<Self::Item> {
470        let mut current = self.hand.load(Acquire);
471
472        loop {
473            let next = (current + 1) & self.capacity_mask;
474
475            match self
476                .hand
477                .compare_exchange_weak(current, next, Release, Acquire)
478            {
479                Ok(_) => {
480                    let slot = &self.slots[current];
481                    let clock = slot.state();
482
483                    if clock == Claimed {
484                        current = next;
485                        continue;
486                    }
487
488                    return Some((current, slot));
489                }
490                Err(value) => {
491                    current = value;
492                }
493            }
494        }
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use crate::core::utils::{random_string, random_string_with_len};
502    use crate::core::workload::{WorkloadGenerator, WorkloadStatistics};
503    use rand::distr::{Alphanumeric, SampleString};
504    use rand::{RngExt, rng};
505    use std::thread::scope;
506    use std::time::{Duration, Instant};
507
508    #[inline(always)]
509    fn create_cache<K, V>(capacity: usize) -> ClockCache<K, V>
510    where
511        K: Eq + Hash,
512    {
513        ClockCache::new(
514            capacity,
515            BackoffConfig::exponential(1000),
516            MetricsConfig::default(),
517        )
518    }
519
520    #[inline(always)]
521    fn random_alphanumeric(len: usize) -> String {
522        Alphanumeric.sample_string(&mut rand::rng(), len)
523    }
524
525    #[test]
526    fn test_clock_cache_insert_should_retrieve_stored_value() {
527        let cache = create_cache(10);
528
529        let key = random_alphanumeric(32);
530        let value = random_alphanumeric(255);
531
532        cache.insert(key.clone(), value.clone(), None);
533
534        let entry = cache.get(&key).expect("must present");
535
536        assert_eq!(entry.key(), &key);
537        assert_eq!(entry.value(), &value);
538    }
539
540    #[test]
541    fn test_clock_cache_insert_should_overwrite_existing_key() {
542        let cache = create_cache(10);
543
544        let key = random_alphanumeric(32);
545        let value1 = random_alphanumeric(255);
546        let value2 = random_alphanumeric(255);
547
548        cache.insert(key.clone(), value1, None);
549        cache.insert(key.clone(), value2.clone(), None);
550
551        let entry = cache.get(&key).expect("must present");
552
553        assert_eq!(entry.key(), &key);
554        assert_eq!(entry.value(), &value2);
555    }
556
557    #[test]
558    fn test_clock_cache_remove_should_invalidate_entry() {
559        let cache = create_cache(100);
560
561        let key = random_alphanumeric(32);
562
563        cache.insert(key.clone(), random_alphanumeric(255), None);
564
565        assert!(cache.get(&key).is_some());
566
567        assert!(cache.remove(&key));
568
569        assert!(cache.get(&key).is_none());
570    }
571
572    #[test]
573    fn test_clock_cache_ghost_reference_safety_should_protect_memory() {
574        let cache = create_cache(2);
575
576        let key = random_alphanumeric(32);
577        let value = random_alphanumeric(255);
578
579        cache.insert(key.clone(), value.clone(), None);
580
581        let entry_ref = cache.get(&key).expect("key should present");
582
583        for _ in 0..10000 {
584            let (key, value) = (random_alphanumeric(32), random_alphanumeric(255));
585            cache.insert(key, value, None);
586        }
587
588        assert!(cache.get(&key).is_none());
589
590        assert_eq!(entry_ref.value(), &value);
591    }
592
593    #[test]
594    fn test_clock_cache_hot_entry_should_resist_eviction() {
595        let cache = create_cache(2);
596
597        cache.insert(1, random_alphanumeric(32), None);
598        cache.insert(2, random_alphanumeric(32), None);
599
600        let _ = cache.get(&1);
601
602        cache.insert(3, random_alphanumeric(32), None);
603
604        assert!(
605            cache.get(&1).is_some(),
606            "K1 should have been protected by Hot state"
607        );
608        assert!(
609            cache.get(&2).is_none(),
610            "K2 should have been the first choice for eviction"
611        );
612    }
613
614    #[test]
615    fn test_clock_cache_ttl_expiration_should_hide_expired_items() {
616        let cache = create_cache(10);
617        let short_lived = Duration::from_millis(50);
618        let (key, value) = ("key".to_string(), "value".to_string());
619
620        cache.insert(
621            key.clone(),
622            value.clone(),
623            Some(Instant::now() + short_lived),
624        );
625        assert!(cache.get(&key).is_some());
626
627        std::thread::sleep(Duration::from_millis(100));
628
629        assert!(
630            cache.get(&key).is_none(),
631            "Expired item should not be accessible"
632        );
633    }
634
635    #[test]
636    fn test_clock_cache_concurrent_hammer_should_not_crash_or_hang() {
637        let cache = create_cache(64);
638        let num_threads = 8;
639        let ops_per_thread = 1000;
640
641        scope(|s| {
642            for t in 0..num_threads {
643                let cache = &cache;
644                s.spawn(move || {
645                    for i in 0..ops_per_thread {
646                        let key = (t * 100) + (i % 50); // Mix of private and shared keys
647                        cache.insert(key, i, None);
648                        let _ = cache.get(&key);
649                        if i % 10 == 0 {
650                            cache.remove(&key);
651                        }
652                    }
653                });
654            }
655        });
656    }
657
658    #[test]
659    fn test_clock_cache_should_preserve_hot_set() {
660        let capacity = 1000;
661        let cache = create_cache(capacity);
662
663        let num_threads = 8;
664        let ops_per_thread = 15_000;
665        let workload_generator = WorkloadGenerator::new(10000, 1.2);
666        let workload_statistics = WorkloadStatistics::new();
667
668        let mut rand = rng();
669
670        for _ in 0..capacity {
671            let key = workload_generator.key(&mut rand);
672            cache.insert(key.to_string(), random_string(), None);
673            workload_statistics.record(key.to_string());
674        }
675
676        scope(|scope| {
677            for _ in 0..num_threads {
678                scope.spawn(|| {
679                    let mut rand = rng();
680                    for _ in 0..ops_per_thread {
681                        let key = workload_generator.key(&mut rand);
682
683                        if cache.get(key).is_none() {
684                            let value = random_string_with_len(rand.random_range(100..255));
685                            cache.insert(key.to_string(), value, None);
686                            workload_statistics.record(key.to_string());
687                        }
688                    }
689                });
690            }
691        });
692
693        let count = workload_statistics
694            .frequent_keys(500)
695            .iter()
696            .fold(0, |acc, key| {
697                if cache.get(key).is_some() {
698                    acc + 1
699                } else {
700                    acc
701                }
702            });
703
704        assert!(count >= 250)
705    }
706}