#![cfg_attr(not(feature = "std"), no_std)]
#[cfg(not(feature = "std"))]
extern crate alloc;
pub mod arena;
pub mod cache_padded;
pub mod daemon;
pub mod filters;
pub mod lossy_queue;
pub mod storage;
pub mod unsafe_core;
pub mod workers;
pub(crate) mod sync {
#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
pub use std::sync::Arc;
#[cfg(any(feature = "loom", loom))]
pub use loom::sync::Arc;
#[cfg(all(not(feature = "std"), not(any(feature = "loom", loom))))]
pub use alloc::sync::Arc;
#[cfg(not(any(feature = "loom", loom)))]
pub type ArcSlice<T> = Arc<[T]>;
#[cfg(any(feature = "loom", loom))]
pub type ArcSlice<T> = Arc<Vec<T>>;
#[cfg(not(any(feature = "loom", loom)))]
#[inline(always)]
pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
vec.into_boxed_slice().into()
}
#[cfg(any(feature = "loom", loom))]
#[inline(always)]
pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
Arc::new(vec)
}
pub mod atomic {
#[cfg(not(any(feature = "loom", loom)))]
pub use core::sync::atomic::{
AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
};
#[cfg(any(feature = "loom", loom))]
pub use loom::sync::atomic::{
AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
};
}
pub mod cell {
#[cfg(not(any(feature = "loom", loom)))]
pub struct UnsafeCell<T>(core::cell::UnsafeCell<T>);
#[cfg(not(any(feature = "loom", loom)))]
impl<T> UnsafeCell<T> {
#[inline(always)]
pub const fn new(data: T) -> Self {
Self(core::cell::UnsafeCell::new(data))
}
#[inline(always)]
pub fn get(&self) -> *mut T {
self.0.get()
}
#[inline(always)]
pub fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(*const T) -> R,
{
f(self.0.get() as *const T)
}
#[inline(always)]
pub fn with_mut<F, R>(&self, f: F) -> R
where
F: FnOnce(*mut T) -> R,
{
f(self.0.get())
}
}
#[cfg(any(feature = "loom", loom))]
pub use loom::cell::UnsafeCell;
}
}
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use crate::cache_padded::CachePadded;
use crate::daemon::{Command, Daemon};
use crate::lossy_queue::{LossyQueue, OneshotAck};
use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
use ahash::RandomState;
use core::hash::{BuildHasher, Hash};
use sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
use sync::{Arc, ArcSlice, new_arc_slice};
#[derive(Debug, Clone, Copy)]
pub struct Config {
pub capacity: usize,
pub t1_slots: usize,
pub t2_slots: usize,
pub duration: u32,
pub threads: usize,
pub poll_us: u64,
pub flush_tick_threshold: u64,
}
impl Config {
pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
let raw_capacity = (ram_mb * 1024 * 1024) / 128;
let capacity = raw_capacity.next_power_of_two().max(256);
Self {
capacity,
t1_slots: 2048,
t2_slots: (capacity / 5).next_power_of_two().max(4096),
duration,
#[cfg(feature = "std")]
threads: std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(16),
#[cfg(not(feature = "std"))]
threads: 8,
poll_us: 1_000,
flush_tick_threshold: 1,
}
}
pub fn new_expert(
capacity: usize,
t1_slots: usize,
t2_slots: usize,
duration: u32,
threads: usize,
) -> Self {
assert!(capacity.is_power_of_two(), "Capacity MUST be a power of two");
assert!(t1_slots.is_power_of_two(), "T1 slots MUST be a power of two");
assert!(t2_slots.is_power_of_two(), "T2 slots MUST be a power of two");
assert!(
t1_slots <= 4096,
"T1 size exceeds L1 Cache physical limits! Max slots: 4096"
);
Self {
capacity,
t1_slots,
t2_slots,
duration,
threads,
poll_us: 1_000,
flush_tick_threshold: 1,
}
}
pub fn with_poll_us(mut self, poll_us: u64) -> Self {
self.poll_us = poll_us.clamp(1_000, 10_000);
self
}
pub fn with_flush_tick_threshold(mut self, ticks: u64) -> Self {
self.flush_tick_threshold = ticks.max(1);
self
}
}
#[cfg(any(feature = "loom", loom))]
loom::lazy_static! {
pub static ref GLOBAL_EPOCH: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(1);
}
#[cfg(not(any(feature = "loom", loom)))]
pub static GLOBAL_EPOCH: sync::atomic::AtomicUsize = sync::atomic::AtomicUsize::new(1);
pub struct WorkerState {
pub local_epoch: CachePadded<AtomicUsize>,
}
impl WorkerState {
pub fn new() -> Self {
Self {
local_epoch: CachePadded::new(AtomicUsize::new(0)),
}
}
}
#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
use std::sync::Mutex;
#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
struct IdAllocator {
free_list: Mutex<Vec<usize>>,
next_id: sync::atomic::AtomicUsize,
}
#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
static ALLOCATOR: IdAllocator = IdAllocator {
free_list: Mutex::new(Vec::new()),
next_id: sync::atomic::AtomicUsize::new(0),
};
#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
struct ThreadIdGuard {
id: usize,
}
#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
impl Drop for ThreadIdGuard {
fn drop(&mut self) {
if let Ok(mut list) = ALLOCATOR.free_list.lock() {
list.push(self.id);
}
}
}
#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
use core::cell::{Cell, RefCell};
#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
thread_local! {
static WORKER_ID: usize = {
let id = if let Ok(mut list) = ALLOCATOR.free_list.lock() {
list.pop().unwrap_or_else(|| ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed))
} else {
ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed)
};
GUARD.with(|g| {
*g.borrow_mut() = Some(ThreadIdGuard { id });
});
id
};
static GUARD: RefCell<Option<ThreadIdGuard>> = const { RefCell::new(None) };
static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
}
#[cfg(any(feature = "loom", loom))]
loom::lazy_static! {
static ref NEXT_THREAD_ID: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(0);
}
#[cfg(any(feature = "loom", loom))]
use core::cell::{Cell, RefCell};
#[cfg(any(feature = "loom", loom))]
loom::thread_local! {
static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
static HIT_BUF: RefCell<([usize; 64], usize)> = RefCell::new(([0; 64], 0));
static L1_FILTER: RefCell<(Box<[u8]>, usize)> = RefCell::new((vec![0u8; 4096].into_boxed_slice(), 0));
static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
}
pub struct DualCacheFF<K, V, S = RandomState> {
pub hasher: S,
pub t1: Arc<T1<K, V>>,
pub t2: Arc<T2<K, V>>,
pub cache: Arc<Cache<K, V>>,
pub cmd_tx: Arc<LossyQueue<Command<K, V>>>,
pub hit_tx: Arc<LossyQueue<[usize; 64]>>,
pub epoch: Arc<AtomicU32>,
pub worker_states: ArcSlice<WorkerState>,
pub miss_buffers: ArcSlice<WorkerSlot<K, V>>,
pub daemon_tick: Arc<AtomicU64>,
pub flush_tick_threshold: u64,
pub is_cold_start: Arc<sync::atomic::AtomicBool>,
}
impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
fn clone(&self) -> Self {
Self {
hasher: self.hasher.clone(),
t1: self.t1.clone(),
t2: self.t2.clone(),
cache: self.cache.clone(),
cmd_tx: self.cmd_tx.clone(),
hit_tx: self.hit_tx.clone(),
epoch: self.epoch.clone(),
worker_states: self.worker_states.clone(),
miss_buffers: self.miss_buffers.clone(),
daemon_tick: self.daemon_tick.clone(),
flush_tick_threshold: self.flush_tick_threshold,
is_cold_start: self.is_cold_start.clone(),
}
}
}
#[cfg(feature = "std")]
impl<K, V> DualCacheFF<K, V, RandomState>
where
K: Hash + Eq + Send + Sync + Clone + 'static,
V: Send + Sync + Clone + 'static,
{
pub fn new(config: Config) -> Self {
let (cache, daemon) = Self::new_headless(config);
#[cfg(any(feature = "loom", loom))]
{
let _ = daemon;
}
#[cfg(not(any(feature = "loom", loom)))]
std::thread::spawn(move || daemon.run());
cache
}
}
impl<K, V> DualCacheFF<K, V, RandomState>
where
K: Hash + Eq + Send + Sync + Clone + 'static,
V: Send + Sync + Clone + 'static,
{
pub fn new_headless(config: Config) -> (Self, Daemon<K, V, RandomState>) {
let hasher = RandomState::new();
let t1 = Arc::new(T1::new(config.t1_slots));
let t2 = Arc::new(T2::new(config.t2_slots));
let cache = Arc::new(Cache::new(config.capacity));
let cmd_q: Arc<LossyQueue<Command<K, V>>> = Arc::new(LossyQueue::new(8192));
let hit_q: Arc<LossyQueue<[usize; 64]>> = Arc::new(LossyQueue::new(1024));
let epoch = Arc::new(AtomicU32::new(0));
let daemon_tick = Arc::new(AtomicU64::new(0));
let is_cold_start = Arc::new(sync::atomic::AtomicBool::new(true));
let mut buffers = Vec::with_capacity(config.threads);
let mut states = Vec::with_capacity(config.threads);
for _ in 0..config.threads {
buffers.push(WorkerSlot::new());
states.push(WorkerState::new());
}
let miss_buffers = new_arc_slice(buffers);
let worker_states = new_arc_slice(states);
let daemon = Daemon::new(
hasher.clone(),
config.capacity,
t1.clone(),
t2.clone(),
cache.clone(),
cmd_q.clone(),
hit_q.clone(),
epoch.clone(),
config.duration,
config.poll_us,
worker_states.clone(),
daemon_tick.clone(),
is_cold_start.clone(),
);
let this = Self {
hasher,
t1,
t2,
cache,
cmd_tx: cmd_q,
hit_tx: hit_q,
epoch,
worker_states,
miss_buffers,
daemon_tick,
flush_tick_threshold: config.flush_tick_threshold,
is_cold_start,
};
(this, daemon)
}
}
impl<K, V, S> DualCacheFF<K, V, S>
where
K: Hash + Eq + Send + Sync + Clone + 'static,
V: Send + Sync + Clone + 'static,
S: BuildHasher + Clone + Send + 'static,
{
pub fn sync(&self) {
#[cfg(feature = "std")]
HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
let mut state = buf.borrow_mut();
if state.1 > 0_usize {
let _ = self.hit_tx.try_send(state.0);
state.1 = 0;
}
});
#[cfg(feature = "std")]
for slot in self.miss_buffers.iter() {
let buf = unsafe { slot.get_mut_unchecked() };
if buf.len() > 0 {
let batch = buf.drain_to_vec();
let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
}
}
let ack = OneshotAck::new();
self.cmd_tx.send_blocking(Command::Sync(ack.clone()));
ack.wait();
}
pub fn get(&self, key: &K) -> Option<V> {
let hash = self.hash(key);
let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
#[cfg(feature = "std")]
let mut id_opt = None;
#[cfg(feature = "std")]
{
let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
WORKER_ID.with(|&id| {
if id < self.worker_states.len() {
self.worker_states[id]
.local_epoch
.store(global_epoch, Ordering::Relaxed);
id_opt = Some(id);
}
});
}
#[cfg(feature = "std")]
let has_epoch = id_opt.is_some();
#[cfg(not(feature = "std"))]
let has_epoch = true;
let mut res: Option<V> = None;
let mut hit_g_idx: Option<u32> = None;
if has_epoch {
let ptr_t1: *mut crate::storage::Node<K, V> = self.t1.load_slot(hash);
if !ptr_t1.is_null() {
let node = unsafe { &*ptr_t1 };
if node.key == *key
&& (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
{
res = Some(node.value.clone());
hit_g_idx = Some(node.g_idx);
}
}
if res.is_none() {
let ptr_t2: *mut crate::storage::Node<K, V> = self.t2.load_slot(hash);
if !ptr_t2.is_null() {
let node = unsafe { &*ptr_t2 };
if node.key == *key
&& (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
{
res = Some(node.value.clone());
hit_g_idx = Some(node.g_idx);
}
}
}
if res.is_none() {
let tag = (hash >> 48) as u16;
if let Some(global_idx) = self.cache.index_probe(hash, tag) {
if let Some(v) = self
.cache
.node_get_full(global_idx, key, current_epoch_cache)
{
res = Some(v);
hit_g_idx = Some(global_idx as u32);
}
}
}
}
#[cfg(feature = "std")]
if let Some(id) = id_opt {
self.worker_states[id]
.local_epoch
.store(0, Ordering::Relaxed);
}
if let Some(g_idx) = hit_g_idx {
self.record_hit(g_idx as usize);
}
res
}
pub fn insert(&self, key: K, value: V) {
let hash = self.hash(&key);
#[cfg(feature = "std")]
{
let is_cold = self.is_cold_start.load(Ordering::Relaxed);
let mut bypass = is_cold;
if !bypass {
let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
let mut id_opt = None;
WORKER_ID.with(|&id| {
if id < self.worker_states.len() {
self.worker_states[id]
.local_epoch
.store(global_epoch, Ordering::Relaxed);
id_opt = Some(id);
}
});
if id_opt.is_some() {
let ptr_t1 = self.t1.load_slot(hash);
if !ptr_t1.is_null() {
let node = unsafe { &*ptr_t1 };
if node.key == key {
bypass = true;
}
}
if !bypass {
let ptr_t2 = self.t2.load_slot(hash);
if !ptr_t2.is_null() {
let node = unsafe { &*ptr_t2 };
if node.key == key {
bypass = true;
}
}
}
if !bypass {
let tag = (hash >> 48) as u16;
if let Some(global_idx) = self.cache.index_probe(hash, tag) {
let ptr = self.cache.nodes[global_idx].load(Ordering::Acquire);
if !ptr.is_null() {
let node = unsafe { &*ptr };
if node.key == key {
bypass = true;
}
}
}
}
}
if let Some(id) = id_opt {
self.worker_states[id]
.local_epoch
.store(0, Ordering::Relaxed);
}
}
let pass = if bypass {
true
} else {
#[cfg(any(feature = "loom", loom))]
{
L1_FILTER.with(|f| {
let mut state = f.borrow_mut();
let idx = (hash as usize) & 4095_usize;
let val = state.0[idx];
state.1 += 1;
if state.1 >= 4096_usize {
for x in state.0.iter_mut() {
*x >>= 1;
}
state.1 = 0;
}
if val < 1_u8 {
state.0[idx] = 1;
false
} else {
if val < 2_u8 {
state.0[idx] = 2;
}
true
}
})
}
#[cfg(not(any(feature = "loom", loom)))]
{
L1_FILTER.with(|f: &RefCell<([u8; 4096], usize)>| {
let mut state = f.borrow_mut();
let idx = (hash as usize) & 4095_usize;
let val = state.0[idx];
state.1 += 1;
if state.1 >= 4096_usize {
for x in state.0.iter_mut() {
*x >>= 1;
}
state.1 = 0;
}
if val < 1_u8 {
state.0[idx] = 1;
false
} else {
if val < 2_u8 {
state.0[idx] = 2;
}
true
}
})
}
};
if !pass {
return;
}
let current_tick = self.daemon_tick.load(Ordering::Relaxed);
let should_time_flush = LAST_FLUSH_TICK.with(|c: &Cell<u64>| {
current_tick.wrapping_sub(c.get()) >= self.flush_tick_threshold
});
WORKER_ID.with(|&id| {
if id >= self.miss_buffers.len() {
let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
return;
}
let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
let capacity_flush = buf.push((key, value, hash));
if capacity_flush || (should_time_flush && !buf.is_empty()) {
let batch = buf.drain_to_vec();
let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(current_tick));
}
});
}
#[cfg(not(feature = "std"))]
{
let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
}
}
pub fn remove(&self, key: &K) {
let hash = self.hash(key);
#[cfg(feature = "std")]
WORKER_ID.with(|&id| {
if id < self.miss_buffers.len() {
let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
if buf.len() > 0 {
let batch = buf.drain_to_vec();
let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
let tick = self.daemon_tick.load(Ordering::Relaxed);
LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(tick));
}
}
});
self.cmd_tx.send_blocking(Command::Remove(key.clone(), hash));
}
pub fn clear(&self) {
let ack = OneshotAck::new();
self.cmd_tx.send_blocking(Command::Clear(ack.clone()));
ack.wait();
}
#[inline(always)]
fn hash(&self, key: &K) -> u64 {
self.hasher.hash_one(key)
}
#[inline(always)]
fn record_hit(&self, global_idx: usize) {
#[cfg(feature = "std")]
HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
let mut state = buf.borrow_mut();
let idx = state.1;
state.0[idx] = global_idx;
state.1 += 1;
if state.1 == 64_usize {
let _ = self.hit_tx.try_send(state.0);
state.1 = 0;
}
});
#[cfg(not(feature = "std"))]
{
let mut batch = [0usize; 64];
batch[0] = global_idx;
let _ = self.hit_tx.try_send(batch);
}
}
}
impl<K, V, S> Drop for DualCacheFF<K, V, S> {
fn drop(&mut self) {
if Arc::strong_count(&self.cmd_tx) <= 2 {
let _ = self.cmd_tx.try_send(Command::Shutdown);
}
}
}