nexus_logbuf/queue/spsc.rs
1//! Single-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared: │
8//! │ head: CachePadded<AtomicUsize> ← Consumer writes, producer reads │
9//! │ buffer: *mut u8 │
10//! │ capacity: usize (power of 2) │
11//! │ mask: usize (capacity - 1) │
12//! └─────────────────────────────────────────────────────────────────────────┘
13//!
14//! ┌─────────────────────────────────┐ ┌─────────────────────────────────┐
15//! │ Producer: │ │ Consumer: │
16//! │ tail: usize (local) │ │ head: usize (local) │
17//! │ cached_head: usize (local) │ │ │
18//! └─────────────────────────────────┘ └─────────────────────────────────┘
19//! ```
20//!
21//! # Record Layout
22//!
23//! ```text
24//! ┌──────────────────────────────────────────────┐
25//! │ len: usize (8 bytes on 64-bit) │ ← payload length / commit marker
26//! ├──────────────────────────────────────────────┤
27//! │ payload: [u8; len] (variable) │ ← raw bytes
28//! ├──────────────────────────────────────────────┤
29//! │ padding: [u8; ...] (0-7 bytes) │ ← align to 8-byte boundary
30//! └──────────────────────────────────────────────┘
31//! ```
32//!
33//! Records are packed contiguously. Total record size is
34//! `align8(size_of::<usize>() + len)`. Using `usize` for the header ensures
35//! the payload starts at a word-aligned offset.
36//!
37//! # Len Field Encoding
38//!
39//! - `len == 0`: Not committed, consumer waits
40//! - `len > 0, high bit clear`: Committed record, payload is `len` bytes
41//! - `len high bit set`: Skip marker, advance by `len & LEN_MASK` bytes
42
43use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
44use std::cell::Cell;
45use std::ops::{Deref, DerefMut};
46use std::ptr;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicUsize, Ordering, fence};
49
50use crossbeam_utils::CachePadded;
51
52use crate::{LEN_MASK, SKIP_BIT, TryClaimError, align8};
53
54/// Header size in bytes — one system word (`usize`).
55///
56/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
57const HEADER_SIZE: usize = std::mem::size_of::<usize>();
58
59/// Creates a bounded SPSC byte ring buffer.
60///
61/// Capacity is rounded up to the next power of two.
62///
63/// # Panics
64///
65/// Panics if `capacity` is zero or less than 16 bytes.
66pub fn new(capacity: usize) -> (Producer, Consumer) {
67 assert!(capacity >= 16, "capacity must be at least 16 bytes");
68
69 let capacity = capacity.next_power_of_two();
70 let mask = capacity - 1;
71
72 // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
73 let layout = Layout::from_size_align(capacity, 8)
74 .expect("valid layout: capacity is a power of two >= 16, align is 8");
75 // SAFETY: Layout is valid — capacity >= 16 (power of two), align is 8.
76 let buffer_ptr = unsafe { alloc_zeroed(layout) };
77 if buffer_ptr.is_null() {
78 handle_alloc_error(layout);
79 }
80
81 let shared = Arc::new(Shared {
82 head: CachePadded::new(AtomicUsize::new(0)),
83 buffer: buffer_ptr,
84 capacity,
85 mask,
86 });
87
88 (
89 Producer {
90 tail: Cell::new(0),
91 cached_head: Cell::new(0),
92 shared: Arc::clone(&shared),
93 },
94 Consumer {
95 head: Cell::new(0),
96 shared,
97 },
98 )
99}
100
101struct Shared {
102 /// Consumer's read position. Updated by consumer, read by producer.
103 head: CachePadded<AtomicUsize>,
104 /// Buffer pointer.
105 buffer: *mut u8,
106 /// Buffer capacity (power of 2).
107 capacity: usize,
108 /// Mask for wrapping (capacity - 1).
109 mask: usize,
110}
111
112// Safety: Buffer is only accessed by one producer and one consumer.
113// The atomic head provides synchronization.
114unsafe impl Send for Shared {}
115unsafe impl Sync for Shared {}
116
117impl Drop for Shared {
118 fn drop(&mut self) {
119 let layout = Layout::from_size_align(self.capacity, 8)
120 .expect("valid layout: capacity was validated at construction");
121 // SAFETY: buffer was allocated with alloc_zeroed using this exact layout.
122 // Shared is only dropped once (Arc prevents earlier drops).
123 unsafe { dealloc(self.buffer, layout) };
124 }
125}
126
127// ============================================================================
128// Producer
129// ============================================================================
130
131/// Producer endpoint of the SPSC ring buffer.
132///
133/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
134pub struct Producer {
135 /// Local tail position (free-running).
136 tail: Cell<usize>,
137 /// Cached head position (Rigtorp optimization).
138 cached_head: Cell<usize>,
139 /// Shared state.
140 shared: Arc<Shared>,
141}
142
143// Safety: Producer is only used from one thread.
144unsafe impl Send for Producer {}
145
146impl Producer {
147 /// Attempts to claim space for a record with the given payload length.
148 ///
149 /// Returns a [`WriteClaim`] that can be written to and then committed.
150 ///
151 /// # Errors
152 ///
153 /// - [`TryClaimError::ZeroLength`] if `len` is zero
154 /// - [`TryClaimError::Full`] if the buffer is full
155 ///
156 /// # Safety Contract
157 ///
158 /// `len` must not exceed `LEN_MASK`. This is checked with
159 /// `debug_assert!` only.
160 #[inline]
161 pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, TryClaimError> {
162 debug_assert!(len <= LEN_MASK, "payload too large");
163 if len == 0 {
164 return Err(TryClaimError::ZeroLength);
165 }
166
167 let record_size = align8(HEADER_SIZE + len);
168
169 // Check if we have space
170 let tail = self.tail.get();
171 let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
172
173 if available < record_size {
174 // Reload head from shared state
175 self.cached_head
176 .set(self.shared.head.load(Ordering::Relaxed));
177 fence(Ordering::Acquire);
178
179 let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
180 if available < record_size {
181 return Err(TryClaimError::Full);
182 }
183 }
184
185 // Check if record fits before buffer end, or needs wrap
186 let offset = tail & self.shared.mask;
187 let space_to_end = self.shared.capacity - offset;
188
189 if space_to_end < record_size {
190 // Need to wrap. First check if we have space for padding + record at start.
191 let total_needed = space_to_end + record_size;
192 let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
193
194 if available < total_needed {
195 // Reload and recheck
196 self.cached_head
197 .set(self.shared.head.load(Ordering::Relaxed));
198 fence(Ordering::Acquire);
199
200 let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
201 if available < total_needed {
202 return Err(TryClaimError::Full);
203 }
204 }
205
206 // Write padding skip marker
207 let buffer = self.shared.buffer;
208 let skip_len = space_to_end | SKIP_BIT;
209 fence(Ordering::Release);
210 // SAFETY: offset is masked to [0, capacity) and 8-byte aligned.
211 // Buffer is valid for capacity bytes. We are the sole producer.
212 let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
213 // SAFETY: len_ptr points to a valid, aligned, zero-initialized usize
214 // within the buffer. AtomicUsize reference is used for store visibility.
215 unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
216
217 // Advance tail past padding
218 self.tail.set(tail.wrapping_add(space_to_end));
219 let new_offset = 0;
220
221 Ok(WriteClaim {
222 producer: self,
223 offset: new_offset,
224 len,
225 record_size,
226 committed: false,
227 })
228 } else {
229 // Fits without wrapping
230 Ok(WriteClaim {
231 producer: self,
232 offset,
233 len,
234 record_size,
235 committed: false,
236 })
237 }
238 }
239
240 /// Returns the capacity of the buffer.
241 #[inline]
242 pub fn capacity(&self) -> usize {
243 self.shared.capacity
244 }
245
246 /// Best-effort hint: returns `true` if the consumer has likely been dropped.
247 ///
248 /// Uses `Arc::strong_count` which is inherently racy — the count can
249 /// change between the check and the caller's next action. Suitable for
250 /// graceful shutdown detection, not for correctness. For reliable
251 /// disconnection detection, use the channel layer (`channel::spsc`)
252 /// which tracks disconnection via dedicated atomic flags.
253 // TODO: consider adding an AtomicBool flag if reliable detection is
254 // needed at the raw queue level.
255 #[inline]
256 pub fn is_disconnected(&self) -> bool {
257 Arc::strong_count(&self.shared) == 1
258 }
259}
260
261impl std::fmt::Debug for Producer {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 f.debug_struct("Producer")
264 .field("capacity", &self.capacity())
265 .finish_non_exhaustive()
266 }
267}
268
269// ============================================================================
270// WriteClaim
271// ============================================================================
272
273/// A claimed region for writing a record.
274///
275/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
276/// when done writing to publish the record. If dropped without committing, a skip
277/// marker is written so the consumer can advance past the dead region.
278pub struct WriteClaim<'a> {
279 producer: &'a mut Producer,
280 offset: usize,
281 len: usize,
282 record_size: usize,
283 committed: bool,
284}
285
286impl WriteClaim<'_> {
287 /// Commits the record, making it visible to the consumer.
288 #[inline]
289 pub fn commit(mut self) {
290 self.do_commit();
291 self.committed = true;
292 }
293
294 #[inline]
295 fn do_commit(&mut self) {
296 let buffer = self.producer.shared.buffer;
297 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
298 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
299
300 // Release fence: ensures payload writes are visible before len store
301 fence(Ordering::Release);
302 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
303 unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
304
305 // Advance tail
306 self.producer
307 .tail
308 .set(self.producer.tail.get().wrapping_add(self.record_size));
309 }
310
311 /// Returns the length of the payload region.
312 #[inline]
313 pub fn len(&self) -> usize {
314 self.len
315 }
316
317 /// Returns `true` if the payload is empty (always false, len must be > 0).
318 #[inline]
319 pub fn is_empty(&self) -> bool {
320 false
321 }
322}
323
324impl Deref for WriteClaim<'_> {
325 type Target = [u8];
326
327 #[inline]
328 fn deref(&self) -> &Self::Target {
329 let buffer = self.producer.shared.buffer;
330 // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
331 // exclusive access to this region via &mut Producer borrow.
332 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
333 // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
334 // and exclusively owned by this claim. Lifetime tied to &self.
335 unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
336 }
337}
338
339impl DerefMut for WriteClaim<'_> {
340 #[inline]
341 fn deref_mut(&mut self) -> &mut Self::Target {
342 let buffer = self.producer.shared.buffer;
343 // SAFETY: offset + HEADER_SIZE is within the buffer. Exclusive access
344 // guaranteed by &mut self (only one WriteClaim exists per try_claim).
345 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
346 // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
347 // and exclusively owned by this claim. Lifetime tied to &mut self.
348 unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
349 }
350}
351
352impl Drop for WriteClaim<'_> {
353 fn drop(&mut self) {
354 if !self.committed {
355 // Write skip marker so consumer can advance past this region
356 let buffer = self.producer.shared.buffer;
357 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
358 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
359 let skip_len = self.record_size | SKIP_BIT;
360
361 fence(Ordering::Release);
362 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
363 unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
364
365 // Advance tail past the dead region
366 self.producer
367 .tail
368 .set(self.producer.tail.get().wrapping_add(self.record_size));
369 }
370 }
371}
372
373// ============================================================================
374// Consumer
375// ============================================================================
376
377/// Consumer endpoint of the SPSC ring buffer.
378///
379/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
380pub struct Consumer {
381 /// Local head position (free-running).
382 head: Cell<usize>,
383 /// Shared state.
384 shared: Arc<Shared>,
385}
386
387// Safety: Consumer is only used from one thread.
388unsafe impl Send for Consumer {}
389
390impl Consumer {
391 /// Attempts to claim the next record for reading.
392 ///
393 /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
394 /// to `&[u8]` for the payload. When dropped, the record region is zeroed
395 /// and the head is advanced.
396 ///
397 /// Returns `None` if no committed record is available.
398 #[inline]
399 pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
400 let buffer = self.shared.buffer;
401
402 loop {
403 let offset = self.head.get() & self.shared.mask;
404 // SAFETY: offset is masked to [0, capacity), always 8-byte aligned
405 // (head advances by align8'd record sizes). Buffer is valid.
406 let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
407
408 // Relaxed atomic load, then Acquire fence for payload visibility
409 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
410 let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
411 fence(Ordering::Acquire);
412
413 if len_raw == 0 {
414 // Not committed yet
415 return None;
416 }
417
418 if len_raw & SKIP_BIT != 0 {
419 // Skip marker: zero the region and advance
420 let skip_size = len_raw & LEN_MASK;
421 // Zero payload first, then stamp last (mirrors write path)
422 if skip_size > HEADER_SIZE {
423 // SAFETY: offset + HEADER_SIZE .. offset + skip_size is within
424 // the buffer. Consumer has exclusive read access to this region.
425 unsafe {
426 ptr::write_bytes(
427 buffer.add(offset + HEADER_SIZE),
428 0,
429 skip_size - HEADER_SIZE,
430 );
431 }
432 }
433 // Ensure payload zeroing completes before clearing stamp
434 fence(Ordering::Release);
435 // SAFETY: len_ptr is still valid, computed above.
436 unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
437
438 self.head.set(self.head.get().wrapping_add(skip_size));
439
440 // Ensure stamp clear completes before head advance
441 fence(Ordering::Release);
442 self.shared.head.store(self.head.get(), Ordering::Relaxed);
443
444 // Continue to check next position
445 continue;
446 }
447
448 // Valid record
449 let len = len_raw;
450 let record_size = align8(HEADER_SIZE + len);
451
452 return Some(ReadClaim {
453 consumer: self,
454 offset,
455 len,
456 record_size,
457 });
458 }
459 }
460
461 /// Returns the capacity of the buffer.
462 #[inline]
463 pub fn capacity(&self) -> usize {
464 self.shared.capacity
465 }
466
467 /// Best-effort hint: returns `true` if the producer has likely been dropped.
468 ///
469 /// See [`Producer::is_disconnected`] for caveats — uses `Arc::strong_count`.
470 #[inline]
471 pub fn is_disconnected(&self) -> bool {
472 Arc::strong_count(&self.shared) == 1
473 }
474}
475
476impl std::fmt::Debug for Consumer {
477 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
478 f.debug_struct("Consumer")
479 .field("capacity", &self.capacity())
480 .finish_non_exhaustive()
481 }
482}
483
484// ============================================================================
485// ReadClaim
486// ============================================================================
487
488/// A claimed record for reading.
489///
490/// Dereferences to `&[u8]` for the payload. When dropped, the record region
491/// is zeroed and the head is advanced, freeing space for the producer.
492pub struct ReadClaim<'a> {
493 consumer: &'a mut Consumer,
494 offset: usize,
495 len: usize,
496 record_size: usize,
497}
498
499impl ReadClaim<'_> {
500 /// Returns the length of the payload.
501 #[inline]
502 pub fn len(&self) -> usize {
503 self.len
504 }
505
506 /// Returns `true` if the payload is empty.
507 #[inline]
508 pub fn is_empty(&self) -> bool {
509 self.len == 0
510 }
511}
512
513impl Deref for ReadClaim<'_> {
514 type Target = [u8];
515
516 #[inline]
517 fn deref(&self) -> &Self::Target {
518 let buffer = self.consumer.shared.buffer;
519 // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
520 // exclusive read access via &mut Consumer borrow.
521 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
522 // SAFETY: payload_ptr is valid for self.len bytes. The producer has
523 // finished writing (len was non-zero, preceded by Release fence).
524 unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
525 }
526}
527
528impl Drop for ReadClaim<'_> {
529 fn drop(&mut self) {
530 let buffer = self.consumer.shared.buffer;
531
532 // Zero payload first, then stamp last (mirrors write path)
533 if self.record_size > HEADER_SIZE {
534 // SAFETY: offset + HEADER_SIZE .. offset + record_size is within
535 // the buffer. Consumer owns this region exclusively.
536 unsafe {
537 ptr::write_bytes(
538 buffer.add(self.offset + HEADER_SIZE),
539 0,
540 self.record_size - HEADER_SIZE,
541 );
542 }
543 }
544 // Ensure payload zeroing completes before clearing stamp
545 fence(Ordering::Release);
546 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
547 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
548 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
549 unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
550
551 // Advance head
552 let new_head = self.consumer.head.get().wrapping_add(self.record_size);
553 self.consumer.head.set(new_head);
554
555 // Ensure stamp clear completes before head advance
556 fence(Ordering::Release);
557 self.consumer.shared.head.store(new_head, Ordering::Relaxed);
558 }
559}
560
561// ============================================================================
562// Tests
563// ============================================================================
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568
569 #[test]
570 fn basic_write_read() {
571 let (mut prod, mut cons) = new(1024);
572
573 let payload = b"hello world";
574 let mut claim = prod.try_claim(payload.len()).unwrap();
575 claim.copy_from_slice(payload);
576 claim.commit();
577
578 let record = cons.try_claim().unwrap();
579 assert_eq!(&*record, payload);
580 }
581
582 #[test]
583 fn empty_returns_none() {
584 let (_, mut cons) = new(1024);
585 assert!(cons.try_claim().is_none());
586 }
587
588 #[test]
589 fn multiple_records() {
590 let (mut prod, mut cons) = new(1024);
591
592 for i in 0..10 {
593 let payload = format!("message {}", i);
594 let mut claim = prod.try_claim(payload.len()).unwrap();
595 claim.copy_from_slice(payload.as_bytes());
596 claim.commit();
597 }
598
599 for i in 0..10 {
600 let record = cons.try_claim().unwrap();
601 let expected = format!("message {}", i);
602 assert_eq!(&*record, expected.as_bytes());
603 }
604
605 assert!(cons.try_claim().is_none());
606 }
607
608 #[test]
609 fn aborted_claim_creates_skip() {
610 let (mut prod, mut cons) = new(1024);
611
612 // Claim and drop without committing
613 {
614 let mut claim = prod.try_claim(10).unwrap();
615 claim.copy_from_slice(b"0123456789");
616 // drop without commit
617 }
618
619 // Write another record
620 {
621 let mut claim = prod.try_claim(5).unwrap();
622 claim.copy_from_slice(b"hello");
623 claim.commit();
624 }
625
626 // Consumer should skip the aborted record and read the committed one
627 let record = cons.try_claim().unwrap();
628 assert_eq!(&*record, b"hello");
629 }
630
631 #[test]
632 fn wrap_around() {
633 let (mut prod, mut cons) = new(64);
634
635 // Fill with messages that will cause wrap-around
636 for i in 0..20 {
637 let payload = format!("msg{:02}", i);
638 loop {
639 match prod.try_claim(payload.len()) {
640 Ok(mut claim) => {
641 claim.copy_from_slice(payload.as_bytes());
642 claim.commit();
643 break;
644 }
645 Err(_) => {
646 // Drain some
647 while cons.try_claim().is_some() {}
648 }
649 }
650 }
651 }
652 }
653
654 #[test]
655 fn full_returns_error() {
656 let (mut prod, _cons) = new(64);
657
658 // Fill the buffer
659 let mut count = 0;
660 while let Ok(mut claim) = prod.try_claim(8) {
661 claim.copy_from_slice(b"12345678");
662 claim.commit();
663 count += 1;
664 }
665
666 assert!(count > 0);
667 assert!(prod.try_claim(8).is_err());
668 }
669
670 #[test]
671 fn cross_thread() {
672 use std::thread;
673
674 let (mut prod, mut cons) = new(4096);
675
676 let producer = thread::spawn(move || {
677 for i in 0..10_000u64 {
678 let payload = i.to_le_bytes();
679 loop {
680 match prod.try_claim(payload.len()) {
681 Ok(mut claim) => {
682 claim.copy_from_slice(&payload);
683 claim.commit();
684 break;
685 }
686 Err(_) => std::hint::spin_loop(),
687 }
688 }
689 }
690 });
691
692 let consumer = thread::spawn(move || {
693 let mut received = 0u64;
694 while received < 10_000 {
695 if let Some(record) = cons.try_claim() {
696 let value = u64::from_le_bytes((*record).try_into().unwrap());
697 assert_eq!(value, received);
698 received += 1;
699 } else {
700 std::hint::spin_loop();
701 }
702 }
703 });
704
705 producer.join().unwrap();
706 consumer.join().unwrap();
707 }
708
709 #[test]
710 fn disconnection_detection() {
711 let (prod, cons) = new(1024);
712
713 assert!(!prod.is_disconnected());
714 assert!(!cons.is_disconnected());
715
716 drop(cons);
717 assert!(prod.is_disconnected());
718 }
719
720 #[test]
721 #[should_panic(expected = "capacity must be at least 16")]
722 fn tiny_capacity_panics() {
723 let _ = new(8);
724 }
725
726 #[test]
727 fn zero_len_returns_error() {
728 let (mut prod, _) = new(1024);
729 assert!(matches!(prod.try_claim(0), Err(TryClaimError::ZeroLength)));
730 }
731
732 #[test]
733 fn capacity_rounds_to_power_of_two() {
734 let (prod, _) = new(100);
735 assert_eq!(prod.capacity(), 128);
736
737 let (prod, _) = new(1000);
738 assert_eq!(prod.capacity(), 1024);
739 }
740
741 #[test]
742 fn variable_length_records() {
743 let (mut prod, mut cons) = new(4096);
744
745 let messages = [
746 "a",
747 "hello",
748 "this is a longer message",
749 "x",
750 "medium length",
751 ];
752
753 for msg in &messages {
754 let mut claim = prod.try_claim(msg.len()).unwrap();
755 claim.copy_from_slice(msg.as_bytes());
756 claim.commit();
757 }
758
759 for msg in &messages {
760 let record = cons.try_claim().unwrap();
761 assert_eq!(&*record, msg.as_bytes());
762 }
763 }
764
765 /// High-volume stress test with variable-length messages.
766 ///
767 /// Tests correctness under sustained load with wrap-around.
768 #[test]
769 fn stress_high_volume() {
770 use std::thread;
771
772 const COUNT: u64 = 1_000_000;
773 const BUFFER_SIZE: usize = 64 * 1024; // 64KB
774
775 let (mut prod, mut cons) = new(BUFFER_SIZE);
776
777 let producer = thread::spawn(move || {
778 for i in 0..COUNT {
779 // Variable length: 8-64 bytes based on sequence
780 let len = 8 + ((i % 8) * 8) as usize;
781 let mut payload = vec![0u8; len];
782 // Write sequence number at start
783 payload[..8].copy_from_slice(&i.to_le_bytes());
784
785 loop {
786 match prod.try_claim(len) {
787 Ok(mut claim) => {
788 claim.copy_from_slice(&payload);
789 claim.commit();
790 break;
791 }
792 Err(_) => std::hint::spin_loop(),
793 }
794 }
795 }
796 });
797
798 let consumer = thread::spawn(move || {
799 let mut received = 0u64;
800 while received < COUNT {
801 if let Some(record) = cons.try_claim() {
802 // Verify sequence number
803 let seq = u64::from_le_bytes(record[..8].try_into().unwrap());
804 assert_eq!(seq, received, "sequence mismatch at {}", received);
805
806 // Verify expected length
807 let expected_len = 8 + ((received % 8) * 8) as usize;
808 assert_eq!(
809 record.len(),
810 expected_len,
811 "length mismatch at {}",
812 received
813 );
814
815 received += 1;
816 } else {
817 std::hint::spin_loop();
818 }
819 }
820 received
821 });
822
823 producer.join().unwrap();
824 let received = consumer.join().unwrap();
825 assert_eq!(received, COUNT);
826 }
827
828 /// Stress test with maximum contention - tiny buffer, high throughput.
829 #[test]
830 fn stress_high_contention() {
831 use std::thread;
832
833 const COUNT: u64 = 100_000;
834 const BUFFER_SIZE: usize = 256; // Tiny buffer forces constant wrap-around
835
836 let (mut prod, mut cons) = new(BUFFER_SIZE);
837
838 let producer = thread::spawn(move || {
839 for i in 0..COUNT {
840 let payload = i.to_le_bytes();
841 loop {
842 match prod.try_claim(payload.len()) {
843 Ok(mut claim) => {
844 claim.copy_from_slice(&payload);
845 claim.commit();
846 break;
847 }
848 Err(_) => std::hint::spin_loop(),
849 }
850 }
851 }
852 });
853
854 let consumer = thread::spawn(move || {
855 let mut received = 0u64;
856 let mut sum = 0u64;
857 while received < COUNT {
858 if let Some(record) = cons.try_claim() {
859 let value = u64::from_le_bytes((*record).try_into().unwrap());
860 assert_eq!(value, received);
861 sum = sum.wrapping_add(value);
862 received += 1;
863 } else {
864 std::hint::spin_loop();
865 }
866 }
867 sum
868 });
869
870 producer.join().unwrap();
871 let sum = consumer.join().unwrap();
872 // Sum of 0..COUNT = COUNT * (COUNT-1) / 2
873 let expected = COUNT * (COUNT - 1) / 2;
874 assert_eq!(sum, expected);
875 }
876
877 /// Payload pointers must be word-aligned so users can write aligned structs.
878 #[test]
879 fn payload_is_word_aligned() {
880 let (mut prod, mut cons) = new(1024);
881
882 // Test several payload sizes to cover padding edge cases
883 for len in [1, 3, 7, 8, 13, 64, 255] {
884 let mut claim = prod.try_claim(len).unwrap();
885 let ptr = claim.as_mut_ptr();
886 assert_eq!(
887 ptr as usize % std::mem::align_of::<usize>(),
888 0,
889 "WriteClaim payload not word-aligned for len={len}"
890 );
891 claim.commit();
892
893 let record = cons.try_claim().unwrap();
894 let ptr = record.as_ptr();
895 assert_eq!(
896 ptr as usize % std::mem::align_of::<usize>(),
897 0,
898 "ReadClaim payload not word-aligned for len={len}"
899 );
900 }
901 }
902}