#![cfg_attr(not(feature = "std"), no_std)]
#[cfg(not(feature = "std"))]
extern crate alloc;
pub mod arena;
pub mod cache_padded;
pub mod daemon;
pub mod filters;
pub mod lossy_queue;
pub mod storage;
pub mod unsafe_core;
pub mod workers;
pub(crate) mod sync {
#[cfg(all(feature = "std", not(feature = "loom")))]
pub use std::sync::Arc;
#[cfg(feature = "loom")]
pub use loom::sync::Arc;
#[cfg(not(any(feature = "std", feature = "loom")))]
pub use alloc::sync::Arc;
}
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use crate::cache_padded::CachePadded;
use crate::daemon::{Command, Daemon};
use crate::lossy_queue::{LossyQueue, OneshotAck};
use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
use ahash::RandomState;
use core::hash::{BuildHasher, Hash};
use core::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
use sync::Arc;
pub struct Config {
pub capacity: usize,
pub t1_slots: usize,
pub t2_slots: usize,
pub duration: u32,
pub threads: usize,
pub poll_us: u64,
pub flush_tick_threshold: u64,
}
impl Config {
pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
let raw_capacity = (ram_mb * 1024 * 1024) / 128;
let capacity = raw_capacity.next_power_of_two().max(256);
Self {
capacity,
t1_slots: 2048,
t2_slots: (capacity / 5).next_power_of_two().max(4096),
duration,
#[cfg(feature = "std")]
threads: std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(16),
#[cfg(not(feature = "std"))]
threads: 8,
poll_us: 1_000,
flush_tick_threshold: 1,
}
}
pub fn new_expert(
capacity: usize,
t1_slots: usize,
t2_slots: usize,
duration: u32,
threads: usize,
) -> Self {
assert!(capacity.is_power_of_two(), "Capacity MUST be a power of two");
assert!(t1_slots.is_power_of_two(), "T1 slots MUST be a power of two");
assert!(t2_slots.is_power_of_two(), "T2 slots MUST be a power of two");
assert!(
t1_slots <= 4096,
"T1 size exceeds L1 Cache physical limits! Max slots: 4096"
);
Self {
capacity,
t1_slots,
t2_slots,
duration,
threads,
poll_us: 1_000,
flush_tick_threshold: 1,
}
}
pub fn with_poll_us(mut self, poll_us: u64) -> Self {
self.poll_us = poll_us.clamp(1_000, 10_000);
self
}
pub fn with_flush_tick_threshold(mut self, ticks: u64) -> Self {
self.flush_tick_threshold = ticks.max(1);
self
}
}
pub static GLOBAL_EPOCH: AtomicUsize = AtomicUsize::new(1);
pub struct WorkerState {
pub local_epoch: CachePadded<AtomicUsize>,
}
impl WorkerState {
pub fn new() -> Self {
Self {
local_epoch: CachePadded::new(AtomicUsize::new(0)),
}
}
}
#[cfg(feature = "std")]
static NEXT_THREAD_ID: AtomicUsize = AtomicUsize::new(0);
#[cfg(feature = "std")]
use core::cell::{Cell, RefCell};
#[cfg(feature = "std")]
thread_local! {
static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
}
pub struct DualCacheFF<K, V, S = RandomState> {
pub hasher: S,
pub t1: Arc<T1<K, V>>,
pub t2: Arc<T2<K, V>>,
pub cache: Arc<Cache<K, V>>,
pub cmd_tx: Arc<LossyQueue<Command<K, V>>>,
pub hit_tx: Arc<LossyQueue<[usize; 64]>>,
pub epoch: Arc<AtomicU32>,
pub worker_states: Arc<[WorkerState]>,
pub miss_buffers: Arc<[WorkerSlot<K, V>]>,
pub daemon_tick: Arc<AtomicU64>,
pub flush_tick_threshold: u64,
}
impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
fn clone(&self) -> Self {
Self {
hasher: self.hasher.clone(),
t1: self.t1.clone(),
t2: self.t2.clone(),
cache: self.cache.clone(),
cmd_tx: self.cmd_tx.clone(),
hit_tx: self.hit_tx.clone(),
epoch: self.epoch.clone(),
worker_states: self.worker_states.clone(),
miss_buffers: self.miss_buffers.clone(),
daemon_tick: self.daemon_tick.clone(),
flush_tick_threshold: self.flush_tick_threshold,
}
}
}
#[cfg(feature = "std")]
impl<K, V> DualCacheFF<K, V, RandomState>
where
K: Hash + Eq + Send + Sync + Clone + 'static,
V: Send + Sync + Clone + 'static,
{
pub fn new(config: Config) -> Self {
let (cache, daemon) = Self::new_headless(config);
std::thread::spawn(move || daemon.run());
cache
}
}
impl<K, V> DualCacheFF<K, V, RandomState>
where
K: Hash + Eq + Send + Sync + Clone + 'static,
V: Send + Sync + Clone + 'static,
{
pub fn new_headless(config: Config) -> (Self, Daemon<K, V, RandomState>) {
let hasher = RandomState::new();
let t1 = Arc::new(T1::new(config.t1_slots));
let t2 = Arc::new(T2::new(config.t2_slots));
let cache = Arc::new(Cache::new(config.capacity));
let cmd_q: Arc<LossyQueue<Command<K, V>>> = Arc::new(LossyQueue::new(8192));
let hit_q: Arc<LossyQueue<[usize; 64]>> = Arc::new(LossyQueue::new(1024));
let epoch = Arc::new(AtomicU32::new(0));
let daemon_tick = Arc::new(AtomicU64::new(0));
let mut buffers = Vec::with_capacity(config.threads);
let mut states = Vec::with_capacity(config.threads);
for _ in 0..config.threads {
buffers.push(WorkerSlot::new());
states.push(WorkerState::new());
}
let miss_buffers: Arc<[_]> = buffers.into_boxed_slice().into();
let worker_states: Arc<[_]> = states.into_boxed_slice().into();
let daemon = Daemon::new(
hasher.clone(),
config.capacity,
t1.clone(),
t2.clone(),
cache.clone(),
cmd_q.clone(),
hit_q.clone(),
epoch.clone(),
config.duration,
config.poll_us,
worker_states.clone(),
daemon_tick.clone(),
);
let this = Self {
hasher,
t1,
t2,
cache,
cmd_tx: cmd_q,
hit_tx: hit_q,
epoch,
worker_states,
miss_buffers,
daemon_tick,
flush_tick_threshold: config.flush_tick_threshold,
};
(this, daemon)
}
}
impl<K, V, S> DualCacheFF<K, V, S>
where
K: Hash + Eq + Send + Sync + Clone + 'static,
V: Send + Sync + Clone + 'static,
S: BuildHasher + Clone + Send + 'static,
{
pub fn sync(&self) {
#[cfg(feature = "std")]
HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
let mut state = buf.borrow_mut();
if state.1 > 0_usize {
let _ = self.hit_tx.try_send(state.0);
state.1 = 0;
}
});
#[cfg(feature = "std")]
for slot in self.miss_buffers.iter() {
let buf = unsafe { slot.get_mut_unchecked() };
if buf.len() > 0 {
let batch = buf.drain_to_vec();
let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
}
}
let ack = OneshotAck::new();
self.cmd_tx.send_blocking(Command::Sync(ack.clone()));
ack.wait();
}
pub fn get(&self, key: &K) -> Option<V> {
let hash = self.hash(key);
let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
#[cfg(feature = "std")]
{
let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
WORKER_ID.with(|&id| {
if id < self.worker_states.len() {
self.worker_states[id]
.local_epoch
.store(global_epoch, Ordering::Relaxed);
}
});
}
let mut res: Option<V> = None;
let mut hit_g_idx: Option<u32> = None;
let ptr_t1: *mut crate::storage::Node<K, V> = self.t1.load_slot(hash);
if !ptr_t1.is_null() {
let node = unsafe { &*ptr_t1 };
if node.key == *key
&& (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
{
res = Some(node.value.clone());
hit_g_idx = Some(node.g_idx);
}
}
if res.is_none() {
let ptr_t2: *mut crate::storage::Node<K, V> = self.t2.load_slot(hash);
if !ptr_t2.is_null() {
let node = unsafe { &*ptr_t2 };
if node.key == *key
&& (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
{
res = Some(node.value.clone());
hit_g_idx = Some(node.g_idx);
}
}
}
if res.is_none() {
let tag = (hash >> 48) as u16;
if let Some(global_idx) = self.cache.index_probe(hash, tag) {
if let Some(v) = self
.cache
.node_get_full(global_idx, key, current_epoch_cache)
{
res = Some(v);
hit_g_idx = Some(global_idx as u32);
}
}
}
#[cfg(feature = "std")]
WORKER_ID.with(|&id| {
if id < self.worker_states.len() {
self.worker_states[id]
.local_epoch
.store(0, Ordering::Relaxed);
}
});
if let Some(g_idx) = hit_g_idx {
self.record_hit(g_idx as usize);
}
res
}
pub fn insert(&self, key: K, value: V) {
let hash = self.hash(&key);
#[cfg(feature = "std")]
{
let pass = L1_FILTER.with(|f: &RefCell<([u8; 4096], usize)>| {
let mut state = f.borrow_mut();
let idx = (hash as usize) & 4095_usize;
let val = state.0[idx];
state.1 += 1;
if state.1 >= 4096_usize {
for x in state.0.iter_mut() {
*x >>= 1;
}
state.1 = 0;
}
if val < 1_u8 {
state.0[idx] = 1;
false
} else {
if val < 2_u8 {
state.0[idx] = 2;
}
true
}
});
if !pass {
return;
}
let current_tick = self.daemon_tick.load(Ordering::Relaxed);
let should_time_flush = LAST_FLUSH_TICK.with(|c: &Cell<u64>| {
current_tick.wrapping_sub(c.get()) >= self.flush_tick_threshold
});
WORKER_ID.with(|&id| {
if id >= self.miss_buffers.len() {
let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
return;
}
let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
let capacity_flush = buf.push((key, value, hash));
if capacity_flush || (should_time_flush && !buf.is_empty()) {
let batch = buf.drain_to_vec();
let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(current_tick));
}
});
}
#[cfg(not(feature = "std"))]
{
let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
}
}
pub fn remove(&self, key: &K) {
let hash = self.hash(key);
#[cfg(feature = "std")]
WORKER_ID.with(|&id| {
if id < self.miss_buffers.len() {
let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
if buf.len() > 0 {
let batch = buf.drain_to_vec();
let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
let tick = self.daemon_tick.load(Ordering::Relaxed);
LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(tick));
}
}
});
self.cmd_tx.send_blocking(Command::Remove(key.clone(), hash));
}
pub fn clear(&self) {
let ack = OneshotAck::new();
self.cmd_tx.send_blocking(Command::Clear(ack.clone()));
ack.wait();
}
#[inline(always)]
fn hash(&self, key: &K) -> u64 {
self.hasher.hash_one(key)
}
#[inline(always)]
fn record_hit(&self, global_idx: usize) {
#[cfg(feature = "std")]
HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
let mut state = buf.borrow_mut();
let idx = state.1;
state.0[idx] = global_idx;
state.1 += 1;
if state.1 == 64_usize {
let _ = self.hit_tx.try_send(state.0);
state.1 = 0;
}
});
#[cfg(not(feature = "std"))]
{
let mut batch = [0usize; 64];
batch[0] = global_idx;
let _ = self.hit_tx.try_send(batch);
}
}
}