dualcache_ff/lossy_queue.rs
1/// State Turnstile MPSC LossyQueue — zero external dependencies.
2///
3/// Replaces `crossbeam_channel::bounded` with a wait-free, lossy ring buffer
4/// designed specifically for the DualCache-FF Worker→Daemon pipeline.
5///
6/// # Design Rationale
7/// crossbeam_channel is a general-purpose channel with thread wake-up
8/// machinery (Parker/Unparker), select! support, and complex state machines.
9/// DualCache-FF only needs an extreme-speed, lossy, single-direction pipe.
10///
11/// # State Turnstile (per-slot AtomicU8)
12/// Each slot carries a 3-state gate to prevent Daemon from reading
13/// half-written data and to prevent Ring Buffer wraparound corruption:
14///
15/// EMPTY(0) ──[Producer CAS]──► WRITING(1) ──[store]──► READY(2)
16/// ▲ │
17/// └──────────────[Daemon store after read]────────────────┘
18///
19/// # MPSC guarantee
20/// Multiple producers atomically CAS EMPTY→WRITING to claim unique slots.
21/// A single Daemon reads sequentially from `head`. No locks required.
22///
23/// # Lossy guarantee
24/// If a slot is not EMPTY (e.g., WRITING or READY — ring buffer lapped),
25/// the producer immediately returns Err(item). No blocking, ever.
26use core::cell::UnsafeCell;
27use core::mem::MaybeUninit;
28use core::sync::atomic::{AtomicBool, AtomicUsize, AtomicU8, Ordering};
29
30#[cfg(not(feature = "std"))]
31use alloc::{boxed::Box, sync::Arc, vec::Vec};
32#[cfg(feature = "std")]
33use std::sync::Arc;
34
35// ── Slot state constants ───────────────────────────────────────────────────
36
37const EMPTY: u8 = 0;
38const WRITING: u8 = 1;
39const READY: u8 = 2;
40
41// ── Slot ──────────────────────────────────────────────────────────────────
42
43struct Slot<T> {
44 state: AtomicU8,
45 data: UnsafeCell<MaybeUninit<T>>,
46}
47
48unsafe impl<T: Send> Send for Slot<T> {}
49unsafe impl<T: Send> Sync for Slot<T> {}
50
51// ── LossyQueue ────────────────────────────────────────────────────────────
52
53/// MPSC wait-free lossy ring buffer.
54pub struct LossyQueue<T> {
55 mask: usize,
56 /// Producer cursor — FAA, unbounded (wraps via mask).
57 tail: AtomicUsize,
58 /// Consumer (Daemon) cursor — single-threaded advance.
59 head: AtomicUsize,
60 buffer: Box<[Slot<T>]>,
61}
62
63unsafe impl<T: Send> Send for LossyQueue<T> {}
64unsafe impl<T: Send> Sync for LossyQueue<T> {}
65
66impl<T> LossyQueue<T> {
67 /// Create a new queue with the given capacity (must be a power of two).
68 pub fn new(capacity: usize) -> Self {
69 assert!(
70 capacity.is_power_of_two(),
71 "LossyQueue capacity must be a power of two"
72 );
73 let mut buf = Vec::with_capacity(capacity);
74 for _ in 0..capacity {
75 buf.push(Slot {
76 state: AtomicU8::new(EMPTY),
77 data: UnsafeCell::new(MaybeUninit::uninit()),
78 });
79 }
80 Self {
81 mask: capacity - 1,
82 tail: AtomicUsize::new(0),
83 head: AtomicUsize::new(0),
84 buffer: buf.into_boxed_slice(),
85 }
86 }
87
88 /// Worker path — try to enqueue an item.
89 ///
90 /// Uses FAA to claim a slot index, then CAS EMPTY→WRITING as a physical
91 /// gate. If the slot is occupied (ring buffer lapped or concurrent writer),
92 /// returns `Err(item)` immediately — never blocks.
93 #[inline(always)]
94 pub fn try_send(&self, item: T) -> Result<(), T> {
95 let tail = self.tail.load(Ordering::Relaxed);
96 let head = self.head.load(Ordering::Acquire);
97
98 // Pre-check: if physically full, don't even try to FAA.
99 // This prevents tail from flying away and causing massive overlaps.
100 if tail.wrapping_sub(head) >= self.buffer.len() {
101 return Err(item);
102 }
103
104 // FAA claims a position.
105 let idx = self.tail.fetch_add(1, Ordering::Relaxed) & self.mask;
106 let slot = &self.buffer[idx];
107
108 // Physical gate: only proceed if the slot is truly empty.
109 if slot
110 .state
111 .compare_exchange(EMPTY, WRITING, Ordering::Acquire, Ordering::Relaxed)
112 .is_err()
113 {
114 return Err(item);
115 }
116
117 unsafe { (*slot.data.get()).write(item) };
118 slot.state.store(READY, Ordering::Release);
119 Ok(())
120 }
121
122 /// Blocking send for critical commands (Sync, Clear).
123 /// Spins until the item is successfully enqueued.
124 pub fn send_blocking(&self, mut item: T) {
125 loop {
126 match self.try_send(item) {
127 Ok(_) => return,
128 Err(returned_item) => {
129 item = returned_item;
130 core::hint::spin_loop();
131 }
132 }
133 }
134 }
135
136 /// Daemon path — try to dequeue one item.
137 ///
138 /// Single-consumer: only Daemon ever calls this.
139 /// Reads from `head`, returns `None` if the slot is not yet READY.
140 #[inline(always)]
141 pub fn try_recv(&self) -> Option<T> {
142 let idx = self.head.load(Ordering::Relaxed) & self.mask;
143 let slot = &self.buffer[idx];
144
145 if slot.state.load(Ordering::Acquire) == READY {
146 // Safe read: we are the exclusive consumer.
147 let item = unsafe { (*slot.data.get()).assume_init_read() };
148
149 // Reset gate and advance head.
150 slot.state.store(EMPTY, Ordering::Release);
151 self.head.fetch_add(1, Ordering::Relaxed);
152 Some(item)
153 } else {
154 None
155 }
156 }
157}
158
159impl<T> Drop for LossyQueue<T> {
160 fn drop(&mut self) {
161 // Drain any READY items that were never consumed.
162 loop {
163 let idx = self.head.load(Ordering::Relaxed) & self.mask;
164 let slot = &self.buffer[idx];
165 if slot.state.load(Ordering::Acquire) == READY {
166 unsafe { (*slot.data.get()).assume_init_drop() };
167 slot.state.store(EMPTY, Ordering::Relaxed);
168 self.head.fetch_add(1, Ordering::Relaxed);
169 } else {
170 break;
171 }
172 }
173 }
174}
175
176// ── OneshotAck ────────────────────────────────────────────────────────────
177
178/// Lightweight one-shot acknowledgment channel.
179///
180/// Replaces `crossbeam_channel::bounded(1)` used for `Sync` and `Clear`
181/// command round-trips. Works in both `std` and `no_std` environments.
182///
183/// The caller creates an `Arc<OneshotAck>`, sends it in a `Command`, and
184/// blocks on `wait()`. The Daemon calls `signal()` after processing.
185pub struct OneshotAck {
186 ready: AtomicBool,
187}
188
189impl OneshotAck {
190 /// Allocate a new, unsignalled ack handle.
191 pub fn new() -> Arc<Self> {
192 Arc::new(Self {
193 ready: AtomicBool::new(false),
194 })
195 }
196
197 /// Daemon: signal that the command has been processed.
198 #[inline(always)]
199 pub fn signal(&self) {
200 self.ready.store(true, Ordering::Release);
201 }
202
203 /// Caller: spin until the signal arrives.
204 ///
205 /// In `std` mode this is a brief spin (Sync/Clear commands are rare).
206 /// In `no_std` / RTOS mode the RTOS scheduler preempts the spinning task.
207 #[inline(always)]
208 pub fn wait(&self) {
209 while !self.ready.load(Ordering::Acquire) {
210 core::hint::spin_loop();
211 }
212 }
213}