nexus_queue/lib.rs
1//! This implementation uses per-slot lap counters instead:
2//!
3//! ```text
4//! Per-slot sequencing (nexus):
5//! ┌──────────────────────────────────────────────────────────┐
6//! │ buffer[0]: { lap: AtomicUsize, data: T } │
7//! │ buffer[1]: { lap: AtomicUsize, data: T } │
8//! │ ... │
9//! └──────────────────────────────────────────────────────────┘
10//! Lap counter + data on SAME cache line
11//! ```
12//!
13//! # Why Per-Slot Sequencing Wins
14//!
15//! ## 1. Cache Locality
16//!
17//! When checking if a slot is ready, we load `slot.lap`. The subsequent read of
18//! `slot.data` is on the same cache line - already in L1. Traditional designs
19//! require fetching from 3 separate cache lines (head, tail, data).
20//!
21//! ## 2. No Stale Cache Problem
22//!
23//! Traditional designs cache the remote index to avoid atomic loads:
24//!
25//! ```text
26//! // Traditional push (rtrb-style)
27//! if head - cached_tail >= capacity {
28//! cached_tail = tail.load(); // "Slow path" refresh
29//! if head - cached_tail >= capacity {
30//! return Full;
31//! }
32//! }
33//! ```
34//!
35//! In ping-pong benchmarks (1 message in flight), the cache is *always* stale,
36//! so every operation hits the "slow path". Per-slot sequencing has no cache
37//! to refresh - just check the slot directly.
38//!
39//! ## 3. Simpler Control Flow
40//!
41//! Fewer branches = better branch prediction. Our branch miss rate is ~1.6%
42//! vs similar rates for rtrb, but with fewer total branches.
43//!
44//! # Optimization Journey
45//!
46//! We started by cloning rtrb's design, then systematically tested changes.
47//! Using rtrb as baseline (p50 ≈ 200 cycles):
48//!
49//! | Change | Result | Cycles | Notes |
50//! |--------|--------|--------|-------|
51//! | Clone rtrb exactly | Baseline | ~200 | Starting point |
52//! | Per-slot lap counters | **+25%** | ~150 | Biggest win |
53//! | Division → bit shift | **+15%** | ~150→~130 | `tail/cap` → `tail>>shift` |
54//! | `repr(C)` field ordering | +5% | - | Hot fields first |
55//! | Manual fencing | ~0% | - | No change on x86, helps ARM |
56//! | Const generics | **-20%** | - | Surprisingly worse! |
57//! | CachePadded slots | ~0% | - | Not needed for our layout |
58//!
59//! ## What Worked
60//!
61//! **Per-slot sequencing**: The single biggest win. Eliminated the stale cache
62//! problem entirely and improved cache locality.
63//!
64//! **Bit shift for lap calculation**: Division is 20-90 cycles on x86. Since
65//! capacity is always a power of two, `tail / capacity` becomes `tail >> shift`.
66//! Saved ~20-50 cycles per operation.
67//!
68//! **Field ordering with `repr(C)`**: Placing hot-path fields (local_tail, buffer,
69//! mask) first improves prefetching. ~5% throughput improvement.
70//!
71//! **Instruction ordering**: Updating `local_tail` *after* the atomic store
72//! gives cache coherency a head start, reducing latency variance.
73//!
74//! ## What Didn't Work
75//!
76//! **Const generics for capacity**: We expected `const CAP: usize` to help by
77//! making the mask a compile-time constant. Instead, throughput regressed 20%!
78//! Likely causes: monomorphization code bloat, different inlining decisions.
79//!
80//! **CachePadded lap counters**: With small `T`, multiple slots share a cache
81//! line. We tested padding each slot to 64 bytes. No improvement - the true
82//! sharing on the active slot dominates, not false sharing between slots.
83//!
84//! **Cached indices**: The traditional "optimization" that rtrb uses. In our
85//! testing, it's actually slower for latency-sensitive workloads because the
86//! cache is always stale in ping-pong scenarios.
87//!
88//! # Example
89//!
90//! ```
91//! use nexus_queue;
92//!
93//! let (mut producer, mut consumer) = nexus_queue::ring_buffer::<u64>(1024);
94//!
95//! // Bounded push - returns error if full
96//! producer.push(1).unwrap();
97//! producer.push(2).unwrap();
98//!
99//! assert_eq!(consumer.pop(), Some(1));
100//! assert_eq!(consumer.pop(), Some(2));
101//! ```
102//!
103//! # Benchmarking Methodology
104//!
105//! Latency benchmarks use ping-pong between two threads pinned to separate
106//! physical cores (avoiding hyperthreading). We measure round-trip time with
107//! `rdtscp` and divide by 2 for one-way latency. 10,000 warmup iterations
108//! followed by 100,000 measured samples.
109//!
110//! Throughput benchmarks measure time to transfer 10 million messages through
111//! a 1024-slot buffer with producer and consumer on separate cores.
112//!
113//! All benchmarks run with turbo boost disabled for consistent results:
114//!
115//! ```bash
116//! echo 1 | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo
117//! sudo taskset -c 0,2 ./target/release/deps/perf_spsc_bounded_latency-*
118//! ```
119//!
120//! # Memory Ordering
121//!
122//! The implementation uses manual fencing for clarity and portability:
123//!
124//! - **Producer**: `fence(Release)` before storing lap counter
125//! - **Consumer**: `fence(Acquire)` after loading lap counter
126//!
127//! On x86, these compile to no instructions (strong memory model), but they're
128//! required for correctness on ARM and other weakly-ordered architectures.
129//!
130//! # When to Use This vs Other Queues
131//!
132//! Use `nexus::spsc` when:
133//! - You have exactly one producer and one consumer
134//! - You need the lowest possible latency
135//! - You want both bounded and overwriting modes in one queue
136//!
137//! Consider alternatives when:
138//! - Multiple producers → use MPSC
139//! - Multiple consumers → use SPMC or MPMC
140//! - You need async/await integration → use `tokio::sync::mpsc`
141
142use std::cell::UnsafeCell;
143use std::fmt;
144use std::mem::{ManuallyDrop, MaybeUninit};
145use std::sync::Arc;
146use std::sync::atomic::{AtomicUsize, Ordering, fence};
147
148/// Creates a new SPSC ring buffer with the given capacity.
149///
150/// Returns a `(Producer, Consumer)` pair.
151///
152/// The actual capacity will be rounded up to the next power of two.
153///
154/// # Panics
155///
156/// Panics if `capacity` is 0.
157pub fn ring_buffer<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
158 assert!(capacity > 0, "capacity must be non-zero");
159
160 let capacity = capacity.next_power_of_two();
161 let mask = capacity - 1;
162 let shift = capacity.trailing_zeros() as usize;
163
164 // Allocate slot buffer
165 let mut slots = ManuallyDrop::new(Vec::<Slot<T>>::with_capacity(capacity));
166 for _ in 0..capacity {
167 slots.push(Slot {
168 lap: AtomicUsize::new(0),
169 data: UnsafeCell::new(MaybeUninit::uninit()),
170 });
171 }
172 let buffer = slots.as_mut_ptr();
173
174 let inner = Arc::new(Inner {
175 buffer,
176 capacity,
177 mask,
178 });
179
180 (
181 Producer {
182 local_tail: 0,
183 buffer,
184 mask,
185 shift,
186 inner: Arc::clone(&inner),
187 },
188 Consumer {
189 local_head: 0,
190 buffer,
191 mask,
192 shift,
193 inner,
194 },
195 )
196}
197
198/// A slot in the ring buffer with a lap counter.
199///
200/// The lap counter serves as the synchronization mechanism:
201/// - `lap == 0`: slot is empty/consumed
202/// - `lap == n`: slot contains data written during lap `n` (1-indexed)
203#[repr(C)]
204struct Slot<T> {
205 lap: AtomicUsize,
206 data: UnsafeCell<MaybeUninit<T>>,
207}
208
209/// Shared state between producer and consumer.
210#[repr(C)]
211struct Inner<T> {
212 buffer: *mut Slot<T>,
213 capacity: usize,
214 mask: usize,
215}
216
217unsafe impl<T: Send> Send for Inner<T> {}
218unsafe impl<T: Send> Sync for Inner<T> {}
219
220impl<T> Drop for Inner<T> {
221 fn drop(&mut self) {
222 // Drop any remaining data in slots
223 for i in 0..self.capacity {
224 let slot = unsafe { &*self.buffer.add(i) };
225 let lap = slot.lap.load(Ordering::Relaxed);
226 if lap > 0 {
227 unsafe { (*slot.data.get()).assume_init_drop() };
228 }
229 }
230
231 // Free the Vec
232 unsafe {
233 let _ = Vec::from_raw_parts(self.buffer, self.capacity, self.capacity);
234 }
235 }
236}
237
238/// The producer half of an SPSC ring buffer.
239///
240/// Takes `&mut self` to statically ensure single-producer access.
241#[repr(C)]
242pub struct Producer<T> {
243 // === Hot path fields ===
244 local_tail: usize,
245 buffer: *mut Slot<T>,
246 mask: usize,
247 shift: usize,
248
249 // === Cold path fields ===
250 inner: Arc<Inner<T>>,
251}
252
253unsafe impl<T: Send> Send for Producer<T> {}
254
255impl<T> Producer<T> {
256 /// Attempts to push a value into the ring buffer.
257 ///
258 /// Returns `Err(Full(value))` if the buffer is full, giving the value back.
259 ///
260 /// # Example
261 ///
262 /// ```
263 /// use nexus_queue;
264 ///
265 /// let (mut producer, mut consumer) = nexus_queue::ring_buffer::<u32>(2);
266 ///
267 /// assert!(producer.push(1).is_ok());
268 /// assert!(producer.push(2).is_ok());
269 /// assert!(producer.push(3).is_err()); // Full
270 /// ```
271 #[inline]
272 #[must_use = "push returns Err if full, which should be handled"]
273 pub fn push(&mut self, value: T) -> Result<(), Full<T>> {
274 let tail = self.local_tail;
275 let slot = unsafe { &*self.buffer.add(tail & self.mask) };
276
277 // Check if slot is occupied (lap > 0 means data present)
278 let slot_lap = slot.lap.load(Ordering::Relaxed);
279 fence(Ordering::Acquire);
280
281 if slot_lap != 0 {
282 return Err(Full(value));
283 }
284
285 // Write new value
286 unsafe { (*slot.data.get()).write(value) };
287
288 // Publish with new lap
289 let lap = (tail >> self.shift) + 1;
290 fence(Ordering::Release);
291 slot.lap.store(lap, Ordering::Relaxed);
292 self.local_tail = tail.wrapping_add(1);
293
294 Ok(())
295 }
296
297 /// Returns the capacity of the ring buffer.
298 #[inline]
299 pub fn capacity(&self) -> usize {
300 1 << self.shift
301 }
302
303 /// Returns `true` if the consumer has been dropped.
304 #[inline]
305 pub fn is_disconnected(&self) -> bool {
306 Arc::strong_count(&self.inner) == 1
307 }
308}
309
310impl<T> fmt::Debug for Producer<T> {
311 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
312 f.debug_struct("Producer")
313 .field("capacity", &self.capacity())
314 .finish_non_exhaustive()
315 }
316}
317
318/// The consumer half of an SPSC ring buffer.
319///
320/// Use [`pop`](Consumer::pop) to remove elements. Takes `&mut self` to
321/// statically ensure single-consumer access.
322#[repr(C)]
323pub struct Consumer<T> {
324 // === Hot path fields ===
325 local_head: usize,
326 buffer: *mut Slot<T>,
327 mask: usize,
328 shift: usize,
329
330 // === Cold path fields ===
331 inner: Arc<Inner<T>>,
332}
333
334unsafe impl<T: Send> Send for Consumer<T> {}
335
336impl<T> Consumer<T> {
337 /// Attempts to pop a value from the ring buffer.
338 ///
339 /// Returns `Some(value)` if data is available, `None` if the buffer is empty.
340 /// Values are returned in FIFO order.
341 ///
342 /// # Example
343 ///
344 /// ```
345 /// use nexus_queue;
346 ///
347 /// let (mut producer, mut consumer) = nexus_queue::ring_buffer::<u32>(8);
348 ///
349 /// assert_eq!(consumer.pop(), None); // Empty
350 ///
351 /// producer.push(42).unwrap();
352 /// assert_eq!(consumer.pop(), Some(42));
353 /// ```
354 #[inline]
355 pub fn pop(&mut self) -> Option<T> {
356 let head = self.local_head;
357 let slot = unsafe { &*self.buffer.add(head & self.mask) };
358 let expected_lap = (head >> self.shift) + 1;
359
360 let slot_lap = slot.lap.load(Ordering::Relaxed);
361 fence(Ordering::Acquire);
362
363 if slot_lap != expected_lap {
364 return None;
365 }
366
367 let value = unsafe { (*slot.data.get()).assume_init_read() };
368
369 fence(Ordering::Release);
370 slot.lap.store(0, Ordering::Relaxed);
371
372 self.local_head = head.wrapping_add(1);
373
374 Some(value)
375 }
376
377 /// Returns the capacity of the ring buffer.
378 #[inline]
379 pub const fn capacity(&self) -> usize {
380 1 << self.shift
381 }
382
383 /// Returns `true` if the producer has been dropped.
384 #[inline]
385 pub fn is_disconnected(&self) -> bool {
386 Arc::strong_count(&self.inner) == 1
387 }
388}
389
390impl<T> fmt::Debug for Consumer<T> {
391 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392 f.debug_struct("Consumer")
393 .field("capacity", &self.capacity())
394 .finish_non_exhaustive()
395 }
396}
397
398/// Error returned when the ring buffer is full.
399///
400/// Contains the value that could not be pushed.
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402pub struct Full<T>(pub T);
403
404impl<T> Full<T> {
405 /// Returns the value that could not be pushed.
406 pub fn into_inner(self) -> T {
407 self.0
408 }
409}
410
411impl<T> fmt::Display for Full<T> {
412 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413 write!(f, "ring buffer is full")
414 }
415}
416
417impl<T: fmt::Debug> std::error::Error for Full<T> {}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 // ============================================================================
424 // Basic Operations
425 // ============================================================================
426
427 #[test]
428 fn basic_push_pop() {
429 let (mut prod, mut cons) = ring_buffer::<u64>(4);
430
431 assert!(prod.push(1).is_ok());
432 assert!(prod.push(2).is_ok());
433 assert!(prod.push(3).is_ok());
434
435 assert_eq!(cons.pop(), Some(1));
436 assert_eq!(cons.pop(), Some(2));
437 assert_eq!(cons.pop(), Some(3));
438 assert_eq!(cons.pop(), None);
439 }
440
441 #[test]
442 fn empty_pop_returns_none() {
443 let (_, mut cons) = ring_buffer::<u64>(4);
444 assert_eq!(cons.pop(), None);
445 assert_eq!(cons.pop(), None);
446 }
447
448 #[test]
449 fn fill_then_drain() {
450 let (mut prod, mut cons) = ring_buffer::<u64>(4);
451
452 for i in 0..4 {
453 assert!(prod.push(i).is_ok());
454 }
455
456 for i in 0..4 {
457 assert_eq!(cons.pop(), Some(i));
458 }
459
460 assert_eq!(cons.pop(), None);
461 }
462
463 #[test]
464 fn push_returns_error_when_full() {
465 let (mut prod, _cons) = ring_buffer::<u64>(4);
466
467 assert!(prod.push(1).is_ok());
468 assert!(prod.push(2).is_ok());
469 assert!(prod.push(3).is_ok());
470 assert!(prod.push(4).is_ok());
471
472 let err = prod.push(5).unwrap_err();
473 assert_eq!(err.into_inner(), 5);
474 }
475 // ============================================================================
476 // Interleaved Operations
477 // ============================================================================
478
479 #[test]
480 fn interleaved_no_overwrite() {
481 let (mut prod, mut cons) = ring_buffer::<u64>(8);
482
483 for i in 0..1000 {
484 assert!(prod.push(i).is_ok());
485 assert_eq!(cons.pop(), Some(i));
486 }
487 }
488
489 #[test]
490 fn partial_fill_drain_cycles() {
491 let (mut prod, mut cons) = ring_buffer::<u64>(8);
492
493 for round in 0..100 {
494 for i in 0..4 {
495 assert!(prod.push(round * 4 + i).is_ok());
496 }
497
498 for i in 0..4 {
499 assert_eq!(cons.pop(), Some(round * 4 + i));
500 }
501 }
502 }
503
504 // ============================================================================
505 // Single Slot
506 // ============================================================================
507
508 #[test]
509 fn single_slot_bounded() {
510 let (mut prod, mut cons) = ring_buffer::<u64>(1);
511
512 assert!(prod.push(1).is_ok());
513 assert!(prod.push(2).is_err());
514
515 assert_eq!(cons.pop(), Some(1));
516 assert!(prod.push(2).is_ok());
517 }
518
519 // ============================================================================
520 // Disconnection
521 // ============================================================================
522
523 #[test]
524 fn producer_disconnected() {
525 let (prod, cons) = ring_buffer::<u64>(4);
526
527 assert!(!cons.is_disconnected());
528 drop(prod);
529 assert!(cons.is_disconnected());
530 }
531
532 #[test]
533 fn consumer_disconnected() {
534 let (prod, cons) = ring_buffer::<u64>(4);
535
536 assert!(!prod.is_disconnected());
537 drop(cons);
538 assert!(prod.is_disconnected());
539 }
540
541 // ============================================================================
542 // Drop Behavior
543 // ============================================================================
544
545 #[test]
546 fn drop_cleans_up_remaining() {
547 use std::sync::atomic::AtomicUsize;
548
549 static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
550
551 struct DropCounter;
552 impl Drop for DropCounter {
553 fn drop(&mut self) {
554 DROP_COUNT.fetch_add(1, Ordering::SeqCst);
555 }
556 }
557
558 DROP_COUNT.store(0, Ordering::SeqCst);
559
560 let (mut prod, cons) = ring_buffer::<DropCounter>(4);
561
562 let _ = prod.push(DropCounter);
563 let _ = prod.push(DropCounter);
564 let _ = prod.push(DropCounter);
565
566 assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
567
568 drop(prod);
569 drop(cons);
570
571 assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 3);
572 }
573
574 // ============================================================================
575 // Cross-Thread
576 // ============================================================================
577
578 #[test]
579 fn cross_thread_bounded() {
580 use std::thread;
581
582 let (mut prod, mut cons) = ring_buffer::<u64>(64);
583
584 let producer = thread::spawn(move || {
585 for i in 0..10_000 {
586 while prod.push(i).is_err() {
587 std::hint::spin_loop();
588 }
589 }
590 });
591
592 let consumer = thread::spawn(move || {
593 let mut received = 0u64;
594 while received < 10_000 {
595 if cons.pop().is_some() {
596 received += 1;
597 } else {
598 std::hint::spin_loop();
599 }
600 }
601 received
602 });
603
604 producer.join().unwrap();
605 let received = consumer.join().unwrap();
606 assert_eq!(received, 10_000);
607 }
608
609 // ============================================================================
610 // Special Types
611 // ============================================================================
612
613 #[test]
614 fn zero_sized_type() {
615 let (mut prod, mut cons) = ring_buffer::<()>(8);
616
617 let _ = prod.push(());
618 let _ = prod.push(());
619
620 assert_eq!(cons.pop(), Some(()));
621 assert_eq!(cons.pop(), Some(()));
622 assert_eq!(cons.pop(), None);
623 }
624
625 #[test]
626 fn string_type() {
627 let (mut prod, mut cons) = ring_buffer::<String>(4);
628
629 let _ = prod.push("hello".to_string());
630 let _ = prod.push("world".to_string());
631
632 assert_eq!(cons.pop(), Some("hello".to_string()));
633 assert_eq!(cons.pop(), Some("world".to_string()));
634 }
635
636 #[test]
637 #[should_panic(expected = "capacity must be non-zero")]
638 fn zero_capacity_panics() {
639 let _ = ring_buffer::<u64>(0);
640 }
641
642 #[test]
643 fn large_message_type() {
644 #[repr(C, align(64))]
645 struct LargeMessage {
646 data: [u8; 256],
647 }
648
649 let (mut prod, mut cons) = ring_buffer::<LargeMessage>(8);
650
651 let msg = LargeMessage { data: [42u8; 256] };
652 assert!(prod.push(msg).is_ok());
653
654 let received = cons.pop().unwrap();
655 assert_eq!(received.data[0], 42);
656 assert_eq!(received.data[255], 42);
657 }
658
659 #[test]
660 fn multiple_laps() {
661 let (mut prod, mut cons) = ring_buffer::<u64>(4);
662
663 // 10 full laps through 4-slot buffer
664 for i in 0..40 {
665 assert!(prod.push(i).is_ok());
666 assert_eq!(cons.pop(), Some(i));
667 }
668 }
669
670 #[test]
671 fn fifo_order_cross_thread() {
672 use std::thread;
673
674 let (mut prod, mut cons) = ring_buffer::<u64>(64);
675
676 let producer = thread::spawn(move || {
677 for i in 0..10_000u64 {
678 while prod.push(i).is_err() {
679 std::hint::spin_loop();
680 }
681 }
682 });
683
684 let consumer = thread::spawn(move || {
685 let mut expected = 0u64;
686 while expected < 10_000 {
687 if let Some(val) = cons.pop() {
688 assert_eq!(val, expected, "FIFO order violated");
689 expected += 1;
690 } else {
691 std::hint::spin_loop();
692 }
693 }
694 });
695
696 producer.join().unwrap();
697 consumer.join().unwrap();
698 }
699
700 #[test]
701 fn stress_high_volume() {
702 use std::thread;
703
704 const COUNT: u64 = 1_000_000;
705
706 let (mut prod, mut cons) = ring_buffer::<u64>(1024);
707
708 let producer = thread::spawn(move || {
709 for i in 0..COUNT {
710 while prod.push(i).is_err() {
711 std::hint::spin_loop();
712 }
713 }
714 });
715
716 let consumer = thread::spawn(move || {
717 let mut sum = 0u64;
718 let mut received = 0u64;
719 while received < COUNT {
720 if let Some(val) = cons.pop() {
721 sum = sum.wrapping_add(val);
722 received += 1;
723 } else {
724 std::hint::spin_loop();
725 }
726 }
727 sum
728 });
729
730 producer.join().unwrap();
731 let sum = consumer.join().unwrap();
732 assert_eq!(sum, COUNT * (COUNT - 1) / 2);
733 }
734
735 #[test]
736 fn capacity_rounds_to_power_of_two() {
737 let (prod, _) = ring_buffer::<u64>(100);
738 assert_eq!(prod.capacity(), 128);
739
740 let (prod, _) = ring_buffer::<u64>(1000);
741 assert_eq!(prod.capacity(), 1024);
742 }
743}