1use crate::core::engine::CacheEngine;
2use crate::core::entry::Entry;
3use crate::core::entry_ref::Ref;
4use crate::core::index::IndexTable;
5use crate::core::key::Key;
6use crate::core::request_quota::RequestQuota;
7use crate::core::ring::RingQueue;
8use crate::core::tag::{Index, Tag};
9use crate::core::thread_context::ThreadContext;
10use crate::core::utils::hash;
11use crate::metrics::{Metrics, MetricsConfig, MetricsSnapshot};
12use crossbeam::epoch::{Atomic, pin};
13use crossbeam::utils::CachePadded;
14use crossbeam_epoch::{Owned, Shared};
15use std::borrow::Borrow;
16use std::hash::Hash;
17use std::ptr::NonNull;
18use std::sync::atomic::AtomicU64;
19use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
20use std::time::Instant;
21
22#[derive(Debug)]
23struct Slot<K, V>
24where
25 K: Eq + Hash,
26{
27 tag: AtomicU64,
28 entry: Atomic<Entry<K, V>>,
29}
30
31impl<K, V> Default for Slot<K, V>
32where
33 K: Eq + Hash,
34{
35 fn default() -> Self {
36 Self {
37 tag: AtomicU64::default(),
38 entry: Default::default(),
39 }
40 }
41}
42
43pub struct ClockCache<K, V>
58where
59 K: Eq + Hash,
60{
61 index_table: IndexTable<K>,
63 slots: Box<[CachePadded<Slot<K, V>>]>,
65 index_pool: RingQueue,
71 capacity: usize,
72 metrics: Metrics,
73}
74
75impl<K, V> ClockCache<K, V>
76where
77 K: Eq + Hash,
78{
79 pub fn new(capacity: usize, metrics_config: MetricsConfig) -> Self {
92 let slots = (0..capacity)
93 .map(|_| CachePadded::new(Slot::default()))
94 .collect::<Vec<_>>()
95 .into_boxed_slice();
96
97 let index_pool = RingQueue::new(capacity);
98
99 let context = ThreadContext::default();
100
101 for index in 0..capacity {
102 index_pool
103 .push(index as u64, &context)
104 .expect("index pool can't be overflowed");
105 }
106 Self {
107 index_table: IndexTable::new(),
108 slots,
109 index_pool,
110 capacity,
111 metrics: Metrics::new(metrics_config),
112 }
113 }
114
115 pub fn get<Q>(&self, key: &Q, context: &ThreadContext) -> Option<Ref<K, V>>
121 where
122 Key<K>: Borrow<Q>,
123 Q: Eq + Hash + ?Sized,
124 {
125 let started_at = Instant::now();
126 let guard = pin();
127 let hash = hash(key);
128
129 loop {
130 match self.index_table.get(key) {
131 Some(index) => {
132 let index = Index::from(index);
133
134 let slot = &self.slots[index.slot_index()];
135 let mut tag = Tag::from(slot.tag.load(Acquire));
136
137 if !tag.is_match(index, hash) {
138 let latency = started_at.elapsed().as_millis() as u64;
139 self.metrics.record_miss();
140 self.metrics.record_latency(latency);
141 return None;
142 }
143
144 let entry = slot.entry.load(Acquire, &guard);
145
146 match unsafe { entry.as_ref() } {
147 None => {
148 let latency = started_at.elapsed().as_millis() as u64;
149 self.metrics.record_miss();
150 self.metrics.record_latency(latency);
151 break None;
152 }
153 Some(entry_ref) => {
154 if entry_ref.key().borrow() != key || entry_ref.is_expired() {
155 let latency = started_at.elapsed().as_millis() as u64;
156 self.metrics.record_miss();
157 self.metrics.record_latency(latency);
158 break None;
159 }
160
161 match slot.tag.compare_exchange_weak(
162 tag.into(),
163 tag.increment_frequency().into(),
164 Release,
165 Acquire,
166 ) {
167 Ok(_) => {
168 context.decay();
169 }
170 Err(latest) => {
171 tag = Tag::from(latest);
172 context.wait();
173 continue;
174 }
175 }
176
177 self.metrics.record_hit();
178 self.metrics
179 .record_latency(started_at.elapsed().as_millis() as u64);
180
181 break Some(Ref::new(NonNull::from_ref(entry_ref), guard));
182 }
183 }
184 }
185 None => {
186 self.metrics.record_miss();
187 self.metrics
188 .record_latency(started_at.elapsed().as_millis() as u64);
189 return None;
190 }
191 }
192 }
193 }
194
195 pub fn insert(&self, entry: Entry<K, V>, context: &ThreadContext, quota: &mut RequestQuota) {
210 let started_at = Instant::now();
211 let guard = pin();
212 let hash = hash(entry.key());
213
214 while quota.consume() {
215 match self.index_table.get(entry.key()).map(Index::from) {
216 Some(index) => {
217 let slot = &self.slots[index.slot_index()];
218 let tag = Tag::from(slot.tag.load(Acquire));
219
220 if !tag.is_match(index, hash) {
221 context.wait();
222 continue;
223 }
224
225 match slot.tag.compare_exchange_weak(
226 tag.into(),
227 tag.busy().into(),
228 AcqRel,
229 Relaxed,
230 ) {
231 Ok(_) => {
232 context.decay();
233 }
234 Err(_) => {
235 context.wait();
236 continue;
237 }
238 }
239
240 let old_entry = slot.entry.load(Relaxed, &guard);
241
242 if let Some(old_entry_ref) = unsafe { old_entry.as_ref() } {
243 if old_entry_ref.key() != entry.key() {
244 context.wait();
245 continue;
246 }
247
248 unsafe { guard.defer_destroy(old_entry) };
249 }
250
251 slot.entry.store(Owned::new(entry), Relaxed);
252 slot.tag.store(tag.increment_frequency().into(), Release);
253
254 self.metrics
255 .record_latency(started_at.elapsed().as_millis() as u64);
256
257 return;
258 }
259 None => match self.index_pool.pop(context) {
260 Some(index) => {
261 let index = Index::from(index);
262
263 let slot = &self.slots[index.slot_index()];
264 let mut tag = Tag::from(slot.tag.load(Acquire));
265
266 loop {
267 if tag.is_hot() {
268 if let Err(latest) = slot.tag.compare_exchange_weak(
269 tag.into(),
270 tag.decrement_frequency().into(),
271 Release,
272 Acquire,
273 ) {
274 tag = Tag::from(latest);
275 context.wait();
276 continue;
277 }
278
279 self.index_pool
280 .push(index.into(), context)
281 .expect("index pool can't be overflowed");
282
283 context.decay();
284
285 break;
286 }
287
288 if let Err(latest) = slot.tag.compare_exchange_weak(
289 tag.into(),
290 tag.busy().into(),
291 AcqRel,
292 Acquire,
293 ) {
294 tag = Tag::from(latest);
295 context.wait();
296 continue;
297 }
298
299 let entry = Owned::new(entry);
300 let key = entry.key().clone();
301
302 let victim = slot.entry.swap(entry, Relaxed, &guard);
303
304 if let Some(victim_ref) = unsafe { victim.as_ref() } {
305 self.index_table.remove(victim_ref.key());
306 self.metrics.record_eviction();
307 unsafe { guard.defer_destroy(victim) };
308 }
309
310 let (tag, index) = tag.advance(index);
311 let tag = tag.with_signature(hash);
312
313 slot.tag.store(tag.into(), Release);
314
315 self.index_pool
316 .push(index.into(), context)
317 .expect("index pool can't be overflowed");
318
319 context.decay();
320
321 self.index_table.insert(key, index.into());
322
323 self.metrics
324 .record_latency(started_at.elapsed().as_millis() as u64);
325
326 return;
327 }
328 }
329 None => context.wait(),
330 },
331 }
332 }
333 }
334
335 pub fn remove<Q>(&self, key: &Q, context: &ThreadContext) -> bool
362 where
363 Key<K>: Borrow<Q>,
364 Q: Eq + Hash + ?Sized,
365 {
366 match self.index_table.remove(key) {
367 Some(index) => {
368 let index = Index::from(index);
369
370 let slot = &self.slots[index.slot_index()];
371 let mut tag = Tag::from(slot.tag.load(Acquire));
372
373 let hash = hash(key);
374
375 loop {
376 if !tag.is_match(index, hash) {
377 return true;
378 }
379
380 if let Err(latest) = slot.tag.compare_exchange_weak(
381 tag.into(),
382 tag.reset().into(),
383 Release,
384 Acquire,
385 ) {
386 tag = Tag::from(latest);
387 context.wait();
388 continue;
389 }
390
391 return true;
392 }
393 }
394 None => false,
395 }
396 }
397}
398
399impl<K, V> CacheEngine<K, V> for ClockCache<K, V>
400where
401 K: Eq + Hash,
402{
403 fn get<Q>(&self, key: &Q, context: &ThreadContext) -> Option<Ref<K, V>>
404 where
405 Key<K>: Borrow<Q>,
406 Q: Eq + Hash + ?Sized,
407 {
408 self.get(key, context)
409 }
410
411 fn insert(&self, entry: Entry<K, V>, context: &ThreadContext, quota: &mut RequestQuota) {
412 self.insert(entry, context, quota)
413 }
414
415 fn remove<Q>(&self, key: &Q, context: &ThreadContext) -> bool
416 where
417 Key<K>: Borrow<Q>,
418 Q: Eq + Hash + ?Sized,
419 {
420 self.remove(key, context)
421 }
422
423 #[inline]
424 fn capacity(&self) -> usize {
425 self.capacity
426 }
427
428 #[inline]
429 fn metrics(&self) -> MetricsSnapshot {
430 self.metrics.snapshot()
431 }
432}
433
434impl<K, V> Drop for ClockCache<K, V>
435where
436 K: Eq + Hash,
437{
438 fn drop(&mut self) {
439 let guard = pin();
440
441 for slot in &self.slots {
442 let shared_old = slot.entry.swap(Shared::null(), AcqRel, &guard);
443
444 if !shared_old.is_null() {
445 unsafe { guard.defer_destroy(shared_old) }
446 }
447 }
448
449 guard.flush();
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use crate::core::utils::random_string;
457 use crate::core::workload::{WorkloadGenerator, WorkloadStatistics};
458 use rand::rng;
459 use std::hash::Hash;
460 use std::sync::{Arc, Mutex};
461 use std::thread::scope;
462
463 #[inline(always)]
464 fn create_cache<K, V>(capacity: usize) -> ClockCache<K, V>
465 where
466 K: Eq + Hash,
467 {
468 ClockCache::new(capacity, MetricsConfig::default())
469 }
470
471 #[test]
472 fn test_clock_cache_insert_should_retrieve_stored_value() {
473 let cache = create_cache(10);
474 let context = ThreadContext::default();
475
476 let key = random_string();
477 let value = random_string();
478
479 cache.insert(
480 Entry::new(key.clone(), value.clone()),
481 &context,
482 &mut RequestQuota::default(),
483 );
484
485 let entry = cache.get(&key, &context).expect("must present");
486
487 assert_eq!(entry.key(), &key);
488 assert_eq!(entry.value(), &value);
489 }
490
491 #[test]
492 fn test_clock_cache_insert_should_overwrite_existing_key() {
493 let cache = create_cache(10);
494 let context = ThreadContext::default();
495
496 let key = random_string();
497 let value1 = random_string();
498 let value2 = random_string();
499
500 cache.insert(
501 Entry::new(key.clone(), value1),
502 &context,
503 &mut RequestQuota::default(),
504 );
505 cache.insert(
506 Entry::new(key.clone(), value2.clone()),
507 &context,
508 &mut RequestQuota::default(),
509 );
510
511 let entry = cache.get(&key, &context).expect("entry must present");
512
513 assert_eq!(entry.key(), &key);
514 assert_eq!(entry.value(), &value2);
515 }
516
517 #[test]
518 fn test_clock_cache_remove_should_invalidate_entry() {
519 let cache = create_cache(100);
520 let context = ThreadContext::default();
521
522 let key = random_string();
523 let value = random_string();
524
525 cache.insert(
526 Entry::new(key.clone(), value),
527 &context,
528 &mut RequestQuota::default(),
529 );
530
531 assert!(cache.get(&key, &context).is_some());
532
533 assert!(cache.remove(&key, &context));
534
535 assert!(cache.get(&key, &context).is_none());
536 }
537
538 #[test]
539 fn test_clock_cache_ghost_reference_safety_should_protect_memory() {
540 let cache = create_cache(2);
541 let context = ThreadContext::default();
542
543 let key = random_string();
544 let value = random_string();
545
546 cache.insert(
547 Entry::new(key.clone(), value.clone()),
548 &context,
549 &mut RequestQuota::default(),
550 );
551
552 let entry_ref = cache.get(&key, &context).expect("key should present");
553
554 for _ in 0..10000 {
555 let (key, value) = (random_string(), random_string());
556 cache.insert(
557 Entry::new(key.clone(), value),
558 &context,
559 &mut RequestQuota::default(),
560 );
561 }
562
563 assert!(cache.get(&key, &context).is_none());
564
565 assert_eq!(entry_ref.value(), &value);
566 }
567
568 #[test]
569 fn test_clock_cache_hot_entry_should_resist_eviction() {
570 let cache = create_cache(2);
571 let context = ThreadContext::default();
572
573 cache.insert(
574 Entry::new(1, random_string()),
575 &context,
576 &mut RequestQuota::default(),
577 );
578 cache.insert(
579 Entry::new(2, random_string()),
580 &context,
581 &mut RequestQuota::default(),
582 );
583
584 let _ = cache.get(&1, &context);
585
586 cache.insert(
587 Entry::new(3, random_string()),
588 &context,
589 &mut RequestQuota::default(),
590 );
591
592 assert!(
593 cache.get(&1, &context).is_some(),
594 "K1 should have been protected by Hot state"
595 );
596 assert!(
597 cache.get(&2, &context).is_none(),
598 "K2 should have been the first choice for eviction"
599 );
600 }
601
602 #[test]
603 fn test_clock_cache_ttl_expiration_should_hide_expired_items() {
604 let cache = create_cache(10);
605 let context = ThreadContext::default();
606 let key = random_string();
607 let value = random_string();
608
609 let expired = Arc::new(Mutex::new(false));
610
611 let is_expired = {
612 let expired = expired.clone();
613 move || *expired.lock().unwrap()
614 };
615
616 cache.insert(
617 Entry::with_custom_expiration(key.clone(), value.clone(), is_expired),
618 &context,
619 &mut RequestQuota::default(),
620 );
621
622 assert!(cache.get(&key, &context).is_some());
623
624 *expired.lock().unwrap() = true;
625
626 assert!(cache.get(&key, &context).is_none());
627 }
628
629 #[test]
630 fn test_clock_cache_concurrent_hammer_should_not_crash_or_hang() {
631 let cache = create_cache(1024);
632 let num_threads = 16;
633 let ops_per_thread = 10000;
634
635 scope(|s| {
636 for thread_id in 0..num_threads {
637 let cache = &cache;
638 s.spawn(move || {
639 let context = ThreadContext::default();
640 for i in 0..ops_per_thread {
641 let key = (thread_id * 100) + (i % 50);
642 let value = random_string();
643 cache.insert(
644 Entry::new(key, value),
645 &context,
646 &mut RequestQuota::default(),
647 );
648 let _ = cache.get(&key, &context);
649 if i % 10 == 0 {
650 cache.remove(&key, &context);
651 }
652 }
653 });
654 }
655 });
656 }
657
658 #[test]
659 fn test_clock_cache_should_preserve_hot_set() {
660 let capacity = 1024;
661 let cache = create_cache(capacity);
662 let context = ThreadContext::default();
663
664 let num_threads = 16;
665 let ops_per_thread = 15_000;
666 let workload_generator = WorkloadGenerator::new(10000, 1.2);
667 let workload_statistics = WorkloadStatistics::new();
668
669 let mut rand = rng();
670
671 for _ in 0..capacity {
672 let key = workload_generator.key(&mut rand);
673 cache.insert(
674 Entry::new(key.clone(), "value"),
675 &context,
676 &mut RequestQuota::default(),
677 );
678 workload_statistics.record(key);
679 }
680
681 scope(|scope| {
682 for _ in 0..num_threads {
683 scope.spawn(|| {
684 let mut rand = rng();
685 let context = ThreadContext::default();
686 for _ in 0..ops_per_thread {
687 let key = workload_generator.key(&mut rand);
688
689 if cache.get(&key, &context).is_none() {
690 cache.insert(
691 Entry::new(key.clone(), "value"),
692 &context,
693 &mut RequestQuota::default(),
694 );
695 workload_statistics.record(key);
696 }
697 }
698 });
699 }
700 });
701
702 let count = workload_statistics
703 .frequent_keys(500)
704 .iter()
705 .fold(0, |acc, key| {
706 if cache.get(key, &context).is_some() {
707 acc + 1
708 } else {
709 acc
710 }
711 });
712
713 assert!(count >= 200)
714 }
715}