1#![cfg_attr(not(feature = "std"), no_std)]
6#[cfg(not(feature = "std"))]
7extern crate alloc;
8
9pub mod arena;
11pub mod cache_padded;
12pub mod daemon;
13pub mod filters;
14pub mod lossy_queue;
15pub mod storage;
16pub mod unsafe_core;
17pub mod workers;
18
19pub(crate) mod sync {
29 #[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
30 pub use std::sync::Arc;
31
32 #[cfg(any(feature = "loom", loom))]
33 pub use loom::sync::Arc;
34
35 #[cfg(all(not(feature = "std"), not(any(feature = "loom", loom))))]
36 pub use alloc::sync::Arc;
37
38 #[cfg(not(any(feature = "loom", loom)))]
39 pub type ArcSlice<T> = Arc<[T]>;
40
41 #[cfg(any(feature = "loom", loom))]
42 pub type ArcSlice<T> = Arc<Vec<T>>;
43
44 #[cfg(not(any(feature = "loom", loom)))]
45 #[inline(always)]
46 pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
47 vec.into_boxed_slice().into()
48 }
49
50 #[cfg(any(feature = "loom", loom))]
51 #[inline(always)]
52 pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
53 Arc::new(vec)
54 }
55
56 pub mod atomic {
57 #[cfg(not(any(feature = "loom", loom)))]
58 pub use core::sync::atomic::{
59 AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
60 };
61
62 #[cfg(any(feature = "loom", loom))]
63 pub use loom::sync::atomic::{
64 AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
65 };
66 }
67
68 pub mod cell {
69 #[cfg(not(any(feature = "loom", loom)))]
70 pub struct UnsafeCell<T>(core::cell::UnsafeCell<T>);
71
72 #[cfg(not(any(feature = "loom", loom)))]
73 impl<T> UnsafeCell<T> {
74 #[inline(always)]
75 pub const fn new(data: T) -> Self {
76 Self(core::cell::UnsafeCell::new(data))
77 }
78
79 #[inline(always)]
80 pub fn get(&self) -> *mut T {
81 self.0.get()
82 }
83
84 #[inline(always)]
85 pub fn with<F, R>(&self, f: F) -> R
86 where
87 F: FnOnce(*const T) -> R,
88 {
89 f(self.0.get() as *const T)
90 }
91
92 #[inline(always)]
93 pub fn with_mut<F, R>(&self, f: F) -> R
94 where
95 F: FnOnce(*mut T) -> R,
96 {
97 f(self.0.get())
98 }
99 }
100
101 #[cfg(any(feature = "loom", loom))]
102 pub use loom::cell::UnsafeCell;
103 }
104}
105
106#[cfg(not(feature = "std"))]
108use alloc::vec::Vec;
109
110use crate::cache_padded::CachePadded;
111use crate::daemon::{Command, Daemon};
112use crate::lossy_queue::{LossyQueue, OneshotAck};
113use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
114use ahash::RandomState;
115use core::hash::{BuildHasher, Hash};
116use sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
117use sync::{Arc, ArcSlice, new_arc_slice};
118
119pub struct Config {
122 pub capacity: usize,
123 pub t1_slots: usize,
124 pub t2_slots: usize,
125 pub duration: u32,
127 pub threads: usize,
128 pub poll_us: u64,
133 pub flush_tick_threshold: u64,
142}
143
144impl Config {
145 pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
147 let raw_capacity = (ram_mb * 1024 * 1024) / 128;
149 let capacity = raw_capacity.next_power_of_two().max(256);
150
151 Self {
152 capacity,
153 t1_slots: 2048,
155 t2_slots: (capacity / 5).next_power_of_two().max(4096),
157 duration,
158 #[cfg(feature = "std")]
159 threads: std::thread::available_parallelism()
160 .map(|p| p.get())
161 .unwrap_or(16),
162 #[cfg(not(feature = "std"))]
163 threads: 8,
164 poll_us: 1_000,
165 flush_tick_threshold: 1,
166 }
167 }
168
169 pub fn new_expert(
171 capacity: usize,
172 t1_slots: usize,
173 t2_slots: usize,
174 duration: u32,
175 threads: usize,
176 ) -> Self {
177 assert!(capacity.is_power_of_two(), "Capacity MUST be a power of two");
179 assert!(t1_slots.is_power_of_two(), "T1 slots MUST be a power of two");
180 assert!(t2_slots.is_power_of_two(), "T2 slots MUST be a power of two");
181
182 assert!(
184 t1_slots <= 4096,
185 "T1 size exceeds L1 Cache physical limits! Max slots: 4096"
186 );
187
188 Self {
189 capacity,
190 t1_slots,
191 t2_slots,
192 duration,
193 threads,
194 poll_us: 1_000,
195 flush_tick_threshold: 1,
196 }
197 }
198
199 pub fn with_poll_us(mut self, poll_us: u64) -> Self {
201 self.poll_us = poll_us.clamp(1_000, 10_000);
202 self
203 }
204
205 pub fn with_flush_tick_threshold(mut self, ticks: u64) -> Self {
207 self.flush_tick_threshold = ticks.max(1);
208 self
209 }
210}
211
212#[cfg(any(feature = "loom", loom))]
218loom::lazy_static! {
219 pub static ref GLOBAL_EPOCH: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(1);
220}
221
222#[cfg(not(any(feature = "loom", loom)))]
223pub static GLOBAL_EPOCH: sync::atomic::AtomicUsize = sync::atomic::AtomicUsize::new(1);
224
225pub struct WorkerState {
228 pub local_epoch: CachePadded<AtomicUsize>,
229}
230
231impl WorkerState {
232 pub fn new() -> Self {
233 Self {
234 local_epoch: CachePadded::new(AtomicUsize::new(0)),
235 }
236 }
237}
238
239#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
246use std::sync::Mutex;
247
248#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
249struct IdAllocator {
250 free_list: Mutex<Vec<usize>>,
251 next_id: sync::atomic::AtomicUsize,
252}
253
254#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
255static ALLOCATOR: IdAllocator = IdAllocator {
256 free_list: Mutex::new(Vec::new()),
257 next_id: sync::atomic::AtomicUsize::new(0),
258};
259
260#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
261struct ThreadIdGuard {
262 id: usize,
263}
264
265#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
266impl Drop for ThreadIdGuard {
267 fn drop(&mut self) {
268 if let Ok(mut list) = ALLOCATOR.free_list.lock() {
269 list.push(self.id);
270 }
271 }
272}
273
274#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
275use core::cell::{Cell, RefCell};
276
277#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
278thread_local! {
279 static WORKER_ID: usize = {
280 let id = if let Ok(mut list) = ALLOCATOR.free_list.lock() {
281 list.pop().unwrap_or_else(|| ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed))
282 } else {
283 ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed)
284 };
285
286 GUARD.with(|g| {
287 *g.borrow_mut() = Some(ThreadIdGuard { id });
288 });
289 id
290 };
291
292 static GUARD: RefCell<Option<ThreadIdGuard>> = const { RefCell::new(None) };
293
294 static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
297
298 static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
301
302 static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
306}
307
308#[cfg(any(feature = "loom", loom))]
309loom::lazy_static! {
310 static ref NEXT_THREAD_ID: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(0);
311}
312
313#[cfg(any(feature = "loom", loom))]
314use core::cell::{Cell, RefCell};
315
316#[cfg(any(feature = "loom", loom))]
317loom::thread_local! {
318 static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
319
320 static HIT_BUF: RefCell<([usize; 64], usize)> = RefCell::new(([0; 64], 0));
323
324 static L1_FILTER: RefCell<(Box<[u8]>, usize)> = RefCell::new((vec![0u8; 4096].into_boxed_slice(), 0));
328
329 static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
333}
334
335pub struct DualCacheFF<K, V, S = RandomState> {
338 pub hasher: S,
339 pub t1: Arc<T1<K, V>>,
340 pub t2: Arc<T2<K, V>>,
341 pub cache: Arc<Cache<K, V>>,
342 pub cmd_tx: Arc<LossyQueue<Command<K, V>>>,
343 pub hit_tx: Arc<LossyQueue<[usize; 64]>>,
344 pub epoch: Arc<AtomicU32>,
345 pub worker_states: ArcSlice<WorkerState>,
347 pub miss_buffers: ArcSlice<WorkerSlot<K, V>>,
349 pub daemon_tick: Arc<AtomicU64>,
352 pub flush_tick_threshold: u64,
354 pub is_cold_start: Arc<sync::atomic::AtomicBool>,
356}
357
358impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
359 fn clone(&self) -> Self {
360 Self {
361 hasher: self.hasher.clone(),
362 t1: self.t1.clone(),
363 t2: self.t2.clone(),
364 cache: self.cache.clone(),
365 cmd_tx: self.cmd_tx.clone(),
366 hit_tx: self.hit_tx.clone(),
367 epoch: self.epoch.clone(),
368 worker_states: self.worker_states.clone(),
369 miss_buffers: self.miss_buffers.clone(),
370 daemon_tick: self.daemon_tick.clone(),
371 flush_tick_threshold: self.flush_tick_threshold,
372 is_cold_start: self.is_cold_start.clone(),
373 }
374 }
375}
376
377#[cfg(feature = "std")]
380impl<K, V> DualCacheFF<K, V, RandomState>
381where
382 K: Hash + Eq + Send + Sync + Clone + 'static,
383 V: Send + Sync + Clone + 'static,
384{
385 pub fn new(config: Config) -> Self {
389 let (cache, daemon) = Self::new_headless(config);
390 #[cfg(any(feature = "loom", loom))]
391 {
392 let _ = daemon;
393 }
394 #[cfg(not(any(feature = "loom", loom)))]
395 std::thread::spawn(move || daemon.run());
396 cache
397 }
398}
399
400impl<K, V> DualCacheFF<K, V, RandomState>
403where
404 K: Hash + Eq + Send + Sync + Clone + 'static,
405 V: Send + Sync + Clone + 'static,
406{
407 pub fn new_headless(config: Config) -> (Self, Daemon<K, V, RandomState>) {
420 let hasher = RandomState::new();
421 let t1 = Arc::new(T1::new(config.t1_slots));
422 let t2 = Arc::new(T2::new(config.t2_slots));
423 let cache = Arc::new(Cache::new(config.capacity));
424 let cmd_q: Arc<LossyQueue<Command<K, V>>> = Arc::new(LossyQueue::new(8192));
425 let hit_q: Arc<LossyQueue<[usize; 64]>> = Arc::new(LossyQueue::new(1024));
426 let epoch = Arc::new(AtomicU32::new(0));
427 let daemon_tick = Arc::new(AtomicU64::new(0));
428 let is_cold_start = Arc::new(sync::atomic::AtomicBool::new(true));
429
430 let mut buffers = Vec::with_capacity(config.threads);
431 let mut states = Vec::with_capacity(config.threads);
432 for _ in 0..config.threads {
433 buffers.push(WorkerSlot::new());
434 states.push(WorkerState::new());
435 }
436 let miss_buffers = new_arc_slice(buffers);
437 let worker_states = new_arc_slice(states);
438
439 let daemon = Daemon::new(
440 hasher.clone(),
441 config.capacity,
442 t1.clone(),
443 t2.clone(),
444 cache.clone(),
445 cmd_q.clone(),
446 hit_q.clone(),
447 epoch.clone(),
448 config.duration,
449 config.poll_us,
450 worker_states.clone(),
451 daemon_tick.clone(),
452 is_cold_start.clone(),
453 );
454
455 let this = Self {
456 hasher,
457 t1,
458 t2,
459 cache,
460 cmd_tx: cmd_q,
461 hit_tx: hit_q,
462 epoch,
463 worker_states,
464 miss_buffers,
465 daemon_tick,
466 flush_tick_threshold: config.flush_tick_threshold,
467 is_cold_start,
468 };
469
470 (this, daemon)
471 }
472}
473
474impl<K, V, S> DualCacheFF<K, V, S>
477where
478 K: Hash + Eq + Send + Sync + Clone + 'static,
479 V: Send + Sync + Clone + 'static,
480 S: BuildHasher + Clone + Send + 'static,
481{
482 pub fn sync(&self) {
486 #[cfg(feature = "std")]
488 HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
489 let mut state = buf.borrow_mut();
490 if state.1 > 0_usize {
491 let _ = self.hit_tx.try_send(state.0);
492 state.1 = 0;
493 }
494 });
495
496 #[cfg(feature = "std")]
498 for slot in self.miss_buffers.iter() {
499 let buf = unsafe { slot.get_mut_unchecked() };
500 if buf.len() > 0 {
501 let batch = buf.drain_to_vec();
502 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
503 }
504 }
505
506 let ack = OneshotAck::new();
508 self.cmd_tx.send_blocking(Command::Sync(ack.clone()));
509 ack.wait();
510 }
511
512 pub fn get(&self, key: &K) -> Option<V> {
517 let hash = self.hash(key);
518 let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
519
520 #[cfg(feature = "std")]
522 let mut id_opt = None;
523 #[cfg(feature = "std")]
524 {
525 let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
526 WORKER_ID.with(|&id| {
527 if id < self.worker_states.len() {
528 self.worker_states[id]
529 .local_epoch
530 .store(global_epoch, Ordering::Relaxed);
531 id_opt = Some(id);
532 }
533 });
534 }
535
536 #[cfg(feature = "std")]
537 let has_epoch = id_opt.is_some();
538 #[cfg(not(feature = "std"))]
539 let has_epoch = true;
540
541 let mut res: Option<V> = None;
542 let mut hit_g_idx: Option<u32> = None;
543
544 if has_epoch {
545 let ptr_t1: *mut crate::storage::Node<K, V> = self.t1.load_slot(hash);
547 if !ptr_t1.is_null() {
548 let node = unsafe { &*ptr_t1 };
549 if node.key == *key
550 && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
551 {
552 res = Some(node.value.clone());
553 hit_g_idx = Some(node.g_idx);
554 }
555 }
556
557 if res.is_none() {
559 let ptr_t2: *mut crate::storage::Node<K, V> = self.t2.load_slot(hash);
560 if !ptr_t2.is_null() {
561 let node = unsafe { &*ptr_t2 };
562 if node.key == *key
563 && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
564 {
565 res = Some(node.value.clone());
566 hit_g_idx = Some(node.g_idx);
567 }
568 }
569 }
570
571 if res.is_none() {
573 let tag = (hash >> 48) as u16;
574 if let Some(global_idx) = self.cache.index_probe(hash, tag) {
575 if let Some(v) = self
576 .cache
577 .node_get_full(global_idx, key, current_epoch_cache)
578 {
579 res = Some(v);
580 hit_g_idx = Some(global_idx as u32);
581 }
582 }
583 }
584 }
585
586 #[cfg(feature = "std")]
588 if let Some(id) = id_opt {
589 self.worker_states[id]
590 .local_epoch
591 .store(0, Ordering::Relaxed);
592 }
593
594 if let Some(g_idx) = hit_g_idx {
595 self.record_hit(g_idx as usize);
596 }
597
598 res
599 }
600
601 pub fn insert(&self, key: K, value: V) {
615 let hash = self.hash(&key);
616
617 #[cfg(feature = "std")]
619 {
620 let is_cold = self.is_cold_start.load(Ordering::Relaxed);
621 let mut bypass = is_cold;
622
623 if !bypass {
624 let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
627 let mut id_opt = None;
628 WORKER_ID.with(|&id| {
629 if id < self.worker_states.len() {
630 self.worker_states[id]
631 .local_epoch
632 .store(global_epoch, Ordering::Relaxed);
633 id_opt = Some(id);
634 }
635 });
636
637 if id_opt.is_some() {
638 let ptr_t1 = self.t1.load_slot(hash);
640 if !ptr_t1.is_null() {
641 let node = unsafe { &*ptr_t1 };
642 if node.key == key {
643 bypass = true;
644 }
645 }
646
647 if !bypass {
649 let ptr_t2 = self.t2.load_slot(hash);
650 if !ptr_t2.is_null() {
651 let node = unsafe { &*ptr_t2 };
652 if node.key == key {
653 bypass = true;
654 }
655 }
656 }
657
658 if !bypass {
660 let tag = (hash >> 48) as u16;
661 if let Some(global_idx) = self.cache.index_probe(hash, tag) {
662 let ptr = self.cache.nodes[global_idx].load(Ordering::Acquire);
663 if !ptr.is_null() {
664 let node = unsafe { &*ptr };
665 if node.key == key {
666 bypass = true;
667 }
668 }
669 }
670 }
671 }
672
673 if let Some(id) = id_opt {
675 self.worker_states[id]
676 .local_epoch
677 .store(0, Ordering::Relaxed);
678 }
679 }
680
681 let pass = if bypass {
682 true
683 } else {
684 #[cfg(any(feature = "loom", loom))]
686 {
687 L1_FILTER.with(|f| {
688 let mut state = f.borrow_mut();
689 let idx = (hash as usize) & 4095_usize;
690 let val = state.0[idx];
691
692 state.1 += 1;
693 if state.1 >= 4096_usize {
694 for x in state.0.iter_mut() {
695 *x >>= 1;
696 }
697 state.1 = 0;
698 }
699
700 if val < 1_u8 {
701 state.0[idx] = 1;
702 false
703 } else {
704 if val < 2_u8 {
705 state.0[idx] = 2;
706 }
707 true
708 }
709 })
710 }
711
712 #[cfg(not(any(feature = "loom", loom)))]
713 {
714 L1_FILTER.with(|f: &RefCell<([u8; 4096], usize)>| {
715 let mut state = f.borrow_mut();
716 let idx = (hash as usize) & 4095_usize;
717 let val = state.0[idx];
718
719 state.1 += 1;
720 if state.1 >= 4096_usize {
721 for x in state.0.iter_mut() {
722 *x >>= 1;
723 }
724 state.1 = 0;
725 }
726
727 if val < 1_u8 {
728 state.0[idx] = 1;
729 false
730 } else {
731 if val < 2_u8 {
732 state.0[idx] = 2;
733 }
734 true
735 }
736 })
737 }
738 };
739
740 if !pass {
741 return;
742 }
743
744 let current_tick = self.daemon_tick.load(Ordering::Relaxed);
746 let should_time_flush = LAST_FLUSH_TICK.with(|c: &Cell<u64>| {
747 current_tick.wrapping_sub(c.get()) >= self.flush_tick_threshold
748 });
749
750 WORKER_ID.with(|&id| {
752 if id >= self.miss_buffers.len() {
753 let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
755 return;
756 }
757
758 let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
760 let capacity_flush = buf.push((key, value, hash));
761
762 if capacity_flush || (should_time_flush && !buf.is_empty()) {
763 let batch = buf.drain_to_vec();
764 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
765 LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(current_tick));
766 }
767 });
768 }
769
770 #[cfg(not(feature = "std"))]
772 {
773 let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
774 }
775 }
776
777 pub fn remove(&self, key: &K) {
779 let hash = self.hash(key);
780
781 #[cfg(feature = "std")]
783 WORKER_ID.with(|&id| {
784 if id < self.miss_buffers.len() {
785 let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
786 if buf.len() > 0 {
787 let batch = buf.drain_to_vec();
788 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
789 let tick = self.daemon_tick.load(Ordering::Relaxed);
790 LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(tick));
791 }
792 }
793 });
794
795 self.cmd_tx.send_blocking(Command::Remove(key.clone(), hash));
796 }
797
798 pub fn clear(&self) {
800 let ack = OneshotAck::new();
801 self.cmd_tx.send_blocking(Command::Clear(ack.clone()));
802 ack.wait();
803 }
804
805 #[inline(always)]
808 fn hash(&self, key: &K) -> u64 {
809 self.hasher.hash_one(key)
810 }
811
812 #[inline(always)]
817 fn record_hit(&self, global_idx: usize) {
818 #[cfg(feature = "std")]
819 HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
820 let mut state = buf.borrow_mut();
821 let idx = state.1;
822 state.0[idx] = global_idx;
823 state.1 += 1;
824 if state.1 == 64_usize {
825 let _ = self.hit_tx.try_send(state.0);
826 state.1 = 0;
827 }
828 });
829
830 #[cfg(not(feature = "std"))]
833 {
834 let mut batch = [0usize; 64];
835 batch[0] = global_idx;
836 let _ = self.hit_tx.try_send(batch);
837 }
838 }
839}
840
841impl<K, V, S> Drop for DualCacheFF<K, V, S> {
842 fn drop(&mut self) {
843 if Arc::strong_count(&self.cmd_tx) <= 2 {
844 let _ = self.cmd_tx.try_send(Command::Shutdown);
845 }
846 }
847}
848