1use std::cell::UnsafeCell;
24use std::sync::Arc;
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26
27use crate::error::{BridgeError, Result};
28use crate::metrics::BridgeMetrics;
29
30const CACHE_LINE: usize = 64;
32
33struct Shared<T> {
41 tail: CacheAligned<AtomicU64>,
43
44 head: CacheAligned<AtomicU64>,
46
47 slots: Box<[UnsafeCell<Option<T>>]>,
49
50 capacity: usize,
52
53 mask: usize,
55
56 disconnected: AtomicBool,
58
59 metrics: BridgeMetrics,
61}
62
63unsafe impl<T: Send> Send for Shared<T> {}
68unsafe impl<T: Send> Sync for Shared<T> {}
69
70#[repr(align(64))]
72struct CacheAligned<T> {
73 value: T,
74 _pad: [u8; CACHE_LINE - std::mem::size_of::<AtomicU64>()],
75}
76
77impl<T: Default> CacheAligned<T> {
78 fn new(value: T) -> Self {
79 Self {
80 value,
81 _pad: [0u8; CACHE_LINE - std::mem::size_of::<AtomicU64>()],
82 }
83 }
84}
85
86pub struct Producer<T> {
91 shared: Arc<Shared<T>>,
92
93 cached_head: u64,
96}
97
98pub struct Consumer<T> {
103 shared: Arc<Shared<T>>,
104
105 cached_tail: u64,
108}
109
110pub struct RingBuffer;
117
118impl RingBuffer {
119 pub fn channel<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
120 assert!(capacity > 0, "ring buffer capacity must be > 0");
121
122 let capacity = capacity.next_power_of_two();
123 let mask = capacity - 1;
124
125 let mut slots = Vec::with_capacity(capacity);
126 for _ in 0..capacity {
127 slots.push(UnsafeCell::new(None));
128 }
129
130 let shared = Arc::new(Shared {
131 tail: CacheAligned::new(AtomicU64::new(0)),
132 head: CacheAligned::new(AtomicU64::new(0)),
133 slots: slots.into_boxed_slice(),
134 capacity,
135 mask,
136 disconnected: AtomicBool::new(false),
137 metrics: BridgeMetrics::new(),
138 });
139
140 let producer = Producer {
141 shared: Arc::clone(&shared),
142 cached_head: 0,
143 };
144
145 let consumer = Consumer {
146 shared,
147 cached_tail: 0,
148 };
149
150 (producer, consumer)
151 }
152}
153
154impl<T> Producer<T> {
155 pub fn try_push(&mut self, value: T) -> Result<()> {
158 if self.shared.disconnected.load(Ordering::Relaxed) {
159 return Err(BridgeError::Disconnected { side: "consumer" });
160 }
161
162 let tail = self.shared.tail.value.load(Ordering::Relaxed);
163
164 if tail.wrapping_sub(self.cached_head) >= self.shared.capacity as u64 {
166 self.cached_head = self.shared.head.value.load(Ordering::Acquire);
168
169 if tail.wrapping_sub(self.cached_head) >= self.shared.capacity as u64 {
170 self.shared.metrics.record_full();
171 return Err(BridgeError::Full {
172 capacity: self.shared.capacity,
173 pending: (tail.wrapping_sub(self.cached_head)) as usize,
174 });
175 }
176 }
177
178 let idx = (tail as usize) & self.shared.mask;
179
180 unsafe {
184 (*self.shared.slots[idx].get()) = Some(value);
185 }
186
187 self.shared
189 .tail
190 .value
191 .store(tail.wrapping_add(1), Ordering::Release);
192
193 self.shared.metrics.record_push();
194 Ok(())
195 }
196
197 pub fn utilization(&self) -> u8 {
199 let tail = self.shared.tail.value.load(Ordering::Relaxed);
200 let head = self.shared.head.value.load(Ordering::Relaxed);
201 let pending = tail.wrapping_sub(head) as usize;
202 ((pending * 100) / self.shared.capacity) as u8
203 }
204
205 pub fn len(&self) -> usize {
207 let tail = self.shared.tail.value.load(Ordering::Relaxed);
208 let head = self.shared.head.value.load(Ordering::Relaxed);
209 tail.wrapping_sub(head) as usize
210 }
211
212 pub fn is_empty(&self) -> bool {
214 self.len() == 0
215 }
216
217 pub fn capacity(&self) -> usize {
219 self.shared.capacity
220 }
221
222 pub fn metrics(&self) -> &BridgeMetrics {
224 &self.shared.metrics
225 }
226}
227
228impl<T> Consumer<T> {
229 pub fn try_pop(&mut self) -> Result<T> {
232 let head = self.shared.head.value.load(Ordering::Relaxed);
233
234 if head == self.cached_tail {
236 self.cached_tail = self.shared.tail.value.load(Ordering::Acquire);
238
239 if head == self.cached_tail {
240 if self.shared.disconnected.load(Ordering::Relaxed) {
241 return Err(BridgeError::Disconnected { side: "producer" });
242 }
243 return Err(BridgeError::Empty);
244 }
245 }
246
247 let idx = (head as usize) & self.shared.mask;
248
249 let value = unsafe { (*self.shared.slots[idx].get()).take() };
253
254 self.shared
256 .head
257 .value
258 .store(head.wrapping_add(1), Ordering::Release);
259
260 self.shared.metrics.record_pop();
261
262 Ok(value.expect("BUG: slot was None despite tail > head"))
265 }
266
267 pub fn drain_into(&mut self, buf: &mut Vec<T>, max: usize) -> usize {
272 let head = self.shared.head.value.load(Ordering::Relaxed);
273 self.cached_tail = self.shared.tail.value.load(Ordering::Acquire);
274
275 let available = self.cached_tail.wrapping_sub(head) as usize;
276 let count = available.min(max);
277
278 for i in 0..count {
279 let idx = ((head.wrapping_add(i as u64)) as usize) & self.shared.mask;
280 let value = unsafe { (*self.shared.slots[idx].get()).take() };
282 buf.push(value.expect("BUG: slot was None during drain"));
283 }
284
285 if count > 0 {
286 self.shared
287 .head
288 .value
289 .store(head.wrapping_add(count as u64), Ordering::Release);
290 self.shared.metrics.record_pops(count as u64);
291 }
292
293 count
294 }
295
296 pub fn len(&self) -> usize {
298 let tail = self.shared.tail.value.load(Ordering::Relaxed);
299 let head = self.shared.head.value.load(Ordering::Relaxed);
300 tail.wrapping_sub(head) as usize
301 }
302
303 pub fn is_empty(&self) -> bool {
305 self.len() == 0
306 }
307
308 pub fn metrics(&self) -> &BridgeMetrics {
310 &self.shared.metrics
311 }
312}
313
314impl<T> Drop for Producer<T> {
315 fn drop(&mut self) {
316 self.shared.disconnected.store(true, Ordering::Release);
317 }
318}
319
320impl<T> Drop for Consumer<T> {
321 fn drop(&mut self) {
322 self.shared.disconnected.store(true, Ordering::Release);
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[test]
331 fn basic_push_pop() {
332 let (mut tx, mut rx) = RingBuffer::channel::<u64>(4);
333
334 tx.try_push(1).unwrap();
335 tx.try_push(2).unwrap();
336 tx.try_push(3).unwrap();
337 tx.try_push(4).unwrap();
338
339 assert!(matches!(tx.try_push(5), Err(BridgeError::Full { .. })));
341
342 assert_eq!(rx.try_pop().unwrap(), 1);
343 assert_eq!(rx.try_pop().unwrap(), 2);
344
345 tx.try_push(5).unwrap();
347 tx.try_push(6).unwrap();
348
349 assert_eq!(rx.try_pop().unwrap(), 3);
350 assert_eq!(rx.try_pop().unwrap(), 4);
351 assert_eq!(rx.try_pop().unwrap(), 5);
352 assert_eq!(rx.try_pop().unwrap(), 6);
353
354 assert!(matches!(rx.try_pop(), Err(BridgeError::Empty)));
355 }
356
357 #[test]
358 fn power_of_two_rounding() {
359 let (tx, _rx) = RingBuffer::channel::<u64>(3);
360 assert_eq!(tx.capacity(), 4);
361
362 let (tx, _rx) = RingBuffer::channel::<u64>(5);
363 assert_eq!(tx.capacity(), 8);
364
365 let (tx, _rx) = RingBuffer::channel::<u64>(8);
366 assert_eq!(tx.capacity(), 8);
367 }
368
369 #[test]
370 fn utilization_tracking() {
371 let (mut tx, mut rx) = RingBuffer::channel::<u64>(8);
372
373 assert_eq!(tx.utilization(), 0);
374
375 for i in 0..6 {
376 tx.try_push(i).unwrap();
377 }
378 assert_eq!(tx.utilization(), 75);
379
380 rx.try_pop().unwrap();
381 rx.try_pop().unwrap();
382 assert_eq!(tx.utilization(), 50);
383 }
384
385 #[test]
386 fn drain_into_batch() {
387 let (mut tx, mut rx) = RingBuffer::channel::<u64>(16);
388
389 for i in 0..10 {
390 tx.try_push(i).unwrap();
391 }
392
393 let mut buf = Vec::new();
394 let drained = rx.drain_into(&mut buf, 5);
395 assert_eq!(drained, 5);
396 assert_eq!(buf, vec![0, 1, 2, 3, 4]);
397
398 let drained = rx.drain_into(&mut buf, 100);
399 assert_eq!(drained, 5);
400 assert_eq!(buf, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
401 }
402
403 #[test]
404 fn disconnect_detection_producer_drops() {
405 let (tx, mut rx) = RingBuffer::channel::<u64>(4);
406 drop(tx);
407
408 assert!(matches!(
409 rx.try_pop(),
410 Err(BridgeError::Disconnected { side: "producer" })
411 ));
412 }
413
414 #[test]
415 fn disconnect_detection_consumer_drops() {
416 let (mut tx, rx) = RingBuffer::channel::<u64>(4);
417 drop(rx);
418
419 assert!(matches!(
420 tx.try_push(1),
421 Err(BridgeError::Disconnected { side: "consumer" })
422 ));
423 }
424
425 #[test]
426 fn wrapping_behavior() {
427 let (mut tx, mut rx) = RingBuffer::channel::<u64>(4);
429
430 for round in 0..1000u64 {
431 for i in 0..4 {
432 tx.try_push(round * 4 + i).unwrap();
433 }
434 for i in 0..4 {
435 assert_eq!(rx.try_pop().unwrap(), round * 4 + i);
436 }
437 }
438 }
439
440 #[test]
441 fn metrics_counting() {
442 let (mut tx, mut rx) = RingBuffer::channel::<u64>(8);
443
444 for i in 0..5 {
445 tx.try_push(i).unwrap();
446 }
447 assert_eq!(tx.metrics().pushes(), 5);
448
449 for _ in 0..3 {
450 rx.try_pop().unwrap();
451 }
452 assert_eq!(rx.metrics().pops(), 3);
453 }
454}