Skip to main content

dualcache_ff/
lib.rs

1use crate::daemon::{Command, Daemon};
2use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
3use ahash::RandomState;
4use crossbeam_channel::{Sender, bounded};
5use crossbeam_utils::CachePadded;
6use std::cell::RefCell;
7use std::hash::{BuildHasher, Hash};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
10
11pub mod daemon;
12pub mod unsafe_core;
13
14pub struct Config {
15    pub capacity: usize,
16    pub t1_slots: usize,
17    pub t2_slots: usize,
18    pub duration: u32,
19    pub threads: usize,
20}
21
22impl Config {
23    /// Gives general developers a "budget-based" initialization.
24    /// Just tell the engine how much RAM you're willing to give, and the engine
25    /// calculates the golden ratio that fits physical limits.
26    pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
27        // Assume total overhead per item is ~128 bytes
28        let raw_capacity = (ram_mb * 1024 * 1024) / 128;
29        let capacity = raw_capacity.next_power_of_two();
30
31        Self {
32            capacity,
33            // T1 sticks to L1 Cache physical limit: max 2048 pointers (16KB)
34            t1_slots: 2048,
35            // T2 responsible for intercepting warm data, set to 20% of total capacity (80/20 rule)
36            t2_slots: (capacity / 5).next_power_of_two().max(4096),
37            duration,
38            // Default to a reasonable number of threads to avoid TLS bloat.
39            threads: std::thread::available_parallelism()
40                .map(|p| p.get())
41                .unwrap_or(16),
42        }
43    }
44
45    /// Gives hardcore players a "form-based" initialization with physical safety mechanisms.
46    pub fn new_expert(
47        capacity: usize,
48        t1_slots: usize,
49        t2_slots: usize,
50        duration: u32,
51        threads: usize,
52    ) -> Self {
53        // Physical Law 1: Bitmask routing must be power of two
54        assert!(
55            capacity.is_power_of_two(),
56            "Capacity MUST be a power of two"
57        );
58        assert!(
59            t1_slots.is_power_of_two(),
60            "T1 slots MUST be a power of two"
61        );
62        assert!(
63            t2_slots.is_power_of_two(),
64            "T2 slots MUST be a power of two"
65        );
66
67        // Physical Law 2: T1 is absolutely not allowed to exceed L1 Cache
68        // (Assuming 32KB limit, i.e., 4096 8-byte pointers)
69        assert!(
70            t1_slots <= 4096,
71            "T1 size exceeds L1 Cache physical limits! Max slots: 4096"
72        );
73
74        Self {
75            capacity,
76            t1_slots,
77            t2_slots,
78            duration,
79            threads,
80        }
81    }
82}
83
84/// Global atomic counter for assigning unique thread IDs.
85/// Each thread calls fetch_add(1) exactly once in its lifetime.
86static NEXT_THREAD_ID: AtomicUsize = AtomicUsize::new(0);
87
88thread_local! {
89    static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
90    static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
91    static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
92}
93
94pub static GLOBAL_EPOCH: AtomicUsize = AtomicUsize::new(1);
95
96pub struct WorkerState {
97    pub local_epoch: CachePadded<AtomicUsize>,
98}
99
100impl WorkerState {
101    pub fn new() -> Self {
102        Self {
103            local_epoch: CachePadded::new(AtomicUsize::new(0)),
104        }
105    }
106}
107
108pub struct DualCacheFF<K, V, S = RandomState> {
109    pub hasher: S,
110    pub t1: Arc<T1<K, V>>,
111    pub t2: Arc<T2<K, V>>,
112    pub cache: Arc<Cache<K, V>>,
113    pub cmd_tx: Sender<Command<K, V>>,
114    pub hit_tx: Sender<[usize; 64]>,
115    pub epoch: Arc<AtomicU32>,
116    /// QSBR registry for each worker.
117    pub worker_states: Arc<[WorkerState]>,
118    /// Per-worker zero-lock batch buffers, indexed by WORKER_ID.
119    pub miss_buffers: Arc<[WorkerSlot<K, V>]>,
120}
121
122impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
123    fn clone(&self) -> Self {
124        Self {
125            hasher: self.hasher.clone(),
126            t1: self.t1.clone(),
127            t2: self.t2.clone(),
128            cache: self.cache.clone(),
129            cmd_tx: self.cmd_tx.clone(),
130            hit_tx: self.hit_tx.clone(),
131            epoch: self.epoch.clone(),
132            worker_states: self.worker_states.clone(),
133            miss_buffers: self.miss_buffers.clone(),
134        }
135    }
136}
137
138impl<K, V> DualCacheFF<K, V, RandomState>
139where
140    K: Hash + Eq + Send + Sync + Clone + 'static,
141    V: Send + Sync + Clone + 'static,
142{
143    #[allow(clippy::too_many_arguments)]
144    pub fn new(config: Config) -> Self {
145        let hasher = RandomState::new();
146        let t1 = Arc::new(T1::new(config.t1_slots));
147        let t2 = Arc::new(T2::new(config.t2_slots));
148        let cache = Arc::new(Cache::new(config.capacity));
149        let (cmd_tx, cmd_rx) = bounded(8192);
150        let (hit_tx, hit_rx) = bounded(1024);
151        let epoch = Arc::new(AtomicU32::new(0));
152
153        let mut buffers = Vec::with_capacity(config.threads);
154        let mut states = Vec::with_capacity(config.threads);
155        for _ in 0..config.threads {
156            buffers.push(WorkerSlot::new());
157            states.push(WorkerState::new());
158        }
159        let miss_buffers: Arc<[_]> = buffers.into_boxed_slice().into();
160        let worker_states: Arc<[_]> = states.into_boxed_slice().into();
161
162        let daemon = Daemon::new(
163            hasher.clone(),
164            config.capacity,
165            t1.clone(),
166            t2.clone(),
167            cache.clone(),
168            cmd_rx,
169            hit_rx,
170            epoch.clone(),
171            config.duration,
172            worker_states.clone(),
173        );
174
175        std::thread::spawn(move || {
176            daemon.run();
177        });
178
179        Self {
180            hasher,
181            t1,
182            t2,
183            cache,
184            cmd_tx,
185            hit_tx,
186            epoch,
187            worker_states,
188            miss_buffers,
189        }
190    }
191}
192
193impl<K, V, S> DualCacheFF<K, V, S>
194where
195    K: Hash + Eq + Send + Sync + Clone + 'static,
196    V: Send + Sync + Clone + 'static,
197    S: BuildHasher + Clone + Send + 'static,
198{
199    pub fn sync(&self) {
200        // 1. Flush this thread's hit buffer
201        HIT_BUF.with(|buf| {
202            let mut state = buf.borrow_mut();
203            if state.1 > 0 {
204                let _ = self.hit_tx.try_send(state.0);
205                state.1 = 0;
206            }
207        });
208
209        // 2. Flush all worker slots (best effort for others, strictly correct for self)
210        for slot in self.miss_buffers.iter() {
211            let buf = unsafe { slot.get_mut_unchecked() };
212            if buf.len() > 0 {
213                let batch = buf.drain_to_vec();
214                // Use blocking send here if we really want to ensure sync
215                let _ = self.cmd_tx.send(Command::BatchInsert(batch));
216            }
217        }
218
219        let (tx, rx) = bounded(1);
220        if self.cmd_tx.send(Command::Sync(tx)).is_ok() {
221            let _ = rx.recv();
222        }
223    }
224
225    pub fn get(&self, key: &K) -> Option<V> {
226        let hash = self.hash(key);
227        let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
228
229        // QSBR Check-in
230        let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
231        WORKER_ID.with(|&id| {
232            if id < self.worker_states.len() {
233                self.worker_states[id]
234                    .local_epoch
235                    .store(global_epoch, Ordering::Relaxed);
236            }
237        });
238
239        let mut res = None;
240        let mut hit_g_idx = None;
241
242        // T1 check
243        let ptr_t1 = self.t1.load_slot(hash);
244        if !ptr_t1.is_null() {
245            let node = unsafe { &*ptr_t1 };
246            if node.key == *key && (node.expire_at == 0 || node.expire_at >= current_epoch_cache) {
247                res = Some(node.value.clone());
248                hit_g_idx = Some(node.g_idx);
249            }
250        }
251
252        // T2 check
253        if res.is_none() {
254            let ptr_t2 = self.t2.load_slot(hash);
255            if !ptr_t2.is_null() {
256                let node = unsafe { &*ptr_t2 };
257                if node.key == *key
258                    && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
259                {
260                    res = Some(node.value.clone());
261                    hit_g_idx = Some(node.g_idx);
262                }
263            }
264        }
265
266        // Cache Check
267        if res.is_none() {
268            let tag = (hash >> 48) as u16;
269            if let Some(global_idx) = self.cache.index_probe(hash, tag) {
270                if let Some(v) = self
271                    .cache
272                    .node_get_full(global_idx, key, current_epoch_cache)
273                {
274                    res = Some(v);
275                    hit_g_idx = Some(global_idx as u32);
276                }
277            }
278        }
279
280        // QSBR Check-out
281        WORKER_ID.with(|&id| {
282            if id < self.worker_states.len() {
283                self.worker_states[id]
284                    .local_epoch
285                    .store(0, Ordering::Relaxed);
286            }
287        });
288
289        if let Some(g_idx) = hit_g_idx {
290            self.record_hit(g_idx as usize);
291        }
292
293        res
294    }
295
296    pub fn insert(&self, key: K, value: V) {
297        let hash = self.hash(&key);
298
299        // TLS L1 Filter: drop single-hit items but allow >= 2
300        let pass = L1_FILTER.with(|f| {
301            let mut state = f.borrow_mut();
302            let idx = (hash as usize) & 4095;
303            let val = state.0[idx];
304
305            state.1 += 1;
306            if state.1 >= 4096 {
307                for x in state.0.iter_mut() {
308                    *x >>= 1;
309                }
310                state.1 = 0;
311            }
312
313            if val < 1 {
314                state.0[idx] = 1;
315                false
316            } else {
317                if val < 2 {
318                    state.0[idx] = 2;
319                }
320                true
321            }
322        });
323
324        if !pass {
325            return;
326        }
327
328        // Thread-ID mapped wait-free batching:
329        // Each Worker has a unique WORKER_ID → exclusive UnsafeCell slot.
330        // 0 Mutex, 0 Atomic, pure memory write. Flush every 32 items.
331        WORKER_ID.with(|&id| {
332            if id >= self.miss_buffers.len() {
333                // Overflow protection: drop silently (lossy)
334                return;
335            }
336
337            // Safety: id is unique per thread → exclusive access to this slot
338            let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
339
340            if buf.push((key, value, hash)) {
341                let batch = buf.drain_to_vec();
342                let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
343            }
344        });
345    }
346
347    pub fn remove(&self, key: &K) {
348        let hash = self.hash(key);
349        // Flush this thread's buffer first for causal ordering
350        WORKER_ID.with(|&id| {
351            if id < self.miss_buffers.len() {
352                let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
353                if buf.len() > 0 {
354                    let batch = buf.drain_to_vec();
355                    let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
356                }
357            }
358        });
359
360        let _ = self.cmd_tx.try_send(Command::Remove(key.clone(), hash));
361    }
362
363    pub fn clear(&self) {
364        let (tx, rx) = bounded(1);
365        if self.cmd_tx.send(Command::Clear(tx)).is_ok() {
366            let _ = rx.recv();
367        }
368    }
369
370    fn hash(&self, key: &K) -> u64 {
371        self.hasher.hash_one(key)
372    }
373
374    fn record_hit(&self, global_idx: usize) {
375        HIT_BUF.with(|buf| {
376            let mut state = buf.borrow_mut();
377            let idx = state.1;
378            state.0[idx] = global_idx;
379            state.1 += 1;
380            if state.1 == 64 {
381                let _ = self.hit_tx.try_send(state.0);
382                state.1 = 0;
383            }
384        });
385    }
386}