use crate::daemon::{Command, Daemon};
use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
use ahash::RandomState;
use crossbeam_channel::{Sender, bounded};
use crossbeam_utils::CachePadded;
use std::cell::RefCell;
use std::hash::{BuildHasher, Hash};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
pub mod daemon;
pub mod unsafe_core;
pub struct Config {
pub capacity: usize,
pub t1_slots: usize,
pub t2_slots: usize,
pub duration: u32,
pub threads: usize,
}
impl Config {
pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
let raw_capacity = (ram_mb * 1024 * 1024) / 128;
let capacity = raw_capacity.next_power_of_two();
Self {
capacity,
t1_slots: 2048,
t2_slots: (capacity / 5).next_power_of_two().max(4096),
duration,
threads: std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(16),
}
}
pub fn new_expert(
capacity: usize,
t1_slots: usize,
t2_slots: usize,
duration: u32,
threads: usize,
) -> Self {
assert!(
capacity.is_power_of_two(),
"Capacity MUST be a power of two"
);
assert!(
t1_slots.is_power_of_two(),
"T1 slots MUST be a power of two"
);
assert!(
t2_slots.is_power_of_two(),
"T2 slots MUST be a power of two"
);
assert!(
t1_slots <= 4096,
"T1 size exceeds L1 Cache physical limits! Max slots: 4096"
);
Self {
capacity,
t1_slots,
t2_slots,
duration,
threads,
}
}
}
static NEXT_THREAD_ID: AtomicUsize = AtomicUsize::new(0);
thread_local! {
static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
}
pub static GLOBAL_EPOCH: AtomicUsize = AtomicUsize::new(1);
pub struct WorkerState {
pub local_epoch: CachePadded<AtomicUsize>,
}
impl WorkerState {
pub fn new() -> Self {
Self {
local_epoch: CachePadded::new(AtomicUsize::new(0)),
}
}
}
pub struct DualCacheFF<K, V, S = RandomState> {
pub hasher: S,
pub t1: Arc<T1<K, V>>,
pub t2: Arc<T2<K, V>>,
pub cache: Arc<Cache<K, V>>,
pub cmd_tx: Sender<Command<K, V>>,
pub hit_tx: Sender<[usize; 64]>,
pub epoch: Arc<AtomicU32>,
pub worker_states: Arc<[WorkerState]>,
pub miss_buffers: Arc<[WorkerSlot<K, V>]>,
}
impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
fn clone(&self) -> Self {
Self {
hasher: self.hasher.clone(),
t1: self.t1.clone(),
t2: self.t2.clone(),
cache: self.cache.clone(),
cmd_tx: self.cmd_tx.clone(),
hit_tx: self.hit_tx.clone(),
epoch: self.epoch.clone(),
worker_states: self.worker_states.clone(),
miss_buffers: self.miss_buffers.clone(),
}
}
}
impl<K, V> DualCacheFF<K, V, RandomState>
where
K: Hash + Eq + Send + Sync + Clone + 'static,
V: Send + Sync + Clone + 'static,
{
#[allow(clippy::too_many_arguments)]
pub fn new(config: Config) -> Self {
let hasher = RandomState::new();
let t1 = Arc::new(T1::new(config.t1_slots));
let t2 = Arc::new(T2::new(config.t2_slots));
let cache = Arc::new(Cache::new(config.capacity));
let (cmd_tx, cmd_rx) = bounded(8192);
let (hit_tx, hit_rx) = bounded(1024);
let epoch = Arc::new(AtomicU32::new(0));
let mut buffers = Vec::with_capacity(config.threads);
let mut states = Vec::with_capacity(config.threads);
for _ in 0..config.threads {
buffers.push(WorkerSlot::new());
states.push(WorkerState::new());
}
let miss_buffers: Arc<[_]> = buffers.into_boxed_slice().into();
let worker_states: Arc<[_]> = states.into_boxed_slice().into();
let daemon = Daemon::new(
hasher.clone(),
config.capacity,
t1.clone(),
t2.clone(),
cache.clone(),
cmd_rx,
hit_rx,
epoch.clone(),
config.duration,
worker_states.clone(),
);
std::thread::spawn(move || {
daemon.run();
});
Self {
hasher,
t1,
t2,
cache,
cmd_tx,
hit_tx,
epoch,
worker_states,
miss_buffers,
}
}
}
impl<K, V, S> DualCacheFF<K, V, S>
where
K: Hash + Eq + Send + Sync + Clone + 'static,
V: Send + Sync + Clone + 'static,
S: BuildHasher + Clone + Send + 'static,
{
pub fn sync(&self) {
HIT_BUF.with(|buf| {
let mut state = buf.borrow_mut();
if state.1 > 0 {
let _ = self.hit_tx.try_send(state.0);
state.1 = 0;
}
});
for slot in self.miss_buffers.iter() {
let buf = unsafe { slot.get_mut_unchecked() };
if buf.len() > 0 {
let batch = buf.drain_to_vec();
let _ = self.cmd_tx.send(Command::BatchInsert(batch));
}
}
let (tx, rx) = bounded(1);
if self.cmd_tx.send(Command::Sync(tx)).is_ok() {
let _ = rx.recv();
}
}
pub fn get(&self, key: &K) -> Option<V> {
let hash = self.hash(key);
let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
WORKER_ID.with(|&id| {
if id < self.worker_states.len() {
self.worker_states[id]
.local_epoch
.store(global_epoch, Ordering::Relaxed);
}
});
let mut res = None;
let mut hit_g_idx = None;
let ptr_t1 = self.t1.load_slot(hash);
if !ptr_t1.is_null() {
let node = unsafe { &*ptr_t1 };
if node.key == *key && (node.expire_at == 0 || node.expire_at >= current_epoch_cache) {
res = Some(node.value.clone());
hit_g_idx = Some(node.g_idx);
}
}
if res.is_none() {
let ptr_t2 = self.t2.load_slot(hash);
if !ptr_t2.is_null() {
let node = unsafe { &*ptr_t2 };
if node.key == *key
&& (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
{
res = Some(node.value.clone());
hit_g_idx = Some(node.g_idx);
}
}
}
if res.is_none() {
let tag = (hash >> 48) as u16;
if let Some(global_idx) = self.cache.index_probe(hash, tag) {
if let Some(v) = self
.cache
.node_get_full(global_idx, key, current_epoch_cache)
{
res = Some(v);
hit_g_idx = Some(global_idx as u32);
}
}
}
WORKER_ID.with(|&id| {
if id < self.worker_states.len() {
self.worker_states[id]
.local_epoch
.store(0, Ordering::Relaxed);
}
});
if let Some(g_idx) = hit_g_idx {
self.record_hit(g_idx as usize);
}
res
}
pub fn insert(&self, key: K, value: V) {
let hash = self.hash(&key);
let pass = L1_FILTER.with(|f| {
let mut state = f.borrow_mut();
let idx = (hash as usize) & 4095;
let val = state.0[idx];
state.1 += 1;
if state.1 >= 4096 {
for x in state.0.iter_mut() {
*x >>= 1;
}
state.1 = 0;
}
if val < 1 {
state.0[idx] = 1;
false
} else {
if val < 2 {
state.0[idx] = 2;
}
true
}
});
if !pass {
return;
}
WORKER_ID.with(|&id| {
if id >= self.miss_buffers.len() {
return;
}
let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
if buf.push((key, value, hash)) {
let batch = buf.drain_to_vec();
let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
}
});
}
pub fn remove(&self, key: &K) {
let hash = self.hash(key);
WORKER_ID.with(|&id| {
if id < self.miss_buffers.len() {
let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
if buf.len() > 0 {
let batch = buf.drain_to_vec();
let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
}
}
});
let _ = self.cmd_tx.try_send(Command::Remove(key.clone(), hash));
}
pub fn clear(&self) {
let (tx, rx) = bounded(1);
if self.cmd_tx.send(Command::Clear(tx)).is_ok() {
let _ = rx.recv();
}
}
fn hash(&self, key: &K) -> u64 {
self.hasher.hash_one(key)
}
fn record_hit(&self, global_idx: usize) {
HIT_BUF.with(|buf| {
let mut state = buf.borrow_mut();
let idx = state.1;
state.0[idx] = global_idx;
state.1 += 1;
if state.1 == 64 {
let _ = self.hit_tx.try_send(state.0);
state.1 = 0;
}
});
}
}