Skip to main content

dualcache_ff/
lossy_queue.rs

1/// State Turnstile MPSC LossyQueue — zero external dependencies.
2///
3/// Replaces `crossbeam_channel::bounded` with a wait-free, lossy ring buffer
4/// designed specifically for the DualCache-FF Worker→Daemon pipeline.
5///
6/// # Design Rationale
7/// crossbeam_channel is a general-purpose channel with thread wake-up
8/// machinery (Parker/Unparker), select! support, and complex state machines.
9/// DualCache-FF only needs an extreme-speed, lossy, single-direction pipe.
10///
11/// # State Turnstile (per-slot AtomicU8)
12/// Each slot carries a 3-state gate to prevent Daemon from reading
13/// half-written data and to prevent Ring Buffer wraparound corruption:
14///
15///   EMPTY(0) ──[Producer CAS]──► WRITING(1) ──[store]──► READY(2)
16///     ▲                                                        │
17///     └──────────────[Daemon store after read]────────────────┘
18///
19/// # MPSC guarantee
20/// Multiple producers atomically CAS EMPTY→WRITING to claim unique slots.
21/// A single Daemon reads sequentially from `head`. No locks required.
22///
23/// # Lossy guarantee
24/// If a slot is not EMPTY (e.g., WRITING or READY — ring buffer lapped),
25/// the producer immediately returns Err(item). No blocking, ever.
26use crate::sync::cell::UnsafeCell;
27use core::mem::MaybeUninit;
28use crate::sync::atomic::{AtomicBool, AtomicUsize, AtomicU8, Ordering};
29
30#[cfg(not(feature = "std"))]
31use alloc::{boxed::Box, vec::Vec};
32
33use crate::sync::Arc;
34
35// ── Slot state constants ───────────────────────────────────────────────────
36
37const EMPTY: u8 = 0;
38const WRITING: u8 = 1;
39const READY: u8 = 2;
40
41// ── Slot ──────────────────────────────────────────────────────────────────
42
43struct Slot<T> {
44    state: AtomicU8,
45    data: UnsafeCell<MaybeUninit<T>>,
46}
47
48unsafe impl<T: Send> Send for Slot<T> {}
49unsafe impl<T: Send> Sync for Slot<T> {}
50
51// ── LossyQueue ────────────────────────────────────────────────────────────
52
53/// MPSC wait-free lossy ring buffer.
54pub struct LossyQueue<T> {
55    mask: usize,
56    /// Producer cursor — FAA, unbounded (wraps via mask).
57    tail: AtomicUsize,
58    /// Consumer (Daemon) cursor — single-threaded advance.
59    head: AtomicUsize,
60    buffer: Box<[Slot<T>]>,
61}
62
63unsafe impl<T: Send> Send for LossyQueue<T> {}
64unsafe impl<T: Send> Sync for LossyQueue<T> {}
65
66impl<T> LossyQueue<T> {
67    /// Create a new queue with the given capacity (must be a power of two).
68    pub fn new(capacity: usize) -> Self {
69        assert!(
70            capacity.is_power_of_two(),
71            "LossyQueue capacity must be a power of two"
72        );
73        let mut buf = Vec::with_capacity(capacity);
74        for _ in 0..capacity {
75            buf.push(Slot {
76                state: AtomicU8::new(EMPTY),
77                data: UnsafeCell::new(MaybeUninit::uninit()),
78            });
79        }
80        Self {
81            mask: capacity - 1,
82            tail: AtomicUsize::new(0),
83            head: AtomicUsize::new(0),
84            buffer: buf.into_boxed_slice(),
85        }
86    }
87
88    /// Worker path — try to enqueue an item.
89    ///
90    /// Uses FAA to claim a slot index, then CAS EMPTY→WRITING as a physical
91    /// gate. If the slot is occupied (ring buffer lapped or concurrent writer),
92    /// returns `Err(item)` immediately — never blocks.
93    #[inline(always)]
94    pub fn try_send(&self, item: T) -> Result<(), T> {
95        let tail = self.tail.load(Ordering::Relaxed);
96        let head = self.head.load(Ordering::Acquire);
97
98        // Pre-check: if physically full, don't even try to FAA.
99        // This prevents tail from flying away and causing massive overlaps.
100        if tail.wrapping_sub(head) >= self.buffer.len() {
101            return Err(item);
102        }
103
104        // FAA claims a position.
105        let idx = self.tail.fetch_add(1, Ordering::Relaxed) & self.mask;
106        let slot = &self.buffer[idx];
107
108        // Physical gate: only proceed if the slot is truly empty.
109        if slot
110            .state
111            .compare_exchange(EMPTY, WRITING, Ordering::Acquire, Ordering::Relaxed)
112            .is_err()
113        {
114            return Err(item);
115        }
116
117        slot.data.with_mut(|ptr| unsafe { (*ptr).write(item) });
118        slot.state.store(READY, Ordering::Release);
119        Ok(())
120    }
121
122    /// Blocking send for critical commands (Sync, Clear).
123    /// Spins until the item is successfully enqueued.
124    pub fn send_blocking(&self, mut item: T) {
125        #[cfg(feature = "std")]
126        let mut spins = 0;
127        loop {
128            match self.try_send(item) {
129                Ok(_) => return,
130                Err(returned_item) => {
131                    item = returned_item;
132                    #[cfg(feature = "std")]
133                    {
134                        if spins < 100 {
135                            core::hint::spin_loop();
136                            spins += 1;
137                        } else {
138                            #[cfg(any(feature = "loom", loom))]
139                            loom::thread::yield_now();
140                            #[cfg(not(any(feature = "loom", loom)))]
141                            std::thread::yield_now();
142                        }
143                    }
144                    #[cfg(not(feature = "std"))]
145                    {
146                        core::hint::spin_loop();
147                    }
148                }
149            }
150        }
151    }
152
153    /// Daemon path — try to dequeue one item.
154    ///
155    /// Single-consumer: only Daemon ever calls this.
156    /// Reads from `head`, returns `None` if the slot is not yet READY.
157    #[inline(always)]
158    pub fn try_recv(&self) -> Option<T> {
159        let idx = self.head.load(Ordering::Relaxed) & self.mask;
160        let slot = &self.buffer[idx];
161
162        if slot.state.load(Ordering::Acquire) == READY {
163            // Safe read: we are the exclusive consumer.
164            let item = slot.data.with_mut(|ptr| unsafe { (*ptr).assume_init_read() });
165
166            // Reset gate and advance head.
167            slot.state.store(EMPTY, Ordering::Release);
168            self.head.fetch_add(1, Ordering::Relaxed);
169            Some(item)
170        } else {
171            None
172        }
173    }
174}
175
176impl<T> Drop for LossyQueue<T> {
177    fn drop(&mut self) {
178        // Drain any READY items that were never consumed.
179        loop {
180            let idx = self.head.load(Ordering::Relaxed) & self.mask;
181            let slot = &self.buffer[idx];
182            if slot.state.load(Ordering::Acquire) == READY {
183                slot.data.with_mut(|ptr| unsafe { (*ptr).assume_init_drop() });
184                slot.state.store(EMPTY, Ordering::Relaxed);
185                self.head.fetch_add(1, Ordering::Relaxed);
186            } else {
187                break;
188            }
189        }
190    }
191}
192
193// ── OneshotAck ────────────────────────────────────────────────────────────
194
195/// Lightweight one-shot acknowledgment channel.
196///
197/// Replaces `crossbeam_channel::bounded(1)` used for `Sync` and `Clear`
198/// command round-trips. Works in both `std` and `no_std` environments.
199///
200/// The caller creates an `Arc<OneshotAck>`, sends it in a `Command`, and
201/// blocks on `wait()`. The Daemon calls `signal()` after processing.
202pub struct OneshotAck {
203    ready: AtomicBool,
204}
205
206impl OneshotAck {
207    /// Allocate a new, unsignalled ack handle.
208    pub fn new() -> Arc<Self> {
209        Arc::new(Self {
210            ready: AtomicBool::new(false),
211        })
212    }
213
214    /// Daemon: signal that the command has been processed.
215    #[inline(always)]
216    pub fn signal(&self) {
217        self.ready.store(true, Ordering::Release);
218    }
219
220    /// Caller: spin until the signal arrives.
221    ///
222    /// In `std` mode this is a brief spin (Sync/Clear commands are rare).
223    /// After a short spin threshold, it yields to allow the daemon thread to run.
224    /// In `no_std` / RTOS mode the RTOS scheduler preempts the spinning task.
225    #[inline(always)]
226    pub fn wait(&self) {
227        #[cfg(feature = "std")]
228        {
229            let mut spins = 0;
230            while !self.ready.load(Ordering::Acquire) {
231                if spins < 100 {
232                    core::hint::spin_loop();
233                    spins += 1;
234                } else {
235                    #[cfg(any(feature = "loom", loom))]
236                    loom::thread::yield_now();
237                    #[cfg(not(any(feature = "loom", loom)))]
238                    std::thread::yield_now();
239                }
240            }
241        }
242        #[cfg(not(feature = "std"))]
243        {
244            while !self.ready.load(Ordering::Acquire) {
245                core::hint::spin_loop();
246            }
247        }
248    }
249}