Skip to main content

dualcache_ff/
workers.rs

1#[cfg(not(feature = "std"))]
2use alloc::vec::Vec;
3
4use crate::sync::cell::UnsafeCell;
5use core::mem::MaybeUninit;
6
7/// Zero-allocation batch buffer: fixed-size MaybeUninit array, reused in-place.
8/// No Mutex, no Vec, no heap allocation on the hot path.
9///
10/// Cache-line aligned to prevent false sharing between worker slots.
11#[cfg_attr(any(target_arch = "aarch64", target_arch = "arm"), repr(C, align(128)))]
12#[cfg_attr(not(any(target_arch = "aarch64", target_arch = "arm")), repr(C, align(64)))]
13pub struct BatchBuf<K, V> {
14    items: [MaybeUninit<(K, V, u64)>; 32],
15    len: usize,
16}
17
18impl<K, V> BatchBuf<K, V> {
19    pub fn new() -> Self {
20        Self {
21            items: unsafe { MaybeUninit::uninit().assume_init() },
22            len: 0,
23        }
24    }
25
26    /// Returns `true` when the buffer is full (32 items) and should be flushed.
27    #[inline(always)]
28    pub fn push(&mut self, item: (K, V, u64)) -> bool {
29        self.items[self.len] = MaybeUninit::new(item);
30        self.len += 1;
31        self.len == 32
32    }
33
34    pub fn len(&self) -> usize {
35        self.len
36    }
37
38    pub fn is_empty(&self) -> bool {
39        self.len == 0
40    }
41
42    /// Drains all items into a `Vec`, resetting the buffer.
43    pub fn drain_to_vec(&mut self) -> Vec<(K, V, u64)> {
44        let mut batch = Vec::with_capacity(self.len);
45        for i in 0..self.len {
46            batch.push(unsafe { self.items[i].assume_init_read() });
47        }
48        self.len = 0;
49        batch
50    }
51}
52
53impl<K, V> Drop for BatchBuf<K, V> {
54    fn drop(&mut self) {
55        for i in 0..self.len {
56            unsafe {
57                self.items[i].assume_init_drop();
58            }
59        }
60    }
61}
62
63unsafe impl<K: Send, V: Send> Send for BatchBuf<K, V> {}
64unsafe impl<K: Sync, V: Sync> Sync for BatchBuf<K, V> {}
65
66/// Per-worker exclusive slot holding a `BatchBuf` inside an `UnsafeCell`.
67///
68/// The WORKER_ID TLS invariant guarantees that only one thread ever accesses
69/// any given slot, eliminating the need for any synchronisation primitive on
70/// the insert hot-path (zero atomics, zero locks, pure memory write).
71#[cfg_attr(any(target_arch = "aarch64", target_arch = "arm"), repr(C, align(128)))]
72#[cfg_attr(not(any(target_arch = "aarch64", target_arch = "arm")), repr(C, align(64)))]
73#[cfg(not(any(feature = "loom", loom)))]
74pub struct WorkerSlot<K, V> {
75    inner: UnsafeCell<BatchBuf<K, V>>,
76}
77
78#[cfg(any(feature = "loom", loom))]
79pub struct WorkerSlot<K, V> {
80    inner: UnsafeCell<Box<BatchBuf<K, V>>>,
81}
82
83impl<K, V> WorkerSlot<K, V> {
84    #[cfg(not(any(feature = "loom", loom)))]
85    pub fn new() -> Self {
86        Self {
87            inner: UnsafeCell::new(BatchBuf::new()),
88        }
89    }
90
91    #[cfg(any(feature = "loom", loom))]
92    pub fn new() -> Self {
93        Self {
94            inner: UnsafeCell::new(Box::new(BatchBuf::new())),
95        }
96    }
97
98    /// Provides exclusive access to the underlying buffer.
99    ///
100    /// # Safety
101    /// The caller must guarantee that only one thread accesses this slot at a time.
102    /// In DualCache-FF this is enforced by the WORKER_ID TLS invariant.
103    #[inline(always)]
104    #[cfg(not(any(feature = "loom", loom)))]
105    pub unsafe fn get_mut_unchecked(&self) -> &mut BatchBuf<K, V> {
106        self.inner.with_mut(|ptr| unsafe { &mut *ptr })
107    }
108
109    /// Provides exclusive access to the underlying buffer.
110    ///
111    /// # Safety
112    /// The caller must guarantee that only one thread accesses this slot at a time.
113    /// In DualCache-FF this is enforced by the WORKER_ID TLS invariant.
114    #[inline(always)]
115    #[cfg(any(feature = "loom", loom))]
116    pub unsafe fn get_mut_unchecked(&self) -> &mut BatchBuf<K, V> {
117        self.inner.with_mut(|ptr| unsafe { &mut **ptr })
118    }
119}
120
121unsafe impl<K: Send, V: Send> Send for WorkerSlot<K, V> {}
122unsafe impl<K: Send + Sync, V: Send + Sync> Sync for WorkerSlot<K, V> {}