1#![cfg_attr(not(feature = "std"), no_std)]
6#[cfg(not(feature = "std"))]
7extern crate alloc;
8
9pub mod arena;
11pub mod cache_padded;
12pub mod daemon;
13pub mod filters;
14pub mod lossy_queue;
15pub mod storage;
16pub mod unsafe_core;
17pub mod workers;
18
19pub(crate) mod sync {
29 #[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
30 pub use std::sync::Arc;
31
32 #[cfg(any(feature = "loom", loom))]
33 pub use loom::sync::Arc;
34
35 #[cfg(all(not(feature = "std"), not(any(feature = "loom", loom))))]
36 pub use alloc::sync::Arc;
37
38 #[cfg(not(any(feature = "loom", loom)))]
39 pub type ArcSlice<T> = Arc<[T]>;
40
41 #[cfg(any(feature = "loom", loom))]
42 pub type ArcSlice<T> = Arc<Vec<T>>;
43
44 #[cfg(not(any(feature = "loom", loom)))]
45 #[inline(always)]
46 pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
47 vec.into_boxed_slice().into()
48 }
49
50 #[cfg(any(feature = "loom", loom))]
51 #[inline(always)]
52 pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
53 Arc::new(vec)
54 }
55
56 pub mod atomic {
57 #[cfg(not(any(feature = "loom", loom)))]
58 pub use core::sync::atomic::{
59 AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
60 };
61
62 #[cfg(any(feature = "loom", loom))]
63 pub use loom::sync::atomic::{
64 AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
65 };
66 }
67
68 pub mod cell {
69 #[cfg(not(any(feature = "loom", loom)))]
70 pub struct UnsafeCell<T>(core::cell::UnsafeCell<T>);
71
72 #[cfg(not(any(feature = "loom", loom)))]
73 impl<T> UnsafeCell<T> {
74 #[inline(always)]
75 pub const fn new(data: T) -> Self {
76 Self(core::cell::UnsafeCell::new(data))
77 }
78
79 #[inline(always)]
80 pub fn get(&self) -> *mut T {
81 self.0.get()
82 }
83
84 #[inline(always)]
85 pub fn with<F, R>(&self, f: F) -> R
86 where
87 F: FnOnce(*const T) -> R,
88 {
89 f(self.0.get() as *const T)
90 }
91
92 #[inline(always)]
93 pub fn with_mut<F, R>(&self, f: F) -> R
94 where
95 F: FnOnce(*mut T) -> R,
96 {
97 f(self.0.get())
98 }
99 }
100
101 #[cfg(any(feature = "loom", loom))]
102 pub use loom::cell::UnsafeCell;
103 }
104}
105
106#[cfg(not(feature = "std"))]
108use alloc::vec::Vec;
109
110use crate::cache_padded::CachePadded;
111use crate::daemon::{Command, Daemon};
112use crate::lossy_queue::{LossyQueue, OneshotAck};
113use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
114use ahash::RandomState;
115use core::hash::{BuildHasher, Hash};
116use sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
117use sync::{Arc, ArcSlice, new_arc_slice};
118
119#[derive(Debug, Clone, Copy)]
122pub struct Config {
123 pub capacity: usize,
124 pub t1_slots: usize,
125 pub t2_slots: usize,
126 pub duration: u32,
128 pub threads: usize,
129 pub poll_us: u64,
134 pub flush_tick_threshold: u64,
143}
144
145impl Config {
146 pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
148 let raw_capacity = (ram_mb * 1024 * 1024) / 128;
150 let capacity = raw_capacity.next_power_of_two().max(256);
151
152 Self {
153 capacity,
154 t1_slots: 2048,
156 t2_slots: (capacity / 5).next_power_of_two().max(4096),
158 duration,
159 #[cfg(feature = "std")]
160 threads: std::thread::available_parallelism()
161 .map(|p| p.get())
162 .unwrap_or(16),
163 #[cfg(not(feature = "std"))]
164 threads: 8,
165 poll_us: 1_000,
166 flush_tick_threshold: 1,
167 }
168 }
169
170 pub fn new_expert(
172 capacity: usize,
173 t1_slots: usize,
174 t2_slots: usize,
175 duration: u32,
176 threads: usize,
177 ) -> Self {
178 assert!(capacity.is_power_of_two(), "Capacity MUST be a power of two");
180 assert!(t1_slots.is_power_of_two(), "T1 slots MUST be a power of two");
181 assert!(t2_slots.is_power_of_two(), "T2 slots MUST be a power of two");
182
183 assert!(
185 t1_slots <= 4096,
186 "T1 size exceeds L1 Cache physical limits! Max slots: 4096"
187 );
188
189 Self {
190 capacity,
191 t1_slots,
192 t2_slots,
193 duration,
194 threads,
195 poll_us: 1_000,
196 flush_tick_threshold: 1,
197 }
198 }
199
200 pub fn with_poll_us(mut self, poll_us: u64) -> Self {
202 self.poll_us = poll_us.clamp(1_000, 10_000);
203 self
204 }
205
206 pub fn with_flush_tick_threshold(mut self, ticks: u64) -> Self {
208 self.flush_tick_threshold = ticks.max(1);
209 self
210 }
211}
212
213#[cfg(any(feature = "loom", loom))]
219loom::lazy_static! {
220 pub static ref GLOBAL_EPOCH: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(1);
221}
222
223#[cfg(not(any(feature = "loom", loom)))]
224pub static GLOBAL_EPOCH: sync::atomic::AtomicUsize = sync::atomic::AtomicUsize::new(1);
225
226pub struct WorkerState {
229 pub local_epoch: CachePadded<AtomicUsize>,
230}
231
232impl WorkerState {
233 pub fn new() -> Self {
234 Self {
235 local_epoch: CachePadded::new(AtomicUsize::new(0)),
236 }
237 }
238}
239
240#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
247use std::sync::Mutex;
248
249#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
250struct IdAllocator {
251 free_list: Mutex<Vec<usize>>,
252 next_id: sync::atomic::AtomicUsize,
253}
254
255#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
256static ALLOCATOR: IdAllocator = IdAllocator {
257 free_list: Mutex::new(Vec::new()),
258 next_id: sync::atomic::AtomicUsize::new(0),
259};
260
261#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
262struct ThreadIdGuard {
263 id: usize,
264}
265
266#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
267impl Drop for ThreadIdGuard {
268 fn drop(&mut self) {
269 if let Ok(mut list) = ALLOCATOR.free_list.lock() {
270 list.push(self.id);
271 }
272 }
273}
274
275#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
276use core::cell::{Cell, RefCell};
277
278#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
279thread_local! {
280 static WORKER_ID: usize = {
281 let id = if let Ok(mut list) = ALLOCATOR.free_list.lock() {
282 list.pop().unwrap_or_else(|| ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed))
283 } else {
284 ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed)
285 };
286
287 GUARD.with(|g| {
288 *g.borrow_mut() = Some(ThreadIdGuard { id });
289 });
290 id
291 };
292
293 static GUARD: RefCell<Option<ThreadIdGuard>> = const { RefCell::new(None) };
294
295 static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
298
299 static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
302
303 static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
307}
308
309#[cfg(any(feature = "loom", loom))]
310loom::lazy_static! {
311 static ref NEXT_THREAD_ID: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(0);
312}
313
314#[cfg(any(feature = "loom", loom))]
315use core::cell::{Cell, RefCell};
316
317#[cfg(any(feature = "loom", loom))]
318loom::thread_local! {
319 static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
320
321 static HIT_BUF: RefCell<([usize; 64], usize)> = RefCell::new(([0; 64], 0));
324
325 static L1_FILTER: RefCell<(Box<[u8]>, usize)> = RefCell::new((vec![0u8; 4096].into_boxed_slice(), 0));
329
330 static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
334}
335
336pub struct DualCacheFF<K, V, S = RandomState> {
339 pub hasher: S,
340 pub t1: Arc<T1<K, V>>,
341 pub t2: Arc<T2<K, V>>,
342 pub cache: Arc<Cache<K, V>>,
343 pub cmd_tx: Arc<LossyQueue<Command<K, V>>>,
344 pub hit_tx: Arc<LossyQueue<[usize; 64]>>,
345 pub epoch: Arc<AtomicU32>,
346 pub worker_states: ArcSlice<WorkerState>,
348 pub miss_buffers: ArcSlice<WorkerSlot<K, V>>,
350 pub daemon_tick: Arc<AtomicU64>,
353 pub flush_tick_threshold: u64,
355 pub is_cold_start: Arc<sync::atomic::AtomicBool>,
357}
358
359impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
360 fn clone(&self) -> Self {
361 Self {
362 hasher: self.hasher.clone(),
363 t1: self.t1.clone(),
364 t2: self.t2.clone(),
365 cache: self.cache.clone(),
366 cmd_tx: self.cmd_tx.clone(),
367 hit_tx: self.hit_tx.clone(),
368 epoch: self.epoch.clone(),
369 worker_states: self.worker_states.clone(),
370 miss_buffers: self.miss_buffers.clone(),
371 daemon_tick: self.daemon_tick.clone(),
372 flush_tick_threshold: self.flush_tick_threshold,
373 is_cold_start: self.is_cold_start.clone(),
374 }
375 }
376}
377
378#[cfg(feature = "std")]
381impl<K, V> DualCacheFF<K, V, RandomState>
382where
383 K: Hash + Eq + Send + Sync + Clone + 'static,
384 V: Send + Sync + Clone + 'static,
385{
386 pub fn new(config: Config) -> Self {
390 let (cache, daemon) = Self::new_headless(config);
391 #[cfg(any(feature = "loom", loom))]
392 {
393 let _ = daemon;
394 }
395 #[cfg(not(any(feature = "loom", loom)))]
396 std::thread::spawn(move || daemon.run());
397 cache
398 }
399}
400
401impl<K, V> DualCacheFF<K, V, RandomState>
404where
405 K: Hash + Eq + Send + Sync + Clone + 'static,
406 V: Send + Sync + Clone + 'static,
407{
408 pub fn new_headless(config: Config) -> (Self, Daemon<K, V, RandomState>) {
421 let hasher = RandomState::new();
422 let t1 = Arc::new(T1::new(config.t1_slots));
423 let t2 = Arc::new(T2::new(config.t2_slots));
424 let cache = Arc::new(Cache::new(config.capacity));
425 let cmd_q: Arc<LossyQueue<Command<K, V>>> = Arc::new(LossyQueue::new(8192));
426 let hit_q: Arc<LossyQueue<[usize; 64]>> = Arc::new(LossyQueue::new(1024));
427 let epoch = Arc::new(AtomicU32::new(0));
428 let daemon_tick = Arc::new(AtomicU64::new(0));
429 let is_cold_start = Arc::new(sync::atomic::AtomicBool::new(true));
430
431 let mut buffers = Vec::with_capacity(config.threads);
432 let mut states = Vec::with_capacity(config.threads);
433 for _ in 0..config.threads {
434 buffers.push(WorkerSlot::new());
435 states.push(WorkerState::new());
436 }
437 let miss_buffers = new_arc_slice(buffers);
438 let worker_states = new_arc_slice(states);
439
440 let daemon = Daemon::new(
441 hasher.clone(),
442 config.capacity,
443 t1.clone(),
444 t2.clone(),
445 cache.clone(),
446 cmd_q.clone(),
447 hit_q.clone(),
448 epoch.clone(),
449 config.duration,
450 config.poll_us,
451 worker_states.clone(),
452 daemon_tick.clone(),
453 is_cold_start.clone(),
454 );
455
456 let this = Self {
457 hasher,
458 t1,
459 t2,
460 cache,
461 cmd_tx: cmd_q,
462 hit_tx: hit_q,
463 epoch,
464 worker_states,
465 miss_buffers,
466 daemon_tick,
467 flush_tick_threshold: config.flush_tick_threshold,
468 is_cold_start,
469 };
470
471 (this, daemon)
472 }
473}
474
475impl<K, V, S> DualCacheFF<K, V, S>
478where
479 K: Hash + Eq + Send + Sync + Clone + 'static,
480 V: Send + Sync + Clone + 'static,
481 S: BuildHasher + Clone + Send + 'static,
482{
483 pub fn sync(&self) {
487 #[cfg(feature = "std")]
489 HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
490 let mut state = buf.borrow_mut();
491 if state.1 > 0_usize {
492 let _ = self.hit_tx.try_send(state.0);
493 state.1 = 0;
494 }
495 });
496
497 #[cfg(feature = "std")]
499 for slot in self.miss_buffers.iter() {
500 let buf = unsafe { slot.get_mut_unchecked() };
501 if buf.len() > 0 {
502 let batch = buf.drain_to_vec();
503 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
504 }
505 }
506
507 let ack = OneshotAck::new();
509 self.cmd_tx.send_blocking(Command::Sync(ack.clone()));
510 ack.wait();
511 }
512
513 pub fn get(&self, key: &K) -> Option<V> {
518 let hash = self.hash(key);
519 let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
520
521 #[cfg(feature = "std")]
523 let mut id_opt = None;
524 #[cfg(feature = "std")]
525 {
526 let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
527 WORKER_ID.with(|&id| {
528 if id < self.worker_states.len() {
529 self.worker_states[id]
530 .local_epoch
531 .store(global_epoch, Ordering::Relaxed);
532 id_opt = Some(id);
533 }
534 });
535 }
536
537 #[cfg(feature = "std")]
538 let has_epoch = id_opt.is_some();
539 #[cfg(not(feature = "std"))]
540 let has_epoch = true;
541
542 let mut res: Option<V> = None;
543 let mut hit_g_idx: Option<u32> = None;
544
545 if has_epoch {
546 let ptr_t1: *mut crate::storage::Node<K, V> = self.t1.load_slot(hash);
548 if !ptr_t1.is_null() {
549 let node = unsafe { &*ptr_t1 };
550 if node.key == *key
551 && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
552 {
553 res = Some(node.value.clone());
554 hit_g_idx = Some(node.g_idx);
555 }
556 }
557
558 if res.is_none() {
560 let ptr_t2: *mut crate::storage::Node<K, V> = self.t2.load_slot(hash);
561 if !ptr_t2.is_null() {
562 let node = unsafe { &*ptr_t2 };
563 if node.key == *key
564 && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
565 {
566 res = Some(node.value.clone());
567 hit_g_idx = Some(node.g_idx);
568 }
569 }
570 }
571
572 if res.is_none() {
574 let tag = (hash >> 48) as u16;
575 if let Some(global_idx) = self.cache.index_probe(hash, tag) {
576 if let Some(v) = self
577 .cache
578 .node_get_full(global_idx, key, current_epoch_cache)
579 {
580 res = Some(v);
581 hit_g_idx = Some(global_idx as u32);
582 }
583 }
584 }
585 }
586
587 #[cfg(feature = "std")]
589 if let Some(id) = id_opt {
590 self.worker_states[id]
591 .local_epoch
592 .store(0, Ordering::Relaxed);
593 }
594
595 if let Some(g_idx) = hit_g_idx {
596 self.record_hit(g_idx as usize);
597 }
598
599 res
600 }
601
602 pub fn insert(&self, key: K, value: V) {
616 let hash = self.hash(&key);
617
618 #[cfg(feature = "std")]
620 {
621 let is_cold = self.is_cold_start.load(Ordering::Relaxed);
622 let mut bypass = is_cold;
623
624 if !bypass {
625 let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
628 let mut id_opt = None;
629 WORKER_ID.with(|&id| {
630 if id < self.worker_states.len() {
631 self.worker_states[id]
632 .local_epoch
633 .store(global_epoch, Ordering::Relaxed);
634 id_opt = Some(id);
635 }
636 });
637
638 if id_opt.is_some() {
639 let ptr_t1 = self.t1.load_slot(hash);
641 if !ptr_t1.is_null() {
642 let node = unsafe { &*ptr_t1 };
643 if node.key == key {
644 bypass = true;
645 }
646 }
647
648 if !bypass {
650 let ptr_t2 = self.t2.load_slot(hash);
651 if !ptr_t2.is_null() {
652 let node = unsafe { &*ptr_t2 };
653 if node.key == key {
654 bypass = true;
655 }
656 }
657 }
658
659 if !bypass {
661 let tag = (hash >> 48) as u16;
662 if let Some(global_idx) = self.cache.index_probe(hash, tag) {
663 let ptr = self.cache.nodes[global_idx].load(Ordering::Acquire);
664 if !ptr.is_null() {
665 let node = unsafe { &*ptr };
666 if node.key == key {
667 bypass = true;
668 }
669 }
670 }
671 }
672 }
673
674 if let Some(id) = id_opt {
676 self.worker_states[id]
677 .local_epoch
678 .store(0, Ordering::Relaxed);
679 }
680 }
681
682 let pass = if bypass {
683 true
684 } else {
685 #[cfg(any(feature = "loom", loom))]
687 {
688 L1_FILTER.with(|f| {
689 let mut state = f.borrow_mut();
690 let idx = (hash as usize) & 4095_usize;
691 let val = state.0[idx];
692
693 state.1 += 1;
694 if state.1 >= 4096_usize {
695 for x in state.0.iter_mut() {
696 *x >>= 1;
697 }
698 state.1 = 0;
699 }
700
701 if val < 1_u8 {
702 state.0[idx] = 1;
703 false
704 } else {
705 if val < 2_u8 {
706 state.0[idx] = 2;
707 }
708 true
709 }
710 })
711 }
712
713 #[cfg(not(any(feature = "loom", loom)))]
714 {
715 L1_FILTER.with(|f: &RefCell<([u8; 4096], usize)>| {
716 let mut state = f.borrow_mut();
717 let idx = (hash as usize) & 4095_usize;
718 let val = state.0[idx];
719
720 state.1 += 1;
721 if state.1 >= 4096_usize {
722 for x in state.0.iter_mut() {
723 *x >>= 1;
724 }
725 state.1 = 0;
726 }
727
728 if val < 1_u8 {
729 state.0[idx] = 1;
730 false
731 } else {
732 if val < 2_u8 {
733 state.0[idx] = 2;
734 }
735 true
736 }
737 })
738 }
739 };
740
741 if !pass {
742 return;
743 }
744
745 let current_tick = self.daemon_tick.load(Ordering::Relaxed);
747 let should_time_flush = LAST_FLUSH_TICK.with(|c: &Cell<u64>| {
748 current_tick.wrapping_sub(c.get()) >= self.flush_tick_threshold
749 });
750
751 WORKER_ID.with(|&id| {
753 if id >= self.miss_buffers.len() {
754 let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
756 return;
757 }
758
759 let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
761 let capacity_flush = buf.push((key, value, hash));
762
763 if capacity_flush || (should_time_flush && !buf.is_empty()) {
764 let batch = buf.drain_to_vec();
765 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
766 LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(current_tick));
767 }
768 });
769 }
770
771 #[cfg(not(feature = "std"))]
773 {
774 let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
775 }
776 }
777
778 pub fn remove(&self, key: &K) {
780 let hash = self.hash(key);
781
782 #[cfg(feature = "std")]
784 WORKER_ID.with(|&id| {
785 if id < self.miss_buffers.len() {
786 let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
787 if buf.len() > 0 {
788 let batch = buf.drain_to_vec();
789 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
790 let tick = self.daemon_tick.load(Ordering::Relaxed);
791 LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(tick));
792 }
793 }
794 });
795
796 self.cmd_tx.send_blocking(Command::Remove(key.clone(), hash));
797 }
798
799 pub fn clear(&self) {
801 let ack = OneshotAck::new();
802 self.cmd_tx.send_blocking(Command::Clear(ack.clone()));
803 ack.wait();
804 }
805
806 #[inline(always)]
809 fn hash(&self, key: &K) -> u64 {
810 self.hasher.hash_one(key)
811 }
812
813 #[inline(always)]
818 fn record_hit(&self, global_idx: usize) {
819 #[cfg(feature = "std")]
820 HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
821 let mut state = buf.borrow_mut();
822 let idx = state.1;
823 state.0[idx] = global_idx;
824 state.1 += 1;
825 if state.1 == 64_usize {
826 let _ = self.hit_tx.try_send(state.0);
827 state.1 = 0;
828 }
829 });
830
831 #[cfg(not(feature = "std"))]
834 {
835 let mut batch = [0usize; 64];
836 batch[0] = global_idx;
837 let _ = self.hit_tx.try_send(batch);
838 }
839 }
840}
841
842impl<K, V, S> Drop for DualCacheFF<K, V, S> {
843 fn drop(&mut self) {
844 if Arc::strong_count(&self.cmd_tx) <= 2 {
845 let _ = self.cmd_tx.try_send(Command::Shutdown);
846 }
847 }
848}
849