1use std::cell::UnsafeCell;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24
25use crate::error::{BridgeError, Result};
26use crate::metrics::BridgeMetrics;
27
28const CACHE_LINE: usize = 64;
30
31struct Shared<T> {
39 tail: CacheAligned<AtomicU64>,
41
42 head: CacheAligned<AtomicU64>,
44
45 slots: Box<[UnsafeCell<Option<T>>]>,
47
48 capacity: usize,
50
51 mask: usize,
53
54 disconnected: AtomicBool,
56
57 metrics: BridgeMetrics,
59}
60
61unsafe impl<T: Send> Send for Shared<T> {}
66unsafe impl<T: Send> Sync for Shared<T> {}
67
68#[repr(align(64))]
70struct CacheAligned<T> {
71 value: T,
72 _pad: [u8; CACHE_LINE - std::mem::size_of::<AtomicU64>()],
73}
74
75impl<T: Default> CacheAligned<T> {
76 fn new(value: T) -> Self {
77 Self {
78 value,
79 _pad: [0u8; CACHE_LINE - std::mem::size_of::<AtomicU64>()],
80 }
81 }
82}
83
84pub struct Producer<T> {
89 shared: Arc<Shared<T>>,
90
91 cached_head: u64,
94}
95
96pub struct Consumer<T> {
101 shared: Arc<Shared<T>>,
102
103 cached_tail: u64,
106}
107
108pub struct RingBuffer;
115
116impl RingBuffer {
117 pub fn channel<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
118 assert!(capacity > 0, "ring buffer capacity must be > 0");
119
120 let capacity = capacity.next_power_of_two();
121 let mask = capacity - 1;
122
123 let mut slots = Vec::with_capacity(capacity);
124 for _ in 0..capacity {
125 slots.push(UnsafeCell::new(None));
126 }
127
128 let shared = Arc::new(Shared {
129 tail: CacheAligned::new(AtomicU64::new(0)),
130 head: CacheAligned::new(AtomicU64::new(0)),
131 slots: slots.into_boxed_slice(),
132 capacity,
133 mask,
134 disconnected: AtomicBool::new(false),
135 metrics: BridgeMetrics::new(),
136 });
137
138 let producer = Producer {
139 shared: Arc::clone(&shared),
140 cached_head: 0,
141 };
142
143 let consumer = Consumer {
144 shared,
145 cached_tail: 0,
146 };
147
148 (producer, consumer)
149 }
150}
151
152impl<T> Producer<T> {
153 pub fn try_push(&mut self, value: T) -> Result<()> {
156 if self.shared.disconnected.load(Ordering::Relaxed) {
157 return Err(BridgeError::Disconnected { side: "consumer" });
158 }
159
160 let tail = self.shared.tail.value.load(Ordering::Relaxed);
161
162 if tail.wrapping_sub(self.cached_head) >= self.shared.capacity as u64 {
164 self.cached_head = self.shared.head.value.load(Ordering::Acquire);
166
167 if tail.wrapping_sub(self.cached_head) >= self.shared.capacity as u64 {
168 self.shared.metrics.record_full();
169 return Err(BridgeError::Full {
170 capacity: self.shared.capacity,
171 pending: (tail.wrapping_sub(self.cached_head)) as usize,
172 });
173 }
174 }
175
176 let idx = (tail as usize) & self.shared.mask;
177
178 unsafe {
182 (*self.shared.slots[idx].get()) = Some(value);
183 }
184
185 self.shared
187 .tail
188 .value
189 .store(tail.wrapping_add(1), Ordering::Release);
190
191 self.shared.metrics.record_push();
192 Ok(())
193 }
194
195 pub fn utilization(&self) -> u8 {
197 let tail = self.shared.tail.value.load(Ordering::Relaxed);
198 let head = self.shared.head.value.load(Ordering::Relaxed);
199 let pending = tail.wrapping_sub(head) as usize;
200 ((pending * 100) / self.shared.capacity) as u8
201 }
202
203 pub fn len(&self) -> usize {
205 let tail = self.shared.tail.value.load(Ordering::Relaxed);
206 let head = self.shared.head.value.load(Ordering::Relaxed);
207 tail.wrapping_sub(head) as usize
208 }
209
210 pub fn is_empty(&self) -> bool {
212 self.len() == 0
213 }
214
215 pub fn capacity(&self) -> usize {
217 self.shared.capacity
218 }
219
220 pub fn metrics(&self) -> &BridgeMetrics {
222 &self.shared.metrics
223 }
224}
225
226impl<T> Consumer<T> {
227 pub fn try_pop(&mut self) -> Result<T> {
230 let head = self.shared.head.value.load(Ordering::Relaxed);
231
232 if head == self.cached_tail {
234 self.cached_tail = self.shared.tail.value.load(Ordering::Acquire);
236
237 if head == self.cached_tail {
238 if self.shared.disconnected.load(Ordering::Relaxed) {
239 return Err(BridgeError::Disconnected { side: "producer" });
240 }
241 return Err(BridgeError::Empty);
242 }
243 }
244
245 let idx = (head as usize) & self.shared.mask;
246
247 let value = unsafe { (*self.shared.slots[idx].get()).take() };
251
252 self.shared
254 .head
255 .value
256 .store(head.wrapping_add(1), Ordering::Release);
257
258 self.shared.metrics.record_pop();
259
260 Ok(value.expect("BUG: slot was None despite tail > head"))
263 }
264
265 pub fn drain_into(&mut self, buf: &mut Vec<T>, max: usize) -> usize {
270 let head = self.shared.head.value.load(Ordering::Relaxed);
271 self.cached_tail = self.shared.tail.value.load(Ordering::Acquire);
272
273 let available = self.cached_tail.wrapping_sub(head) as usize;
274 let count = available.min(max);
275
276 for i in 0..count {
277 let idx = ((head.wrapping_add(i as u64)) as usize) & self.shared.mask;
278 let value = unsafe { (*self.shared.slots[idx].get()).take() };
280 buf.push(value.expect("BUG: slot was None during drain"));
281 }
282
283 if count > 0 {
284 self.shared
285 .head
286 .value
287 .store(head.wrapping_add(count as u64), Ordering::Release);
288 self.shared.metrics.record_pops(count as u64);
289 }
290
291 count
292 }
293
294 pub fn len(&self) -> usize {
296 let tail = self.shared.tail.value.load(Ordering::Relaxed);
297 let head = self.shared.head.value.load(Ordering::Relaxed);
298 tail.wrapping_sub(head) as usize
299 }
300
301 pub fn is_empty(&self) -> bool {
303 self.len() == 0
304 }
305
306 pub fn metrics(&self) -> &BridgeMetrics {
308 &self.shared.metrics
309 }
310}
311
312impl<T> Drop for Producer<T> {
313 fn drop(&mut self) {
314 self.shared.disconnected.store(true, Ordering::Release);
315 }
316}
317
318impl<T> Drop for Consumer<T> {
319 fn drop(&mut self) {
320 self.shared.disconnected.store(true, Ordering::Release);
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn basic_push_pop() {
330 let (mut tx, mut rx) = RingBuffer::channel::<u64>(4);
331
332 tx.try_push(1).unwrap();
333 tx.try_push(2).unwrap();
334 tx.try_push(3).unwrap();
335 tx.try_push(4).unwrap();
336
337 assert!(matches!(tx.try_push(5), Err(BridgeError::Full { .. })));
339
340 assert_eq!(rx.try_pop().unwrap(), 1);
341 assert_eq!(rx.try_pop().unwrap(), 2);
342
343 tx.try_push(5).unwrap();
345 tx.try_push(6).unwrap();
346
347 assert_eq!(rx.try_pop().unwrap(), 3);
348 assert_eq!(rx.try_pop().unwrap(), 4);
349 assert_eq!(rx.try_pop().unwrap(), 5);
350 assert_eq!(rx.try_pop().unwrap(), 6);
351
352 assert!(matches!(rx.try_pop(), Err(BridgeError::Empty)));
353 }
354
355 #[test]
356 fn power_of_two_rounding() {
357 let (tx, _rx) = RingBuffer::channel::<u64>(3);
358 assert_eq!(tx.capacity(), 4);
359
360 let (tx, _rx) = RingBuffer::channel::<u64>(5);
361 assert_eq!(tx.capacity(), 8);
362
363 let (tx, _rx) = RingBuffer::channel::<u64>(8);
364 assert_eq!(tx.capacity(), 8);
365 }
366
367 #[test]
368 fn utilization_tracking() {
369 let (mut tx, mut rx) = RingBuffer::channel::<u64>(8);
370
371 assert_eq!(tx.utilization(), 0);
372
373 for i in 0..6 {
374 tx.try_push(i).unwrap();
375 }
376 assert_eq!(tx.utilization(), 75);
377
378 rx.try_pop().unwrap();
379 rx.try_pop().unwrap();
380 assert_eq!(tx.utilization(), 50);
381 }
382
383 #[test]
384 fn drain_into_batch() {
385 let (mut tx, mut rx) = RingBuffer::channel::<u64>(16);
386
387 for i in 0..10 {
388 tx.try_push(i).unwrap();
389 }
390
391 let mut buf = Vec::new();
392 let drained = rx.drain_into(&mut buf, 5);
393 assert_eq!(drained, 5);
394 assert_eq!(buf, vec![0, 1, 2, 3, 4]);
395
396 let drained = rx.drain_into(&mut buf, 100);
397 assert_eq!(drained, 5);
398 assert_eq!(buf, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
399 }
400
401 #[test]
402 fn disconnect_detection_producer_drops() {
403 let (tx, mut rx) = RingBuffer::channel::<u64>(4);
404 drop(tx);
405
406 assert!(matches!(
407 rx.try_pop(),
408 Err(BridgeError::Disconnected { side: "producer" })
409 ));
410 }
411
412 #[test]
413 fn disconnect_detection_consumer_drops() {
414 let (mut tx, rx) = RingBuffer::channel::<u64>(4);
415 drop(rx);
416
417 assert!(matches!(
418 tx.try_push(1),
419 Err(BridgeError::Disconnected { side: "consumer" })
420 ));
421 }
422
423 #[test]
424 fn wrapping_behavior() {
425 let (mut tx, mut rx) = RingBuffer::channel::<u64>(4);
427
428 for round in 0..1000u64 {
429 for i in 0..4 {
430 tx.try_push(round * 4 + i).unwrap();
431 }
432 for i in 0..4 {
433 assert_eq!(rx.try_pop().unwrap(), round * 4 + i);
434 }
435 }
436 }
437
438 #[test]
439 fn metrics_counting() {
440 let (mut tx, mut rx) = RingBuffer::channel::<u64>(8);
441
442 for i in 0..5 {
443 tx.try_push(i).unwrap();
444 }
445 assert_eq!(tx.metrics().pushes(), 5);
446
447 for _ in 0..3 {
448 rx.try_pop().unwrap();
449 }
450 assert_eq!(rx.metrics().pops(), 3);
451 }
452}