Skip to main content

dualcache_ff/
lossy_queue.rs

1/// State Turnstile MPSC LossyQueue — zero external dependencies.
2///
3/// Replaces `crossbeam_channel::bounded` with a wait-free, lossy ring buffer
4/// designed specifically for the DualCache-FF Worker→Daemon pipeline.
5///
6/// # Design Rationale
7/// crossbeam_channel is a general-purpose channel with thread wake-up
8/// machinery (Parker/Unparker), select! support, and complex state machines.
9/// DualCache-FF only needs an extreme-speed, lossy, single-direction pipe.
10///
11/// # State Turnstile (per-slot AtomicU8)
12/// Each slot carries a 3-state gate to prevent Daemon from reading
13/// half-written data and to prevent Ring Buffer wraparound corruption:
14///
15///   EMPTY(0) ──[Producer CAS]──► WRITING(1) ──[store]──► READY(2)
16///     ▲                                                        │
17///     └──────────────[Daemon store after read]────────────────┘
18///
19/// # MPSC guarantee
20/// Multiple producers atomically CAS EMPTY→WRITING to claim unique slots.
21/// A single Daemon reads sequentially from `head`. No locks required.
22///
23/// # Lossy guarantee
24/// If a slot is not EMPTY (e.g., WRITING or READY — ring buffer lapped),
25/// the producer immediately returns Err(item). No blocking, ever.
26use core::cell::UnsafeCell;
27use core::mem::MaybeUninit;
28use core::sync::atomic::{AtomicBool, AtomicUsize, AtomicU8, Ordering};
29
30#[cfg(not(feature = "std"))]
31use alloc::{boxed::Box, sync::Arc, vec::Vec};
32#[cfg(feature = "std")]
33use std::sync::Arc;
34
35// ── Slot state constants ───────────────────────────────────────────────────
36
37const EMPTY: u8 = 0;
38const WRITING: u8 = 1;
39const READY: u8 = 2;
40
41// ── Slot ──────────────────────────────────────────────────────────────────
42
43struct Slot<T> {
44    state: AtomicU8,
45    data: UnsafeCell<MaybeUninit<T>>,
46}
47
48unsafe impl<T: Send> Send for Slot<T> {}
49unsafe impl<T: Send> Sync for Slot<T> {}
50
51// ── LossyQueue ────────────────────────────────────────────────────────────
52
53/// MPSC wait-free lossy ring buffer.
54pub struct LossyQueue<T> {
55    mask: usize,
56    /// Producer cursor — FAA, unbounded (wraps via mask).
57    tail: AtomicUsize,
58    /// Consumer (Daemon) cursor — single-threaded advance.
59    head: AtomicUsize,
60    buffer: Box<[Slot<T>]>,
61}
62
63unsafe impl<T: Send> Send for LossyQueue<T> {}
64unsafe impl<T: Send> Sync for LossyQueue<T> {}
65
66impl<T> LossyQueue<T> {
67    /// Create a new queue with the given capacity (must be a power of two).
68    pub fn new(capacity: usize) -> Self {
69        assert!(
70            capacity.is_power_of_two(),
71            "LossyQueue capacity must be a power of two"
72        );
73        let mut buf = Vec::with_capacity(capacity);
74        for _ in 0..capacity {
75            buf.push(Slot {
76                state: AtomicU8::new(EMPTY),
77                data: UnsafeCell::new(MaybeUninit::uninit()),
78            });
79        }
80        Self {
81            mask: capacity - 1,
82            tail: AtomicUsize::new(0),
83            head: AtomicUsize::new(0),
84            buffer: buf.into_boxed_slice(),
85        }
86    }
87
88    /// Worker path — try to enqueue an item.
89    ///
90    /// Uses FAA to claim a slot index, then CAS EMPTY→WRITING as a physical
91    /// gate. If the slot is occupied (ring buffer lapped or concurrent writer),
92    /// returns `Err(item)` immediately — never blocks.
93    #[inline(always)]
94    pub fn try_send(&self, item: T) -> Result<(), T> {
95        let tail = self.tail.load(Ordering::Relaxed);
96        let head = self.head.load(Ordering::Acquire);
97
98        // Pre-check: if physically full, don't even try to FAA.
99        // This prevents tail from flying away and causing massive overlaps.
100        if tail.wrapping_sub(head) >= self.buffer.len() {
101            return Err(item);
102        }
103
104        // FAA claims a position.
105        let idx = self.tail.fetch_add(1, Ordering::Relaxed) & self.mask;
106        let slot = &self.buffer[idx];
107
108        // Physical gate: only proceed if the slot is truly empty.
109        if slot
110            .state
111            .compare_exchange(EMPTY, WRITING, Ordering::Acquire, Ordering::Relaxed)
112            .is_err()
113        {
114            return Err(item);
115        }
116
117        unsafe { (*slot.data.get()).write(item) };
118        slot.state.store(READY, Ordering::Release);
119        Ok(())
120    }
121
122    /// Blocking send for critical commands (Sync, Clear).
123    /// Spins until the item is successfully enqueued.
124    pub fn send_blocking(&self, mut item: T) {
125        loop {
126            match self.try_send(item) {
127                Ok(_) => return,
128                Err(returned_item) => {
129                    item = returned_item;
130                    core::hint::spin_loop();
131                }
132            }
133        }
134    }
135
136    /// Daemon path — try to dequeue one item.
137    ///
138    /// Single-consumer: only Daemon ever calls this.
139    /// Reads from `head`, returns `None` if the slot is not yet READY.
140    #[inline(always)]
141    pub fn try_recv(&self) -> Option<T> {
142        let idx = self.head.load(Ordering::Relaxed) & self.mask;
143        let slot = &self.buffer[idx];
144
145        if slot.state.load(Ordering::Acquire) == READY {
146            // Safe read: we are the exclusive consumer.
147            let item = unsafe { (*slot.data.get()).assume_init_read() };
148
149            // Reset gate and advance head.
150            slot.state.store(EMPTY, Ordering::Release);
151            self.head.fetch_add(1, Ordering::Relaxed);
152            Some(item)
153        } else {
154            None
155        }
156    }
157}
158
159impl<T> Drop for LossyQueue<T> {
160    fn drop(&mut self) {
161        // Drain any READY items that were never consumed.
162        loop {
163            let idx = self.head.load(Ordering::Relaxed) & self.mask;
164            let slot = &self.buffer[idx];
165            if slot.state.load(Ordering::Acquire) == READY {
166                unsafe { (*slot.data.get()).assume_init_drop() };
167                slot.state.store(EMPTY, Ordering::Relaxed);
168                self.head.fetch_add(1, Ordering::Relaxed);
169            } else {
170                break;
171            }
172        }
173    }
174}
175
176// ── OneshotAck ────────────────────────────────────────────────────────────
177
178/// Lightweight one-shot acknowledgment channel.
179///
180/// Replaces `crossbeam_channel::bounded(1)` used for `Sync` and `Clear`
181/// command round-trips. Works in both `std` and `no_std` environments.
182///
183/// The caller creates an `Arc<OneshotAck>`, sends it in a `Command`, and
184/// blocks on `wait()`. The Daemon calls `signal()` after processing.
185pub struct OneshotAck {
186    ready: AtomicBool,
187}
188
189impl OneshotAck {
190    /// Allocate a new, unsignalled ack handle.
191    pub fn new() -> Arc<Self> {
192        Arc::new(Self {
193            ready: AtomicBool::new(false),
194        })
195    }
196
197    /// Daemon: signal that the command has been processed.
198    #[inline(always)]
199    pub fn signal(&self) {
200        self.ready.store(true, Ordering::Release);
201    }
202
203    /// Caller: spin until the signal arrives.
204    ///
205    /// In `std` mode this is a brief spin (Sync/Clear commands are rare).
206    /// In `no_std` / RTOS mode the RTOS scheduler preempts the spinning task.
207    #[inline(always)]
208    pub fn wait(&self) {
209        while !self.ready.load(Ordering::Acquire) {
210            core::hint::spin_loop();
211        }
212    }
213}