Skip to main content

dualcache_ff/
unsafe_core.rs

1pub use arena::Arena;
2pub use storage::{Cache, Node};
3pub use filters::{T1, T2};
4pub use workers::{BatchBuf, WorkerSlot};
5
6mod arena {
7    pub struct Arena {
8        pub(crate) capacity: usize,
9        pub(crate) cursor: usize,
10        pub(crate) rank: Vec<u8>,
11        pub hashes: Vec<u64>,
12        pub(crate) free_list: Vec<usize>,
13        pub(crate) count_sum: u64,
14    }
15
16    unsafe impl Send for Arena {}
17    unsafe impl Sync for Arena {}
18
19    impl Arena {
20        pub fn new(capacity: usize) -> Self {
21            Self {
22                capacity,
23                cursor: 0,
24                rank: vec![0; capacity],
25                hashes: vec![0; capacity],
26                free_list: (0..capacity).collect(),
27                count_sum: 0,
28            }
29        }
30
31        #[inline(always)]
32        pub fn pop_free_slot(&mut self) -> Option<usize> {
33            self.free_list.pop()
34        }
35
36        #[inline(always)]
37        pub fn push_free_slot(&mut self, idx: usize) {
38            self.free_list.push(idx);
39        }
40
41        #[inline(always)]
42        pub fn free_list_empty(&self) -> bool {
43            self.free_list.is_empty()
44        }
45
46        #[inline(always)]
47        pub fn free_list_len(&self) -> usize {
48            self.free_list.len()
49        }
50
51        #[inline(always)]
52        pub fn set_hash(&mut self, idx: usize, hash: u64) {
53            self.hashes[idx] = hash;
54        }
55
56        #[inline(always)]
57        pub fn get_hash(&self, idx: usize) -> u64 {
58            self.hashes[idx]
59        }
60
61        #[inline(always)]
62        pub fn set_rank(&mut self, idx: usize, rank: u8) {
63            let old = self.rank[idx];
64            self.rank[idx] = rank;
65            self.count_sum = self.count_sum - old as u64 + rank as u64;
66        }
67
68        #[inline(always)]
69        pub fn get_rank(&self, idx: usize) -> u8 {
70            self.rank[idx]
71        }
72
73        #[inline(always)]
74        pub fn decrement_rank(&mut self, idx: usize) {
75            if self.rank[idx] > 0 {
76                self.rank[idx] -= 1;
77                self.count_sum -= 1;
78            }
79        }
80
81        #[inline(always)]
82        pub fn count_sum(&self) -> u64 {
83            self.count_sum
84        }
85
86        #[inline(always)]
87        pub fn cursor(&self) -> usize {
88            self.cursor
89        }
90
91        #[inline(always)]
92        pub fn advance_cursor(&mut self) {
93            self.cursor = (self.cursor + 1) % self.capacity;
94        }
95
96        pub fn clear(&mut self) {
97            self.free_list = (0..self.capacity).collect();
98            self.rank.fill(0);
99            self.cursor = 0;
100            self.count_sum = 0;
101        }
102    }
103}
104
105mod storage {
106    use std::sync::atomic::{AtomicU64, AtomicPtr, Ordering};
107    use std::ptr;
108
109    pub struct Node<K, V> {
110        pub key: K,
111        pub value: V,
112        pub expire_at: u32,
113        pub g_idx: u32,
114    }
115
116    pub struct Cache<K, V> {
117        pub(crate) index_mask: usize,
118        pub(crate) index: Box<[AtomicU64]>,
119        pub(crate) nodes: Box<[AtomicPtr<Node<K, V>>]>,
120    }
121
122    unsafe impl<K: Send, V: Send> Send for Cache<K, V> {}
123    unsafe impl<K: Send + Sync, V: Send + Sync> Sync for Cache<K, V> {}
124
125    impl<K, V> Cache<K, V> {
126        pub fn new(capacity: usize) -> Self {
127            let index_size = (capacity * 2).next_power_of_two();
128            let mut index = Vec::with_capacity(index_size);
129            for _ in 0..index_size {
130                index.push(AtomicU64::new(0));
131            }
132
133            let mut nodes = Vec::with_capacity(capacity);
134            for _ in 0..capacity {
135                nodes.push(AtomicPtr::new(ptr::null_mut()));
136            }
137
138            Self {
139                index_mask: index_size - 1,
140                index: index.into_boxed_slice(),
141                nodes: nodes.into_boxed_slice(),
142            }
143        }
144
145        #[inline(always)]
146        pub fn index_probe(&self, hash: u64, tag: u16) -> Option<usize> {
147            let mut idx = hash as usize & self.index_mask;
148            for _ in 0..16 {
149                let entry = self.index[idx].load(Ordering::Acquire);
150                if entry == 0 {
151                    return None;
152                }
153                if entry != u64::MAX && (entry >> 48) as u16 == tag {
154                    return Some((entry & 0x0000_FFFF_FFFF_FFFF) as usize);
155                }
156                idx = (idx + 1) & self.index_mask;
157            }
158            None
159        }
160
161        #[inline(always)]
162        pub fn index_store(&self, hash: u64, tag: u16, entry: u64) {
163            let mut idx = hash as usize & self.index_mask;
164            for i in 0..16 {
165                let prev = self.index[idx].load(Ordering::Acquire);
166                if prev == 0 || prev == u64::MAX || (prev >> 48) == (tag as u64) {
167                    self.index[idx].store(entry, Ordering::Release);
168                    return;
169                }
170                if i == 15 {
171                    self.index[hash as usize & self.index_mask].store(entry, Ordering::Release);
172                }
173                idx = (idx + 1) & self.index_mask;
174            }
175        }
176
177        #[inline(always)]
178        pub fn index_remove(&self, hash: u64, tag: u16, g_idx: usize) {
179            let mut idx = hash as usize & self.index_mask;
180            for _ in 0..16 {
181                let entry = self.index[idx].load(Ordering::Acquire);
182                if entry == 0 {
183                    return;
184                }
185                if entry != u64::MAX && (entry >> 48) as u16 == tag && (entry & 0x0000_FFFF_FFFF_FFFF) == (g_idx as u64) {
186                    self.index[idx].store(u64::MAX, Ordering::Release);
187                    return;
188                }
189                idx = (idx + 1) & self.index_mask;
190            }
191        }
192
193        #[inline(always)]
194        pub fn index_clear_at(&self, idx: usize) {
195            self.index[idx].store(0, Ordering::Relaxed);
196        }
197
198        #[inline(always)]
199        pub fn index_len(&self) -> usize {
200            self.index.len()
201        }
202
203        #[inline(always)]
204        pub fn node_get_full(&self, idx: usize, key: &K, current_epoch: u32) -> Option<V>
205        where K: Eq, V: Clone
206        {
207            let ptr = self.nodes[idx].load(Ordering::Acquire);
208            if ptr.is_null() {
209                return None;
210            }
211            // Safety: QSBR guarantees pointer is valid during check-in.
212            let node = unsafe { &*ptr };
213            if node.key == *key {
214                if node.expire_at > 0 && node.expire_at < current_epoch {
215                    return None;
216                }
217                Some(node.value.clone())
218            } else {
219                None
220            }
221        }
222
223        pub fn clear(&self) {
224            for i in 0..self.index.len() {
225                self.index[i].store(0, Ordering::Relaxed);
226            }
227            for i in 0..self.nodes.len() {
228                self.nodes[i].store(ptr::null_mut(), Ordering::Release);
229            }
230        }
231    }
232}
233
234mod filters {
235    use std::sync::atomic::{AtomicPtr, Ordering};
236    use std::ptr;
237    use crate::unsafe_core::Node;
238
239    pub struct T1<K, V> {
240        pub(crate) mask: usize,
241        pub(crate) slots: Box<[AtomicPtr<Node<K, V>>]>,
242    }
243
244    unsafe impl<K: Send, V: Send> Send for T1<K, V> {}
245    unsafe impl<K: Send + Sync, V: Send + Sync> Sync for T1<K, V> {}
246
247    impl<K, V> T1<K, V> {
248        pub fn new(slots_count: usize) -> Self {
249            let mut slots = Vec::with_capacity(slots_count);
250            for _ in 0..slots_count {
251                slots.push(AtomicPtr::new(ptr::null_mut()));
252            }
253            Self {
254                mask: slots_count - 1,
255                slots: slots.into_boxed_slice(),
256            }
257        }
258
259        #[inline(always)]
260        pub fn load_slot(&self, hash: u64) -> *mut Node<K, V> {
261            let idx = hash as usize & self.mask;
262            self.slots[idx].load(Ordering::Acquire)
263        }
264
265        #[inline(always)]
266        pub fn store_slot(&self, hash: u64, ptr: *mut Node<K, V>) {
267            let idx = hash as usize & self.mask;
268            self.slots[idx].store(ptr, Ordering::Release);
269        }
270
271        #[inline(always)]
272        pub fn clear_if_matches(&self, hash: u64, expected_ptr: *mut Node<K, V>) {
273            let idx = hash as usize & self.mask;
274            let _ = self.slots[idx].compare_exchange(
275                expected_ptr,
276                ptr::null_mut(),
277                Ordering::Release,
278                Ordering::Relaxed,
279            );
280        }
281
282        #[inline(always)]
283        pub fn clear_at(&self, idx: usize) {
284            self.slots[idx].store(ptr::null_mut(), Ordering::Relaxed);
285        }
286
287        #[inline(always)]
288        pub fn len(&self) -> usize {
289            self.slots.len()
290        }
291    }
292
293    pub struct T2<K, V> {
294        pub(crate) mask: usize,
295        pub(crate) slots: Box<[AtomicPtr<Node<K, V>>]>,
296    }
297
298    unsafe impl<K: Send, V: Send> Send for T2<K, V> {}
299    unsafe impl<K: Send + Sync, V: Send + Sync> Sync for T2<K, V> {}
300
301    impl<K, V> T2<K, V> {
302        pub fn new(slots_count: usize) -> Self {
303            let mut slots = Vec::with_capacity(slots_count);
304            for _ in 0..slots_count {
305                slots.push(AtomicPtr::new(ptr::null_mut()));
306            }
307            Self {
308                mask: slots_count - 1,
309                slots: slots.into_boxed_slice(),
310            }
311        }
312
313        #[inline(always)]
314        pub fn load_slot(&self, hash: u64) -> *mut Node<K, V> {
315            let idx = hash as usize & self.mask;
316            self.slots[idx].load(Ordering::Acquire)
317        }
318
319        #[inline(always)]
320        pub fn store_slot(&self, hash: u64, ptr: *mut Node<K, V>) {
321            let idx = hash as usize & self.mask;
322            self.slots[idx].store(ptr, Ordering::Release);
323        }
324
325        #[inline(always)]
326        pub fn clear_if_matches(&self, hash: u64, expected_ptr: *mut Node<K, V>) {
327            let idx = hash as usize & self.mask;
328            let _ = self.slots[idx].compare_exchange(
329                expected_ptr,
330                ptr::null_mut(),
331                Ordering::Release,
332                Ordering::Relaxed,
333            );
334        }
335
336        #[inline(always)]
337        pub fn clear_at(&self, idx: usize) {
338            self.slots[idx].store(ptr::null_mut(), Ordering::Relaxed);
339        }
340
341        #[inline(always)]
342        pub fn len(&self) -> usize {
343            self.slots.len()
344        }
345    }
346}
347
348mod workers {
349    use std::cell::UnsafeCell;
350    use std::mem::MaybeUninit;
351
352    /// Zero-allocation batch buffer: fixed-size MaybeUninit array, reused in-place.
353    /// No Mutex, no Vec, no heap allocation on the hot path.
354    #[cfg_attr(any(target_arch = "aarch64", target_arch = "arm"), repr(C, align(128)))]
355    #[cfg_attr(not(any(target_arch = "aarch64", target_arch = "arm")), repr(C, align(64)))]
356    pub struct BatchBuf<K, V> {
357        items: [MaybeUninit<(K, V, u64)>; 32],
358        len: usize,
359    }
360
361    impl<K, V> BatchBuf<K, V> {
362        pub fn new() -> Self {
363            Self {
364                items: unsafe { MaybeUninit::uninit().assume_init() },
365                len: 0,
366            }
367        }
368
369        #[inline(always)]
370        pub fn push(&mut self, item: (K, V, u64)) -> bool {
371            self.items[self.len] = MaybeUninit::new(item);
372            self.len += 1;
373            self.len == 32
374        }
375
376        pub fn len(&self) -> usize {
377            self.len
378        }
379
380        pub fn drain_to_vec(&mut self) -> Vec<(K, V, u64)> {
381            let mut batch = Vec::with_capacity(self.len);
382            for i in 0..self.len {
383                batch.push(unsafe { self.items[i].assume_init_read() });
384            }
385            self.len = 0;
386            batch
387        }
388    }
389
390    impl<K, V> Drop for BatchBuf<K, V> {
391        fn drop(&mut self) {
392            for i in 0..self.len {
393                unsafe {
394                    self.items[i].assume_init_drop();
395                }
396            }
397        }
398    }
399
400    unsafe impl<K: Send, V: Send> Send for BatchBuf<K, V> {}
401    unsafe impl<K: Sync, V: Sync> Sync for BatchBuf<K, V> {}
402
403    #[cfg_attr(any(target_arch = "aarch64", target_arch = "arm"), repr(C, align(128)))]
404    #[cfg_attr(not(any(target_arch = "aarch64", target_arch = "arm")), repr(C, align(64)))]
405    pub struct WorkerSlot<K, V> {
406        inner: UnsafeCell<BatchBuf<K, V>>,
407    }
408
409    impl<K, V> WorkerSlot<K, V> {
410        pub fn new() -> Self {
411            Self {
412                inner: UnsafeCell::new(BatchBuf::new()),
413            }
414        }
415
416        /// Provides exclusive access to the underlying buffer.
417        /// Safety: The caller must ensure that only one thread accesses this slot at a time.
418        /// In DualCache-FF, this is guaranteed by the WORKER_ID invariant.
419        #[inline(always)]
420        pub unsafe fn get_mut_unchecked(&self) -> &mut BatchBuf<K, V> {
421            unsafe { &mut *self.inner.get() }
422        }
423    }
424
425    unsafe impl<K: Send, V: Send> Send for WorkerSlot<K, V> {}
426    unsafe impl<K: Send + Sync, V: Send + Sync> Sync for WorkerSlot<K, V> {}
427}