1use crate::core::backoff::{Backoff, BackoffConfig};
2use crate::core::cms::CountMinSketch;
3use crate::core::engine::CacheEngine;
4use crate::core::entry::Entry;
5use crate::core::entry_ref::Ref;
6use crate::core::index::IndexTable;
7use crate::core::key::Key;
8use crate::core::ring::RingQueue;
9use crate::core::utils;
10use crate::metrics::{Metrics, MetricsConfig, MetricsSnapshot};
11use crossbeam::utils::CachePadded;
12use crossbeam_epoch::{Atomic, Owned, pin};
13use crossbeam_epoch::{Guard, Shared};
14use std::borrow::Borrow;
15use std::hash::Hash;
16use std::ptr::NonNull;
17use std::sync::atomic::AtomicU64;
18use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
19use std::time::Instant;
20use utils::hash;
21
22#[derive(Debug, Clone, Copy, Eq, PartialEq)]
30pub struct Tag(u64);
31
32impl Tag {
33 const ID_SHIFT: u64 = 48;
34 const SIGNATURE_SHIFT: u64 = 16;
35 const SIGNATURE_MASK: u64 = 0xFFFF_FFFF << Self::SIGNATURE_SHIFT;
36 const BUSY_MASK: u64 = 1 << 15;
37 const FREQUENCY_MASK: u64 = 0xFF;
38
39 const SIGNATURE_C1: u64 = 0xff51afd7ed558ccd;
41 const SIGNATURE_C2: u64 = 0xc4ceb9fe1a85ec53;
42
43 #[inline]
46 fn make_signature(hash: u64) -> u32 {
47 let mut x = hash;
48 x ^= x >> 33;
49 x = x.wrapping_mul(Self::SIGNATURE_C1);
50 x ^= x >> 33;
51 x = x.wrapping_mul(Self::SIGNATURE_C2);
52 x ^= x >> 33;
53
54 (x as u32) | 1
57 }
58
59 #[inline]
60 fn id(self) -> u16 {
61 (self.0 >> Self::ID_SHIFT) as u16
62 }
63 #[inline]
64 fn signature(self) -> u32 {
65 (self.0 >> Self::SIGNATURE_SHIFT) as u32
66 }
67 #[inline]
68 fn is_busy(self) -> bool {
69 (self.0 & Self::BUSY_MASK) != 0
70 }
71 #[inline]
72 fn frequency(self) -> u8 {
73 (self.0 & Self::FREQUENCY_MASK) as u8
74 }
75
76 #[inline]
77 fn is_epoch_match(self, index: Index) -> bool {
78 self.id() == index.id()
79 }
80
81 #[inline]
82 fn is_hot(self) -> bool {
83 self.frequency() > 0
84 }
85
86 #[inline]
88 fn is_match(self, index: Index, hash: u64) -> bool {
89 self.id() == index.id() && !self.is_busy() && self.signature() == Self::make_signature(hash)
90 }
91
92 #[inline]
94 fn with_signature(self, hash: u64) -> Self {
95 let sig = Self::make_signature(hash);
96 Self((self.0 & !Self::SIGNATURE_MASK) | (sig as u64) << Self::SIGNATURE_SHIFT)
97 }
98
99 #[inline]
101 fn busy(self) -> Self {
102 Self(self.0 | Self::BUSY_MASK)
103 }
104
105 #[inline]
107 fn increment_frequency(self) -> Self {
108 let frequency = self.frequency();
109 if frequency < u8::MAX {
110 Self((self.0 & !Self::FREQUENCY_MASK) | (frequency + 1) as u64)
111 } else {
112 self
113 }
114 }
115
116 #[inline]
118 fn decrement_frequency(self) -> Self {
119 let frequency = self.frequency();
120 if frequency > 0 {
121 Self((self.0 & !Self::FREQUENCY_MASK) | (frequency - 1) as u64)
122 } else {
123 self
124 }
125 }
126
127 #[inline]
130 fn advance(self, index: Index) -> (Self, Index) {
131 let next_id = self.id().wrapping_add(1);
132 let new_tag = Tag((next_id as u64) << Self::ID_SHIFT);
133 let new_index = index.with_id(next_id);
134 (new_tag, new_index)
135 }
136
137 #[inline]
147 fn reset(&self) -> Tag {
148 Tag(self.0 & !Self::FREQUENCY_MASK)
149 }
150}
151
152impl From<u64> for Tag {
153 #[inline]
154 fn from(raw: u64) -> Self {
155 Self(raw)
156 }
157}
158
159impl From<Tag> for u64 {
160 #[inline]
161 fn from(tag: Tag) -> u64 {
162 tag.0
163 }
164}
165
166#[derive(Debug, Clone, Copy, Eq, PartialEq)]
172pub struct Index(u64);
173
174impl Index {
175 const ID_SHIFT: u64 = 48;
176 const INDEX_MASK: u64 = 0x0000_FFFF_FFFF_FFFF;
177
178 #[inline]
184 pub fn new(id: u16, slot_index: usize) -> Self {
185 let clean_slot = (slot_index as u64) & Self::INDEX_MASK;
186 Self(((id as u64) << Self::ID_SHIFT) | clean_slot)
187 }
188
189 #[inline]
191 pub fn id(self) -> u16 {
192 (self.0 >> Self::ID_SHIFT) as u16
193 }
194
195 #[inline]
197 pub fn slot_index(self) -> usize {
198 (self.0 & Self::INDEX_MASK) as usize
199 }
200
201 #[inline]
206 pub fn with_id(self, id: u16) -> Self {
207 let slot_part = self.0 & Self::INDEX_MASK;
208 Self(((id as u64) << Self::ID_SHIFT) | slot_part)
209 }
210}
211
212impl From<u64> for Index {
213 #[inline]
214 fn from(raw_index: u64) -> Self {
215 Index(raw_index)
216 }
217}
218
219impl From<Index> for u64 {
220 #[inline]
221 fn from(index: Index) -> Self {
222 index.0
223 }
224}
225
226pub struct Slot<K, V>
227where
228 K: Eq + Hash,
229{
230 entry: Atomic<Entry<K, V>>,
231 tag: AtomicU64,
232}
233
234impl<K, V> Slot<K, V>
235where
236 K: Eq + Hash,
237{
238 #[inline(always)]
239 fn new() -> Self {
240 Self {
241 entry: Atomic::null(),
242 tag: AtomicU64::default(),
243 }
244 }
245}
246
247impl<K, V> Default for Slot<K, V>
248where
249 K: Eq + Hash,
250{
251 #[inline(always)]
252 fn default() -> Self {
253 Slot::new()
254 }
255}
256
257pub struct S3FIFOCache<K, V>
270where
271 K: Eq + Hash,
272{
273 index_table: IndexTable<K>,
275 slots: Box<[CachePadded<Slot<K, V>>]>,
277 main_queue: RingQueue,
279 small_queue: RingQueue,
281 ghost_queue: RingQueue,
283 index_pool: RingQueue,
285 ghost_filter: CountMinSketch,
287 backoff_config: BackoffConfig,
289 metrics: Metrics,
291 capacity: usize,
293}
294
295impl<K, V> S3FIFOCache<K, V>
296where
297 K: Eq + Hash,
298{
299 #[inline]
305 pub fn new(
306 capacity: usize,
307 backoff_config: BackoffConfig,
308 metrics_config: MetricsConfig,
309 ) -> Self {
310 const GHOST_FILTER_DEPTH: usize = 4;
311
312 let small_queue_capacity = (capacity as f64 * 0.1) as usize;
313 let main_queue_capacity = capacity - small_queue_capacity;
314
315 let small_queue = RingQueue::new(small_queue_capacity, backoff_config);
316 let main_queue = RingQueue::new(main_queue_capacity, backoff_config);
317 let ghost_queue = RingQueue::new(capacity, backoff_config);
318
319 let index_pool = RingQueue::new(capacity, backoff_config);
320
321 for index in 0..capacity {
322 let _ = index_pool.push(index as u64);
323 }
324
325 let metrics = Metrics::new(metrics_config);
326
327 let slots = (0..capacity)
328 .map(|_| CachePadded::new(Slot::new()))
329 .collect::<Vec<_>>()
330 .into_boxed_slice();
331
332 Self {
333 index_table: IndexTable::new(),
334 slots,
335 main_queue,
336 small_queue,
337 ghost_queue,
338 index_pool,
339 ghost_filter: CountMinSketch::new(capacity, GHOST_FILTER_DEPTH),
340 backoff_config,
341 metrics,
342 capacity,
343 }
344 }
345
346 pub fn get<Q>(&self, key: &Q) -> Option<Ref<K, V>>
352 where
353 Key<K>: Borrow<Q>,
354 Q: Eq + Hash + ?Sized,
355 {
356 let called_at = Instant::now();
357 let hash = hash(key);
358 let guard = pin();
359 let mut backoff = self.backoff_config.build();
360
361 loop {
362 match self.index_table.get(key) {
363 Some(index) => {
364 let index = Index::from(index);
365
366 let slot = &self.slots[index.slot_index()];
367 let mut tag = Tag::from(slot.tag.load(Acquire));
368
369 if !tag.is_match(index, hash) {
370 let latency = called_at.elapsed().as_millis() as u64;
371 self.metrics.record_miss();
372 self.metrics.record_latency(latency);
373 return None;
374 }
375
376 let entry = slot.entry.load(Relaxed, &guard);
377
378 match unsafe { entry.as_ref() } {
379 None => {
380 let latency = called_at.elapsed().as_millis() as u64;
381 self.metrics.record_miss();
382 self.metrics.record_latency(latency);
383 break None;
384 }
385 Some(entry_ref) => {
386 if entry_ref.key().borrow() != key {
387 let latency = called_at.elapsed().as_millis() as u64;
388 self.metrics.record_miss();
389 self.metrics.record_latency(latency);
390 break None;
391 }
392
393 if entry_ref.is_expired() {
394 let latency = called_at.elapsed().as_millis() as u64;
395 self.metrics.record_miss();
396 self.metrics.record_latency(latency);
397 break None;
398 }
399
400 if let Err(latest) = slot.tag.compare_exchange_weak(
401 tag.into(),
402 tag.increment_frequency().into(),
403 Release,
404 Acquire,
405 ) {
406 tag = Tag::from(latest);
407 backoff.backoff();
408 continue;
409 }
410
411 break Some(Ref::new(NonNull::from_ref(entry_ref), guard));
412 }
413 }
414 }
415 None => {
416 self.metrics.record_miss();
417 self.metrics
418 .record_latency(called_at.elapsed().as_millis() as u64);
419 return None;
420 }
421 }
422 }
423 }
424
425 pub fn insert_with(
431 &self,
432 key: K,
433 value: V,
434 expired_at: Option<Instant>,
435 admission: impl Fn(&K, &K) -> bool,
436 ) {
437 let entry = Entry::new(key, value, expired_at);
438 let key = entry.key().clone();
439 let hash = hash(entry.key());
440 let guard = pin();
441 let mut backoff = self.backoff_config.build();
442
443 loop {
444 match self.index_table.get(&key).map(Index::from) {
445 Some(index) => {
446 let slot = &self.slots[index.slot_index()];
447
448 let tag = Tag::from(slot.tag.load(Acquire));
449
450 if !(tag.is_match(index, hash)
451 && slot
452 .tag
453 .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Relaxed)
454 .is_ok())
455 {
456 backoff.backoff();
457 continue;
458 }
459
460 let old_entry = slot.entry.swap(Owned::new(entry), Relaxed, &guard);
461 slot.tag.store(tag.increment_frequency().into(), Release);
462
463 unsafe { guard.defer_destroy(old_entry) };
464
465 break;
466 }
467 None => {
468 if self.ghost_filter.contains(&hash) {
469 self.push_into_main_queue(entry, admission, &guard, &mut backoff)
470 } else {
471 self.push_into_small_queue(entry, admission, &guard, &mut backoff)
472 }
473
474 break;
475 }
476 }
477 }
478 }
479
480 fn push_into_small_queue(
489 &self,
490 entry: Entry<K, V>,
491 admission: impl Fn(&K, &K) -> bool,
492 guard: &Guard,
493 backoff: &mut Backoff,
494 ) {
495 let index = match self.index_pool.pop() {
496 Some(index) => Index::from(index),
497 None => {
498 match self.evict_from_small_queue(|key| admission(entry.key(), key), guard, backoff)
499 {
500 Some(index) => index,
501 None => return,
502 }
503 }
504 };
505
506 let slot = &self.slots[index.slot_index()];
507
508 let tag = Tag::from(slot.tag.load(Acquire));
509
510 let entry = Owned::new(entry);
511 let key = entry.key().clone();
512
513 slot.entry.store(entry, Relaxed);
514
515 let tag = tag.with_signature(hash(key.as_ref()));
516
517 slot.tag.store(tag.into(), Release);
518
519 loop {
520 match self.small_queue.push(index.into()) {
521 Ok(_) => break,
522 Err(_) => {
523 if let Some(index) = self.evict_from_small_queue(|_| true, guard, backoff) {
524 self.index_pool
525 .push(index.into())
526 .expect("the index pool can't overflow");
527 } else {
528 backoff.backoff();
529 }
530 }
531 }
532 }
533
534 self.index_table.insert(key, index.into());
535 }
536
537 fn evict_from_small_queue(
547 &self,
548 allow_eviction: impl Fn(&K) -> bool,
549 guard: &Guard,
550 backoff: &mut Backoff,
551 ) -> Option<Index> {
552 while let Some(index) = self.small_queue.pop().map(Index::from) {
553 let slot = &self.slots[index.slot_index()];
554 let mut tag = Tag::from(slot.tag.load(Acquire));
555
556 loop {
557 if tag.is_busy() {
558 tag = Tag::from(slot.tag.load(Acquire));
559 backoff.backoff();
560 continue;
561 }
562
563 let entry = slot.entry.load(Relaxed, guard);
564
565 let entry_ref =
566 unsafe { entry.as_ref().expect("the occupied entry cannot be null") };
567
568 if tag.is_hot() && !entry_ref.is_expired() {
569 if let Err(latest) = slot.tag.compare_exchange_weak(
570 tag.into(),
571 tag.reset().into(),
572 Release,
573 Acquire,
574 ) {
575 tag = Tag::from(latest);
576 backoff.backoff();
577 continue;
578 }
579
580 self.promote_index(index, guard, backoff);
581 break;
582 }
583
584 if !(entry_ref.is_expired() || allow_eviction(entry_ref.key()))
585 && self.small_queue.push(index.into()).is_ok()
586 {
587 return None;
588 }
589
590 match slot
591 .tag
592 .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Acquire)
593 {
594 Ok(_) => {
595 let key = entry_ref.key().clone();
596 self.index_table.remove(key.as_ref());
597 slot.entry.store(Shared::null(), Relaxed);
598
599 let (tag, index) = tag.advance(index);
600 slot.tag.store(tag.into(), Release);
601
602 self.push_into_ghost_queue(key.as_ref(), backoff);
603
604 unsafe { guard.defer_destroy(entry) };
605
606 return Some(index);
607 }
608 Err(latest) => {
609 tag = Tag::from(latest);
610 backoff.backoff();
611 }
612 }
613 }
614 }
615
616 None
617 }
618
619 #[inline(always)]
633 fn push_into_ghost_queue(&self, key: &K, backoff: &mut Backoff) {
634 let hash = hash(key);
635 loop {
636 match self.ghost_queue.push(hash) {
637 Ok(_) => {
638 self.ghost_filter.increment(&hash);
639 break;
640 }
641 Err(_) => {
642 backoff.backoff();
643
644 if let Some(hash) = self.ghost_queue.pop() {
645 self.ghost_filter.decrement(&hash);
646 }
647 }
648 }
649 }
650 }
651
652 fn promote_index(&self, index: Index, guard: &Guard, backoff: &mut Backoff) {
663 loop {
664 if self.main_queue.push(index.into()).is_ok() {
665 break;
666 }
667
668 if let Some(index) = self.evict_from_main_queue(|_| true, guard, backoff) {
669 self.index_pool
670 .push(index.into())
671 .expect("the index pool can't overflow");
672 } else {
673 backoff.backoff();
674 }
675 }
676 }
677
678 fn push_into_main_queue(
703 &self,
704 entry: Entry<K, V>,
705 admission: impl Fn(&K, &K) -> bool,
706 guard: &Guard,
707 backoff: &mut Backoff,
708 ) {
709 let index = match self.index_pool.pop() {
710 Some(index) => Index::from(index),
711 None => {
712 match self.evict_from_main_queue(|key| admission(entry.key(), key), guard, backoff)
713 {
714 Some(index) => index,
715 None => return,
716 }
717 }
718 };
719
720 let slot = &self.slots[index.slot_index()];
721 let tag = Tag::from(slot.tag.load(Acquire));
722
723 let entry = Owned::new(entry);
724 let key = entry.key().clone();
725
726 slot.entry.store(entry, Relaxed);
727 let tag = tag.with_signature(hash(key.as_ref()));
728 slot.tag.store(tag.into(), Release);
729
730 loop {
731 match self.main_queue.push(index.into()) {
732 Ok(_) => break,
733 Err(_) => {
734 if let Some(index) = self.evict_from_main_queue(|_| true, guard, backoff) {
735 self.index_pool
736 .push(index.into())
737 .expect("the index pool can't overflow");
738 } else {
739 backoff.backoff();
740 }
741 }
742 }
743 }
744
745 self.index_table.insert(key, index.into());
746 }
747
748 fn evict_from_main_queue(
761 &self,
762 allow_eviction: impl Fn(&K) -> bool,
763 guard: &Guard,
764 backoff: &mut Backoff,
765 ) -> Option<Index> {
766 while let Some(index) = self.main_queue.pop().map(Index::from) {
767 let slot = &self.slots[index.slot_index()];
768 let mut tag = Tag::from(slot.tag.load(Acquire));
769
770 loop {
771 if tag.is_busy() {
772 tag = Tag::from(slot.tag.load(Acquire));
773 backoff.backoff();
774 continue;
775 }
776
777 let entry = slot.entry.load(Relaxed, guard);
778 let entry_ref = unsafe { entry.as_ref().expect("occupied entry can't be null") };
779
780 if tag.is_hot() && !entry_ref.is_expired() {
784 let updated_tag = tag.decrement_frequency();
785 if slot
786 .tag
787 .compare_exchange_weak(tag.into(), updated_tag.into(), Release, Acquire)
788 .is_ok()
789 {
790 if self.main_queue.push(index.into()).is_ok() {
792 break; }
794 tag = updated_tag;
795 } else {
796 tag = Tag::from(slot.tag.load(Acquire));
797 backoff.backoff();
798 continue;
799 }
800 }
801
802 if !(entry_ref.is_expired() || allow_eviction(entry_ref.key()))
806 && self.main_queue.push(index.into()).is_ok()
807 {
808 break;
809 }
810
811 match slot
814 .tag
815 .compare_exchange_weak(tag.into(), tag.busy().into(), AcqRel, Acquire)
816 {
817 Ok(_) => {
818 self.index_table.remove(entry_ref.key());
819 slot.entry.store(Shared::null(), Relaxed);
820
821 let (next_tag, next_index) = tag.advance(index);
822 slot.tag.store(next_tag.into(), Release);
823
824 unsafe { guard.defer_destroy(entry) };
825 return Some(next_index);
826 }
827 Err(latest) => {
828 tag = Tag::from(latest);
829 backoff.backoff();
830
831 if self.main_queue.push(index.into()).is_ok() {
832 break;
833 }
834 }
835 }
836 }
837 }
838
839 None
840 }
841
842 pub fn remove<Q>(&self, key: &Q) -> bool
843 where
844 Key<K>: Borrow<Q>,
845 Q: Eq + Hash + ?Sized,
846 {
847 let mut backoff = self.backoff_config.build();
848
849 match self.index_table.remove(key).map(Index::from) {
850 None => false,
851 Some(index) => {
852 let slot = &self.slots[index.slot_index()];
853
854 let mut tag = Tag::from(slot.tag.load(Relaxed));
855
856 loop {
857 if !tag.is_epoch_match(index) {
858 return false;
859 }
860
861 if let Err(latest) = slot.tag.compare_exchange_weak(
862 tag.into(),
863 tag.reset().into(),
864 Relaxed,
865 Relaxed,
866 ) {
867 tag = Tag::from(latest);
868 backoff.backoff();
869 continue;
870 }
871
872 return true;
873 }
874 }
875 }
876 }
877}
878
879impl<K, V> CacheEngine<K, V> for S3FIFOCache<K, V>
880where
881 K: Eq + Hash,
882{
883 fn get<Q>(&self, key: &Q) -> Option<Ref<K, V>>
884 where
885 Key<K>: Borrow<Q>,
886 Q: Eq + Hash + ?Sized,
887 {
888 self.get(key)
889 }
890
891 fn insert_with<A>(&self, key: K, value: V, expired_at: Option<Instant>, admission: A)
892 where
893 A: Fn(&K, &K) -> bool,
894 {
895 self.insert_with(key, value, expired_at, admission)
896 }
897
898 fn remove<Q>(&self, key: &Q) -> bool
899 where
900 Key<K>: Borrow<Q>,
901 Q: Eq + Hash + ?Sized,
902 {
903 self.remove(key)
904 }
905
906 fn capacity(&self) -> usize {
907 self.capacity
908 }
909
910 fn metrics(&self) -> MetricsSnapshot {
911 self.metrics.snapshot()
912 }
913}
914
915#[cfg(test)]
916mod tests {
917 use super::*;
918 use crate::core::utils::random_string;
919 use crate::core::workload::{WorkloadGenerator, WorkloadStatistics};
920 use rand::distr::{Alphanumeric, SampleString};
921 use rand::{RngExt, rng};
922 use std::thread::{scope, sleep};
923 use std::time::Duration;
924
925 #[inline(always)]
926 fn create_cache<K, V>(capacity: usize) -> S3FIFOCache<K, V>
927 where
928 K: Eq + Hash,
929 {
930 S3FIFOCache::new(
931 capacity,
932 BackoffConfig::exponential(1000),
933 MetricsConfig::default(),
934 )
935 }
936
937 #[inline(always)]
938 fn random_alphanumeric(len: usize) -> String {
939 Alphanumeric.sample_string(&mut rand::rng(), len)
940 }
941
942 #[test]
943 fn test_s3cache_insert_should_retrieve_stored_value() {
944 let cache = create_cache(10);
945
946 let key = random_alphanumeric(32);
947 let value = random_alphanumeric(255);
948
949 cache.insert(key.clone(), value.clone(), None);
950
951 let entry = cache.get(&key).expect("must present");
952
953 assert_eq!(entry.key(), &key);
954 assert_eq!(entry.value(), &value);
955 }
956
957 #[test]
958 fn test_s3cache_insert_should_overwrite_existing_key() {
959 let cache = create_cache(10);
960
961 let key = random_alphanumeric(32);
962 let value1 = random_alphanumeric(255);
963 let value2 = random_alphanumeric(255);
964
965 cache.insert(key.clone(), value1, None);
966 cache.insert(key.clone(), value2.clone(), None);
967
968 let entry = cache.get(&key).expect("must present");
969
970 assert_eq!(entry.key(), &key);
971 assert_eq!(entry.value(), &value2);
972 }
973
974 #[test]
975 fn test_s3cache_remove_should_invalidate_entry() {
976 let cache = create_cache(100);
977
978 let key = random_alphanumeric(32);
979
980 cache.insert(key.clone(), random_alphanumeric(255), None);
981
982 assert!(cache.get(&key).is_some());
983
984 assert!(cache.remove(&key));
985
986 assert!(cache.get(&key).is_none());
987 }
988
989 #[test]
990 fn test_s3cache_fill_beyond_capacity_should_evict_fifo() {
991 let cache = create_cache(100);
992
993 for _ in 0..1000 {
994 let key = random_alphanumeric(32);
995 let value = random_alphanumeric(255);
996
997 cache.insert(key, value, None);
998 }
999 }
1000
1001 #[test]
1002 fn test_s3cache_hot_entry_should_resist_eviction() {
1003 let cache = create_cache(1000);
1004
1005 let key = random_alphanumeric(32);
1006 let value = random_alphanumeric(255);
1007
1008 cache.insert(key.clone(), value.clone(), None);
1009
1010 let entry = cache.get(&key).expect("must present");
1011
1012 assert_eq!(entry.value(), &value);
1013
1014 for _ in 0..250 {
1015 let key = random_alphanumeric(32);
1016 let value = random_alphanumeric(255);
1017
1018 cache.insert(key, value, None);
1019 }
1020
1021 let entry = cache.get(&key).expect("must present");
1022
1023 assert_eq!(entry.key(), &key);
1024 assert_eq!(entry.value(), &value);
1025 }
1026
1027 #[test]
1028 fn test_s3cache_reinserted_ghost_entry_should_be_promoted_to_main() {
1029 let cache = create_cache(1000);
1030
1031 let (key, value) = (random_alphanumeric(32), random_alphanumeric(255));
1032
1033 cache.insert(key.clone(), value.clone().to_string(), None);
1034
1035 for _ in 0..200 {
1036 let key = random_alphanumeric(32);
1037 let value = random_alphanumeric(255);
1038
1039 cache.insert(key, value, None);
1040 }
1041
1042 assert!(cache.get(&key).is_none());
1043
1044 cache.insert(key.clone(), value.clone().to_string(), None);
1045
1046 for _ in 0..1000 {
1047 let key = random_alphanumeric(32);
1048 let value = random_alphanumeric(255);
1049
1050 cache.insert(key, value, None);
1051 }
1052
1053 let entry = cache.get(&key).expect("must present");
1054
1055 assert_eq!(entry.key(), &key);
1056 assert_eq!(entry.value(), &value);
1057 }
1058
1059 #[test]
1060 fn test_s3cache_ghost_filter_should_protect_working_set() {
1061 let cache = create_cache(1000);
1062 let hot_entries = vec![("a", "a"), ("b", "b"), ("c", "c"), ("d", "d"), ("e", "e")];
1063
1064 for &(key, value) in &hot_entries {
1065 cache.insert(key.to_string(), value.to_string(), None);
1066 }
1067
1068 for i in 0..100000 {
1069 if i % 2 == 0 {
1070 cache.insert(format!("key-{}", i), format!("value-{}", i), None);
1071 } else {
1072 let index = rng().random_range(..hot_entries.len());
1073 let key = hot_entries[index].0;
1074 let _ = cache.get(key);
1075 }
1076 }
1077
1078 let count = hot_entries
1079 .iter()
1080 .map(|&(key, _)| cache.get(key))
1081 .filter(|it| it.is_some())
1082 .count();
1083
1084 assert!(count >= 4);
1085 }
1086
1087 #[test]
1088 fn test_s3cache_concurrent_hammer_should_not_crash_or_hang() {
1089 let cache = create_cache(1000);
1090 let num_threads = 32;
1091 let ops_per_thread = 5000;
1092
1093 scope(|s| {
1094 for _ in 0..num_threads {
1095 s.spawn(|| {
1096 for i in 0..ops_per_thread {
1097 let key = (i % 500).to_string();
1098 if i % 2 == 0 {
1099 cache.insert(key, random_alphanumeric(255), None);
1100 } else {
1101 let _ = cache.get(&key);
1102 }
1103 }
1104 });
1105 }
1106 });
1107 }
1108
1109 #[test]
1110 fn test_s3_fifo_should_protect_hot_set_under_high_churn() {
1111 let capacity = 1000;
1112 let cache = create_cache(capacity);
1113
1114 let num_threads = 8;
1115 let ops_per_thread = 10000;
1116
1117 let workload_generator = WorkloadGenerator::new(20000, 1.3);
1118 let workload_statistics = WorkloadStatistics::new();
1119
1120 let mut rand = rng();
1121 for _ in 0..capacity {
1122 let key = workload_generator.key(&mut rand);
1123 cache.insert(key.to_string(), random_string(), None);
1124 workload_statistics.record(key.to_string());
1125 }
1126
1127 scope(|scope| {
1128 for _ in 0..num_threads {
1129 scope.spawn(|| {
1130 let mut thread_rng = rng();
1131 for _ in 0..ops_per_thread {
1132 let key = workload_generator.key(&mut thread_rng);
1133 workload_statistics.record(key.to_string());
1134
1135 if cache.get(key).is_none() {
1136 let value = random_string();
1137 cache.insert(key.to_string(), value, None);
1138 }
1139 }
1140 });
1141 }
1142 });
1143
1144 let top_keys_size = 500;
1145 let frequent_keys = workload_statistics.frequent_keys(top_keys_size);
1146
1147 let count = frequent_keys.iter().fold(0, |acc, key| {
1148 if cache.get(key).is_some() {
1149 acc + 1
1150 } else {
1151 acc
1152 }
1153 });
1154
1155 assert!(
1156 count >= 400,
1157 "S3-FIFO efficiency dropped! Captured only {}/{} hot keys",
1158 count,
1159 top_keys_size
1160 );
1161 }
1162
1163 #[test]
1164 fn test_s3cache_ttl_entry_should_expire() {
1165 let cache = create_cache(10);
1166 let key = random_string();
1167 let value = random_string();
1168
1169 cache.insert(
1170 key.clone(),
1171 value.clone(),
1172 Some(Instant::now() + Duration::from_millis(10)),
1173 );
1174
1175 assert!(cache.get(&key).is_some());
1176
1177 sleep(Duration::from_millis(50));
1178
1179 assert!(cache.get(&key).is_none(), "Entry should have expired");
1181 }
1182
1183 #[test]
1184 fn test_s3cache_ttl_entry_should_not_expire_early() {
1185 use std::time::Duration;
1186 let cache = create_cache(10);
1187 let key = "not-expire-me".to_string();
1188 let value = "value".to_string();
1189
1190 cache.insert(
1191 key.clone(),
1192 value.clone(),
1193 Some(Instant::now() + Duration::from_secs(10)),
1194 );
1195
1196 sleep(Duration::from_millis(10));
1197
1198 assert!(
1199 cache.get(&key).is_some(),
1200 "Entry should not have expired yet"
1201 );
1202 }
1203
1204 #[test]
1205 fn test_s3cache_ttl_overwrite_should_update_expiry() {
1206 let cache = create_cache(10);
1207 let key = random_string();
1208
1209 cache.insert(
1210 key.clone(),
1211 "val1".to_string(),
1212 Some(Instant::now() + Duration::from_millis(10)),
1213 );
1214
1215 cache.insert(
1216 key.clone(),
1217 "val2".to_string(),
1218 Some(Instant::now() + Duration::from_secs(10)),
1219 );
1220
1221 sleep(Duration::from_millis(50));
1222
1223 let entry = cache.get(&key);
1224 assert!(
1225 entry.is_some(),
1226 "Entry should still be present with new TTL"
1227 );
1228 assert_eq!(entry.unwrap().value(), "val2");
1229 }
1230
1231 #[test]
1232 fn test_s3cache_expired_entries_should_be_evicted() {
1233 let capacity = 10;
1234 let cache = create_cache(capacity);
1235
1236 for _ in 0..capacity {
1237 let key = random_string();
1238
1239 cache.insert(
1240 key.clone(),
1241 random_string(),
1242 Some(Instant::now() + Duration::from_millis(300)),
1243 );
1244
1245 let _ = cache.get(&key);
1246 }
1247
1248 sleep(Duration::from_millis(500));
1249
1250 let latest_key = random_string();
1251
1252 cache.insert(latest_key.to_string(), random_string(), None);
1253
1254 assert!(cache.get(&latest_key).is_some());
1255 }
1256}