1#![cfg_attr(not(feature = "std"), no_std)]
6#[cfg(not(feature = "std"))]
7extern crate alloc;
8
9pub mod arena;
11pub mod cache_padded;
12pub mod daemon;
13pub mod filters;
14pub mod lossy_queue;
15pub mod storage;
16pub mod unsafe_core;
17pub mod workers;
18
19pub(crate) mod sync {
29 #[cfg(all(feature = "std", not(feature = "loom")))]
30 pub use std::sync::Arc;
31
32 #[cfg(feature = "loom")]
33 pub use loom::sync::Arc;
34
35 #[cfg(not(any(feature = "std", feature = "loom")))]
36 pub use alloc::sync::Arc;
37}
38
39#[cfg(not(feature = "std"))]
41use alloc::vec::Vec;
42
43use crate::cache_padded::CachePadded;
44use crate::daemon::{Command, Daemon};
45use crate::lossy_queue::{LossyQueue, OneshotAck};
46use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
47use ahash::RandomState;
48use core::hash::{BuildHasher, Hash};
49use core::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
50use sync::Arc;
51
52pub struct Config {
55 pub capacity: usize,
56 pub t1_slots: usize,
57 pub t2_slots: usize,
58 pub duration: u32,
60 pub threads: usize,
61 pub poll_us: u64,
66 pub flush_tick_threshold: u64,
75}
76
77impl Config {
78 pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
80 let raw_capacity = (ram_mb * 1024 * 1024) / 128;
82 let capacity = raw_capacity.next_power_of_two().max(256);
83
84 Self {
85 capacity,
86 t1_slots: 2048,
88 t2_slots: (capacity / 5).next_power_of_two().max(4096),
90 duration,
91 #[cfg(feature = "std")]
92 threads: std::thread::available_parallelism()
93 .map(|p| p.get())
94 .unwrap_or(16),
95 #[cfg(not(feature = "std"))]
96 threads: 8,
97 poll_us: 1_000,
98 flush_tick_threshold: 1,
99 }
100 }
101
102 pub fn new_expert(
104 capacity: usize,
105 t1_slots: usize,
106 t2_slots: usize,
107 duration: u32,
108 threads: usize,
109 ) -> Self {
110 assert!(capacity.is_power_of_two(), "Capacity MUST be a power of two");
112 assert!(t1_slots.is_power_of_two(), "T1 slots MUST be a power of two");
113 assert!(t2_slots.is_power_of_two(), "T2 slots MUST be a power of two");
114
115 assert!(
117 t1_slots <= 4096,
118 "T1 size exceeds L1 Cache physical limits! Max slots: 4096"
119 );
120
121 Self {
122 capacity,
123 t1_slots,
124 t2_slots,
125 duration,
126 threads,
127 poll_us: 1_000,
128 flush_tick_threshold: 1,
129 }
130 }
131
132 pub fn with_poll_us(mut self, poll_us: u64) -> Self {
134 self.poll_us = poll_us.clamp(1_000, 10_000);
135 self
136 }
137
138 pub fn with_flush_tick_threshold(mut self, ticks: u64) -> Self {
140 self.flush_tick_threshold = ticks.max(1);
141 self
142 }
143}
144
145pub static GLOBAL_EPOCH: AtomicUsize = AtomicUsize::new(1);
151
152pub struct WorkerState {
155 pub local_epoch: CachePadded<AtomicUsize>,
156}
157
158impl WorkerState {
159 pub fn new() -> Self {
160 Self {
161 local_epoch: CachePadded::new(AtomicUsize::new(0)),
162 }
163 }
164}
165
166#[cfg(feature = "std")]
173static NEXT_THREAD_ID: AtomicUsize = AtomicUsize::new(0);
175
176#[cfg(feature = "std")]
177use core::cell::{Cell, RefCell};
178
179#[cfg(feature = "std")]
180thread_local! {
181 static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
182
183 static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
186
187 static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
190
191 static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
195}
196
197pub struct DualCacheFF<K, V, S = RandomState> {
200 pub hasher: S,
201 pub t1: Arc<T1<K, V>>,
202 pub t2: Arc<T2<K, V>>,
203 pub cache: Arc<Cache<K, V>>,
204 pub cmd_tx: Arc<LossyQueue<Command<K, V>>>,
205 pub hit_tx: Arc<LossyQueue<[usize; 64]>>,
206 pub epoch: Arc<AtomicU32>,
207 pub worker_states: Arc<[WorkerState]>,
209 pub miss_buffers: Arc<[WorkerSlot<K, V>]>,
211 pub daemon_tick: Arc<AtomicU64>,
214 pub flush_tick_threshold: u64,
216}
217
218impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
219 fn clone(&self) -> Self {
220 Self {
221 hasher: self.hasher.clone(),
222 t1: self.t1.clone(),
223 t2: self.t2.clone(),
224 cache: self.cache.clone(),
225 cmd_tx: self.cmd_tx.clone(),
226 hit_tx: self.hit_tx.clone(),
227 epoch: self.epoch.clone(),
228 worker_states: self.worker_states.clone(),
229 miss_buffers: self.miss_buffers.clone(),
230 daemon_tick: self.daemon_tick.clone(),
231 flush_tick_threshold: self.flush_tick_threshold,
232 }
233 }
234}
235
236#[cfg(feature = "std")]
239impl<K, V> DualCacheFF<K, V, RandomState>
240where
241 K: Hash + Eq + Send + Sync + Clone + 'static,
242 V: Send + Sync + Clone + 'static,
243{
244 pub fn new(config: Config) -> Self {
248 let (cache, daemon) = Self::new_headless(config);
249 std::thread::spawn(move || daemon.run());
250 cache
251 }
252}
253
254impl<K, V> DualCacheFF<K, V, RandomState>
257where
258 K: Hash + Eq + Send + Sync + Clone + 'static,
259 V: Send + Sync + Clone + 'static,
260{
261 pub fn new_headless(config: Config) -> (Self, Daemon<K, V, RandomState>) {
274 let hasher = RandomState::new();
275 let t1 = Arc::new(T1::new(config.t1_slots));
276 let t2 = Arc::new(T2::new(config.t2_slots));
277 let cache = Arc::new(Cache::new(config.capacity));
278 let cmd_q: Arc<LossyQueue<Command<K, V>>> = Arc::new(LossyQueue::new(8192));
279 let hit_q: Arc<LossyQueue<[usize; 64]>> = Arc::new(LossyQueue::new(1024));
280 let epoch = Arc::new(AtomicU32::new(0));
281 let daemon_tick = Arc::new(AtomicU64::new(0));
282
283 let mut buffers = Vec::with_capacity(config.threads);
284 let mut states = Vec::with_capacity(config.threads);
285 for _ in 0..config.threads {
286 buffers.push(WorkerSlot::new());
287 states.push(WorkerState::new());
288 }
289 let miss_buffers: Arc<[_]> = buffers.into_boxed_slice().into();
290 let worker_states: Arc<[_]> = states.into_boxed_slice().into();
291
292 let daemon = Daemon::new(
293 hasher.clone(),
294 config.capacity,
295 t1.clone(),
296 t2.clone(),
297 cache.clone(),
298 cmd_q.clone(),
299 hit_q.clone(),
300 epoch.clone(),
301 config.duration,
302 config.poll_us,
303 worker_states.clone(),
304 daemon_tick.clone(),
305 );
306
307 let this = Self {
308 hasher,
309 t1,
310 t2,
311 cache,
312 cmd_tx: cmd_q,
313 hit_tx: hit_q,
314 epoch,
315 worker_states,
316 miss_buffers,
317 daemon_tick,
318 flush_tick_threshold: config.flush_tick_threshold,
319 };
320
321 (this, daemon)
322 }
323}
324
325impl<K, V, S> DualCacheFF<K, V, S>
328where
329 K: Hash + Eq + Send + Sync + Clone + 'static,
330 V: Send + Sync + Clone + 'static,
331 S: BuildHasher + Clone + Send + 'static,
332{
333 pub fn sync(&self) {
337 #[cfg(feature = "std")]
339 HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
340 let mut state = buf.borrow_mut();
341 if state.1 > 0_usize {
342 let _ = self.hit_tx.try_send(state.0);
343 state.1 = 0;
344 }
345 });
346
347 #[cfg(feature = "std")]
349 for slot in self.miss_buffers.iter() {
350 let buf = unsafe { slot.get_mut_unchecked() };
351 if buf.len() > 0 {
352 let batch = buf.drain_to_vec();
353 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
354 }
355 }
356
357 let ack = OneshotAck::new();
359 self.cmd_tx.send_blocking(Command::Sync(ack.clone()));
360 ack.wait();
361 }
362
363 pub fn get(&self, key: &K) -> Option<V> {
368 let hash = self.hash(key);
369 let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
370
371 #[cfg(feature = "std")]
373 {
374 let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
375 WORKER_ID.with(|&id| {
376 if id < self.worker_states.len() {
377 self.worker_states[id]
378 .local_epoch
379 .store(global_epoch, Ordering::Relaxed);
380 }
381 });
382 }
383
384 let mut res: Option<V> = None;
385 let mut hit_g_idx: Option<u32> = None;
386
387 let ptr_t1: *mut crate::storage::Node<K, V> = self.t1.load_slot(hash);
389 if !ptr_t1.is_null() {
390 let node = unsafe { &*ptr_t1 };
391 if node.key == *key
392 && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
393 {
394 res = Some(node.value.clone());
395 hit_g_idx = Some(node.g_idx);
396 }
397 }
398
399 if res.is_none() {
401 let ptr_t2: *mut crate::storage::Node<K, V> = self.t2.load_slot(hash);
402 if !ptr_t2.is_null() {
403 let node = unsafe { &*ptr_t2 };
404 if node.key == *key
405 && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
406 {
407 res = Some(node.value.clone());
408 hit_g_idx = Some(node.g_idx);
409 }
410 }
411 }
412
413 if res.is_none() {
415 let tag = (hash >> 48) as u16;
416 if let Some(global_idx) = self.cache.index_probe(hash, tag) {
417 if let Some(v) = self
418 .cache
419 .node_get_full(global_idx, key, current_epoch_cache)
420 {
421 res = Some(v);
422 hit_g_idx = Some(global_idx as u32);
423 }
424 }
425 }
426
427 #[cfg(feature = "std")]
429 WORKER_ID.with(|&id| {
430 if id < self.worker_states.len() {
431 self.worker_states[id]
432 .local_epoch
433 .store(0, Ordering::Relaxed);
434 }
435 });
436
437 if let Some(g_idx) = hit_g_idx {
438 self.record_hit(g_idx as usize);
439 }
440
441 res
442 }
443
444 pub fn insert(&self, key: K, value: V) {
458 let hash = self.hash(&key);
459
460 #[cfg(feature = "std")]
462 {
463 let pass = L1_FILTER.with(|f: &RefCell<([u8; 4096], usize)>| {
465 let mut state = f.borrow_mut();
466 let idx = (hash as usize) & 4095_usize;
467 let val = state.0[idx];
468
469 state.1 += 1;
470 if state.1 >= 4096_usize {
471 for x in state.0.iter_mut() {
472 *x >>= 1;
473 }
474 state.1 = 0;
475 }
476
477 if val < 1_u8 {
478 state.0[idx] = 1;
479 false
480 } else {
481 if val < 2_u8 {
482 state.0[idx] = 2;
483 }
484 true
485 }
486 });
487
488 if !pass {
489 return;
490 }
491
492 let current_tick = self.daemon_tick.load(Ordering::Relaxed);
494 let should_time_flush = LAST_FLUSH_TICK.with(|c: &Cell<u64>| {
495 current_tick.wrapping_sub(c.get()) >= self.flush_tick_threshold
496 });
497
498 WORKER_ID.with(|&id| {
500 if id >= self.miss_buffers.len() {
501 let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
503 return;
504 }
505
506 let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
508 let capacity_flush = buf.push((key, value, hash));
509
510 if capacity_flush || (should_time_flush && !buf.is_empty()) {
511 let batch = buf.drain_to_vec();
512 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
513 LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(current_tick));
514 }
515 });
516 }
517
518 #[cfg(not(feature = "std"))]
520 {
521 let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
522 }
523 }
524
525 pub fn remove(&self, key: &K) {
527 let hash = self.hash(key);
528
529 #[cfg(feature = "std")]
531 WORKER_ID.with(|&id| {
532 if id < self.miss_buffers.len() {
533 let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
534 if buf.len() > 0 {
535 let batch = buf.drain_to_vec();
536 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
537 let tick = self.daemon_tick.load(Ordering::Relaxed);
538 LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(tick));
539 }
540 }
541 });
542
543 self.cmd_tx.send_blocking(Command::Remove(key.clone(), hash));
544 }
545
546 pub fn clear(&self) {
548 let ack = OneshotAck::new();
549 self.cmd_tx.send_blocking(Command::Clear(ack.clone()));
550 ack.wait();
551 }
552
553 #[inline(always)]
556 fn hash(&self, key: &K) -> u64 {
557 self.hasher.hash_one(key)
558 }
559
560 #[inline(always)]
565 fn record_hit(&self, global_idx: usize) {
566 #[cfg(feature = "std")]
567 HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
568 let mut state = buf.borrow_mut();
569 let idx = state.1;
570 state.0[idx] = global_idx;
571 state.1 += 1;
572 if state.1 == 64_usize {
573 let _ = self.hit_tx.try_send(state.0);
574 state.1 = 0;
575 }
576 });
577
578 #[cfg(not(feature = "std"))]
581 {
582 let mut batch = [0usize; 64];
583 batch[0] = global_idx;
584 let _ = self.hit_tx.try_send(batch);
585 }
586 }
587}