nexus_logbuf/queue/spsc.rs
1//! Single-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared: │
8//! │ head: CachePadded<AtomicUsize> ← Consumer writes, producer reads │
9//! │ buffer: *mut u8 │
10//! │ capacity: usize (power of 2) │
11//! │ mask: usize (capacity - 1) │
12//! └─────────────────────────────────────────────────────────────────────────┘
13//!
14//! ┌─────────────────────────────────┐ ┌─────────────────────────────────┐
15//! │ Producer: │ │ Consumer: │
16//! │ tail: usize (local) │ │ head: usize (local) │
17//! │ cached_head: usize (local) │ │ │
18//! └─────────────────────────────────┘ └─────────────────────────────────┘
19//! ```
20//!
21//! # Record Layout
22//!
23//! ```text
24//! ┌──────────────────────────────────────────────┐
25//! │ len: usize (8 bytes on 64-bit) │ ← payload length / commit marker
26//! ├──────────────────────────────────────────────┤
27//! │ payload: [u8; len] (variable) │ ← raw bytes
28//! ├──────────────────────────────────────────────┤
29//! │ padding: [u8; ...] (0-7 bytes) │ ← align to 8-byte boundary
30//! └──────────────────────────────────────────────┘
31//! ```
32//!
33//! Records are packed contiguously. Total record size is
34//! `align8(size_of::<usize>() + len)`. Using `usize` for the header ensures
35//! the payload starts at a word-aligned offset.
36//!
37//! # Len Field Encoding
38//!
39//! - `len == 0`: Not committed, consumer waits
40//! - `len > 0, high bit clear`: Committed record, payload is `len` bytes
41//! - `len high bit set`: Skip marker, advance by `len & LEN_MASK` bytes
42
43use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
44use std::cell::Cell;
45use std::ops::{Deref, DerefMut};
46use std::ptr;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicUsize, Ordering, fence};
49
50use crossbeam_utils::CachePadded;
51
52use crate::{LEN_MASK, SKIP_BIT, TryClaimError, align8};
53
54/// Header size in bytes — one system word (`usize`).
55///
56/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
57const HEADER_SIZE: usize = std::mem::size_of::<usize>();
58
59/// Creates a bounded SPSC byte ring buffer.
60///
61/// Capacity is rounded up to the next power of two.
62///
63/// # Panics
64///
65/// Panics if `capacity` is zero or less than 16 bytes.
66pub fn new(capacity: usize) -> (Producer, Consumer) {
67 assert!(capacity >= 16, "capacity must be at least 16 bytes");
68
69 let capacity = capacity.next_power_of_two();
70 let mask = capacity - 1;
71
72 // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
73 let layout = Layout::from_size_align(capacity, 8)
74 .expect("valid layout: capacity is a power of two >= 16, align is 8");
75 // SAFETY: Layout is valid — capacity >= 16 (power of two), align is 8.
76 let buffer_ptr = unsafe { alloc_zeroed(layout) };
77 if buffer_ptr.is_null() {
78 handle_alloc_error(layout);
79 }
80
81 let shared = Arc::new(Shared {
82 head: CachePadded::new(AtomicUsize::new(0)),
83 buffer: buffer_ptr,
84 capacity,
85 mask,
86 });
87
88 (
89 Producer {
90 tail: Cell::new(0),
91 cached_head: Cell::new(0),
92 shared: Arc::clone(&shared),
93 },
94 Consumer {
95 head: Cell::new(0),
96 shared,
97 },
98 )
99}
100
101struct Shared {
102 /// Consumer's read position. Updated by consumer, read by producer.
103 head: CachePadded<AtomicUsize>,
104 /// Buffer pointer.
105 buffer: *mut u8,
106 /// Buffer capacity (power of 2).
107 capacity: usize,
108 /// Mask for wrapping (capacity - 1).
109 mask: usize,
110}
111
112// Safety: Buffer is only accessed by one producer and one consumer.
113// The atomic head provides synchronization.
114unsafe impl Send for Shared {}
115unsafe impl Sync for Shared {}
116
117impl Drop for Shared {
118 fn drop(&mut self) {
119 let layout = Layout::from_size_align(self.capacity, 8)
120 .expect("valid layout: capacity was validated at construction");
121 // SAFETY: buffer was allocated with alloc_zeroed using this exact layout.
122 // Shared is only dropped once (Arc prevents earlier drops).
123 unsafe { dealloc(self.buffer, layout) };
124 }
125}
126
127// ============================================================================
128// Producer
129// ============================================================================
130
131/// Producer endpoint of the SPSC ring buffer.
132///
133/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
134pub struct Producer {
135 /// Local tail position (free-running).
136 tail: Cell<usize>,
137 /// Cached head position (Rigtorp optimization).
138 cached_head: Cell<usize>,
139 /// Shared state.
140 shared: Arc<Shared>,
141}
142
143// Safety: Producer is only used from one thread.
144unsafe impl Send for Producer {}
145
146impl Producer {
147 /// Attempts to claim space for a record with the given payload length.
148 ///
149 /// Returns a [`WriteClaim`] that can be written to and then committed.
150 ///
151 /// # Errors
152 ///
153 /// - [`TryClaimError::ZeroLength`] if `len` is zero
154 /// - [`TryClaimError::Full`] if the buffer is full
155 ///
156 /// # Safety Contract
157 ///
158 /// `len` must not exceed `LEN_MASK`. On 64-bit this is ~9.2 exabytes
159 /// (unreachable in practice). On 32-bit, records >2GB could set
160 /// `SKIP_BIT` and corrupt the stream — enforced with `assert!`.
161 /// This is checked with
162 /// `debug_assert!` only.
163 #[inline]
164 pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, TryClaimError> {
165 #[cfg(target_pointer_width = "32")]
166 assert!(len <= LEN_MASK, "payload too large for 32-bit logbuf");
167 #[cfg(not(target_pointer_width = "32"))]
168 debug_assert!(len <= LEN_MASK, "payload too large");
169 if len == 0 {
170 return Err(TryClaimError::ZeroLength);
171 }
172
173 let record_size = align8(HEADER_SIZE + len);
174
175 // Check if we have space
176 let tail = self.tail.get();
177 let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
178
179 if available < record_size {
180 // Reload head from shared state
181 self.cached_head
182 .set(self.shared.head.load(Ordering::Relaxed));
183 fence(Ordering::Acquire);
184
185 let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
186 if available < record_size {
187 return Err(TryClaimError::Full);
188 }
189 }
190
191 // Check if record fits before buffer end, or needs wrap
192 let offset = tail & self.shared.mask;
193 let space_to_end = self.shared.capacity - offset;
194
195 if space_to_end < record_size {
196 // Need to wrap. First check if we have space for padding + record at start.
197 let total_needed = space_to_end + record_size;
198 let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
199
200 if available < total_needed {
201 // Reload and recheck
202 self.cached_head
203 .set(self.shared.head.load(Ordering::Relaxed));
204 fence(Ordering::Acquire);
205
206 let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
207 if available < total_needed {
208 return Err(TryClaimError::Full);
209 }
210 }
211
212 // Write padding skip marker
213 let buffer = self.shared.buffer;
214 let skip_len = space_to_end | SKIP_BIT;
215 fence(Ordering::Release);
216 // SAFETY: offset is masked to [0, capacity) and 8-byte aligned.
217 // Buffer is valid for capacity bytes. We are the sole producer.
218 let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
219 // SAFETY: len_ptr points to a valid, aligned, zero-initialized usize
220 // within the buffer. AtomicUsize reference is used for store visibility.
221 unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
222
223 // Advance tail past padding
224 self.tail.set(tail.wrapping_add(space_to_end));
225 let new_offset = 0;
226
227 Ok(WriteClaim {
228 producer: self,
229 offset: new_offset,
230 len,
231 record_size,
232 committed: false,
233 })
234 } else {
235 // Fits without wrapping
236 Ok(WriteClaim {
237 producer: self,
238 offset,
239 len,
240 record_size,
241 committed: false,
242 })
243 }
244 }
245
246 /// Returns the capacity of the buffer.
247 #[inline]
248 pub fn capacity(&self) -> usize {
249 self.shared.capacity
250 }
251
252 /// Best-effort hint: returns `true` if the consumer has likely been dropped.
253 ///
254 /// Uses `Arc::strong_count` which is inherently racy — the count can
255 /// change between the check and the caller's next action. Suitable for
256 /// graceful shutdown detection, not for correctness. For reliable
257 /// disconnection detection, use the channel layer (`channel::spsc`)
258 /// which tracks disconnection via dedicated atomic flags.
259 // Decision: No AtomicBool flag at the raw queue level. The channel
260 // layer (channel::spsc) provides reliable detection via dedicated
261 // flags. Adding one here would cost an atomic on every push/pop
262 // for a feature only the channel layer needs.
263 #[inline]
264 pub fn is_disconnected(&self) -> bool {
265 Arc::strong_count(&self.shared) == 1
266 }
267}
268
269impl std::fmt::Debug for Producer {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 f.debug_struct("Producer")
272 .field("capacity", &self.capacity())
273 .finish_non_exhaustive()
274 }
275}
276
277// ============================================================================
278// WriteClaim
279// ============================================================================
280
281/// A claimed region for writing a record.
282///
283/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
284/// when done writing to publish the record. If dropped without committing, a skip
285/// marker is written so the consumer can advance past the dead region.
286///
287/// # Important
288///
289/// Leaking a `WriteClaim` via [`mem::forget`](std::mem::forget) will permanently
290/// block the consumer at this record's offset. This is not undefined behavior
291/// but causes an unrecoverable deadlock. Always drop or explicitly abort claims.
292pub struct WriteClaim<'a> {
293 producer: &'a mut Producer,
294 offset: usize,
295 len: usize,
296 record_size: usize,
297 committed: bool,
298}
299
300impl WriteClaim<'_> {
301 /// Commits the record, making it visible to the consumer.
302 #[inline]
303 pub fn commit(mut self) {
304 self.do_commit();
305 self.committed = true;
306 }
307
308 #[inline]
309 fn do_commit(&mut self) {
310 let buffer = self.producer.shared.buffer;
311 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
312 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
313
314 // Release fence: ensures payload writes are visible before len store
315 fence(Ordering::Release);
316 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
317 unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
318
319 // Advance tail
320 self.producer
321 .tail
322 .set(self.producer.tail.get().wrapping_add(self.record_size));
323 }
324
325 /// Returns the length of the payload region.
326 #[inline]
327 pub fn len(&self) -> usize {
328 self.len
329 }
330
331 /// Returns `true` if the payload is empty (always false, len must be > 0).
332 #[inline]
333 pub fn is_empty(&self) -> bool {
334 false
335 }
336}
337
338impl Deref for WriteClaim<'_> {
339 type Target = [u8];
340
341 #[inline]
342 fn deref(&self) -> &Self::Target {
343 let buffer = self.producer.shared.buffer;
344 // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
345 // exclusive access to this region via &mut Producer borrow.
346 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
347 // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
348 // and exclusively owned by this claim. Lifetime tied to &self.
349 unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
350 }
351}
352
353impl DerefMut for WriteClaim<'_> {
354 #[inline]
355 fn deref_mut(&mut self) -> &mut Self::Target {
356 let buffer = self.producer.shared.buffer;
357 // SAFETY: offset + HEADER_SIZE is within the buffer. Exclusive access
358 // guaranteed by &mut self (only one WriteClaim exists per try_claim).
359 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
360 // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
361 // and exclusively owned by this claim. Lifetime tied to &mut self.
362 unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
363 }
364}
365
366impl Drop for WriteClaim<'_> {
367 fn drop(&mut self) {
368 if !self.committed {
369 // Write skip marker so consumer can advance past this region
370 let buffer = self.producer.shared.buffer;
371 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
372 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
373 let skip_len = self.record_size | SKIP_BIT;
374
375 fence(Ordering::Release);
376 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
377 unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
378
379 // Advance tail past the dead region
380 self.producer
381 .tail
382 .set(self.producer.tail.get().wrapping_add(self.record_size));
383 }
384 }
385}
386
387// ============================================================================
388// Consumer
389// ============================================================================
390
391/// Consumer endpoint of the SPSC ring buffer.
392///
393/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
394pub struct Consumer {
395 /// Local head position (free-running).
396 head: Cell<usize>,
397 /// Shared state.
398 shared: Arc<Shared>,
399}
400
401// Safety: Consumer is only used from one thread.
402unsafe impl Send for Consumer {}
403
404impl Consumer {
405 /// Attempts to claim the next record for reading.
406 ///
407 /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
408 /// to `&[u8]` for the payload. When dropped, the record region is zeroed
409 /// and the head is advanced.
410 ///
411 /// Returns `None` if no committed record is available.
412 #[inline]
413 pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
414 let buffer = self.shared.buffer;
415
416 loop {
417 let offset = self.head.get() & self.shared.mask;
418 // SAFETY: offset is masked to [0, capacity), always 8-byte aligned
419 // (head advances by align8'd record sizes). Buffer is valid.
420 let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
421
422 // Relaxed atomic load, then Acquire fence for payload visibility
423 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
424 let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
425 fence(Ordering::Acquire);
426
427 if len_raw == 0 {
428 // Not committed yet
429 return None;
430 }
431
432 if len_raw & SKIP_BIT != 0 {
433 // Skip marker: zero the region and advance
434 let skip_size = len_raw & LEN_MASK;
435 // Zero payload first, then stamp last (mirrors write path)
436 if skip_size > HEADER_SIZE {
437 // SAFETY: offset + HEADER_SIZE .. offset + skip_size is within
438 // the buffer. Consumer has exclusive read access to this region.
439 unsafe {
440 ptr::write_bytes(
441 buffer.add(offset + HEADER_SIZE),
442 0,
443 skip_size - HEADER_SIZE,
444 );
445 }
446 }
447 // Ensure payload zeroing completes before clearing stamp
448 fence(Ordering::Release);
449 // SAFETY: len_ptr is still valid, computed above.
450 unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
451
452 self.head.set(self.head.get().wrapping_add(skip_size));
453
454 // Ensure stamp clear completes before head advance
455 fence(Ordering::Release);
456 self.shared.head.store(self.head.get(), Ordering::Relaxed);
457
458 // Continue to check next position
459 continue;
460 }
461
462 // Valid record
463 let len = len_raw;
464 let record_size = align8(HEADER_SIZE + len);
465
466 return Some(ReadClaim {
467 consumer: self,
468 offset,
469 len,
470 record_size,
471 });
472 }
473 }
474
475 /// Returns the capacity of the buffer.
476 #[inline]
477 pub fn capacity(&self) -> usize {
478 self.shared.capacity
479 }
480
481 /// Best-effort hint: returns `true` if the producer has likely been dropped.
482 ///
483 /// See [`Producer::is_disconnected`] for caveats — uses `Arc::strong_count`.
484 #[inline]
485 pub fn is_disconnected(&self) -> bool {
486 Arc::strong_count(&self.shared) == 1
487 }
488}
489
490impl std::fmt::Debug for Consumer {
491 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492 f.debug_struct("Consumer")
493 .field("capacity", &self.capacity())
494 .finish_non_exhaustive()
495 }
496}
497
498// ============================================================================
499// ReadClaim
500// ============================================================================
501
502/// A claimed record for reading.
503///
504/// Dereferences to `&[u8]` for the payload. When dropped, the record region
505/// is zeroed and the head is advanced, freeing space for the producer.
506pub struct ReadClaim<'a> {
507 consumer: &'a mut Consumer,
508 offset: usize,
509 len: usize,
510 record_size: usize,
511}
512
513impl ReadClaim<'_> {
514 /// Returns the length of the payload.
515 #[inline]
516 pub fn len(&self) -> usize {
517 self.len
518 }
519
520 /// Returns `true` if the payload is empty.
521 #[inline]
522 pub fn is_empty(&self) -> bool {
523 self.len == 0
524 }
525}
526
527impl Deref for ReadClaim<'_> {
528 type Target = [u8];
529
530 #[inline]
531 fn deref(&self) -> &Self::Target {
532 let buffer = self.consumer.shared.buffer;
533 // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
534 // exclusive read access via &mut Consumer borrow.
535 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
536 // SAFETY: payload_ptr is valid for self.len bytes. The producer has
537 // finished writing (len was non-zero, preceded by Release fence).
538 unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
539 }
540}
541
542impl Drop for ReadClaim<'_> {
543 fn drop(&mut self) {
544 let buffer = self.consumer.shared.buffer;
545
546 // Zero payload first, then stamp last (mirrors write path)
547 if self.record_size > HEADER_SIZE {
548 // SAFETY: offset + HEADER_SIZE .. offset + record_size is within
549 // the buffer. Consumer owns this region exclusively.
550 unsafe {
551 ptr::write_bytes(
552 buffer.add(self.offset + HEADER_SIZE),
553 0,
554 self.record_size - HEADER_SIZE,
555 );
556 }
557 }
558 // Ensure payload zeroing completes before clearing stamp
559 fence(Ordering::Release);
560 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
561 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
562 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
563 unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
564
565 // Advance head
566 let new_head = self.consumer.head.get().wrapping_add(self.record_size);
567 self.consumer.head.set(new_head);
568
569 // Ensure stamp clear completes before head advance
570 fence(Ordering::Release);
571 self.consumer.shared.head.store(new_head, Ordering::Relaxed);
572 }
573}
574
575// ============================================================================
576// Tests
577// ============================================================================
578
579#[cfg(test)]
580mod tests {
581 use super::*;
582
583 #[test]
584 fn basic_write_read() {
585 let (mut prod, mut cons) = new(1024);
586
587 let payload = b"hello world";
588 let mut claim = prod.try_claim(payload.len()).unwrap();
589 claim.copy_from_slice(payload);
590 claim.commit();
591
592 let record = cons.try_claim().unwrap();
593 assert_eq!(&*record, payload);
594 }
595
596 #[test]
597 fn empty_returns_none() {
598 let (_, mut cons) = new(1024);
599 assert!(cons.try_claim().is_none());
600 }
601
602 #[test]
603 fn multiple_records() {
604 let (mut prod, mut cons) = new(1024);
605
606 for i in 0..10 {
607 let payload = format!("message {}", i);
608 let mut claim = prod.try_claim(payload.len()).unwrap();
609 claim.copy_from_slice(payload.as_bytes());
610 claim.commit();
611 }
612
613 for i in 0..10 {
614 let record = cons.try_claim().unwrap();
615 let expected = format!("message {}", i);
616 assert_eq!(&*record, expected.as_bytes());
617 }
618
619 assert!(cons.try_claim().is_none());
620 }
621
622 #[test]
623 fn aborted_claim_creates_skip() {
624 let (mut prod, mut cons) = new(1024);
625
626 // Claim and drop without committing
627 {
628 let mut claim = prod.try_claim(10).unwrap();
629 claim.copy_from_slice(b"0123456789");
630 // drop without commit
631 }
632
633 // Write another record
634 {
635 let mut claim = prod.try_claim(5).unwrap();
636 claim.copy_from_slice(b"hello");
637 claim.commit();
638 }
639
640 // Consumer should skip the aborted record and read the committed one
641 let record = cons.try_claim().unwrap();
642 assert_eq!(&*record, b"hello");
643 }
644
645 #[test]
646 fn wrap_around() {
647 let (mut prod, mut cons) = new(64);
648
649 // Fill with messages that will cause wrap-around
650 for i in 0..20 {
651 let payload = format!("msg{:02}", i);
652 loop {
653 match prod.try_claim(payload.len()) {
654 Ok(mut claim) => {
655 claim.copy_from_slice(payload.as_bytes());
656 claim.commit();
657 break;
658 }
659 Err(_) => {
660 // Drain some
661 while cons.try_claim().is_some() {}
662 }
663 }
664 }
665 }
666 }
667
668 #[test]
669 fn full_returns_error() {
670 let (mut prod, _cons) = new(64);
671
672 // Fill the buffer
673 let mut count = 0;
674 while let Ok(mut claim) = prod.try_claim(8) {
675 claim.copy_from_slice(b"12345678");
676 claim.commit();
677 count += 1;
678 }
679
680 assert!(count > 0);
681 assert!(prod.try_claim(8).is_err());
682 }
683
684 #[test]
685 fn cross_thread() {
686 use std::thread;
687
688 let (mut prod, mut cons) = new(4096);
689
690 let producer = thread::spawn(move || {
691 for i in 0..10_000u64 {
692 let payload = i.to_le_bytes();
693 loop {
694 match prod.try_claim(payload.len()) {
695 Ok(mut claim) => {
696 claim.copy_from_slice(&payload);
697 claim.commit();
698 break;
699 }
700 Err(_) => std::hint::spin_loop(),
701 }
702 }
703 }
704 });
705
706 let consumer = thread::spawn(move || {
707 let mut received = 0u64;
708 while received < 10_000 {
709 if let Some(record) = cons.try_claim() {
710 let value = u64::from_le_bytes((*record).try_into().unwrap());
711 assert_eq!(value, received);
712 received += 1;
713 } else {
714 std::hint::spin_loop();
715 }
716 }
717 });
718
719 producer.join().unwrap();
720 consumer.join().unwrap();
721 }
722
723 #[test]
724 fn disconnection_detection() {
725 let (prod, cons) = new(1024);
726
727 assert!(!prod.is_disconnected());
728 assert!(!cons.is_disconnected());
729
730 drop(cons);
731 assert!(prod.is_disconnected());
732 }
733
734 #[test]
735 #[should_panic(expected = "capacity must be at least 16")]
736 fn tiny_capacity_panics() {
737 let _ = new(8);
738 }
739
740 #[test]
741 fn zero_len_returns_error() {
742 let (mut prod, _) = new(1024);
743 assert!(matches!(prod.try_claim(0), Err(TryClaimError::ZeroLength)));
744 }
745
746 #[test]
747 fn capacity_rounds_to_power_of_two() {
748 let (prod, _) = new(100);
749 assert_eq!(prod.capacity(), 128);
750
751 let (prod, _) = new(1000);
752 assert_eq!(prod.capacity(), 1024);
753 }
754
755 #[test]
756 fn variable_length_records() {
757 let (mut prod, mut cons) = new(4096);
758
759 let messages = [
760 "a",
761 "hello",
762 "this is a longer message",
763 "x",
764 "medium length",
765 ];
766
767 for msg in &messages {
768 let mut claim = prod.try_claim(msg.len()).unwrap();
769 claim.copy_from_slice(msg.as_bytes());
770 claim.commit();
771 }
772
773 for msg in &messages {
774 let record = cons.try_claim().unwrap();
775 assert_eq!(&*record, msg.as_bytes());
776 }
777 }
778
779 /// High-volume stress test with variable-length messages.
780 ///
781 /// Tests correctness under sustained load with wrap-around.
782 #[test]
783 fn stress_high_volume() {
784 use std::thread;
785
786 const COUNT: u64 = 1_000_000;
787 const BUFFER_SIZE: usize = 64 * 1024; // 64KB
788
789 let (mut prod, mut cons) = new(BUFFER_SIZE);
790
791 let producer = thread::spawn(move || {
792 for i in 0..COUNT {
793 // Variable length: 8-64 bytes based on sequence
794 let len = 8 + ((i % 8) * 8) as usize;
795 let mut payload = vec![0u8; len];
796 // Write sequence number at start
797 payload[..8].copy_from_slice(&i.to_le_bytes());
798
799 loop {
800 match prod.try_claim(len) {
801 Ok(mut claim) => {
802 claim.copy_from_slice(&payload);
803 claim.commit();
804 break;
805 }
806 Err(_) => std::hint::spin_loop(),
807 }
808 }
809 }
810 });
811
812 let consumer = thread::spawn(move || {
813 let mut received = 0u64;
814 while received < COUNT {
815 if let Some(record) = cons.try_claim() {
816 // Verify sequence number
817 let seq = u64::from_le_bytes(record[..8].try_into().unwrap());
818 assert_eq!(seq, received, "sequence mismatch at {}", received);
819
820 // Verify expected length
821 let expected_len = 8 + ((received % 8) * 8) as usize;
822 assert_eq!(
823 record.len(),
824 expected_len,
825 "length mismatch at {}",
826 received
827 );
828
829 received += 1;
830 } else {
831 std::hint::spin_loop();
832 }
833 }
834 received
835 });
836
837 producer.join().unwrap();
838 let received = consumer.join().unwrap();
839 assert_eq!(received, COUNT);
840 }
841
842 /// Stress test with maximum contention - tiny buffer, high throughput.
843 #[test]
844 fn stress_high_contention() {
845 use std::thread;
846
847 const COUNT: u64 = 100_000;
848 const BUFFER_SIZE: usize = 256; // Tiny buffer forces constant wrap-around
849
850 let (mut prod, mut cons) = new(BUFFER_SIZE);
851
852 let producer = thread::spawn(move || {
853 for i in 0..COUNT {
854 let payload = i.to_le_bytes();
855 loop {
856 match prod.try_claim(payload.len()) {
857 Ok(mut claim) => {
858 claim.copy_from_slice(&payload);
859 claim.commit();
860 break;
861 }
862 Err(_) => std::hint::spin_loop(),
863 }
864 }
865 }
866 });
867
868 let consumer = thread::spawn(move || {
869 let mut received = 0u64;
870 let mut sum = 0u64;
871 while received < COUNT {
872 if let Some(record) = cons.try_claim() {
873 let value = u64::from_le_bytes((*record).try_into().unwrap());
874 assert_eq!(value, received);
875 sum = sum.wrapping_add(value);
876 received += 1;
877 } else {
878 std::hint::spin_loop();
879 }
880 }
881 sum
882 });
883
884 producer.join().unwrap();
885 let sum = consumer.join().unwrap();
886 // Sum of 0..COUNT = COUNT * (COUNT-1) / 2
887 let expected = COUNT * (COUNT - 1) / 2;
888 assert_eq!(sum, expected);
889 }
890
891 /// Payload pointers must be word-aligned so users can write aligned structs.
892 #[test]
893 fn payload_is_word_aligned() {
894 let (mut prod, mut cons) = new(1024);
895
896 // Test several payload sizes to cover padding edge cases
897 for len in [1, 3, 7, 8, 13, 64, 255] {
898 let mut claim = prod.try_claim(len).unwrap();
899 let ptr = claim.as_mut_ptr();
900 assert_eq!(
901 ptr as usize % std::mem::align_of::<usize>(),
902 0,
903 "WriteClaim payload not word-aligned for len={len}"
904 );
905 claim.commit();
906
907 let record = cons.try_claim().unwrap();
908 let ptr = record.as_ptr();
909 assert_eq!(
910 ptr as usize % std::mem::align_of::<usize>(),
911 0,
912 "ReadClaim payload not word-aligned for len={len}"
913 );
914 }
915 }
916}