1use crate::daemon::{Command, Daemon};
2use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
3use ahash::RandomState;
4use crossbeam_channel::{Sender, bounded};
5use crossbeam_utils::CachePadded;
6use std::cell::RefCell;
7use std::hash::{BuildHasher, Hash};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
10
11pub mod daemon;
12pub mod unsafe_core;
13
14pub struct Config {
15 pub capacity: usize,
16 pub t1_slots: usize,
17 pub t2_slots: usize,
18 pub duration: u32,
19 pub threads: usize,
20}
21
22impl Config {
23 pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
27 let raw_capacity = (ram_mb * 1024 * 1024) / 128;
29 let capacity = raw_capacity.next_power_of_two();
30
31 Self {
32 capacity,
33 t1_slots: 2048,
35 t2_slots: (capacity / 5).next_power_of_two().max(4096),
37 duration,
38 threads: std::thread::available_parallelism()
40 .map(|p| p.get())
41 .unwrap_or(16),
42 }
43 }
44
45 pub fn new_expert(
47 capacity: usize,
48 t1_slots: usize,
49 t2_slots: usize,
50 duration: u32,
51 threads: usize,
52 ) -> Self {
53 assert!(
55 capacity.is_power_of_two(),
56 "Capacity MUST be a power of two"
57 );
58 assert!(
59 t1_slots.is_power_of_two(),
60 "T1 slots MUST be a power of two"
61 );
62 assert!(
63 t2_slots.is_power_of_two(),
64 "T2 slots MUST be a power of two"
65 );
66
67 assert!(
70 t1_slots <= 4096,
71 "T1 size exceeds L1 Cache physical limits! Max slots: 4096"
72 );
73
74 Self {
75 capacity,
76 t1_slots,
77 t2_slots,
78 duration,
79 threads,
80 }
81 }
82}
83
84static NEXT_THREAD_ID: AtomicUsize = AtomicUsize::new(0);
87
88thread_local! {
89 static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
90 static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
91 static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
92}
93
94pub static GLOBAL_EPOCH: AtomicUsize = AtomicUsize::new(1);
95
96pub struct WorkerState {
97 pub local_epoch: CachePadded<AtomicUsize>,
98}
99
100impl WorkerState {
101 pub fn new() -> Self {
102 Self {
103 local_epoch: CachePadded::new(AtomicUsize::new(0)),
104 }
105 }
106}
107
108pub struct DualCacheFF<K, V, S = RandomState> {
109 pub hasher: S,
110 pub t1: Arc<T1<K, V>>,
111 pub t2: Arc<T2<K, V>>,
112 pub cache: Arc<Cache<K, V>>,
113 pub cmd_tx: Sender<Command<K, V>>,
114 pub hit_tx: Sender<[usize; 64]>,
115 pub epoch: Arc<AtomicU32>,
116 pub worker_states: Arc<[WorkerState]>,
118 pub miss_buffers: Arc<[WorkerSlot<K, V>]>,
120}
121
122impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
123 fn clone(&self) -> Self {
124 Self {
125 hasher: self.hasher.clone(),
126 t1: self.t1.clone(),
127 t2: self.t2.clone(),
128 cache: self.cache.clone(),
129 cmd_tx: self.cmd_tx.clone(),
130 hit_tx: self.hit_tx.clone(),
131 epoch: self.epoch.clone(),
132 worker_states: self.worker_states.clone(),
133 miss_buffers: self.miss_buffers.clone(),
134 }
135 }
136}
137
138impl<K, V> DualCacheFF<K, V, RandomState>
139where
140 K: Hash + Eq + Send + Sync + Clone + 'static,
141 V: Send + Sync + Clone + 'static,
142{
143 #[allow(clippy::too_many_arguments)]
144 pub fn new(config: Config) -> Self {
145 let hasher = RandomState::new();
146 let t1 = Arc::new(T1::new(config.t1_slots));
147 let t2 = Arc::new(T2::new(config.t2_slots));
148 let cache = Arc::new(Cache::new(config.capacity));
149 let (cmd_tx, cmd_rx) = bounded(8192);
150 let (hit_tx, hit_rx) = bounded(1024);
151 let epoch = Arc::new(AtomicU32::new(0));
152
153 let mut buffers = Vec::with_capacity(config.threads);
154 let mut states = Vec::with_capacity(config.threads);
155 for _ in 0..config.threads {
156 buffers.push(WorkerSlot::new());
157 states.push(WorkerState::new());
158 }
159 let miss_buffers: Arc<[_]> = buffers.into_boxed_slice().into();
160 let worker_states: Arc<[_]> = states.into_boxed_slice().into();
161
162 let daemon = Daemon::new(
163 hasher.clone(),
164 config.capacity,
165 t1.clone(),
166 t2.clone(),
167 cache.clone(),
168 cmd_rx,
169 hit_rx,
170 epoch.clone(),
171 config.duration,
172 worker_states.clone(),
173 );
174
175 std::thread::spawn(move || {
176 daemon.run();
177 });
178
179 Self {
180 hasher,
181 t1,
182 t2,
183 cache,
184 cmd_tx,
185 hit_tx,
186 epoch,
187 worker_states,
188 miss_buffers,
189 }
190 }
191}
192
193impl<K, V, S> DualCacheFF<K, V, S>
194where
195 K: Hash + Eq + Send + Sync + Clone + 'static,
196 V: Send + Sync + Clone + 'static,
197 S: BuildHasher + Clone + Send + 'static,
198{
199 pub fn sync(&self) {
200 HIT_BUF.with(|buf| {
202 let mut state = buf.borrow_mut();
203 if state.1 > 0 {
204 let _ = self.hit_tx.try_send(state.0);
205 state.1 = 0;
206 }
207 });
208
209 for slot in self.miss_buffers.iter() {
211 let buf = unsafe { slot.get_mut_unchecked() };
212 if buf.len() > 0 {
213 let batch = buf.drain_to_vec();
214 let _ = self.cmd_tx.send(Command::BatchInsert(batch));
216 }
217 }
218
219 let (tx, rx) = bounded(1);
220 if self.cmd_tx.send(Command::Sync(tx)).is_ok() {
221 let _ = rx.recv();
222 }
223 }
224
225 pub fn get(&self, key: &K) -> Option<V> {
226 let hash = self.hash(key);
227 let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
228
229 let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
231 WORKER_ID.with(|&id| {
232 if id < self.worker_states.len() {
233 self.worker_states[id]
234 .local_epoch
235 .store(global_epoch, Ordering::Relaxed);
236 }
237 });
238
239 let mut res = None;
240 let mut hit_g_idx = None;
241
242 let ptr_t1 = self.t1.load_slot(hash);
244 if !ptr_t1.is_null() {
245 let node = unsafe { &*ptr_t1 };
246 if node.key == *key && (node.expire_at == 0 || node.expire_at >= current_epoch_cache) {
247 res = Some(node.value.clone());
248 hit_g_idx = Some(node.g_idx);
249 }
250 }
251
252 if res.is_none() {
254 let ptr_t2 = self.t2.load_slot(hash);
255 if !ptr_t2.is_null() {
256 let node = unsafe { &*ptr_t2 };
257 if node.key == *key
258 && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
259 {
260 res = Some(node.value.clone());
261 hit_g_idx = Some(node.g_idx);
262 }
263 }
264 }
265
266 if res.is_none() {
268 let tag = (hash >> 48) as u16;
269 if let Some(global_idx) = self.cache.index_probe(hash, tag) {
270 if let Some(v) = self
271 .cache
272 .node_get_full(global_idx, key, current_epoch_cache)
273 {
274 res = Some(v);
275 hit_g_idx = Some(global_idx as u32);
276 }
277 }
278 }
279
280 WORKER_ID.with(|&id| {
282 if id < self.worker_states.len() {
283 self.worker_states[id]
284 .local_epoch
285 .store(0, Ordering::Relaxed);
286 }
287 });
288
289 if let Some(g_idx) = hit_g_idx {
290 self.record_hit(g_idx as usize);
291 }
292
293 res
294 }
295
296 pub fn insert(&self, key: K, value: V) {
297 let hash = self.hash(&key);
298
299 let pass = L1_FILTER.with(|f| {
301 let mut state = f.borrow_mut();
302 let idx = (hash as usize) & 4095;
303 let val = state.0[idx];
304
305 state.1 += 1;
306 if state.1 >= 4096 {
307 for x in state.0.iter_mut() {
308 *x >>= 1;
309 }
310 state.1 = 0;
311 }
312
313 if val < 1 {
314 state.0[idx] = 1;
315 false
316 } else {
317 if val < 2 {
318 state.0[idx] = 2;
319 }
320 true
321 }
322 });
323
324 if !pass {
325 return;
326 }
327
328 WORKER_ID.with(|&id| {
332 if id >= self.miss_buffers.len() {
333 return;
335 }
336
337 let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
339
340 if buf.push((key, value, hash)) {
341 let batch = buf.drain_to_vec();
342 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
343 }
344 });
345 }
346
347 pub fn remove(&self, key: &K) {
348 let hash = self.hash(key);
349 WORKER_ID.with(|&id| {
351 if id < self.miss_buffers.len() {
352 let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
353 if buf.len() > 0 {
354 let batch = buf.drain_to_vec();
355 let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
356 }
357 }
358 });
359
360 let _ = self.cmd_tx.try_send(Command::Remove(key.clone(), hash));
361 }
362
363 pub fn clear(&self) {
364 let (tx, rx) = bounded(1);
365 if self.cmd_tx.send(Command::Clear(tx)).is_ok() {
366 let _ = rx.recv();
367 }
368 }
369
370 fn hash(&self, key: &K) -> u64 {
371 self.hasher.hash_one(key)
372 }
373
374 fn record_hit(&self, global_idx: usize) {
375 HIT_BUF.with(|buf| {
376 let mut state = buf.borrow_mut();
377 let idx = state.1;
378 state.0[idx] = global_idx;
379 state.1 += 1;
380 if state.1 == 64 {
381 let _ = self.hit_tx.try_send(state.0);
382 state.1 = 0;
383 }
384 });
385 }
386}