Skip to main content

laminar_core/streaming/
ring_buffer.rs

1//! Lock-free ring buffer for streaming channels.
2//!
3//! This module provides a heap-allocated ring buffer optimized for
4//! single-producer single-consumer (SPSC) scenarios with support for
5//! MPSC mode via atomic slot claiming.
6//!
7//! ## Design
8//!
9//! - Heap-allocated with runtime capacity (unlike const-generic `alloc::RingBuffer`)
10//! - Power-of-2 capacity with bitmask indexing for fast modulo
11//! - Cache-padded head/tail indices prevent false sharing
12//! - Acquire/Release memory ordering for lock-free operation
13//! - Separate `claim_counter` for MPSC slot claiming
14//!
15//! ## Performance
16//!
17//! Target: < 20ns per push/pop operation in SPSC mode.
18
19use std::cell::UnsafeCell;
20use std::mem::MaybeUninit;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22
23use crate::tpc::CachePadded;
24
25use super::config::{MAX_BUFFER_SIZE, MIN_BUFFER_SIZE};
26
27/// A lock-free ring buffer with runtime-specified capacity.
28///
29/// This buffer supports both SPSC and MPSC modes:
30/// - SPSC: Single producer uses `push`, single consumer uses `pop`
31/// - MPSC: Multiple producers use `claim_and_write`, single consumer uses `pop`
32///
33/// # Safety
34///
35/// The buffer is safe to use from multiple threads as long as:
36/// - In SPSC mode: Exactly one producer and one consumer thread
37/// - In MPSC mode: Any number of producers, exactly one consumer thread
38pub struct RingBuffer<T> {
39    /// Ring buffer storage.
40    buffer: Box<[UnsafeCell<MaybeUninit<T>>]>,
41
42    /// Head index (consumer reads from here).
43    /// Cache-padded to prevent false sharing with tail.
44    head: CachePadded<AtomicUsize>,
45
46    /// Tail index (producer writes here in SPSC mode).
47    /// Cache-padded to prevent false sharing with head.
48    tail: CachePadded<AtomicUsize>,
49
50    /// Claim counter for MPSC mode.
51    /// Producers atomically increment this to claim slots.
52    claim_counter: CachePadded<AtomicU64>,
53
54    /// Capacity mask for fast modulo (capacity - 1).
55    capacity_mask: usize,
56}
57
58// SAFETY: RingBuffer can be sent between threads as long as T is Send
59unsafe impl<T: Send> Send for RingBuffer<T> {}
60
61// SAFETY: RingBuffer can be shared between threads (with appropriate
62// producer/consumer constraints) as long as T is Send
63unsafe impl<T: Send> Sync for RingBuffer<T> {}
64
65impl<T> RingBuffer<T> {
66    /// Creates a new ring buffer with the given capacity.
67    ///
68    /// The capacity will be clamped to `[MIN_BUFFER_SIZE, MAX_BUFFER_SIZE]`
69    /// and rounded up to the next power of 2 for efficiency.
70    ///
71    /// # Panics
72    ///
73    /// Panics if capacity is 0.
74    #[must_use]
75    pub fn new(capacity: usize) -> Self {
76        assert!(capacity > 0, "capacity must be > 0");
77
78        // Clamp and round up to next power of 2
79        let capacity = capacity
80            .clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE)
81            .next_power_of_two();
82
83        // Allocate the buffer
84        let buffer: Vec<UnsafeCell<MaybeUninit<T>>> = (0..capacity)
85            .map(|_| UnsafeCell::new(MaybeUninit::uninit()))
86            .collect();
87
88        Self {
89            buffer: buffer.into_boxed_slice(),
90            head: CachePadded::new(AtomicUsize::new(0)),
91            tail: CachePadded::new(AtomicUsize::new(0)),
92            claim_counter: CachePadded::new(AtomicU64::new(0)),
93            capacity_mask: capacity - 1,
94        }
95    }
96
97    /// Returns the capacity of the buffer.
98    #[inline]
99    #[must_use]
100    pub fn capacity(&self) -> usize {
101        self.capacity_mask + 1
102    }
103
104    /// Returns true if the buffer is empty.
105    ///
106    /// Note: This is a snapshot and may change immediately after returning.
107    /// Uses Relaxed ordering since this is an approximate query.
108    #[inline]
109    #[must_use]
110    pub fn is_empty(&self) -> bool {
111        let head = self.head.load(Ordering::Relaxed);
112        let tail = self.tail.load(Ordering::Relaxed);
113        head == tail
114    }
115
116    /// Returns true if the buffer is full.
117    ///
118    /// Note: This is a snapshot and may change immediately after returning.
119    /// Uses Relaxed ordering since this is an approximate query.
120    #[inline]
121    #[must_use]
122    pub fn is_full(&self) -> bool {
123        let head = self.head.load(Ordering::Relaxed);
124        let tail = self.tail.load(Ordering::Relaxed);
125        self.next_index(tail) == head
126    }
127
128    /// Returns the current number of items in the buffer.
129    ///
130    /// Note: This is a snapshot and may change immediately after returning.
131    /// Uses Relaxed ordering since this is an approximate query.
132    #[inline]
133    #[must_use]
134    pub fn len(&self) -> usize {
135        let head = self.head.load(Ordering::Relaxed);
136        let tail = self.tail.load(Ordering::Relaxed);
137        tail.wrapping_sub(head) & self.capacity_mask
138    }
139
140    /// Returns the number of free slots in the buffer.
141    ///
142    /// Note: This is a snapshot and may change immediately after returning.
143    #[inline]
144    #[must_use]
145    pub fn free_slots(&self) -> usize {
146        // One slot is always reserved to distinguish full from empty
147        self.capacity() - 1 - self.len()
148    }
149
150    /// Pushes an item to the buffer (SPSC mode).
151    ///
152    /// Returns `Ok(())` if successful, or `Err(item)` if the buffer is full.
153    ///
154    /// # Errors
155    ///
156    /// Returns the item back if the buffer is full.
157    ///
158    /// # Safety
159    ///
160    /// In SPSC mode, this must only be called by the single producer thread.
161    #[inline]
162    pub fn push(&self, item: T) -> Result<(), T> {
163        let tail = self.tail.load(Ordering::Relaxed);
164        let next_tail = self.next_index(tail);
165
166        // Check if buffer is full
167        if next_tail == self.head.load(Ordering::Acquire) {
168            return Err(item);
169        }
170
171        // SAFETY: We have exclusive write access to this slot because:
172        // 1. We are the only producer (SPSC constraint)
173        // 2. The consumer only reads slots where head < tail
174        // 3. We haven't published this slot yet (tail not updated)
175        unsafe {
176            (*self.buffer[tail].get()).write(item);
177        }
178
179        // Publish the item by updating tail
180        self.tail.store(next_tail, Ordering::Release);
181
182        Ok(())
183    }
184
185    /// Pops an item from the buffer.
186    ///
187    /// Returns `Some(item)` if successful, or `None` if the buffer is empty.
188    ///
189    /// # Safety
190    ///
191    /// This must only be called by the single consumer thread.
192    #[inline]
193    #[must_use]
194    pub fn pop(&self) -> Option<T> {
195        let head = self.head.load(Ordering::Relaxed);
196
197        // Check if buffer is empty
198        if head == self.tail.load(Ordering::Acquire) {
199            return None;
200        }
201
202        // SAFETY: We have exclusive read access to this slot because:
203        // 1. We are the only consumer
204        // 2. The producer only writes to slots where tail > head
205        // 3. This slot has been published (we checked tail > head)
206        let item = unsafe { (*self.buffer[head].get()).assume_init_read() };
207
208        // Consume the item by updating head
209        self.head.store(self.next_index(head), Ordering::Release);
210
211        Some(item)
212    }
213
214    /// Peeks at the next item without removing it.
215    ///
216    /// Returns `None` if the buffer is empty.
217    ///
218    /// # Safety
219    ///
220    /// This must only be called by the single consumer thread.
221    #[inline]
222    #[must_use]
223    pub fn peek(&self) -> Option<&T> {
224        let head = self.head.load(Ordering::Relaxed);
225
226        if head == self.tail.load(Ordering::Acquire) {
227            return None;
228        }
229
230        // SAFETY: Same reasoning as pop()
231        unsafe { Some((*self.buffer[head].get()).assume_init_ref()) }
232    }
233
234    /// Claims a slot for writing (MPSC mode).
235    ///
236    /// Returns the slot index if successful, or `None` if the buffer is full.
237    /// The caller must then write to the slot and call `publish_slot`.
238    ///
239    /// This uses atomic increment on `claim_counter` to ensure each producer
240    /// gets a unique slot.
241    #[inline]
242    pub fn claim_slot(&self) -> Option<usize> {
243        // Atomic claim - each caller gets a unique slot
244        let claim = self.claim_counter.fetch_add(1, Ordering::AcqRel);
245
246        // Convert claim to slot index
247        // We use u64 for claim_counter to avoid wrap-around issues
248        #[allow(clippy::cast_possible_truncation)]
249        let slot = usize::try_from(claim).unwrap_or(usize::MAX) & self.capacity_mask;
250
251        // Calculate how many slots are claimed but not yet published
252        let tail = self.tail.load(Ordering::Acquire);
253        let pending = claim.saturating_sub(tail as u64);
254
255        // If we've claimed more than capacity - 1 slots ahead, we're full
256        if pending >= (self.capacity() - 1) as u64 {
257            // Undo the claim (best effort - another producer may have claimed)
258            // This is safe because we haven't written anything yet
259            self.claim_counter.fetch_sub(1, Ordering::AcqRel);
260            return None;
261        }
262
263        Some(slot)
264    }
265
266    /// Writes to a claimed slot (MPSC mode).
267    ///
268    /// # Safety
269    ///
270    /// The slot must have been obtained from `claim_slot` and not yet written to.
271    /// After calling this, you must call `try_publish` to make the item visible.
272    #[inline]
273    pub unsafe fn write_slot(&self, slot: usize, item: T) {
274        debug_assert!(slot < self.capacity());
275        (*self.buffer[slot].get()).write(item);
276    }
277
278    /// Attempts to publish written slots up to the given claim (MPSC mode).
279    ///
280    /// This advances the tail if all slots up to `claim` have been written.
281    /// Returns true if the tail was advanced.
282    ///
283    /// In MPSC mode, producers write out-of-order but the consumer sees
284    /// items in order. This function is typically called by each producer
285    /// after writing, but only succeeds when all prior slots are also written.
286    ///
287    /// For simplicity, this implementation uses a "publish barrier" approach:
288    /// producers write to their slots, then the tail is advanced by whichever
289    /// producer happens to have the lowest claimed slot.
290    #[inline]
291    pub fn try_advance_tail(&self, target_tail: usize) -> bool {
292        let current_tail = self.tail.load(Ordering::Acquire);
293
294        // Only advance if target is the next slot after current tail
295        if target_tail == self.next_index(current_tail) {
296            // Try to advance tail
297            self.tail
298                .compare_exchange(
299                    current_tail,
300                    target_tail,
301                    Ordering::AcqRel,
302                    Ordering::Relaxed,
303                )
304                .is_ok()
305        } else {
306            false
307        }
308    }
309
310    /// Pushes multiple items to the buffer.
311    ///
312    /// Returns the number of items successfully pushed.
313    ///
314    /// # Safety
315    ///
316    /// In SPSC mode, this must only be called by the single producer thread.
317    #[inline]
318    pub fn push_batch(&self, items: impl IntoIterator<Item = T>) -> usize {
319        let mut count = 0;
320        for item in items {
321            if self.push(item).is_err() {
322                break;
323            }
324            count += 1;
325        }
326        count
327    }
328
329    /// Pops multiple items from the buffer.
330    ///
331    /// Returns a vector of up to `max_count` items.
332    ///
333    /// # Safety
334    ///
335    /// This must only be called by the single consumer thread.
336    ///
337    /// # Performance Warning
338    ///
339    /// **This method allocates a `Vec` on every call.** Do not use on hot paths
340    /// where allocation overhead matters. For zero-allocation consumption, use
341    /// [`pop_each`](Self::pop_each) or [`pop_batch_into`](Self::pop_batch_into).
342    #[cold]
343    #[must_use]
344    pub fn pop_batch(&self, max_count: usize) -> Vec<T> {
345        let mut items = Vec::with_capacity(max_count.min(self.len()));
346        for _ in 0..max_count {
347            if let Some(item) = self.pop() {
348                items.push(item);
349            } else {
350                break;
351            }
352        }
353        items
354    }
355
356    /// Pops items and calls a callback for each (zero-allocation).
357    ///
358    /// Processing stops when:
359    /// - `max_count` items have been processed
360    /// - The buffer becomes empty
361    /// - The callback returns `false`
362    ///
363    /// Returns the number of items processed.
364    ///
365    /// # Safety
366    ///
367    /// This must only be called by the single consumer thread.
368    #[inline]
369    pub fn pop_each<F>(&self, max_count: usize, mut f: F) -> usize
370    where
371        F: FnMut(T) -> bool,
372    {
373        if max_count == 0 {
374            return 0;
375        }
376
377        let mut current_head = self.head.load(Ordering::Relaxed);
378        let tail = self.tail.load(Ordering::Acquire);
379
380        // Calculate available items
381        let available = if tail >= current_head {
382            tail - current_head
383        } else {
384            (self.capacity_mask + 1) - current_head + tail
385        };
386
387        let to_pop = available.min(max_count);
388
389        if to_pop == 0 {
390            return 0;
391        }
392
393        let mut popped = 0;
394        for _ in 0..to_pop {
395            // SAFETY: We have exclusive read access
396            let item = unsafe { (*self.buffer[current_head].get()).assume_init_read() };
397
398            popped += 1;
399            current_head = self.next_index(current_head);
400
401            if !f(item) {
402                break;
403            }
404        }
405
406        if popped > 0 {
407            self.head.store(current_head, Ordering::Release);
408        }
409
410        popped
411    }
412
413    /// Pops items into a caller-provided buffer (zero-allocation).
414    ///
415    /// Returns the number of items popped.
416    ///
417    /// # Safety
418    ///
419    /// This must only be called by the single consumer thread.
420    /// After this method returns `n`, the first `n` elements of `buffer`
421    /// are initialized.
422    #[inline]
423    pub fn pop_batch_into(&self, buffer: &mut [MaybeUninit<T>]) -> usize {
424        if buffer.is_empty() {
425            return 0;
426        }
427
428        let mut current_head = self.head.load(Ordering::Relaxed);
429        let tail = self.tail.load(Ordering::Acquire);
430
431        let available = if tail >= current_head {
432            tail - current_head
433        } else {
434            (self.capacity_mask + 1) - current_head + tail
435        };
436
437        let count = available.min(buffer.len());
438
439        if count == 0 {
440            return 0;
441        }
442
443        for slot in buffer.iter_mut().take(count) {
444            // SAFETY: We have exclusive read access
445            unsafe {
446                let src = (*self.buffer[current_head].get()).assume_init_read();
447                slot.write(src);
448            }
449            current_head = self.next_index(current_head);
450        }
451
452        self.head.store(current_head, Ordering::Release);
453        count
454    }
455
456    /// Calculate the next index with wrap-around.
457    #[inline]
458    const fn next_index(&self, index: usize) -> usize {
459        (index + 1) & self.capacity_mask
460    }
461
462    /// Resets the buffer to empty state.
463    ///
464    /// # Safety
465    ///
466    /// This must only be called when no other threads are accessing the buffer.
467    pub fn reset(&mut self) {
468        // Drop any remaining items
469        while self.pop().is_some() {}
470
471        // Reset indices
472        self.head.store(0, Ordering::Release);
473        self.tail.store(0, Ordering::Release);
474        self.claim_counter.store(0, Ordering::Release);
475    }
476}
477
478impl<T> Drop for RingBuffer<T> {
479    fn drop(&mut self) {
480        // Drop any remaining items
481        while self.pop().is_some() {}
482    }
483}
484
485impl<T: std::fmt::Debug> std::fmt::Debug for RingBuffer<T> {
486    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
487        f.debug_struct("RingBuffer")
488            .field("capacity", &self.capacity())
489            .field("len", &self.len())
490            .field("is_empty", &self.is_empty())
491            .field("is_full", &self.is_full())
492            .finish()
493    }
494}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499    use std::sync::Arc;
500    use std::thread;
501
502    #[test]
503    fn test_new_buffer() {
504        let buffer: RingBuffer<i32> = RingBuffer::new(100);
505        // Should round up to 128
506        assert_eq!(buffer.capacity(), 128);
507        assert!(buffer.is_empty());
508        assert!(!buffer.is_full());
509        assert_eq!(buffer.len(), 0);
510    }
511
512    #[test]
513    fn test_push_pop() {
514        let buffer: RingBuffer<i32> = RingBuffer::new(4);
515
516        assert!(buffer.push(1).is_ok());
517        assert!(buffer.push(2).is_ok());
518        assert!(buffer.push(3).is_ok());
519        // Capacity 4 means 3 usable slots
520        assert!(buffer.is_full());
521        assert!(buffer.push(4).is_err());
522
523        assert_eq!(buffer.pop(), Some(1));
524        assert_eq!(buffer.pop(), Some(2));
525        assert_eq!(buffer.pop(), Some(3));
526        assert_eq!(buffer.pop(), None);
527        assert!(buffer.is_empty());
528    }
529
530    #[test]
531    fn test_fifo_order() {
532        let buffer: RingBuffer<i32> = RingBuffer::new(16);
533
534        for i in 0..10 {
535            assert!(buffer.push(i).is_ok());
536        }
537
538        for i in 0..10 {
539            assert_eq!(buffer.pop(), Some(i));
540        }
541    }
542
543    #[test]
544    fn test_wrap_around() {
545        let buffer: RingBuffer<i32> = RingBuffer::new(4);
546
547        // Fill and empty multiple times to test wrap-around
548        for iteration in 0..5 {
549            for i in 0..3 {
550                assert!(buffer.push(iteration * 10 + i).is_ok());
551            }
552            for i in 0..3 {
553                assert_eq!(buffer.pop(), Some(iteration * 10 + i));
554            }
555        }
556    }
557
558    #[test]
559    fn test_peek() {
560        let buffer: RingBuffer<i32> = RingBuffer::new(4);
561
562        assert!(buffer.peek().is_none());
563
564        buffer.push(42).unwrap();
565        assert_eq!(buffer.peek(), Some(&42));
566        assert_eq!(buffer.peek(), Some(&42)); // Still there
567
568        assert_eq!(buffer.pop(), Some(42));
569        assert!(buffer.peek().is_none());
570    }
571
572    #[test]
573    fn test_push_batch() {
574        let buffer: RingBuffer<i32> = RingBuffer::new(8);
575
576        let pushed = buffer.push_batch(vec![1, 2, 3, 4, 5]);
577        assert_eq!(pushed, 5);
578        assert_eq!(buffer.len(), 5);
579
580        // Try to push more than capacity
581        let pushed = buffer.push_batch(vec![6, 7, 8, 9, 10]);
582        assert_eq!(pushed, 2); // Only 2 more fit
583    }
584
585    #[test]
586    fn test_pop_batch() {
587        let buffer: RingBuffer<i32> = RingBuffer::new(8);
588
589        buffer.push_batch(vec![1, 2, 3, 4, 5]);
590
591        let items = buffer.pop_batch(3);
592        assert_eq!(items, vec![1, 2, 3]);
593        assert_eq!(buffer.len(), 2);
594
595        let items = buffer.pop_batch(10);
596        assert_eq!(items, vec![4, 5]);
597        assert!(buffer.is_empty());
598    }
599
600    #[test]
601    fn test_pop_each() {
602        let buffer: RingBuffer<i32> = RingBuffer::new(16);
603
604        buffer.push_batch(vec![1, 2, 3, 4, 5]);
605
606        let mut sum = 0;
607        let count = buffer.pop_each(10, |item| {
608            sum += item;
609            true
610        });
611
612        assert_eq!(count, 5);
613        assert_eq!(sum, 15);
614        assert!(buffer.is_empty());
615    }
616
617    #[test]
618    fn test_pop_each_early_stop() {
619        let buffer: RingBuffer<i32> = RingBuffer::new(16);
620
621        buffer.push_batch(vec![1, 2, 3, 4, 5]);
622
623        let mut items = Vec::new();
624        let count = buffer.pop_each(10, |item| {
625            items.push(item);
626            item < 3
627        });
628
629        assert_eq!(count, 3);
630        assert_eq!(items, vec![1, 2, 3]);
631        assert_eq!(buffer.len(), 2);
632    }
633
634    #[test]
635    fn test_pop_batch_into() {
636        let buffer: RingBuffer<i32> = RingBuffer::new(16);
637
638        buffer.push_batch(vec![1, 2, 3]);
639
640        let mut dest: [MaybeUninit<i32>; 8] = [MaybeUninit::uninit(); 8];
641        let count = buffer.pop_batch_into(&mut dest);
642
643        assert_eq!(count, 3);
644        unsafe {
645            assert_eq!(dest[0].assume_init(), 1);
646            assert_eq!(dest[1].assume_init(), 2);
647            assert_eq!(dest[2].assume_init(), 3);
648        }
649    }
650
651    #[test]
652    fn test_free_slots() {
653        let buffer: RingBuffer<i32> = RingBuffer::new(8);
654
655        assert_eq!(buffer.free_slots(), 7); // capacity - 1
656
657        buffer.push(1).unwrap();
658        buffer.push(2).unwrap();
659        assert_eq!(buffer.free_slots(), 5);
660
661        let _ = buffer.pop();
662        assert_eq!(buffer.free_slots(), 6);
663    }
664
665    #[test]
666    fn test_concurrent_spsc() {
667        const ITEMS: i32 = 10_000;
668        let buffer = Arc::new(RingBuffer::<i32>::new(1024));
669        let producer_buffer = Arc::clone(&buffer);
670        let consumer_buffer = Arc::clone(&buffer);
671
672        let producer = thread::spawn(move || {
673            for i in 0..ITEMS {
674                while producer_buffer.push(i).is_err() {
675                    thread::yield_now();
676                }
677            }
678        });
679
680        let consumer = thread::spawn(move || {
681            let mut received = Vec::with_capacity(ITEMS as usize);
682            while received.len() < ITEMS as usize {
683                if let Some(item) = consumer_buffer.pop() {
684                    received.push(item);
685                } else {
686                    thread::yield_now();
687                }
688            }
689            received
690        });
691
692        producer.join().unwrap();
693        let received = consumer.join().unwrap();
694
695        assert_eq!(received.len(), ITEMS as usize);
696        for (i, &item) in received.iter().enumerate() {
697            assert_eq!(item, i32::try_from(i).unwrap());
698        }
699    }
700
701    #[test]
702    fn test_drop_items() {
703        use std::sync::atomic::AtomicUsize;
704
705        static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
706
707        #[derive(Debug)]
708        struct DropCounter;
709        impl Drop for DropCounter {
710            fn drop(&mut self) {
711                DROP_COUNT.fetch_add(1, Ordering::SeqCst);
712            }
713        }
714
715        DROP_COUNT.store(0, Ordering::SeqCst);
716
717        {
718            let buffer: RingBuffer<DropCounter> = RingBuffer::new(8);
719            for _ in 0..5 {
720                buffer.push(DropCounter).unwrap();
721            }
722            let _ = buffer.pop();
723            let _ = buffer.pop();
724            // 3 remaining, 2 dropped via pop
725        }
726
727        // All 5 should be dropped (2 via pop, 3 via buffer drop)
728        assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 5);
729    }
730
731    #[test]
732    fn test_reset() {
733        let mut buffer: RingBuffer<i32> = RingBuffer::new(8);
734
735        buffer.push_batch(vec![1, 2, 3, 4, 5]);
736        assert_eq!(buffer.len(), 5);
737
738        buffer.reset();
739        assert!(buffer.is_empty());
740        assert_eq!(buffer.len(), 0);
741    }
742
743    #[test]
744    fn test_debug() {
745        let buffer: RingBuffer<i32> = RingBuffer::new(8);
746        buffer.push(1).unwrap();
747        buffer.push(2).unwrap();
748
749        let debug_str = format!("{buffer:?}");
750        assert!(debug_str.contains("RingBuffer"));
751        assert!(debug_str.contains("capacity"));
752        assert!(debug_str.contains("len"));
753    }
754
755    #[test]
756    #[should_panic(expected = "capacity must be > 0")]
757    fn test_zero_capacity_panics() {
758        let _: RingBuffer<i32> = RingBuffer::new(0);
759    }
760
761    #[test]
762    fn test_capacity_clamping() {
763        // Very small
764        let buffer: RingBuffer<i32> = RingBuffer::new(1);
765        assert!(buffer.capacity() >= MIN_BUFFER_SIZE);
766
767        // Very large
768        let buffer: RingBuffer<i32> = RingBuffer::new(usize::MAX / 2);
769        assert!(buffer.capacity() <= MAX_BUFFER_SIZE.next_power_of_two());
770    }
771}