nexus_queue/
lib.rs

1//! Single-Producer Single-Consumer (SPSC) ring buffer with per-slot sequencing.
2//!
3//! This module provides a high-performance SPSC queue that outperforms traditional
4//! implementations like [`rtrb`](https://docs.rs/rtrb) on both latency and throughput.
5//!
6//! # Performance Summary
7//!
8//! Benchmarked on Intel Core Ultra 7 155H @ 4.8GHz, pinned to cores 0,2:
9//!
10//! | Metric | nexus spsc | rtrb | Improvement |
11//! |--------|------------|------|-------------|
12//! | Latency p50 | ~152 cycles | ~202 cycles | **25% faster** |
13//! | Latency p99 | ~255 cycles | ~300 cycles | **15% faster** |
14//! | Throughput | 449ms/10M ops | 587ms/10M ops | **31% faster** |
15//! | IPC | 0.19 | 0.14 | **36% better** |
16//!
17//! At 4.8GHz, 152 cycles ≈ **32 nanoseconds** one-way latency.
18//!
19//! # Design: Per-Slot Sequencing
20//!
21//! Traditional SPSC queues (like rtrb) use separate atomic head/tail indices with
22//! cached copies to reduce atomic operations:
23//!
24//! ```text
25//! Traditional (rtrb-style):
26//! ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐
27//! │ head (atomic)   │  │ tail (atomic)   │  │ buffer[N]       │
28//! │ cached_tail     │  │ cached_head     │  │ (just data)     │
29//! └─────────────────┘  └─────────────────┘  └─────────────────┘
30//!    Cache line 1         Cache line 2         Cache line 3+
31//! ```
32//!
33//! This implementation uses per-slot lap counters instead:
34//!
35//! ```text
36//! Per-slot sequencing (nexus):
37//! ┌──────────────────────────────────────────────────────────┐
38//! │ buffer[0]: { lap: AtomicUsize, data: T }                 │
39//! │ buffer[1]: { lap: AtomicUsize, data: T }                 │
40//! │ ...                                                      │
41//! └──────────────────────────────────────────────────────────┘
42//!    Lap counter + data on SAME cache line
43//! ```
44//!
45//! # Why Per-Slot Sequencing Wins
46//!
47//! ## 1. Cache Locality
48//!
49//! When checking if a slot is ready, we load `slot.lap`. The subsequent read of
50//! `slot.data` is on the same cache line - already in L1. Traditional designs
51//! require fetching from 3 separate cache lines (head, tail, data).
52//!
53//! ## 2. No Stale Cache Problem
54//!
55//! Traditional designs cache the remote index to avoid atomic loads:
56//!
57//! ```text
58//! // Traditional push (rtrb-style)
59//! if head - cached_tail >= capacity {
60//!     cached_tail = tail.load();  // "Slow path" refresh
61//!     if head - cached_tail >= capacity {
62//!         return Full;
63//!     }
64//! }
65//! ```
66//!
67//! In ping-pong benchmarks (1 message in flight), the cache is *always* stale,
68//! so every operation hits the "slow path". Per-slot sequencing has no cache
69//! to refresh - just check the slot directly.
70//!
71//! ## 3. Simpler Control Flow
72//!
73//! Fewer branches = better branch prediction. Our branch miss rate is ~1.6%
74//! vs similar rates for rtrb, but with fewer total branches.
75//!
76//! # Optimization Journey
77//!
78//! We started by cloning rtrb's design, then systematically tested changes.
79//! Using rtrb as baseline (p50 ≈ 200 cycles):
80//!
81//! | Change | Result | Cycles | Notes |
82//! |--------|--------|--------|-------|
83//! | Clone rtrb exactly | Baseline | ~200 | Starting point |
84//! | Per-slot lap counters | **+25%** | ~150 | Biggest win |
85//! | Division → bit shift | **+15%** | ~150→~130 | `tail/cap` → `tail>>shift` |
86//! | `repr(C)` field ordering | +5% | - | Hot fields first |
87//! | Manual fencing | ~0% | - | No change on x86, helps ARM |
88//! | Const generics | **-20%** | - | Surprisingly worse! |
89//! | CachePadded slots | ~0% | - | Not needed for our layout |
90//!
91//! ## What Worked
92//!
93//! **Per-slot sequencing**: The single biggest win. Eliminated the stale cache
94//! problem entirely and improved cache locality.
95//!
96//! **Bit shift for lap calculation**: Division is 20-90 cycles on x86. Since
97//! capacity is always a power of two, `tail / capacity` becomes `tail >> shift`.
98//! Saved ~20-50 cycles per operation.
99//!
100//! **Field ordering with `repr(C)`**: Placing hot-path fields (local_tail, buffer,
101//! mask) first improves prefetching. ~5% throughput improvement.
102//!
103//! **Instruction ordering**: Updating `local_tail` *after* the atomic store
104//! gives cache coherency a head start, reducing latency variance.
105//!
106//! ## What Didn't Work
107//!
108//! **Const generics for capacity**: We expected `const CAP: usize` to help by
109//! making the mask a compile-time constant. Instead, throughput regressed 20%!
110//! Likely causes: monomorphization code bloat, different inlining decisions.
111//!
112//! **CachePadded lap counters**: With small `T`, multiple slots share a cache
113//! line. We tested padding each slot to 64 bytes. No improvement - the true
114//! sharing on the active slot dominates, not false sharing between slots.
115//!
116//! **Cached indices**: The traditional "optimization" that rtrb uses. In our
117//! testing, it's actually slower for latency-sensitive workloads because the
118//! cache is always stale in ping-pong scenarios.
119//!
120//! # Example
121//!
122//! ```
123//! use nexus_queue;
124//!
125//! let (mut producer, mut consumer) = nexus_queue::ring_buffer::<u64>(1024);
126//!
127//! // Bounded push - returns error if full
128//! producer.push(1).unwrap();
129//! producer.push(2).unwrap();
130//!
131//! assert_eq!(consumer.pop(), Some(1));
132//! assert_eq!(consumer.pop(), Some(2));
133//! ```
134//!
135//! # Benchmarking Methodology
136//!
137//! Latency benchmarks use ping-pong between two threads pinned to separate
138//! physical cores (avoiding hyperthreading). We measure round-trip time with
139//! `rdtscp` and divide by 2 for one-way latency. 10,000 warmup iterations
140//! followed by 100,000 measured samples.
141//!
142//! Throughput benchmarks measure time to transfer 10 million messages through
143//! a 1024-slot buffer with producer and consumer on separate cores.
144//!
145//! All benchmarks run with turbo boost disabled for consistent results:
146//!
147//! ```bash
148//! echo 1 | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo
149//! sudo taskset -c 0,2 ./target/release/deps/perf_spsc_bounded_latency-*
150//! ```
151//!
152//! # Memory Ordering
153//!
154//! The implementation uses manual fencing for clarity and portability:
155//!
156//! - **Producer**: `fence(Release)` before storing lap counter
157//! - **Consumer**: `fence(Acquire)` after loading lap counter
158//!
159//! On x86, these compile to no instructions (strong memory model), but they're
160//! required for correctness on ARM and other weakly-ordered architectures.
161//!
162//! # When to Use This vs Other Queues
163//!
164//! Use `nexus::spsc` when:
165//! - You have exactly one producer and one consumer
166//! - You need the lowest possible latency
167//! - You want both bounded and overwriting modes in one queue
168//!
169//! Consider alternatives when:
170//! - Multiple producers → use MPSC
171//! - Multiple consumers → use SPMC or MPMC
172//! - You need async/await integration → use `tokio::sync::mpsc`
173
174use std::cell::UnsafeCell;
175use std::fmt;
176use std::mem::{ManuallyDrop, MaybeUninit};
177use std::sync::Arc;
178use std::sync::atomic::{AtomicUsize, Ordering, fence};
179
180/// Creates a new SPSC ring buffer with the given capacity.
181///
182/// Returns a `(Producer, Consumer)` pair.
183///
184/// The actual capacity will be rounded up to the next power of two.
185///
186/// # Panics
187///
188/// Panics if `capacity` is 0.
189pub fn ring_buffer<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
190    assert!(capacity > 0, "capacity must be non-zero");
191
192    let capacity = capacity.next_power_of_two();
193    let mask = capacity - 1;
194    let shift = capacity.trailing_zeros() as usize;
195
196    // Allocate slot buffer
197    let mut slots = ManuallyDrop::new(Vec::<Slot<T>>::with_capacity(capacity));
198    for _ in 0..capacity {
199        slots.push(Slot {
200            lap: AtomicUsize::new(0),
201            data: UnsafeCell::new(MaybeUninit::uninit()),
202        });
203    }
204    let buffer = slots.as_mut_ptr();
205
206    let inner = Arc::new(Inner {
207        buffer,
208        capacity,
209        mask,
210    });
211
212    (
213        Producer {
214            local_tail: 0,
215            buffer,
216            mask,
217            shift,
218            inner: Arc::clone(&inner),
219        },
220        Consumer {
221            local_head: 0,
222            buffer,
223            mask,
224            shift,
225            inner,
226        },
227    )
228}
229
230/// A slot in the ring buffer with a lap counter.
231///
232/// The lap counter serves as the synchronization mechanism:
233/// - `lap == 0`: slot is empty/consumed
234/// - `lap == n`: slot contains data written during lap `n` (1-indexed)
235#[repr(C)]
236struct Slot<T> {
237    lap: AtomicUsize,
238    data: UnsafeCell<MaybeUninit<T>>,
239}
240
241/// Shared state between producer and consumer.
242#[repr(C)]
243struct Inner<T> {
244    buffer: *mut Slot<T>,
245    capacity: usize,
246    mask: usize,
247}
248
249unsafe impl<T: Send> Send for Inner<T> {}
250unsafe impl<T: Send> Sync for Inner<T> {}
251
252impl<T> Drop for Inner<T> {
253    fn drop(&mut self) {
254        // Drop any remaining data in slots
255        for i in 0..self.capacity {
256            let slot = unsafe { &*self.buffer.add(i) };
257            let lap = slot.lap.load(Ordering::Relaxed);
258            if lap > 0 {
259                unsafe { (*slot.data.get()).assume_init_drop() };
260            }
261        }
262
263        // Free the Vec
264        unsafe {
265            let _ = Vec::from_raw_parts(self.buffer, self.capacity, self.capacity);
266        }
267    }
268}
269
270/// The producer half of an SPSC ring buffer.
271///
272/// Takes `&mut self` to statically ensure single-producer access.
273#[repr(C)]
274pub struct Producer<T> {
275    // === Hot path fields ===
276    local_tail: usize,
277    buffer: *mut Slot<T>,
278    mask: usize,
279    shift: usize,
280
281    // === Cold path fields ===
282    inner: Arc<Inner<T>>,
283}
284
285unsafe impl<T: Send> Send for Producer<T> {}
286
287impl<T> Producer<T> {
288    /// Attempts to push a value into the ring buffer.
289    ///
290    /// Returns `Err(Full(value))` if the buffer is full, giving the value back.
291    ///
292    /// # Example
293    ///
294    /// ```
295    /// use nexus_queue;
296    ///
297    /// let (mut producer, mut consumer) = nexus_queue::ring_buffer::<u32>(2);
298    ///
299    /// assert!(producer.push(1).is_ok());
300    /// assert!(producer.push(2).is_ok());
301    /// assert!(producer.push(3).is_err()); // Full
302    /// ```
303    #[inline]
304    #[must_use = "push returns Err if full, which should be handled"]
305    pub fn push(&mut self, value: T) -> Result<(), Full<T>> {
306        let tail = self.local_tail;
307        let slot = unsafe { &*self.buffer.add(tail & self.mask) };
308
309        // Check if slot is occupied (lap > 0 means data present)
310        let slot_lap = slot.lap.load(Ordering::Relaxed);
311        fence(Ordering::Acquire);
312
313        if slot_lap != 0 {
314            return Err(Full(value));
315        }
316
317        // Write new value
318        unsafe { (*slot.data.get()).write(value) };
319
320        // Publish with new lap
321        let lap = (tail >> self.shift) + 1;
322        fence(Ordering::Release);
323        slot.lap.store(lap, Ordering::Relaxed);
324        self.local_tail = tail.wrapping_add(1);
325
326        Ok(())
327    }
328
329    /// Returns the capacity of the ring buffer.
330    #[inline]
331    pub fn capacity(&self) -> usize {
332        1 << self.shift
333    }
334
335    /// Returns `true` if the consumer has been dropped.
336    #[inline]
337    pub fn is_disconnected(&self) -> bool {
338        Arc::strong_count(&self.inner) == 1
339    }
340}
341
342impl<T> fmt::Debug for Producer<T> {
343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344        f.debug_struct("Producer")
345            .field("capacity", &self.capacity())
346            .finish_non_exhaustive()
347    }
348}
349
350/// The consumer half of an SPSC ring buffer.
351///
352/// Use [`pop`](Consumer::pop) to remove elements. Takes `&mut self` to
353/// statically ensure single-consumer access.
354#[repr(C)]
355pub struct Consumer<T> {
356    // === Hot path fields ===
357    local_head: usize,
358    buffer: *mut Slot<T>,
359    mask: usize,
360    shift: usize,
361
362    // === Cold path fields ===
363    inner: Arc<Inner<T>>,
364}
365
366unsafe impl<T: Send> Send for Consumer<T> {}
367
368impl<T> Consumer<T> {
369    /// Attempts to pop a value from the ring buffer.
370    ///
371    /// Returns `Some(value)` if data is available, `None` if the buffer is empty.
372    /// Values are returned in FIFO order.
373    ///
374    /// # Example
375    ///
376    /// ```
377    /// use nexus_queue;
378    ///
379    /// let (mut producer, mut consumer) = nexus_queue::ring_buffer::<u32>(8);
380    ///
381    /// assert_eq!(consumer.pop(), None); // Empty
382    ///
383    /// producer.push(42).unwrap();
384    /// assert_eq!(consumer.pop(), Some(42));
385    /// ```
386    #[inline]
387    pub fn pop(&mut self) -> Option<T> {
388        let head = self.local_head;
389        let slot = unsafe { &*self.buffer.add(head & self.mask) };
390        let expected_lap = (head >> self.shift) + 1;
391
392        let slot_lap = slot.lap.load(Ordering::Relaxed);
393        fence(Ordering::Acquire);
394
395        if slot_lap != expected_lap {
396            return None;
397        }
398
399        let value = unsafe { (*slot.data.get()).assume_init_read() };
400
401        fence(Ordering::Release);
402        slot.lap.store(0, Ordering::Relaxed);
403
404        self.local_head = head.wrapping_add(1);
405
406        Some(value)
407    }
408
409    /// Returns the capacity of the ring buffer.
410    #[inline]
411    pub const fn capacity(&self) -> usize {
412        1 << self.shift
413    }
414
415    /// Returns `true` if the producer has been dropped.
416    #[inline]
417    pub fn is_disconnected(&self) -> bool {
418        Arc::strong_count(&self.inner) == 1
419    }
420}
421
422impl<T> fmt::Debug for Consumer<T> {
423    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
424        f.debug_struct("Consumer")
425            .field("capacity", &self.capacity())
426            .finish_non_exhaustive()
427    }
428}
429
430/// Error returned when the ring buffer is full.
431///
432/// Contains the value that could not be pushed.
433#[derive(Debug, Clone, Copy, PartialEq, Eq)]
434pub struct Full<T>(pub T);
435
436impl<T> Full<T> {
437    /// Returns the value that could not be pushed.
438    pub fn into_inner(self) -> T {
439        self.0
440    }
441}
442
443impl<T> fmt::Display for Full<T> {
444    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
445        write!(f, "ring buffer is full")
446    }
447}
448
449impl<T: fmt::Debug> std::error::Error for Full<T> {}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    // ============================================================================
456    // Basic Operations
457    // ============================================================================
458
459    #[test]
460    fn basic_push_pop() {
461        let (mut prod, mut cons) = ring_buffer::<u64>(4);
462
463        assert!(prod.push(1).is_ok());
464        assert!(prod.push(2).is_ok());
465        assert!(prod.push(3).is_ok());
466
467        assert_eq!(cons.pop(), Some(1));
468        assert_eq!(cons.pop(), Some(2));
469        assert_eq!(cons.pop(), Some(3));
470        assert_eq!(cons.pop(), None);
471    }
472
473    #[test]
474    fn empty_pop_returns_none() {
475        let (_, mut cons) = ring_buffer::<u64>(4);
476        assert_eq!(cons.pop(), None);
477        assert_eq!(cons.pop(), None);
478    }
479
480    #[test]
481    fn fill_then_drain() {
482        let (mut prod, mut cons) = ring_buffer::<u64>(4);
483
484        for i in 0..4 {
485            assert!(prod.push(i).is_ok());
486        }
487
488        for i in 0..4 {
489            assert_eq!(cons.pop(), Some(i));
490        }
491
492        assert_eq!(cons.pop(), None);
493    }
494
495    #[test]
496    fn push_returns_error_when_full() {
497        let (mut prod, _cons) = ring_buffer::<u64>(4);
498
499        assert!(prod.push(1).is_ok());
500        assert!(prod.push(2).is_ok());
501        assert!(prod.push(3).is_ok());
502        assert!(prod.push(4).is_ok());
503
504        let err = prod.push(5).unwrap_err();
505        assert_eq!(err.into_inner(), 5);
506    }
507    // ============================================================================
508    // Interleaved Operations
509    // ============================================================================
510
511    #[test]
512    fn interleaved_no_overwrite() {
513        let (mut prod, mut cons) = ring_buffer::<u64>(8);
514
515        for i in 0..1000 {
516            assert!(prod.push(i).is_ok());
517            assert_eq!(cons.pop(), Some(i));
518        }
519    }
520
521    #[test]
522    fn partial_fill_drain_cycles() {
523        let (mut prod, mut cons) = ring_buffer::<u64>(8);
524
525        for round in 0..100 {
526            for i in 0..4 {
527                assert!(prod.push(round * 4 + i).is_ok());
528            }
529
530            for i in 0..4 {
531                assert_eq!(cons.pop(), Some(round * 4 + i));
532            }
533        }
534    }
535
536    // ============================================================================
537    // Single Slot
538    // ============================================================================
539
540    #[test]
541    fn single_slot_bounded() {
542        let (mut prod, mut cons) = ring_buffer::<u64>(1);
543
544        assert!(prod.push(1).is_ok());
545        assert!(prod.push(2).is_err());
546
547        assert_eq!(cons.pop(), Some(1));
548        assert!(prod.push(2).is_ok());
549    }
550
551    // ============================================================================
552    // Disconnection
553    // ============================================================================
554
555    #[test]
556    fn producer_disconnected() {
557        let (prod, cons) = ring_buffer::<u64>(4);
558
559        assert!(!cons.is_disconnected());
560        drop(prod);
561        assert!(cons.is_disconnected());
562    }
563
564    #[test]
565    fn consumer_disconnected() {
566        let (prod, cons) = ring_buffer::<u64>(4);
567
568        assert!(!prod.is_disconnected());
569        drop(cons);
570        assert!(prod.is_disconnected());
571    }
572
573    // ============================================================================
574    // Drop Behavior
575    // ============================================================================
576
577    #[test]
578    fn drop_cleans_up_remaining() {
579        use std::sync::atomic::AtomicUsize;
580
581        static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
582
583        struct DropCounter;
584        impl Drop for DropCounter {
585            fn drop(&mut self) {
586                DROP_COUNT.fetch_add(1, Ordering::SeqCst);
587            }
588        }
589
590        DROP_COUNT.store(0, Ordering::SeqCst);
591
592        let (mut prod, cons) = ring_buffer::<DropCounter>(4);
593
594        let _ = prod.push(DropCounter);
595        let _ = prod.push(DropCounter);
596        let _ = prod.push(DropCounter);
597
598        assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
599
600        drop(prod);
601        drop(cons);
602
603        assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 3);
604    }
605
606    // ============================================================================
607    // Cross-Thread
608    // ============================================================================
609
610    #[test]
611    fn cross_thread_bounded() {
612        use std::thread;
613
614        let (mut prod, mut cons) = ring_buffer::<u64>(64);
615
616        let producer = thread::spawn(move || {
617            for i in 0..10_000 {
618                while prod.push(i).is_err() {
619                    std::hint::spin_loop();
620                }
621            }
622        });
623
624        let consumer = thread::spawn(move || {
625            let mut received = 0u64;
626            while received < 10_000 {
627                if cons.pop().is_some() {
628                    received += 1;
629                } else {
630                    std::hint::spin_loop();
631                }
632            }
633            received
634        });
635
636        producer.join().unwrap();
637        let received = consumer.join().unwrap();
638        assert_eq!(received, 10_000);
639    }
640
641    // ============================================================================
642    // Special Types
643    // ============================================================================
644
645    #[test]
646    fn zero_sized_type() {
647        let (mut prod, mut cons) = ring_buffer::<()>(8);
648
649        let _ = prod.push(());
650        let _ = prod.push(());
651
652        assert_eq!(cons.pop(), Some(()));
653        assert_eq!(cons.pop(), Some(()));
654        assert_eq!(cons.pop(), None);
655    }
656
657    #[test]
658    fn string_type() {
659        let (mut prod, mut cons) = ring_buffer::<String>(4);
660
661        let _ = prod.push("hello".to_string());
662        let _ = prod.push("world".to_string());
663
664        assert_eq!(cons.pop(), Some("hello".to_string()));
665        assert_eq!(cons.pop(), Some("world".to_string()));
666    }
667
668    #[test]
669    #[should_panic(expected = "capacity must be non-zero")]
670    fn zero_capacity_panics() {
671        let _ = ring_buffer::<u64>(0);
672    }
673
674    #[test]
675    fn large_message_type() {
676        #[repr(C, align(64))]
677        struct LargeMessage {
678            data: [u8; 256],
679        }
680
681        let (mut prod, mut cons) = ring_buffer::<LargeMessage>(8);
682
683        let msg = LargeMessage { data: [42u8; 256] };
684        assert!(prod.push(msg).is_ok());
685
686        let received = cons.pop().unwrap();
687        assert_eq!(received.data[0], 42);
688        assert_eq!(received.data[255], 42);
689    }
690
691    #[test]
692    fn multiple_laps() {
693        let (mut prod, mut cons) = ring_buffer::<u64>(4);
694
695        // 10 full laps through 4-slot buffer
696        for i in 0..40 {
697            assert!(prod.push(i).is_ok());
698            assert_eq!(cons.pop(), Some(i));
699        }
700    }
701
702    #[test]
703    fn fifo_order_cross_thread() {
704        use std::thread;
705
706        let (mut prod, mut cons) = ring_buffer::<u64>(64);
707
708        let producer = thread::spawn(move || {
709            for i in 0..10_000u64 {
710                while prod.push(i).is_err() {
711                    std::hint::spin_loop();
712                }
713            }
714        });
715
716        let consumer = thread::spawn(move || {
717            let mut expected = 0u64;
718            while expected < 10_000 {
719                if let Some(val) = cons.pop() {
720                    assert_eq!(val, expected, "FIFO order violated");
721                    expected += 1;
722                } else {
723                    std::hint::spin_loop();
724                }
725            }
726        });
727
728        producer.join().unwrap();
729        consumer.join().unwrap();
730    }
731
732    #[test]
733    fn stress_high_volume() {
734        use std::thread;
735
736        const COUNT: u64 = 1_000_000;
737
738        let (mut prod, mut cons) = ring_buffer::<u64>(1024);
739
740        let producer = thread::spawn(move || {
741            for i in 0..COUNT {
742                while prod.push(i).is_err() {
743                    std::hint::spin_loop();
744                }
745            }
746        });
747
748        let consumer = thread::spawn(move || {
749            let mut sum = 0u64;
750            let mut received = 0u64;
751            while received < COUNT {
752                if let Some(val) = cons.pop() {
753                    sum = sum.wrapping_add(val);
754                    received += 1;
755                } else {
756                    std::hint::spin_loop();
757                }
758            }
759            sum
760        });
761
762        producer.join().unwrap();
763        let sum = consumer.join().unwrap();
764        assert_eq!(sum, COUNT * (COUNT - 1) / 2);
765    }
766
767    #[test]
768    fn capacity_rounds_to_power_of_two() {
769        let (prod, _) = ring_buffer::<u64>(100);
770        assert_eq!(prod.capacity(), 128);
771
772        let (prod, _) = ring_buffer::<u64>(1000);
773        assert_eq!(prod.capacity(), 1024);
774    }
775}