dualcache_ff/lossy_queue.rs
1/// State Turnstile MPSC LossyQueue — zero external dependencies.
2///
3/// Replaces `crossbeam_channel::bounded` with a wait-free, lossy ring buffer
4/// designed specifically for the DualCache-FF Worker→Daemon pipeline.
5///
6/// # Design Rationale
7/// crossbeam_channel is a general-purpose channel with thread wake-up
8/// machinery (Parker/Unparker), select! support, and complex state machines.
9/// DualCache-FF only needs an extreme-speed, lossy, single-direction pipe.
10///
11/// # State Turnstile (per-slot AtomicU8)
12/// Each slot carries a 3-state gate to prevent Daemon from reading
13/// half-written data and to prevent Ring Buffer wraparound corruption:
14///
15/// EMPTY(0) ──[Producer CAS]──► WRITING(1) ──[store]──► READY(2)
16/// ▲ │
17/// └──────────────[Daemon store after read]────────────────┘
18///
19/// # MPSC guarantee
20/// Multiple producers atomically CAS EMPTY→WRITING to claim unique slots.
21/// A single Daemon reads sequentially from `head`. No locks required.
22///
23/// # Lossy guarantee
24/// If a slot is not EMPTY (e.g., WRITING or READY — ring buffer lapped),
25/// the producer immediately returns Err(item). No blocking, ever.
26use crate::sync::cell::UnsafeCell;
27use core::mem::MaybeUninit;
28use crate::sync::atomic::{AtomicBool, AtomicUsize, AtomicU8, Ordering};
29
30#[cfg(not(feature = "std"))]
31use alloc::{boxed::Box, vec::Vec};
32
33use crate::sync::Arc;
34
35// ── Slot state constants ───────────────────────────────────────────────────
36
37const EMPTY: u8 = 0;
38const WRITING: u8 = 1;
39const READY: u8 = 2;
40
41// ── Slot ──────────────────────────────────────────────────────────────────
42
43struct Slot<T> {
44 state: AtomicU8,
45 data: UnsafeCell<MaybeUninit<T>>,
46}
47
48unsafe impl<T: Send> Send for Slot<T> {}
49unsafe impl<T: Send> Sync for Slot<T> {}
50
51// ── LossyQueue ────────────────────────────────────────────────────────────
52
53/// MPSC wait-free lossy ring buffer.
54pub struct LossyQueue<T> {
55 mask: usize,
56 /// Producer cursor — FAA, unbounded (wraps via mask).
57 tail: AtomicUsize,
58 /// Consumer (Daemon) cursor — single-threaded advance.
59 head: AtomicUsize,
60 buffer: Box<[Slot<T>]>,
61}
62
63unsafe impl<T: Send> Send for LossyQueue<T> {}
64unsafe impl<T: Send> Sync for LossyQueue<T> {}
65
66impl<T> LossyQueue<T> {
67 /// Create a new queue with the given capacity (must be a power of two).
68 pub fn new(capacity: usize) -> Self {
69 assert!(
70 capacity.is_power_of_two(),
71 "LossyQueue capacity must be a power of two"
72 );
73 let mut buf = Vec::with_capacity(capacity);
74 for _ in 0..capacity {
75 buf.push(Slot {
76 state: AtomicU8::new(EMPTY),
77 data: UnsafeCell::new(MaybeUninit::uninit()),
78 });
79 }
80 Self {
81 mask: capacity - 1,
82 tail: AtomicUsize::new(0),
83 head: AtomicUsize::new(0),
84 buffer: buf.into_boxed_slice(),
85 }
86 }
87
88 /// Worker path — try to enqueue an item.
89 ///
90 /// Uses FAA to claim a slot index, then CAS EMPTY→WRITING as a physical
91 /// gate. If the slot is occupied (ring buffer lapped or concurrent writer),
92 /// returns `Err(item)` immediately — never blocks.
93 #[inline(always)]
94 pub fn try_send(&self, item: T) -> Result<(), T> {
95 let tail = self.tail.load(Ordering::Relaxed);
96 let head = self.head.load(Ordering::Acquire);
97
98 // Pre-check: if physically full, don't even try to FAA.
99 // This prevents tail from flying away and causing massive overlaps.
100 if tail.wrapping_sub(head) >= self.buffer.len() {
101 return Err(item);
102 }
103
104 // FAA claims a position.
105 let idx = self.tail.fetch_add(1, Ordering::Relaxed) & self.mask;
106 let slot = &self.buffer[idx];
107
108 // Physical gate: only proceed if the slot is truly empty.
109 if slot
110 .state
111 .compare_exchange(EMPTY, WRITING, Ordering::Acquire, Ordering::Relaxed)
112 .is_err()
113 {
114 return Err(item);
115 }
116
117 slot.data.with_mut(|ptr| unsafe { (*ptr).write(item) });
118 slot.state.store(READY, Ordering::Release);
119 Ok(())
120 }
121
122 /// Blocking send for critical commands (Sync, Clear).
123 /// Spins until the item is successfully enqueued.
124 pub fn send_blocking(&self, mut item: T) {
125 #[cfg(feature = "std")]
126 let mut spins = 0;
127 loop {
128 match self.try_send(item) {
129 Ok(_) => return,
130 Err(returned_item) => {
131 item = returned_item;
132 #[cfg(feature = "std")]
133 {
134 if spins < 100 {
135 core::hint::spin_loop();
136 spins += 1;
137 } else {
138 #[cfg(any(feature = "loom", loom))]
139 loom::thread::yield_now();
140 #[cfg(not(any(feature = "loom", loom)))]
141 std::thread::yield_now();
142 }
143 }
144 #[cfg(not(feature = "std"))]
145 {
146 core::hint::spin_loop();
147 }
148 }
149 }
150 }
151 }
152
153 /// Daemon path — try to dequeue one item.
154 ///
155 /// Single-consumer: only Daemon ever calls this.
156 /// Reads from `head`, returns `None` if the slot is not yet READY.
157 #[inline(always)]
158 pub fn try_recv(&self) -> Option<T> {
159 let idx = self.head.load(Ordering::Relaxed) & self.mask;
160 let slot = &self.buffer[idx];
161
162 if slot.state.load(Ordering::Acquire) == READY {
163 // Safe read: we are the exclusive consumer.
164 let item = slot.data.with_mut(|ptr| unsafe { (*ptr).assume_init_read() });
165
166 // Reset gate and advance head.
167 slot.state.store(EMPTY, Ordering::Release);
168 self.head.fetch_add(1, Ordering::Relaxed);
169 Some(item)
170 } else {
171 None
172 }
173 }
174}
175
176impl<T> Drop for LossyQueue<T> {
177 fn drop(&mut self) {
178 // Drain any READY items that were never consumed.
179 loop {
180 let idx = self.head.load(Ordering::Relaxed) & self.mask;
181 let slot = &self.buffer[idx];
182 if slot.state.load(Ordering::Acquire) == READY {
183 slot.data.with_mut(|ptr| unsafe { (*ptr).assume_init_drop() });
184 slot.state.store(EMPTY, Ordering::Relaxed);
185 self.head.fetch_add(1, Ordering::Relaxed);
186 } else {
187 break;
188 }
189 }
190 }
191}
192
193// ── OneshotAck ────────────────────────────────────────────────────────────
194
195/// Lightweight one-shot acknowledgment channel.
196///
197/// Replaces `crossbeam_channel::bounded(1)` used for `Sync` and `Clear`
198/// command round-trips. Works in both `std` and `no_std` environments.
199///
200/// The caller creates an `Arc<OneshotAck>`, sends it in a `Command`, and
201/// blocks on `wait()`. The Daemon calls `signal()` after processing.
202pub struct OneshotAck {
203 ready: AtomicBool,
204}
205
206impl OneshotAck {
207 /// Allocate a new, unsignalled ack handle.
208 pub fn new() -> Arc<Self> {
209 Arc::new(Self {
210 ready: AtomicBool::new(false),
211 })
212 }
213
214 /// Daemon: signal that the command has been processed.
215 #[inline(always)]
216 pub fn signal(&self) {
217 self.ready.store(true, Ordering::Release);
218 }
219
220 /// Caller: spin until the signal arrives.
221 ///
222 /// In `std` mode this is a brief spin (Sync/Clear commands are rare).
223 /// After a short spin threshold, it yields to allow the daemon thread to run.
224 /// In `no_std` / RTOS mode the RTOS scheduler preempts the spinning task.
225 #[inline(always)]
226 pub fn wait(&self) {
227 #[cfg(feature = "std")]
228 {
229 let mut spins = 0;
230 while !self.ready.load(Ordering::Acquire) {
231 if spins < 100 {
232 core::hint::spin_loop();
233 spins += 1;
234 } else {
235 #[cfg(any(feature = "loom", loom))]
236 loom::thread::yield_now();
237 #[cfg(not(any(feature = "loom", loom)))]
238 std::thread::yield_now();
239 }
240 }
241 }
242 #[cfg(not(feature = "std"))]
243 {
244 while !self.ready.load(Ordering::Acquire) {
245 core::hint::spin_loop();
246 }
247 }
248 }
249}