nexus_logbuf/queue/spsc.rs
1//! Single-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared: │
8//! │ head: CachePadded<AtomicUsize> ← Consumer writes, producer reads │
9//! │ buffer: *mut u8 │
10//! │ capacity: usize (power of 2) │
11//! │ mask: usize (capacity - 1) │
12//! └─────────────────────────────────────────────────────────────────────────┘
13//!
14//! ┌─────────────────────────────────┐ ┌─────────────────────────────────┐
15//! │ Producer: │ │ Consumer: │
16//! │ tail: usize (local) │ │ head: usize (local) │
17//! │ cached_head: usize (local) │ │ │
18//! └─────────────────────────────────┘ └─────────────────────────────────┘
19//! ```
20//!
21//! # Record Layout
22//!
23//! ```text
24//! ┌──────────────────────────────────────────────┐
25//! │ len: usize (8 bytes on 64-bit) │ ← payload length / commit marker
26//! ├──────────────────────────────────────────────┤
27//! │ payload: [u8; len] (variable) │ ← raw bytes
28//! ├──────────────────────────────────────────────┤
29//! │ padding: [u8; ...] (0-7 bytes) │ ← align to 8-byte boundary
30//! └──────────────────────────────────────────────┘
31//! ```
32//!
33//! Records are packed contiguously. Total record size is
34//! `align8(size_of::<usize>() + len)`. Using `usize` for the header ensures
35//! the payload starts at a word-aligned offset.
36//!
37//! # Len Field Encoding
38//!
39//! - `len == 0`: Not committed, consumer waits
40//! - `len > 0, high bit clear`: Committed record, payload is `len` bytes
41//! - `len high bit set`: Skip marker, advance by `len & LEN_MASK` bytes
42
43use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
44use std::cell::Cell;
45use std::ops::{Deref, DerefMut};
46use std::ptr;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicUsize, Ordering, fence};
49
50use crossbeam_utils::CachePadded;
51
52use crate::{BufferFull, LEN_MASK, SKIP_BIT, align8};
53
54/// Header size in bytes — one system word (`usize`).
55///
56/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
57const HEADER_SIZE: usize = std::mem::size_of::<usize>();
58
59/// Creates a bounded SPSC byte ring buffer.
60///
61/// Capacity is rounded up to the next power of two.
62///
63/// # Panics
64///
65/// Panics if `capacity` is zero or less than 16 bytes.
66pub fn new(capacity: usize) -> (Producer, Consumer) {
67 assert!(capacity >= 16, "capacity must be at least 16 bytes");
68
69 let capacity = capacity.next_power_of_two();
70 let mask = capacity - 1;
71
72 // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
73 let layout = Layout::from_size_align(capacity, 8)
74 .expect("valid layout: capacity is a power of two >= 16, align is 8");
75 // SAFETY: Layout is valid — capacity >= 16 (power of two), align is 8.
76 let buffer_ptr = unsafe { alloc_zeroed(layout) };
77 if buffer_ptr.is_null() {
78 handle_alloc_error(layout);
79 }
80
81 let shared = Arc::new(Shared {
82 head: CachePadded::new(AtomicUsize::new(0)),
83 buffer: buffer_ptr,
84 capacity,
85 mask,
86 });
87
88 (
89 Producer {
90 tail: Cell::new(0),
91 cached_head: Cell::new(0),
92 shared: Arc::clone(&shared),
93 },
94 Consumer {
95 head: Cell::new(0),
96 shared,
97 },
98 )
99}
100
101struct Shared {
102 /// Consumer's read position. Updated by consumer, read by producer.
103 head: CachePadded<AtomicUsize>,
104 /// Buffer pointer.
105 buffer: *mut u8,
106 /// Buffer capacity (power of 2).
107 capacity: usize,
108 /// Mask for wrapping (capacity - 1).
109 mask: usize,
110}
111
112// Safety: Buffer is only accessed by one producer and one consumer.
113// The atomic head provides synchronization.
114unsafe impl Send for Shared {}
115unsafe impl Sync for Shared {}
116
117impl Drop for Shared {
118 fn drop(&mut self) {
119 let layout = Layout::from_size_align(self.capacity, 8)
120 .expect("valid layout: capacity was validated at construction");
121 // SAFETY: buffer was allocated with alloc_zeroed using this exact layout.
122 // Shared is only dropped once (Arc prevents earlier drops).
123 unsafe { dealloc(self.buffer, layout) };
124 }
125}
126
127// ============================================================================
128// Producer
129// ============================================================================
130
131/// Producer endpoint of the SPSC ring buffer.
132///
133/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
134pub struct Producer {
135 /// Local tail position (free-running).
136 tail: Cell<usize>,
137 /// Cached head position (Rigtorp optimization).
138 cached_head: Cell<usize>,
139 /// Shared state.
140 shared: Arc<Shared>,
141}
142
143// Safety: Producer is only used from one thread.
144unsafe impl Send for Producer {}
145
146impl Producer {
147 /// Attempts to claim space for a record with the given payload length.
148 ///
149 /// Returns a [`WriteClaim`] that can be written to and then committed.
150 ///
151 /// # Errors
152 ///
153 /// Returns [`BufferFull`] if the buffer has no space for the record.
154 ///
155 /// # Panics
156 ///
157 /// Panics if `len == 0`. The wire format reserves `len == 0` as the
158 /// "uncommitted" sentinel — letting it through would silently hang the
159 /// consumer. Aborting a non-zero claim is fully supported (drop the
160 /// [`WriteClaim`] without committing); only claiming zero bytes upfront
161 /// is forbidden.
162 ///
163 /// # Safety Contract
164 ///
165 /// `len` must not exceed `LEN_MASK`. On 64-bit this is ~9.2 exabytes
166 /// (unreachable in practice). On 32-bit, records >2GB could set
167 /// `SKIP_BIT` and corrupt the stream — enforced with `assert!`.
168 /// On 64-bit this is checked with `debug_assert!` only.
169 #[inline]
170 pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, BufferFull> {
171 assert!(len > 0, "payload length must be non-zero");
172 #[cfg(target_pointer_width = "32")]
173 assert!(len <= LEN_MASK, "payload too large for 32-bit logbuf");
174 #[cfg(not(target_pointer_width = "32"))]
175 debug_assert!(len <= LEN_MASK, "payload too large");
176
177 let record_size = align8(HEADER_SIZE + len);
178
179 // Check if we have space
180 let tail = self.tail.get();
181 let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
182
183 if available < record_size {
184 // Reload head from shared state
185 self.cached_head
186 .set(self.shared.head.load(Ordering::Relaxed));
187 fence(Ordering::Acquire);
188
189 let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
190 if available < record_size {
191 return Err(BufferFull);
192 }
193 }
194
195 // Check if record fits before buffer end, or needs wrap
196 let offset = tail & self.shared.mask;
197 let space_to_end = self.shared.capacity - offset;
198
199 if space_to_end < record_size {
200 // Need to wrap. First check if we have space for padding + record at start.
201 let total_needed = space_to_end + record_size;
202 let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
203
204 if available < total_needed {
205 // Reload and recheck
206 self.cached_head
207 .set(self.shared.head.load(Ordering::Relaxed));
208 fence(Ordering::Acquire);
209
210 let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
211 if available < total_needed {
212 return Err(BufferFull);
213 }
214 }
215
216 // Write padding skip marker
217 let buffer = self.shared.buffer;
218 let skip_len = space_to_end | SKIP_BIT;
219 fence(Ordering::Release);
220 // SAFETY: offset is masked to [0, capacity) and 8-byte aligned.
221 // Buffer is valid for capacity bytes. We are the sole producer.
222 let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
223 // SAFETY: len_ptr points to a valid, aligned, zero-initialized usize
224 // within the buffer. AtomicUsize reference is used for store visibility.
225 unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
226
227 // Advance tail past padding
228 self.tail.set(tail.wrapping_add(space_to_end));
229 let new_offset = 0;
230
231 Ok(WriteClaim {
232 producer: self,
233 offset: new_offset,
234 len,
235 record_size,
236 committed: false,
237 })
238 } else {
239 // Fits without wrapping
240 Ok(WriteClaim {
241 producer: self,
242 offset,
243 len,
244 record_size,
245 committed: false,
246 })
247 }
248 }
249
250 /// Returns the capacity of the buffer.
251 #[inline]
252 pub fn capacity(&self) -> usize {
253 self.shared.capacity
254 }
255
256 /// Best-effort hint: returns `true` if the consumer has likely been dropped.
257 ///
258 /// Uses `Arc::strong_count` which is inherently racy — the count can
259 /// change between the check and the caller's next action. Suitable for
260 /// graceful shutdown detection, not for correctness. For reliable
261 /// disconnection detection, use the channel layer (`channel::spsc`)
262 /// which tracks disconnection via dedicated atomic flags.
263 // Decision: No AtomicBool flag at the raw queue level. The channel
264 // layer (channel::spsc) provides reliable detection via dedicated
265 // flags. Adding one here would cost an atomic on every push/pop
266 // for a feature only the channel layer needs.
267 #[inline]
268 pub fn is_disconnected(&self) -> bool {
269 Arc::strong_count(&self.shared) == 1
270 }
271}
272
273impl std::fmt::Debug for Producer {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 f.debug_struct("Producer")
276 .field("capacity", &self.capacity())
277 .finish_non_exhaustive()
278 }
279}
280
281// ============================================================================
282// WriteClaim
283// ============================================================================
284
285/// A claimed region for writing a record.
286///
287/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
288/// when done writing to publish the record. If dropped without committing, a skip
289/// marker is written so the consumer can advance past the dead region.
290///
291/// # Important
292///
293/// Leaking a `WriteClaim` via [`mem::forget`](std::mem::forget) will permanently
294/// block the consumer at this record's offset. This is not undefined behavior
295/// but causes an unrecoverable deadlock. Always drop or explicitly abort claims.
296pub struct WriteClaim<'a> {
297 producer: &'a mut Producer,
298 offset: usize,
299 len: usize,
300 record_size: usize,
301 committed: bool,
302}
303
304impl WriteClaim<'_> {
305 /// Commits the record, making it visible to the consumer.
306 #[inline]
307 pub fn commit(mut self) {
308 self.do_commit();
309 self.committed = true;
310 }
311
312 #[inline]
313 fn do_commit(&mut self) {
314 let buffer = self.producer.shared.buffer;
315 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
316 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
317
318 // Release fence: ensures payload writes are visible before len store
319 fence(Ordering::Release);
320 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
321 unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
322
323 // Advance tail
324 self.producer
325 .tail
326 .set(self.producer.tail.get().wrapping_add(self.record_size));
327 }
328
329 /// Returns the length of the payload region.
330 #[inline]
331 pub fn len(&self) -> usize {
332 self.len
333 }
334
335 /// Returns `true` if the payload is empty (always false, len must be > 0).
336 #[inline]
337 pub fn is_empty(&self) -> bool {
338 false
339 }
340}
341
342impl Deref for WriteClaim<'_> {
343 type Target = [u8];
344
345 #[inline]
346 fn deref(&self) -> &Self::Target {
347 let buffer = self.producer.shared.buffer;
348 // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
349 // exclusive access to this region via &mut Producer borrow.
350 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
351 // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
352 // and exclusively owned by this claim. Lifetime tied to &self.
353 unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
354 }
355}
356
357impl DerefMut for WriteClaim<'_> {
358 #[inline]
359 fn deref_mut(&mut self) -> &mut Self::Target {
360 let buffer = self.producer.shared.buffer;
361 // SAFETY: offset + HEADER_SIZE is within the buffer. Exclusive access
362 // guaranteed by &mut self (only one WriteClaim exists per try_claim).
363 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
364 // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
365 // and exclusively owned by this claim. Lifetime tied to &mut self.
366 unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
367 }
368}
369
370impl Drop for WriteClaim<'_> {
371 fn drop(&mut self) {
372 if !self.committed {
373 // Write skip marker so consumer can advance past this region
374 let buffer = self.producer.shared.buffer;
375 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
376 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
377 let skip_len = self.record_size | SKIP_BIT;
378
379 fence(Ordering::Release);
380 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
381 unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
382
383 // Advance tail past the dead region
384 self.producer
385 .tail
386 .set(self.producer.tail.get().wrapping_add(self.record_size));
387 }
388 }
389}
390
391// ============================================================================
392// Consumer
393// ============================================================================
394
395/// Consumer endpoint of the SPSC ring buffer.
396///
397/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
398pub struct Consumer {
399 /// Local head position (free-running).
400 head: Cell<usize>,
401 /// Shared state.
402 shared: Arc<Shared>,
403}
404
405// Safety: Consumer is only used from one thread.
406unsafe impl Send for Consumer {}
407
408impl Consumer {
409 /// Attempts to claim the next record for reading.
410 ///
411 /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
412 /// to `&[u8]` for the payload. When dropped, the record region is zeroed
413 /// and the head is advanced.
414 ///
415 /// Returns `None` if no committed record is available.
416 #[inline]
417 pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
418 let buffer = self.shared.buffer;
419
420 loop {
421 let offset = self.head.get() & self.shared.mask;
422 // SAFETY: offset is masked to [0, capacity), always 8-byte aligned
423 // (head advances by align8'd record sizes). Buffer is valid.
424 let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
425
426 // Relaxed atomic load, then Acquire fence for payload visibility
427 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
428 let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
429 fence(Ordering::Acquire);
430
431 if len_raw == 0 {
432 // Not committed yet
433 return None;
434 }
435
436 if len_raw & SKIP_BIT != 0 {
437 // Skip marker: zero the region and advance
438 let skip_size = len_raw & LEN_MASK;
439 // Zero payload first, then stamp last (mirrors write path)
440 if skip_size > HEADER_SIZE {
441 // SAFETY: offset + HEADER_SIZE .. offset + skip_size is within
442 // the buffer. Consumer has exclusive read access to this region.
443 unsafe {
444 ptr::write_bytes(
445 buffer.add(offset + HEADER_SIZE),
446 0,
447 skip_size - HEADER_SIZE,
448 );
449 }
450 }
451 // Ensure payload zeroing completes before clearing stamp
452 fence(Ordering::Release);
453 // SAFETY: len_ptr is still valid, computed above.
454 unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
455
456 self.head.set(self.head.get().wrapping_add(skip_size));
457
458 // Ensure stamp clear completes before head advance
459 fence(Ordering::Release);
460 self.shared.head.store(self.head.get(), Ordering::Relaxed);
461
462 // Continue to check next position
463 continue;
464 }
465
466 // Valid record
467 let len = len_raw;
468 let record_size = align8(HEADER_SIZE + len);
469
470 return Some(ReadClaim {
471 consumer: self,
472 offset,
473 len,
474 record_size,
475 });
476 }
477 }
478
479 /// Returns the capacity of the buffer.
480 #[inline]
481 pub fn capacity(&self) -> usize {
482 self.shared.capacity
483 }
484
485 /// Best-effort hint: returns `true` if the producer has likely been dropped.
486 ///
487 /// See [`Producer::is_disconnected`] for caveats — uses `Arc::strong_count`.
488 #[inline]
489 pub fn is_disconnected(&self) -> bool {
490 Arc::strong_count(&self.shared) == 1
491 }
492}
493
494impl std::fmt::Debug for Consumer {
495 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
496 f.debug_struct("Consumer")
497 .field("capacity", &self.capacity())
498 .finish_non_exhaustive()
499 }
500}
501
502// ============================================================================
503// ReadClaim
504// ============================================================================
505
506/// A claimed record for reading.
507///
508/// Dereferences to `&[u8]` for the payload. When dropped, the record region
509/// is zeroed and the head is advanced, freeing space for the producer.
510pub struct ReadClaim<'a> {
511 consumer: &'a mut Consumer,
512 offset: usize,
513 len: usize,
514 record_size: usize,
515}
516
517impl ReadClaim<'_> {
518 /// Returns the length of the payload.
519 #[inline]
520 pub fn len(&self) -> usize {
521 self.len
522 }
523
524 /// Returns `true` if the payload is empty.
525 #[inline]
526 pub fn is_empty(&self) -> bool {
527 self.len == 0
528 }
529}
530
531impl Deref for ReadClaim<'_> {
532 type Target = [u8];
533
534 #[inline]
535 fn deref(&self) -> &Self::Target {
536 let buffer = self.consumer.shared.buffer;
537 // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
538 // exclusive read access via &mut Consumer borrow.
539 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
540 // SAFETY: payload_ptr is valid for self.len bytes. The producer has
541 // finished writing (len was non-zero, preceded by Release fence).
542 unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
543 }
544}
545
546impl Drop for ReadClaim<'_> {
547 fn drop(&mut self) {
548 let buffer = self.consumer.shared.buffer;
549
550 // Zero payload first, then stamp last (mirrors write path)
551 if self.record_size > HEADER_SIZE {
552 // SAFETY: offset + HEADER_SIZE .. offset + record_size is within
553 // the buffer. Consumer owns this region exclusively.
554 unsafe {
555 ptr::write_bytes(
556 buffer.add(self.offset + HEADER_SIZE),
557 0,
558 self.record_size - HEADER_SIZE,
559 );
560 }
561 }
562 // Ensure payload zeroing completes before clearing stamp
563 fence(Ordering::Release);
564 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
565 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
566 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
567 unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
568
569 // Advance head
570 let new_head = self.consumer.head.get().wrapping_add(self.record_size);
571 self.consumer.head.set(new_head);
572
573 // Ensure stamp clear completes before head advance
574 fence(Ordering::Release);
575 self.consumer.shared.head.store(new_head, Ordering::Relaxed);
576 }
577}
578
579// ============================================================================
580// Tests
581// ============================================================================
582
583#[cfg(test)]
584mod tests {
585 use super::*;
586
587 #[test]
588 fn basic_write_read() {
589 let (mut prod, mut cons) = new(1024);
590
591 let payload = b"hello world";
592 let mut claim = prod.try_claim(payload.len()).unwrap();
593 claim.copy_from_slice(payload);
594 claim.commit();
595
596 let record = cons.try_claim().unwrap();
597 assert_eq!(&*record, payload);
598 }
599
600 #[test]
601 fn empty_returns_none() {
602 let (_, mut cons) = new(1024);
603 assert!(cons.try_claim().is_none());
604 }
605
606 #[test]
607 fn multiple_records() {
608 let (mut prod, mut cons) = new(1024);
609
610 for i in 0..10 {
611 let payload = format!("message {}", i);
612 let mut claim = prod.try_claim(payload.len()).unwrap();
613 claim.copy_from_slice(payload.as_bytes());
614 claim.commit();
615 }
616
617 for i in 0..10 {
618 let record = cons.try_claim().unwrap();
619 let expected = format!("message {}", i);
620 assert_eq!(&*record, expected.as_bytes());
621 }
622
623 assert!(cons.try_claim().is_none());
624 }
625
626 #[test]
627 fn aborted_claim_creates_skip() {
628 let (mut prod, mut cons) = new(1024);
629
630 // Claim and drop without committing
631 {
632 let mut claim = prod.try_claim(10).unwrap();
633 claim.copy_from_slice(b"0123456789");
634 // drop without commit
635 }
636
637 // Write another record
638 {
639 let mut claim = prod.try_claim(5).unwrap();
640 claim.copy_from_slice(b"hello");
641 claim.commit();
642 }
643
644 // Consumer should skip the aborted record and read the committed one
645 let record = cons.try_claim().unwrap();
646 assert_eq!(&*record, b"hello");
647 }
648
649 #[test]
650 fn wrap_around() {
651 let (mut prod, mut cons) = new(64);
652
653 // Fill with messages that will cause wrap-around
654 for i in 0..20 {
655 let payload = format!("msg{:02}", i);
656 loop {
657 match prod.try_claim(payload.len()) {
658 Ok(mut claim) => {
659 claim.copy_from_slice(payload.as_bytes());
660 claim.commit();
661 break;
662 }
663 Err(_) => {
664 // Drain some
665 while cons.try_claim().is_some() {}
666 }
667 }
668 }
669 }
670 }
671
672 #[test]
673 fn full_returns_error() {
674 let (mut prod, _cons) = new(64);
675
676 // Fill the buffer
677 let mut count = 0;
678 while let Ok(mut claim) = prod.try_claim(8) {
679 claim.copy_from_slice(b"12345678");
680 claim.commit();
681 count += 1;
682 }
683
684 assert!(count > 0);
685 assert!(prod.try_claim(8).is_err());
686 }
687
688 #[test]
689 fn cross_thread() {
690 use std::thread;
691
692 let (mut prod, mut cons) = new(4096);
693
694 let producer = thread::spawn(move || {
695 for i in 0..10_000u64 {
696 let payload = i.to_le_bytes();
697 loop {
698 match prod.try_claim(payload.len()) {
699 Ok(mut claim) => {
700 claim.copy_from_slice(&payload);
701 claim.commit();
702 break;
703 }
704 Err(_) => std::hint::spin_loop(),
705 }
706 }
707 }
708 });
709
710 let consumer = thread::spawn(move || {
711 let mut received = 0u64;
712 while received < 10_000 {
713 if let Some(record) = cons.try_claim() {
714 let value = u64::from_le_bytes((*record).try_into().unwrap());
715 assert_eq!(value, received);
716 received += 1;
717 } else {
718 std::hint::spin_loop();
719 }
720 }
721 });
722
723 producer.join().unwrap();
724 consumer.join().unwrap();
725 }
726
727 #[test]
728 fn disconnection_detection() {
729 let (prod, cons) = new(1024);
730
731 assert!(!prod.is_disconnected());
732 assert!(!cons.is_disconnected());
733
734 drop(cons);
735 assert!(prod.is_disconnected());
736 }
737
738 #[test]
739 #[should_panic(expected = "capacity must be at least 16")]
740 fn tiny_capacity_panics() {
741 let _ = new(8);
742 }
743
744 #[test]
745 #[should_panic(expected = "payload length must be non-zero")]
746 fn zero_len_panics() {
747 let (mut prod, _) = new(1024);
748 let _ = prod.try_claim(0);
749 }
750
751 #[test]
752 fn capacity_rounds_to_power_of_two() {
753 let (prod, _) = new(100);
754 assert_eq!(prod.capacity(), 128);
755
756 let (prod, _) = new(1000);
757 assert_eq!(prod.capacity(), 1024);
758 }
759
760 #[test]
761 fn variable_length_records() {
762 let (mut prod, mut cons) = new(4096);
763
764 let messages = [
765 "a",
766 "hello",
767 "this is a longer message",
768 "x",
769 "medium length",
770 ];
771
772 for msg in &messages {
773 let mut claim = prod.try_claim(msg.len()).unwrap();
774 claim.copy_from_slice(msg.as_bytes());
775 claim.commit();
776 }
777
778 for msg in &messages {
779 let record = cons.try_claim().unwrap();
780 assert_eq!(&*record, msg.as_bytes());
781 }
782 }
783
784 /// High-volume stress test with variable-length messages.
785 ///
786 /// Tests correctness under sustained load with wrap-around.
787 #[test]
788 fn stress_high_volume() {
789 use std::thread;
790
791 const COUNT: u64 = 1_000_000;
792 const BUFFER_SIZE: usize = 64 * 1024; // 64KB
793
794 let (mut prod, mut cons) = new(BUFFER_SIZE);
795
796 let producer = thread::spawn(move || {
797 for i in 0..COUNT {
798 // Variable length: 8-64 bytes based on sequence
799 let len = 8 + ((i % 8) * 8) as usize;
800 let mut payload = vec![0u8; len];
801 // Write sequence number at start
802 payload[..8].copy_from_slice(&i.to_le_bytes());
803
804 loop {
805 match prod.try_claim(len) {
806 Ok(mut claim) => {
807 claim.copy_from_slice(&payload);
808 claim.commit();
809 break;
810 }
811 Err(_) => std::hint::spin_loop(),
812 }
813 }
814 }
815 });
816
817 let consumer = thread::spawn(move || {
818 let mut received = 0u64;
819 while received < COUNT {
820 if let Some(record) = cons.try_claim() {
821 // Verify sequence number
822 let seq = u64::from_le_bytes(record[..8].try_into().unwrap());
823 assert_eq!(seq, received, "sequence mismatch at {}", received);
824
825 // Verify expected length
826 let expected_len = 8 + ((received % 8) * 8) as usize;
827 assert_eq!(
828 record.len(),
829 expected_len,
830 "length mismatch at {}",
831 received
832 );
833
834 received += 1;
835 } else {
836 std::hint::spin_loop();
837 }
838 }
839 received
840 });
841
842 producer.join().unwrap();
843 let received = consumer.join().unwrap();
844 assert_eq!(received, COUNT);
845 }
846
847 /// Stress test with maximum contention - tiny buffer, high throughput.
848 #[test]
849 fn stress_high_contention() {
850 use std::thread;
851
852 const COUNT: u64 = 100_000;
853 const BUFFER_SIZE: usize = 256; // Tiny buffer forces constant wrap-around
854
855 let (mut prod, mut cons) = new(BUFFER_SIZE);
856
857 let producer = thread::spawn(move || {
858 for i in 0..COUNT {
859 let payload = i.to_le_bytes();
860 loop {
861 match prod.try_claim(payload.len()) {
862 Ok(mut claim) => {
863 claim.copy_from_slice(&payload);
864 claim.commit();
865 break;
866 }
867 Err(_) => std::hint::spin_loop(),
868 }
869 }
870 }
871 });
872
873 let consumer = thread::spawn(move || {
874 let mut received = 0u64;
875 let mut sum = 0u64;
876 while received < COUNT {
877 if let Some(record) = cons.try_claim() {
878 let value = u64::from_le_bytes((*record).try_into().unwrap());
879 assert_eq!(value, received);
880 sum = sum.wrapping_add(value);
881 received += 1;
882 } else {
883 std::hint::spin_loop();
884 }
885 }
886 sum
887 });
888
889 producer.join().unwrap();
890 let sum = consumer.join().unwrap();
891 // Sum of 0..COUNT = COUNT * (COUNT-1) / 2
892 let expected = COUNT * (COUNT - 1) / 2;
893 assert_eq!(sum, expected);
894 }
895
896 /// Payload pointers must be word-aligned so users can write aligned structs.
897 #[test]
898 fn payload_is_word_aligned() {
899 let (mut prod, mut cons) = new(1024);
900
901 // Test several payload sizes to cover padding edge cases
902 for len in [1, 3, 7, 8, 13, 64, 255] {
903 let mut claim = prod.try_claim(len).unwrap();
904 let ptr = claim.as_mut_ptr();
905 assert_eq!(
906 ptr as usize % std::mem::align_of::<usize>(),
907 0,
908 "WriteClaim payload not word-aligned for len={len}"
909 );
910 claim.commit();
911
912 let record = cons.try_claim().unwrap();
913 let ptr = record.as_ptr();
914 assert_eq!(
915 ptr as usize % std::mem::align_of::<usize>(),
916 0,
917 "ReadClaim payload not word-aligned for len={len}"
918 );
919 }
920 }
921}