1use crate::clock::WriteResult::{Rejected, Retry, Written};
2use crate::core::backoff::BackoffConfig;
3use crate::core::engine::CacheEngine;
4use crate::core::entry::Entry;
5use crate::core::entry_ref::Ref;
6use crate::core::key::Key;
7use crate::metrics::{Metrics, MetricsConfig, MetricsSnapshot};
8use SlotState::{Claimed, Cold, Hot, Vacant};
9use crossbeam::epoch::{Atomic, Guard, Owned, pin};
10use crossbeam_epoch::Shared;
11use dashmap::DashMap;
12use std::borrow::Borrow;
13use std::hash::Hash;
14use std::ptr::NonNull;
15use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
16use std::sync::atomic::{AtomicU8, AtomicUsize};
17use std::time::Instant;
18
19#[repr(u8)]
28#[derive(Copy, Clone, Debug, PartialEq, Eq)]
29enum SlotState {
30 Vacant = 0,
31 Cold = 1,
32 Hot = 2,
33 Claimed = 3,
34}
35
36impl From<SlotState> for u8 {
37 fn from(clock: SlotState) -> Self {
38 match clock {
39 Vacant => 0,
40 Cold => 1,
41 Hot => 2,
42 Claimed => 3,
43 }
44 }
45}
46
47impl From<u8> for SlotState {
48 fn from(clock: u8) -> Self {
49 match clock {
50 0 => Vacant,
51 1 => Cold,
52 2 => Hot,
53 3 => Claimed,
54 _ => unreachable!("only values 0-3 are supported"),
55 }
56 }
57}
58
59#[derive(Debug)]
60enum WriteResult<K, V>
61where
62 K: Eq + Hash,
63{
64 Written,
65 Retry(Entry<K, V>),
66 Rejected,
67}
68
69#[derive(Debug)]
73struct Slot<K, V>
74where
75 K: Eq + Hash,
76{
77 entry: Atomic<Entry<K, V>>,
79 state: AtomicU8,
81}
82
83impl<K, V> Slot<K, V>
84where
85 K: Eq + Hash,
86{
87 fn empty() -> Self {
89 Self {
90 entry: Atomic::default(),
91 state: AtomicU8::new(Vacant.into()),
92 }
93 }
94
95 fn get<Q>(&self, key: &Q, guard: Guard) -> Option<Ref<K, V>>
105 where
106 Key<K>: Borrow<Q>,
107 Q: Eq + Hash + ?Sized,
108 {
109 let state = self.state();
110
111 match state {
112 Vacant | Claimed => None,
113 Cold | Hot => {
114 let shared_entry = self.entry.load(Acquire, &guard);
115 if shared_entry.is_null() {
116 return None;
117 }
118
119 let entry = unsafe { shared_entry.deref() };
120
121 if entry.key().borrow() != key || entry.is_expired() {
122 return None;
123 }
124
125 if state == Cold {
126 self.upgrade();
127 }
128
129 Some(Ref::new(NonNull::from(entry), guard))
130 }
131 }
132 }
133
134 fn try_write<A, E, I>(
149 &self,
150 entry: Entry<K, V>,
151 guard: Guard,
152 admission: A,
153 on_evict: E,
154 on_insert: I,
155 ) -> WriteResult<K, V>
156 where
157 A: for<'a> Fn(&'a K, &'a K) -> bool,
158 E: for<'a> FnOnce(&'a Key<K>),
159 I: for<'a> FnOnce(&'a Key<K>),
160 {
161 let state = self.state();
162
163 if !self.is_writable(state, &guard) {
164 if state == Hot {
165 let _ = self.downgrade();
166 }
167
168 return Retry(entry);
169 }
170
171 if !self.claim(state) {
172 return Retry(entry);
173 }
174
175 let shared_current_entry = self.entry.load(Acquire, &guard);
176
177 if let Some(current_entry) = unsafe { shared_current_entry.as_ref() } {
178 if !(current_entry.is_expired() || admission(entry.key(), current_entry.key())) {
179 self.store_state(state);
180 return Rejected;
181 }
182
183 on_evict(current_entry.key());
184 unsafe { guard.defer_destroy(shared_current_entry) };
185 }
186
187 let key = entry.key().clone();
188
189 self.entry.store(Owned::new(entry), Relaxed);
190 on_insert(&key);
191 self.state.store(Cold.into(), Release);
192
193 Written
194 }
195
196 #[inline]
197 fn is_writable(&self, state: SlotState, guard: &Guard) -> bool {
198 let shared_entry = self.entry.load(Acquire, guard);
199
200 match unsafe { shared_entry.as_ref() } {
201 Some(entry) if entry.is_expired() => true,
202 None => true,
203 _ => match state {
204 Vacant | Cold => true,
205 Hot | Claimed => false,
206 },
207 }
208 }
209
210 #[inline]
220 fn claim(&self, state: SlotState) -> bool {
221 self.state
222 .compare_exchange_weak(state.into(), Claimed.into(), Relaxed, Relaxed)
223 .is_ok()
224 }
225
226 fn state(&self) -> SlotState {
235 self.state.load(Acquire).into()
236 }
237
238 fn store_state(&self, state: SlotState) {
240 self.state.store(state.into(), Release);
241 }
242
243 fn upgrade(&self) -> bool {
245 self.state
246 .compare_exchange_weak(Cold.into(), Hot.into(), Release, Relaxed)
247 .is_ok()
248 }
249
250 fn downgrade(&self) -> bool {
252 self.state
253 .compare_exchange_weak(Hot.into(), Cold.into(), Release, Relaxed)
254 .is_ok()
255 }
256}
257
258pub struct ClockCache<K, V>
263where
264 K: Eq + Hash,
265{
266 index: DashMap<Key<K>, usize>,
268 slots: Box<[Slot<K, V>]>,
270 hand: AtomicUsize,
272 capacity_mask: usize,
274 capacity: usize,
276 backoff_config: BackoffConfig,
277 metrics: Metrics,
278}
279
280impl<K, V> ClockCache<K, V>
281where
282 K: Eq + Hash,
283{
284 pub fn new(
286 capacity: usize,
287 backoff_config: BackoffConfig,
288 metrics_config: MetricsConfig,
289 ) -> Self {
290 let capacity = capacity.next_power_of_two();
291
292 let slots = (0..capacity)
293 .map(|_| Slot::empty())
294 .collect::<Vec<_>>()
295 .into_boxed_slice();
296
297 let capacity_mask = capacity - 1;
298
299 Self {
300 index: DashMap::new(),
301 slots,
302 hand: Default::default(),
303 capacity_mask,
304 capacity,
305 backoff_config,
306 metrics: Metrics::new(metrics_config),
307 }
308 }
309}
310
311impl<K, V> CacheEngine<K, V> for ClockCache<K, V>
312where
313 K: Eq + Hash,
314{
315 fn get<Q>(&self, key: &Q) -> Option<Ref<K, V>>
319 where
320 Key<K>: Borrow<Q>,
321 Q: Eq + Hash + ?Sized,
322 {
323 let called_at = Instant::now();
324 let guard = pin();
325
326 self.index
327 .get(key)
328 .and_then(|entry| {
329 let index = *entry.value();
330
331 match self.slots[index].get(key, guard) {
332 None => {
333 self.metrics.record_miss();
334 let elapsed = called_at.elapsed().as_millis() as u64;
335 self.metrics.record_latency(elapsed);
336 None
337 }
338 Some(reference) => {
339 self.metrics.record_hit();
340 let elapsed = called_at.elapsed().as_millis() as u64;
341 self.metrics.record_latency(elapsed);
342 Some(reference)
343 }
344 }
345 })
346 .or_else(|| {
347 self.metrics.record_miss();
348 let elapsed = called_at.elapsed().as_millis() as u64;
349 self.metrics.record_latency(elapsed);
350 None
351 })
352 }
353
354 fn insert_with<F>(&self, key: K, value: V, expired_at: Option<Instant>, admission: F)
355 where
356 F: Fn(&K, &K) -> bool,
357 {
358 let called_at = Instant::now();
359 let mut entry = Entry::new(key, value, expired_at);
360 let mut iter = SlotIter::new(self);
361 let mut backoff = self.backoff_config.build();
362
363 loop {
364 let (index, slot) = match self.index.get(entry.key()) {
365 None => iter.next().expect("cache has at least one slot"),
366 Some(reference) => {
367 let index = *reference.value();
368 let slot = &self.slots[index];
369 (index, slot)
370 }
371 };
372
373 let guard = pin();
374
375 match slot.try_write(
376 entry,
377 guard,
378 &admission,
379 |key| {
380 self.index.remove(key);
381 self.metrics.record_eviction();
382 },
383 |key| {
384 self.index.insert(key.clone(), index);
385 },
386 ) {
387 Written | Rejected => {
388 let elapsed = called_at.elapsed().as_millis() as u64;
389 self.metrics.record_latency(elapsed);
390 break;
391 }
392 Retry(passed_entry) => {
393 backoff.backoff();
394 entry = passed_entry
395 }
396 };
397 }
398 }
399
400 fn remove<Q>(&self, key: &Q) -> bool
401 where
402 Key<K>: Borrow<Q>,
403 Q: Eq + Hash + ?Sized,
404 {
405 self.index.remove(key).is_some()
406 }
407
408 #[inline]
409 fn capacity(&self) -> usize {
410 self.capacity
411 }
412
413 #[inline]
414 fn metrics(&self) -> MetricsSnapshot {
415 self.metrics.snapshot()
416 }
417}
418
419impl<K, V> Drop for ClockCache<K, V>
420where
421 K: Eq + Hash,
422{
423 fn drop(&mut self) {
424 let guard = pin();
425
426 for slot in &self.slots {
427 let shared_old = slot.entry.swap(Shared::null(), AcqRel, &guard);
428
429 if !shared_old.is_null() {
430 unsafe { guard.defer_destroy(shared_old) }
431 }
432 }
433
434 guard.flush();
435 }
436}
437
438struct SlotIter<'a, K, V>
442where
443 K: Eq + Hash,
444{
445 slots: &'a [Slot<K, V>],
446 hand: &'a AtomicUsize,
447 capacity_mask: usize,
448}
449
450impl<'a, K, V> SlotIter<'a, K, V>
451where
452 K: Eq + Hash,
453{
454 fn new(cache: &'a ClockCache<K, V>) -> Self {
455 Self {
456 slots: &cache.slots,
457 hand: &cache.hand,
458 capacity_mask: cache.capacity_mask,
459 }
460 }
461}
462
463impl<'a, K, V> Iterator for SlotIter<'a, K, V>
464where
465 K: Eq + Hash,
466{
467 type Item = (usize, &'a Slot<K, V>);
468
469 fn next(&mut self) -> Option<Self::Item> {
470 let mut current = self.hand.load(Acquire);
471
472 loop {
473 let next = (current + 1) & self.capacity_mask;
474
475 match self
476 .hand
477 .compare_exchange_weak(current, next, Release, Acquire)
478 {
479 Ok(_) => {
480 let slot = &self.slots[current];
481 let clock = slot.state();
482
483 if clock == Claimed {
484 current = next;
485 continue;
486 }
487
488 return Some((current, slot));
489 }
490 Err(value) => {
491 current = value;
492 }
493 }
494 }
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use crate::core::utils::{random_string, random_string_with_len};
502 use crate::core::workload::{WorkloadGenerator, WorkloadStatistics};
503 use rand::distr::{Alphanumeric, SampleString};
504 use rand::{RngExt, rng};
505 use std::thread::scope;
506 use std::time::{Duration, Instant};
507
508 #[inline(always)]
509 fn create_cache<K, V>(capacity: usize) -> ClockCache<K, V>
510 where
511 K: Eq + Hash,
512 {
513 ClockCache::new(
514 capacity,
515 BackoffConfig::exponential(1000),
516 MetricsConfig::default(),
517 )
518 }
519
520 #[inline(always)]
521 fn random_alphanumeric(len: usize) -> String {
522 Alphanumeric.sample_string(&mut rand::rng(), len)
523 }
524
525 #[test]
526 fn test_clock_cache_insert_should_retrieve_stored_value() {
527 let cache = create_cache(10);
528
529 let key = random_alphanumeric(32);
530 let value = random_alphanumeric(255);
531
532 cache.insert(key.clone(), value.clone(), None);
533
534 let entry = cache.get(&key).expect("must present");
535
536 assert_eq!(entry.key(), &key);
537 assert_eq!(entry.value(), &value);
538 }
539
540 #[test]
541 fn test_clock_cache_insert_should_overwrite_existing_key() {
542 let cache = create_cache(10);
543
544 let key = random_alphanumeric(32);
545 let value1 = random_alphanumeric(255);
546 let value2 = random_alphanumeric(255);
547
548 cache.insert(key.clone(), value1, None);
549 cache.insert(key.clone(), value2.clone(), None);
550
551 let entry = cache.get(&key).expect("must present");
552
553 assert_eq!(entry.key(), &key);
554 assert_eq!(entry.value(), &value2);
555 }
556
557 #[test]
558 fn test_clock_cache_remove_should_invalidate_entry() {
559 let cache = create_cache(100);
560
561 let key = random_alphanumeric(32);
562
563 cache.insert(key.clone(), random_alphanumeric(255), None);
564
565 assert!(cache.get(&key).is_some());
566
567 assert!(cache.remove(&key));
568
569 assert!(cache.get(&key).is_none());
570 }
571
572 #[test]
573 fn test_clock_cache_ghost_reference_safety_should_protect_memory() {
574 let cache = create_cache(2);
575
576 let key = random_alphanumeric(32);
577 let value = random_alphanumeric(255);
578
579 cache.insert(key.clone(), value.clone(), None);
580
581 let entry_ref = cache.get(&key).expect("key should present");
582
583 for _ in 0..10000 {
584 let (key, value) = (random_alphanumeric(32), random_alphanumeric(255));
585 cache.insert(key, value, None);
586 }
587
588 assert!(cache.get(&key).is_none());
589
590 assert_eq!(entry_ref.value(), &value);
591 }
592
593 #[test]
594 fn test_clock_cache_hot_entry_should_resist_eviction() {
595 let cache = create_cache(2);
596
597 cache.insert(1, random_alphanumeric(32), None);
598 cache.insert(2, random_alphanumeric(32), None);
599
600 let _ = cache.get(&1);
601
602 cache.insert(3, random_alphanumeric(32), None);
603
604 assert!(
605 cache.get(&1).is_some(),
606 "K1 should have been protected by Hot state"
607 );
608 assert!(
609 cache.get(&2).is_none(),
610 "K2 should have been the first choice for eviction"
611 );
612 }
613
614 #[test]
615 fn test_clock_cache_ttl_expiration_should_hide_expired_items() {
616 let cache = create_cache(10);
617 let short_lived = Duration::from_millis(50);
618 let (key, value) = ("key".to_string(), "value".to_string());
619
620 cache.insert(
621 key.clone(),
622 value.clone(),
623 Some(Instant::now() + short_lived),
624 );
625 assert!(cache.get(&key).is_some());
626
627 std::thread::sleep(Duration::from_millis(100));
628
629 assert!(
630 cache.get(&key).is_none(),
631 "Expired item should not be accessible"
632 );
633 }
634
635 #[test]
636 fn test_clock_cache_concurrent_hammer_should_not_crash_or_hang() {
637 let cache = create_cache(64);
638 let num_threads = 8;
639 let ops_per_thread = 1000;
640
641 scope(|s| {
642 for t in 0..num_threads {
643 let cache = &cache;
644 s.spawn(move || {
645 for i in 0..ops_per_thread {
646 let key = (t * 100) + (i % 50); cache.insert(key, i, None);
648 let _ = cache.get(&key);
649 if i % 10 == 0 {
650 cache.remove(&key);
651 }
652 }
653 });
654 }
655 });
656 }
657
658 #[test]
659 fn test_clock_cache_should_preserve_hot_set() {
660 let capacity = 1000;
661 let cache = create_cache(capacity);
662
663 let num_threads = 8;
664 let ops_per_thread = 15_000;
665 let workload_generator = WorkloadGenerator::new(10000, 1.2);
666 let workload_statistics = WorkloadStatistics::new();
667
668 let mut rand = rng();
669
670 for _ in 0..capacity {
671 let key = workload_generator.key(&mut rand);
672 cache.insert(key.to_string(), random_string(), None);
673 workload_statistics.record(key.to_string());
674 }
675
676 scope(|scope| {
677 for _ in 0..num_threads {
678 scope.spawn(|| {
679 let mut rand = rng();
680 for _ in 0..ops_per_thread {
681 let key = workload_generator.key(&mut rand);
682
683 if cache.get(key).is_none() {
684 let value = random_string_with_len(rand.random_range(100..255));
685 cache.insert(key.to_string(), value, None);
686 workload_statistics.record(key.to_string());
687 }
688 }
689 });
690 }
691 });
692
693 let count = workload_statistics
694 .frequent_keys(500)
695 .iter()
696 .fold(0, |acc, key| {
697 if cache.get(key).is_some() {
698 acc + 1
699 } else {
700 acc
701 }
702 });
703
704 assert!(count >= 250)
705 }
706}