1#![cfg_attr(not(feature = "std"), no_std)]
6#[cfg(not(feature = "std"))]
7extern crate alloc;
8
9pub mod arena;
11pub mod cache_padded;
12pub mod daemon;
13pub mod filters;
14pub mod lossy_queue;
15pub mod storage;
16pub mod unsafe_core;
17pub mod workers;
18pub mod static_cache;
19
20
21pub(crate) mod sync {
31 #[cfg(not(feature = "std"))]
32 use alloc::vec::Vec;
33 #[cfg(feature = "std")]
34 use std::vec::Vec;
35
36 #[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
37 pub use std::sync::Arc;
38
39 #[cfg(any(feature = "loom", loom))]
40 pub use loom::sync::Arc;
41
42 #[cfg(all(not(feature = "std"), not(any(feature = "loom", loom))))]
43 pub use alloc::sync::Arc;
44
45 #[cfg(not(any(feature = "loom", loom)))]
46 pub type ArcSlice<T> = Arc<[T]>;
47
48 #[cfg(any(feature = "loom", loom))]
49 pub type ArcSlice<T> = Arc<Vec<T>>;
50
51 #[cfg(not(any(feature = "loom", loom)))]
52 #[inline(always)]
53 pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
54 vec.into_boxed_slice().into()
55 }
56
57 #[cfg(any(feature = "loom", loom))]
58 #[inline(always)]
59 pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
60 Arc::new(vec)
61 }
62
63 pub mod atomic {
64 #[cfg(not(any(feature = "loom", loom)))]
65 pub use core::sync::atomic::{
66 AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
67 };
68
69 #[cfg(any(feature = "loom", loom))]
70 pub use loom::sync::atomic::{
71 AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
72 };
73 }
74
75 pub mod cell {
76 #[cfg(not(any(feature = "loom", loom)))]
77 pub struct UnsafeCell<T>(core::cell::UnsafeCell<T>);
78
79 #[cfg(not(any(feature = "loom", loom)))]
80 impl<T> UnsafeCell<T> {
81 #[inline(always)]
82 pub const fn new(data: T) -> Self {
83 Self(core::cell::UnsafeCell::new(data))
84 }
85
86 #[inline(always)]
87 pub fn get(&self) -> *mut T {
88 self.0.get()
89 }
90
91 #[inline(always)]
92 pub fn with<F, R>(&self, f: F) -> R
93 where
94 F: FnOnce(*const T) -> R,
95 {
96 f(self.0.get() as *const T)
97 }
98
99 #[inline(always)]
100 pub fn with_mut<F, R>(&self, f: F) -> R
101 where
102 F: FnOnce(*mut T) -> R,
103 {
104 f(self.0.get())
105 }
106 }
107
108 #[cfg(any(feature = "loom", loom))]
109 pub use loom::cell::UnsafeCell;
110 }
111}
112
113#[cfg(not(feature = "std"))]
115use alloc::vec::Vec;
116
117use crate::cache_padded::CachePadded;
118use crate::daemon::{Command, Daemon};
119use crate::lossy_queue::{LossyQueue, OneshotAck};
120use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
121use ahash::RandomState;
122use core::hash::{BuildHasher, Hash};
123use sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
124use sync::{Arc, ArcSlice, new_arc_slice};
125
126#[derive(Debug, Clone, Copy)]
129pub struct Config {
130 pub capacity: usize,
131 pub t1_slots: usize,
132 pub t2_slots: usize,
133 pub duration: u32,
135 pub threads: usize,
136 pub poll_us: u64,
141 pub flush_tick_threshold: u64,
150}
151
152impl Config {
153 pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
155 let raw_capacity = (ram_mb * 1024 * 1024) / 128;
157 let capacity = raw_capacity.next_power_of_two().max(256);
158
159 Self {
160 capacity,
161 t1_slots: 2048,
163 t2_slots: (capacity / 5).next_power_of_two().max(4096),
165 duration,
166 #[cfg(feature = "std")]
167 threads: std::thread::available_parallelism()
168 .map(|p| p.get())
169 .unwrap_or(16),
170 #[cfg(not(feature = "std"))]
171 threads: 8,
172 poll_us: 1_000,
173 flush_tick_threshold: 1,
174 }
175 }
176
177 pub fn new_expert(
179 capacity: usize,
180 t1_slots: usize,
181 t2_slots: usize,
182 duration: u32,
183 threads: usize,
184 ) -> Self {
185 assert!(capacity.is_power_of_two(), "Capacity MUST be a power of two");
187 assert!(t1_slots.is_power_of_two(), "T1 slots MUST be a power of two");
188 assert!(t2_slots.is_power_of_two(), "T2 slots MUST be a power of two");
189
190 assert!(
192 t1_slots <= 4096,
193 "T1 size exceeds L1 Cache physical limits! Max slots: 4096"
194 );
195
196 Self {
197 capacity,
198 t1_slots,
199 t2_slots,
200 duration,
201 threads,
202 poll_us: 1_000,
203 flush_tick_threshold: 1,
204 }
205 }
206
207 pub fn with_poll_us(mut self, poll_us: u64) -> Self {
209 self.poll_us = poll_us.clamp(1_000, 10_000);
210 self
211 }
212
213 pub fn with_flush_tick_threshold(mut self, ticks: u64) -> Self {
215 self.flush_tick_threshold = ticks.max(1);
216 self
217 }
218}
219
220#[cfg(any(feature = "loom", loom))]
226loom::lazy_static! {
227 pub static ref GLOBAL_EPOCH: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(1);
228}
229
230#[cfg(not(any(feature = "loom", loom)))]
231pub static GLOBAL_EPOCH: sync::atomic::AtomicUsize = sync::atomic::AtomicUsize::new(1);
232
233pub struct WorkerState {
236 pub local_epoch: CachePadded<AtomicUsize>,
237}
238
239impl WorkerState {
240 pub fn new() -> Self {
241 Self {
242 local_epoch: CachePadded::new(AtomicUsize::new(0)),
243 }
244 }
245}
246
247#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
254use std::sync::Mutex;
255
256#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
257struct IdAllocator {
258 free_list: Mutex<Vec<usize>>,
259 next_id: sync::atomic::AtomicUsize,
260}
261
262#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
263static ALLOCATOR: IdAllocator = IdAllocator {
264 free_list: Mutex::new(Vec::new()),
265 next_id: sync::atomic::AtomicUsize::new(0),
266};
267
268#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
269struct ThreadIdGuard {
270 id: usize,
271}
272
273#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
274impl Drop for ThreadIdGuard {
275 fn drop(&mut self) {
276 if let Ok(mut list) = ALLOCATOR.free_list.lock() {
277 list.push(self.id);
278 }
279 }
280}
281
282#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
283use core::cell::{Cell, RefCell};
284
285#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
286thread_local! {
287 static WORKER_ID: usize = {
288 let id = if let Ok(mut list) = ALLOCATOR.free_list.lock() {
289 list.pop().unwrap_or_else(|| ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed))
290 } else {
291 ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed)
292 };
293
294 GUARD.with(|g| {
295 *g.borrow_mut() = Some(ThreadIdGuard { id });
296 });
297 id
298 };
299
300 static GUARD: RefCell<Option<ThreadIdGuard>> = const { RefCell::new(None) };
301
302 static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
305
306 static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
309
310 static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
314}
315
316#[cfg(any(feature = "loom", loom))]
317loom::lazy_static! {
318 static ref NEXT_THREAD_ID: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(0);
319}
320
321#[cfg(any(feature = "loom", loom))]
322use core::cell::{Cell, RefCell};
323
324#[cfg(any(feature = "loom", loom))]
325loom::thread_local! {
326 static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
327
328 static HIT_BUF: RefCell<([usize; 64], usize)> = RefCell::new(([0; 64], 0));
331
332 static L1_FILTER: RefCell<(Box<[u8]>, usize)> = RefCell::new((vec![0u8; 4096].into_boxed_slice(), 0));
336
337 static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
341}
342
343pub struct DualCacheFF<K, V, S = RandomState> {
346 pub hasher: S,
347 pub t1: Arc<T1<K, V>>,
348 pub t2: Arc<T2<K, V>>,
349 pub cache: Arc<Cache<K, V>>,
350 pub cmd_tx: Arc<LossyQueue<Command<K, V>>>,
351 pub hit_tx: Arc<LossyQueue<[usize; 64]>>,
352 pub epoch: Arc<AtomicU32>,
353 pub worker_states: ArcSlice<WorkerState>,
355 pub miss_buffers: ArcSlice<WorkerSlot<K, V>>,
357 pub daemon_tick: Arc<AtomicU64>,
360 pub flush_tick_threshold: u64,
362 pub is_cold_start: Arc<sync::atomic::AtomicBool>,
364}
365
366impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
367 fn clone(&self) -> Self {
368 Self {
369 hasher: self.hasher.clone(),
370 t1: self.t1.clone(),
371 t2: self.t2.clone(),
372 cache: self.cache.clone(),
373 cmd_tx: self.cmd_tx.clone(),
374 hit_tx: self.hit_tx.clone(),
375 epoch: self.epoch.clone(),
376 worker_states: self.worker_states.clone(),
377 miss_buffers: self.miss_buffers.clone(),
378 daemon_tick: self.daemon_tick.clone(),
379 flush_tick_threshold: self.flush_tick_threshold,
380 is_cold_start: self.is_cold_start.clone(),
381 }
382 }
383}
384
385#[cfg(feature = "std")]
388impl<K, V> DualCacheFF<K, V, RandomState>
389where
390 K: Hash + Eq + Send + Sync + Clone + 'static,
391 V: Send + Sync + Clone + 'static,
392{
393 pub fn new(config: Config) -> Self {
397 let (cache, daemon) = Self::new_headless(config);
398 #[cfg(any(feature = "loom", loom))]
399 {
400 let _ = daemon;
401 }
402 #[cfg(not(any(feature = "loom", loom)))]
403 std::thread::spawn(move || daemon.run());
404 cache
405 }
406}
407
408impl<K, V> DualCacheFF<K, V, RandomState>
411where
412 K: Hash + Eq + Send + Sync + Clone + 'static,
413 V: Send + Sync + Clone + 'static,
414{
415 pub fn new_headless(config: Config) -> (Self, Daemon<K, V, RandomState>) {
428 let hasher = RandomState::new();
429 let t1 = Arc::new(T1::new(config.t1_slots));
430 let t2 = Arc::new(T2::new(config.t2_slots));
431 let cache = Arc::new(Cache::new(config.capacity));
432 let cmd_q: Arc<LossyQueue<Command<K, V>>> = Arc::new(LossyQueue::new(8192));
433 let hit_q: Arc<LossyQueue<[usize; 64]>> = Arc::new(LossyQueue::new(1024));
434 let epoch = Arc::new(AtomicU32::new(0));
435 let daemon_tick = Arc::new(AtomicU64::new(0));
436 let is_cold_start = Arc::new(sync::atomic::AtomicBool::new(true));
437
438 let mut buffers = Vec::with_capacity(config.threads);
439 let mut states = Vec::with_capacity(config.threads);
440 for _ in 0..config.threads {
441 buffers.push(WorkerSlot::new());
442 states.push(WorkerState::new());
443 }
444 let miss_buffers = new_arc_slice(buffers);
445 let worker_states = new_arc_slice(states);
446
447 let daemon = Daemon::new(
448 hasher.clone(),
449 config.capacity,
450 t1.clone(),
451 t2.clone(),
452 cache.clone(),
453 cmd_q.clone(),
454 hit_q.clone(),
455 epoch.clone(),
456 config.duration,
457 config.poll_us,
458 worker_states.clone(),
459 daemon_tick.clone(),
460 is_cold_start.clone(),
461 );
462
463 let this = Self {
464 hasher,
465 t1,
466 t2,
467 cache,
468 cmd_tx: cmd_q,
469 hit_tx: hit_q,
470 epoch,
471 worker_states,
472 miss_buffers,
473 daemon_tick,
474 flush_tick_threshold: config.flush_tick_threshold,
475 is_cold_start,
476 };
477
478 (this, daemon)
479 }
480}
481
482impl<K, V, S> DualCacheFF<K, V, S>
485where
486 K: Hash + Eq + Send + Sync + Clone + 'static,
487 V: Send + Sync + Clone + 'static,
488 S: BuildHasher + Clone + Send + 'static,
489{
490 pub fn sync(&self) {
494 #[cfg(feature = "std")]
496 HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
497 let mut state = buf.borrow_mut();
498 if state.1 > 0_usize {
499 let _ = self.hit_tx.try_send(state.0);
500 state.1 = 0;
501 }
502 });
503
504 #[cfg(feature = "std")]
506 for slot in self.miss_buffers.iter() {
507 let buf = unsafe { slot.get_mut_unchecked() };
508 if buf.len() > 0 {
509 let batch = buf.drain_to_vec();
510 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
511 }
512 }
513
514 let ack = OneshotAck::new();
516 self.cmd_tx.send_blocking(Command::Sync(ack.clone()));
517 ack.wait();
518 }
519
520 pub fn get(&self, key: &K) -> Option<V> {
525 let hash = self.hash(key);
526 let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
527
528 #[cfg(feature = "std")]
530 let mut id_opt = None;
531 #[cfg(feature = "std")]
532 {
533 let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
534 WORKER_ID.with(|&id| {
535 if id < self.worker_states.len() {
536 self.worker_states[id]
537 .local_epoch
538 .store(global_epoch, Ordering::Relaxed);
539 id_opt = Some(id);
540 }
541 });
542 }
543
544 #[cfg(feature = "std")]
545 let has_epoch = id_opt.is_some();
546 #[cfg(not(feature = "std"))]
547 let has_epoch = true;
548
549 let mut res: Option<V> = None;
550 let mut hit_g_idx: Option<u32> = None;
551
552 if has_epoch {
553 let ptr_t1: *mut crate::storage::Node<K, V> = self.t1.load_slot(hash);
555 if !ptr_t1.is_null() {
556 let node = unsafe { &*ptr_t1 };
557 if node.key == *key
558 && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
559 {
560 res = Some(node.value.clone());
561 hit_g_idx = Some(node.g_idx);
562 }
563 }
564
565 if res.is_none() {
567 let ptr_t2: *mut crate::storage::Node<K, V> = self.t2.load_slot(hash);
568 if !ptr_t2.is_null() {
569 let node = unsafe { &*ptr_t2 };
570 if node.key == *key
571 && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
572 {
573 res = Some(node.value.clone());
574 hit_g_idx = Some(node.g_idx);
575 }
576 }
577 }
578
579 if res.is_none() {
581 let tag = (hash >> 48) as u16;
582 if let Some(global_idx) = self.cache.index_probe(hash, tag) {
583 if let Some(v) = self
584 .cache
585 .node_get_full(global_idx, key, current_epoch_cache)
586 {
587 res = Some(v);
588 hit_g_idx = Some(global_idx as u32);
589 }
590 }
591 }
592 }
593
594 #[cfg(feature = "std")]
596 if let Some(id) = id_opt {
597 self.worker_states[id]
598 .local_epoch
599 .store(0, Ordering::Relaxed);
600 }
601
602 if let Some(g_idx) = hit_g_idx {
603 self.record_hit(g_idx as usize);
604 }
605
606 res
607 }
608
609 pub fn insert(&self, key: K, value: V) {
623 let hash = self.hash(&key);
624
625 #[cfg(feature = "std")]
627 {
628 let is_cold = self.is_cold_start.load(Ordering::Relaxed);
629 let mut bypass = is_cold;
630
631 if !bypass {
632 let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
635 let mut id_opt = None;
636 WORKER_ID.with(|&id| {
637 if id < self.worker_states.len() {
638 self.worker_states[id]
639 .local_epoch
640 .store(global_epoch, Ordering::Relaxed);
641 id_opt = Some(id);
642 }
643 });
644
645 if id_opt.is_some() {
646 let ptr_t1 = self.t1.load_slot(hash);
648 if !ptr_t1.is_null() {
649 let node = unsafe { &*ptr_t1 };
650 if node.key == key {
651 bypass = true;
652 }
653 }
654
655 if !bypass {
657 let ptr_t2 = self.t2.load_slot(hash);
658 if !ptr_t2.is_null() {
659 let node = unsafe { &*ptr_t2 };
660 if node.key == key {
661 bypass = true;
662 }
663 }
664 }
665
666 if !bypass {
668 let tag = (hash >> 48) as u16;
669 if let Some(global_idx) = self.cache.index_probe(hash, tag) {
670 let ptr = self.cache.nodes[global_idx].load(Ordering::Acquire);
671 if !ptr.is_null() {
672 let node = unsafe { &*ptr };
673 if node.key == key {
674 bypass = true;
675 }
676 }
677 }
678 }
679 }
680
681 if let Some(id) = id_opt {
683 self.worker_states[id]
684 .local_epoch
685 .store(0, Ordering::Relaxed);
686 }
687 }
688
689 let pass = if bypass {
690 true
691 } else {
692 #[cfg(any(feature = "loom", loom))]
694 {
695 L1_FILTER.with(|f| {
696 let mut state = f.borrow_mut();
697 let idx = (hash as usize) & 4095_usize;
698 let val = state.0[idx];
699
700 state.1 += 1;
701 if state.1 >= 4096_usize {
702 for x in state.0.iter_mut() {
703 *x >>= 1;
704 }
705 state.1 = 0;
706 }
707
708 if val < 1_u8 {
709 state.0[idx] = 1;
710 false
711 } else {
712 if val < 2_u8 {
713 state.0[idx] = 2;
714 }
715 true
716 }
717 })
718 }
719
720 #[cfg(not(any(feature = "loom", loom)))]
721 {
722 L1_FILTER.with(|f: &RefCell<([u8; 4096], usize)>| {
723 let mut state = f.borrow_mut();
724 let idx = (hash as usize) & 4095_usize;
725 let val = state.0[idx];
726
727 state.1 += 1;
728 if state.1 >= 4096_usize {
729 for x in state.0.iter_mut() {
730 *x >>= 1;
731 }
732 state.1 = 0;
733 }
734
735 if val < 1_u8 {
736 state.0[idx] = 1;
737 false
738 } else {
739 if val < 2_u8 {
740 state.0[idx] = 2;
741 }
742 true
743 }
744 })
745 }
746 };
747
748 if !pass {
749 return;
750 }
751
752 let current_tick = self.daemon_tick.load(Ordering::Relaxed);
754 let should_time_flush = LAST_FLUSH_TICK.with(|c: &Cell<u64>| {
755 current_tick.wrapping_sub(c.get()) >= self.flush_tick_threshold
756 });
757
758 WORKER_ID.with(|&id| {
760 if id >= self.miss_buffers.len() {
761 let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
763 return;
764 }
765
766 let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
768 let capacity_flush = buf.push((key, value, hash));
769
770 if capacity_flush || (should_time_flush && !buf.is_empty()) {
771 let batch = buf.drain_to_vec();
772 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
773 LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(current_tick));
774 }
775 });
776 }
777
778 #[cfg(not(feature = "std"))]
780 {
781 let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
782 }
783 }
784
785 pub fn remove(&self, key: &K) {
787 let hash = self.hash(key);
788
789 #[cfg(feature = "std")]
791 WORKER_ID.with(|&id| {
792 if id < self.miss_buffers.len() {
793 let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
794 if buf.len() > 0 {
795 let batch = buf.drain_to_vec();
796 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
797 let tick = self.daemon_tick.load(Ordering::Relaxed);
798 LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(tick));
799 }
800 }
801 });
802
803 self.cmd_tx.send_blocking(Command::Remove(key.clone(), hash));
804 }
805
806 pub fn clear(&self) {
808 let ack = OneshotAck::new();
809 self.cmd_tx.send_blocking(Command::Clear(ack.clone()));
810 ack.wait();
811 }
812
813 #[inline(always)]
816 fn hash(&self, key: &K) -> u64 {
817 self.hasher.hash_one(key)
818 }
819
820 #[inline(always)]
825 fn record_hit(&self, global_idx: usize) {
826 #[cfg(feature = "std")]
827 HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
828 let mut state = buf.borrow_mut();
829 let idx = state.1;
830 state.0[idx] = global_idx;
831 state.1 += 1;
832 if state.1 == 64_usize {
833 let _ = self.hit_tx.try_send(state.0);
834 state.1 = 0;
835 }
836 });
837
838 #[cfg(not(feature = "std"))]
841 {
842 let mut batch = [0usize; 64];
843 batch[0] = global_idx;
844 let _ = self.hit_tx.try_send(batch);
845 }
846 }
847}
848
849impl<K, V, S> Drop for DualCacheFF<K, V, S> {
850 fn drop(&mut self) {
851 if Arc::strong_count(&self.cmd_tx) <= 2 {
852 let _ = self.cmd_tx.try_send(Command::Shutdown);
853 }
854 }
855}
856