Skip to main content

dualcache_ff/
workers.rs

1#[cfg(not(feature = "std"))]
2use alloc::vec::Vec;
3
4use core::cell::UnsafeCell;
5use core::mem::MaybeUninit;
6
7/// Zero-allocation batch buffer: fixed-size MaybeUninit array, reused in-place.
8/// No Mutex, no Vec, no heap allocation on the hot path.
9///
10/// Cache-line aligned to prevent false sharing between worker slots.
11#[cfg_attr(any(target_arch = "aarch64", target_arch = "arm"), repr(C, align(128)))]
12#[cfg_attr(not(any(target_arch = "aarch64", target_arch = "arm")), repr(C, align(64)))]
13pub struct BatchBuf<K, V> {
14    items: [MaybeUninit<(K, V, u64)>; 32],
15    len: usize,
16}
17
18impl<K, V> BatchBuf<K, V> {
19    pub fn new() -> Self {
20        Self {
21            items: unsafe { MaybeUninit::uninit().assume_init() },
22            len: 0,
23        }
24    }
25
26    /// Returns `true` when the buffer is full (32 items) and should be flushed.
27    #[inline(always)]
28    pub fn push(&mut self, item: (K, V, u64)) -> bool {
29        self.items[self.len] = MaybeUninit::new(item);
30        self.len += 1;
31        self.len == 32
32    }
33
34    pub fn len(&self) -> usize {
35        self.len
36    }
37
38    pub fn is_empty(&self) -> bool {
39        self.len == 0
40    }
41
42    /// Drains all items into a `Vec`, resetting the buffer.
43    pub fn drain_to_vec(&mut self) -> Vec<(K, V, u64)> {
44        let mut batch = Vec::with_capacity(self.len);
45        for i in 0..self.len {
46            batch.push(unsafe { self.items[i].assume_init_read() });
47        }
48        self.len = 0;
49        batch
50    }
51}
52
53impl<K, V> Drop for BatchBuf<K, V> {
54    fn drop(&mut self) {
55        for i in 0..self.len {
56            unsafe {
57                self.items[i].assume_init_drop();
58            }
59        }
60    }
61}
62
63unsafe impl<K: Send, V: Send> Send for BatchBuf<K, V> {}
64unsafe impl<K: Sync, V: Sync> Sync for BatchBuf<K, V> {}
65
66/// Per-worker exclusive slot holding a `BatchBuf` inside an `UnsafeCell`.
67///
68/// The WORKER_ID TLS invariant guarantees that only one thread ever accesses
69/// any given slot, eliminating the need for any synchronisation primitive on
70/// the insert hot-path (zero atomics, zero locks, pure memory write).
71#[cfg_attr(any(target_arch = "aarch64", target_arch = "arm"), repr(C, align(128)))]
72#[cfg_attr(not(any(target_arch = "aarch64", target_arch = "arm")), repr(C, align(64)))]
73pub struct WorkerSlot<K, V> {
74    inner: UnsafeCell<BatchBuf<K, V>>,
75}
76
77impl<K, V> WorkerSlot<K, V> {
78    pub fn new() -> Self {
79        Self {
80            inner: UnsafeCell::new(BatchBuf::new()),
81        }
82    }
83
84    /// Provides exclusive access to the underlying buffer.
85    ///
86    /// # Safety
87    /// The caller must guarantee that only one thread accesses this slot at a time.
88    /// In DualCache-FF this is enforced by the WORKER_ID TLS invariant.
89    #[inline(always)]
90    pub unsafe fn get_mut_unchecked(&self) -> &mut BatchBuf<K, V> {
91        unsafe { &mut *self.inner.get() }
92    }
93}
94
95unsafe impl<K: Send, V: Send> Send for WorkerSlot<K, V> {}
96unsafe impl<K: Send + Sync, V: Send + Sync> Sync for WorkerSlot<K, V> {}