nexus_logbuf/queue/mpsc.rs
1//! Multi-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared: │
8//! │ head: CachePadded<AtomicUsize> ← Consumer writes, producers read │
9//! │ tail: CachePadded<AtomicUsize> ← Producers CAS to claim space │
10//! │ buffer: *mut u8 │
11//! │ capacity: usize (power of 2) │
12//! │ mask: usize (capacity - 1) │
13//! └─────────────────────────────────────────────────────────────────────────┘
14//!
15//! ┌─────────────────────────────────┐ ┌─────────────────────────────────┐
16//! │ Producer (cloneable): │ │ Consumer: │
17//! │ cached_head: usize (local) │ │ head: usize (local) │
18//! │ shared: Arc<Shared> │ │ │
19//! └─────────────────────────────────┘ └─────────────────────────────────┘
20//! ```
21//!
22//! # Differences from SPSC
23//!
24//! - Tail is atomic in shared state (not local to producer)
25//! - Producers use CAS loop to claim space
26//! - Producer is `Clone` - multiple producers allowed
27//! - Synchronization: Relaxed CAS on tail, Release on len commit, Acquire on len read
28//!
29//! # Record Layout
30//!
31//! Same as SPSC - see [`crate::spsc`] for details.
32
33use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
34use std::cell::Cell;
35use std::ops::{Deref, DerefMut};
36use std::ptr;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicUsize, Ordering, fence};
39
40use crossbeam_utils::CachePadded;
41
42use crate::{LEN_MASK, SKIP_BIT, TryClaimError, align8};
43
44/// Header size in bytes — one system word (`usize`).
45///
46/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
47const HEADER_SIZE: usize = std::mem::size_of::<usize>();
48
49/// Creates a bounded MPSC byte ring buffer.
50///
51/// Capacity is rounded up to the next power of two.
52///
53/// # Panics
54///
55/// Panics if `capacity` is zero or less than 16 bytes.
56pub fn new(capacity: usize) -> (Producer, Consumer) {
57 assert!(capacity >= 16, "capacity must be at least 16 bytes");
58
59 let capacity = capacity.next_power_of_two();
60 let mask = capacity - 1;
61
62 // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
63 let layout = Layout::from_size_align(capacity, 8)
64 .expect("valid layout: capacity is a power of two >= 16, align is 8");
65 // SAFETY: Layout is valid — capacity >= 16 (power of two), align is 8.
66 let buffer_ptr = unsafe { alloc_zeroed(layout) };
67 if buffer_ptr.is_null() {
68 handle_alloc_error(layout);
69 }
70
71 let shared = Arc::new(Shared {
72 head: CachePadded::new(AtomicUsize::new(0)),
73 tail: CachePadded::new(AtomicUsize::new(0)),
74 buffer: buffer_ptr,
75 capacity,
76 mask,
77 });
78
79 (
80 Producer {
81 cached_head: Cell::new(0),
82 shared: Arc::clone(&shared),
83 },
84 Consumer {
85 head: Cell::new(0),
86 shared,
87 },
88 )
89}
90
91struct Shared {
92 /// Consumer's read position. Updated by consumer, read by producers.
93 head: CachePadded<AtomicUsize>,
94 /// Producers' write position. CAS'd by producers.
95 tail: CachePadded<AtomicUsize>,
96 /// Buffer pointer.
97 buffer: *mut u8,
98 /// Buffer capacity (power of 2).
99 capacity: usize,
100 /// Mask for wrapping (capacity - 1).
101 mask: usize,
102}
103
104// Safety: Buffer access is synchronized through atomic head/tail.
105// Multiple producers coordinate via CAS on tail.
106// Single consumer is enforced by API (Consumer is not Clone).
107unsafe impl Send for Shared {}
108unsafe impl Sync for Shared {}
109
110impl Drop for Shared {
111 fn drop(&mut self) {
112 let layout = Layout::from_size_align(self.capacity, 8)
113 .expect("valid layout: capacity was validated at construction");
114 // SAFETY: buffer was allocated with alloc_zeroed using this exact layout.
115 // Shared is only dropped once (Arc prevents earlier drops).
116 unsafe { dealloc(self.buffer, layout) };
117 }
118}
119
120// ============================================================================
121// Producer
122// ============================================================================
123
124/// Producer endpoint of the MPSC ring buffer.
125///
126/// This type is `Clone` - multiple producers can write concurrently.
127/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
128#[derive(Clone)]
129pub struct Producer {
130 /// Cached head position (Rigtorp-style optimization, per-producer).
131 cached_head: Cell<usize>,
132 /// Shared state.
133 shared: Arc<Shared>,
134}
135
136// Safety: Producer coordinates with other producers via atomic CAS.
137unsafe impl Send for Producer {}
138
139impl Producer {
140 /// Attempts to claim space for a record with the given payload length.
141 ///
142 /// Returns a [`WriteClaim`] that can be written to and then committed.
143 ///
144 /// # Errors
145 ///
146 /// - [`TryClaimError::ZeroLength`] if `len` is zero
147 /// - [`TryClaimError::Full`] if the buffer is full
148 ///
149 /// # Safety Contract
150 ///
151 /// `len` must not exceed `LEN_MASK`. This is checked with
152 /// `debug_assert!` only.
153 #[inline]
154 pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, TryClaimError> {
155 debug_assert!(len <= LEN_MASK, "payload too large");
156 if len == 0 {
157 return Err(TryClaimError::ZeroLength);
158 }
159
160 let record_size = align8(HEADER_SIZE + len);
161
162 // CAS loop to claim space
163 loop {
164 let tail = self.shared.tail.load(Ordering::Relaxed);
165
166 // Calculate used space. If cached_head is stale, used can exceed capacity.
167 // saturating_sub handles this gracefully (returns 0 if stale).
168 let used = tail.wrapping_sub(self.cached_head.get());
169 let available = self.shared.capacity.saturating_sub(used);
170
171 if available < record_size {
172 // Reload head from shared state
173 self.cached_head
174 .set(self.shared.head.load(Ordering::Relaxed));
175 fence(Ordering::Acquire);
176
177 let used = tail.wrapping_sub(self.cached_head.get());
178 if used > self.shared.capacity || self.shared.capacity - used < record_size {
179 return Err(TryClaimError::Full);
180 }
181 }
182
183 // Check if record fits before buffer end, or needs wrap
184 let offset = tail & self.shared.mask;
185 let space_to_end = self.shared.capacity - offset;
186
187 if space_to_end < record_size {
188 // Need to wrap. Check if we have space for padding + record at start.
189 let total_needed = space_to_end + record_size;
190
191 let used = tail.wrapping_sub(self.cached_head.get());
192 let available = self.shared.capacity.saturating_sub(used);
193
194 if available < total_needed {
195 // Reload and recheck
196 self.cached_head
197 .set(self.shared.head.load(Ordering::Relaxed));
198 fence(Ordering::Acquire);
199
200 let used = tail.wrapping_sub(self.cached_head.get());
201 if used > self.shared.capacity || self.shared.capacity - used < total_needed {
202 return Err(TryClaimError::Full);
203 }
204 }
205
206 // Try to claim the padding + record space
207 let new_tail = tail.wrapping_add(total_needed);
208 if self
209 .shared
210 .tail
211 .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
212 .is_ok()
213 {
214 // We claimed the space. Write padding skip marker.
215 let buffer = self.shared.buffer;
216 let skip_len = space_to_end | SKIP_BIT;
217
218 // Release fence before writing skip marker
219 fence(Ordering::Release);
220 // SAFETY: offset is masked to [0, capacity), 8-byte aligned.
221 // CAS success guarantees exclusive ownership of this region.
222 let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
223 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
224 unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
225
226 return Ok(WriteClaim {
227 shared: &self.shared,
228 offset: 0, // Record starts at beginning after wrap
229 len,
230 record_size,
231 committed: false,
232 });
233 }
234 // CAS failed, retry
235 continue;
236 }
237
238 // Fits without wrapping
239 let new_tail = tail.wrapping_add(record_size);
240 if self
241 .shared
242 .tail
243 .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
244 .is_ok()
245 {
246 return Ok(WriteClaim {
247 shared: &self.shared,
248 offset,
249 len,
250 record_size,
251 committed: false,
252 });
253 }
254 // CAS failed, retry
255 }
256 }
257
258 /// Returns the capacity of the buffer.
259 #[inline]
260 pub fn capacity(&self) -> usize {
261 self.shared.capacity
262 }
263
264 /// Best-effort hint: returns `true` if the consumer has likely been dropped.
265 ///
266 /// Uses `Arc::strong_count` which is inherently racy. For reliable
267 /// disconnection detection, use the channel layer (`channel::mpsc`).
268 #[inline]
269 pub fn is_disconnected(&self) -> bool {
270 Arc::strong_count(&self.shared) == 1
271 }
272}
273
274impl std::fmt::Debug for Producer {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 f.debug_struct("Producer")
277 .field("capacity", &self.capacity())
278 .finish_non_exhaustive()
279 }
280}
281
282// ============================================================================
283// WriteClaim
284// ============================================================================
285
286/// A claimed region for writing a record.
287///
288/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
289/// when done writing to publish the record. If dropped without committing, a skip
290/// marker is written so the consumer can advance past the dead region.
291pub struct WriteClaim<'a> {
292 shared: &'a Shared,
293 offset: usize,
294 len: usize,
295 record_size: usize,
296 committed: bool,
297}
298
299impl WriteClaim<'_> {
300 /// Commits the record, making it visible to the consumer.
301 #[inline]
302 pub fn commit(mut self) {
303 self.do_commit();
304 self.committed = true;
305 }
306
307 #[inline]
308 fn do_commit(&mut self) {
309 let buffer = self.shared.buffer;
310 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
311 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
312
313 // Release fence: ensures payload writes are visible before len store
314 fence(Ordering::Release);
315 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
316 unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
317 }
318
319 /// Returns the length of the payload region.
320 #[inline]
321 pub fn len(&self) -> usize {
322 self.len
323 }
324
325 /// Returns `true` if the payload is empty (always false, len must be > 0).
326 #[inline]
327 pub fn is_empty(&self) -> bool {
328 false
329 }
330}
331
332impl Deref for WriteClaim<'_> {
333 type Target = [u8];
334
335 #[inline]
336 fn deref(&self) -> &Self::Target {
337 let buffer = self.shared.buffer;
338 // SAFETY: offset + HEADER_SIZE is within the buffer. CAS atomically
339 // advances tail past this region before the pointer is constructed,
340 // guaranteeing disjoint regions — no two WriteClaims overlap in the buffer.
341 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
342 // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
343 // and exclusively owned by this claim (disjoint CAS regions).
344 // Lifetime tied to &self.
345 unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
346 }
347}
348
349impl DerefMut for WriteClaim<'_> {
350 #[inline]
351 fn deref_mut(&mut self) -> &mut Self::Target {
352 let buffer = self.shared.buffer;
353 // SAFETY: offset + HEADER_SIZE is within the buffer. CAS atomically
354 // advances tail past this region before the pointer is constructed,
355 // guaranteeing disjoint regions — no two WriteClaims overlap in the buffer.
356 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
357 // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
358 // and exclusively owned by this claim (disjoint CAS regions).
359 // Lifetime tied to &mut self.
360 unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
361 }
362}
363
364impl Drop for WriteClaim<'_> {
365 fn drop(&mut self) {
366 if !self.committed {
367 // Write skip marker so consumer can advance past this region
368 let buffer = self.shared.buffer;
369 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
370 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
371 let skip_len = self.record_size | SKIP_BIT;
372
373 fence(Ordering::Release);
374 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
375 unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
376 }
377 }
378}
379
380// ============================================================================
381// Consumer
382// ============================================================================
383
384/// Consumer endpoint of the MPSC ring buffer.
385///
386/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
387/// This type is NOT `Clone` - only one consumer is allowed.
388pub struct Consumer {
389 /// Local head position (free-running).
390 head: Cell<usize>,
391 /// Shared state.
392 shared: Arc<Shared>,
393}
394
395// Safety: Consumer is only used from one thread.
396unsafe impl Send for Consumer {}
397
398impl Consumer {
399 /// Attempts to claim the next record for reading.
400 ///
401 /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
402 /// to `&[u8]` for the payload. When dropped, the record region is zeroed
403 /// and the head is advanced.
404 ///
405 /// Returns `None` if no committed record is available.
406 #[inline]
407 pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
408 let buffer = self.shared.buffer;
409
410 loop {
411 let offset = self.head.get() & self.shared.mask;
412 // SAFETY: offset is masked to [0, capacity), always 8-byte aligned
413 // (head advances by align8'd record sizes). Buffer is valid.
414 let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
415
416 // Relaxed atomic load, then Acquire fence for payload visibility
417 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
418 let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
419 fence(Ordering::Acquire);
420
421 if len_raw == 0 {
422 // Not committed yet
423 return None;
424 }
425
426 if len_raw & SKIP_BIT != 0 {
427 // Skip marker: zero the region and advance
428 let skip_size = len_raw & LEN_MASK;
429 // Zero payload first, then stamp last (mirrors write path)
430 if skip_size > HEADER_SIZE {
431 // SAFETY: offset + HEADER_SIZE .. offset + skip_size is within
432 // the buffer. Consumer has exclusive read access to this region.
433 unsafe {
434 ptr::write_bytes(
435 buffer.add(offset + HEADER_SIZE),
436 0,
437 skip_size - HEADER_SIZE,
438 );
439 }
440 }
441 // Ensure payload zeroing completes before clearing stamp
442 fence(Ordering::Release);
443 // SAFETY: len_ptr is still valid, computed above.
444 unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
445
446 self.head.set(self.head.get().wrapping_add(skip_size));
447
448 // Ensure stamp clear completes before head advance
449 fence(Ordering::Release);
450 self.shared.head.store(self.head.get(), Ordering::Relaxed);
451
452 // Continue to check next position
453 continue;
454 }
455
456 // Valid record
457 let len = len_raw;
458 let record_size = align8(HEADER_SIZE + len);
459
460 return Some(ReadClaim {
461 consumer: self,
462 offset,
463 len,
464 record_size,
465 });
466 }
467 }
468
469 /// Returns the capacity of the buffer.
470 #[inline]
471 pub fn capacity(&self) -> usize {
472 self.shared.capacity
473 }
474
475 /// Best-effort hint: returns `true` if all producers have likely been dropped.
476 ///
477 /// See producer's [`is_disconnected`](Producer::is_disconnected) for caveats.
478 #[inline]
479 pub fn is_disconnected(&self) -> bool {
480 Arc::strong_count(&self.shared) == 1
481 }
482}
483
484impl std::fmt::Debug for Consumer {
485 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486 f.debug_struct("Consumer")
487 .field("capacity", &self.capacity())
488 .finish_non_exhaustive()
489 }
490}
491
492// ============================================================================
493// ReadClaim
494// ============================================================================
495
496/// A claimed record for reading.
497///
498/// Dereferences to `&[u8]` for the payload. When dropped, the record region
499/// is zeroed and the head is advanced, freeing space for producers.
500pub struct ReadClaim<'a> {
501 consumer: &'a mut Consumer,
502 offset: usize,
503 len: usize,
504 record_size: usize,
505}
506
507impl ReadClaim<'_> {
508 /// Returns the length of the payload.
509 #[inline]
510 pub fn len(&self) -> usize {
511 self.len
512 }
513
514 /// Returns `true` if the payload is empty.
515 #[inline]
516 pub fn is_empty(&self) -> bool {
517 self.len == 0
518 }
519}
520
521impl Deref for ReadClaim<'_> {
522 type Target = [u8];
523
524 #[inline]
525 fn deref(&self) -> &Self::Target {
526 let buffer = self.consumer.shared.buffer;
527 // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
528 // exclusive read access via &mut Consumer borrow.
529 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
530 // SAFETY: payload_ptr is valid for self.len bytes. The producer has
531 // finished writing (len was non-zero, preceded by Release fence).
532 unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
533 }
534}
535
536impl Drop for ReadClaim<'_> {
537 fn drop(&mut self) {
538 let buffer = self.consumer.shared.buffer;
539
540 // Zero payload first, then stamp last (mirrors write path)
541 if self.record_size > HEADER_SIZE {
542 // SAFETY: offset + HEADER_SIZE .. offset + record_size is within
543 // the buffer. Consumer owns this region exclusively.
544 unsafe {
545 ptr::write_bytes(
546 buffer.add(self.offset + HEADER_SIZE),
547 0,
548 self.record_size - HEADER_SIZE,
549 );
550 }
551 }
552 // Ensure payload zeroing completes before clearing stamp
553 fence(Ordering::Release);
554 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
555 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
556 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
557 unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
558
559 // Advance head
560 let new_head = self.consumer.head.get().wrapping_add(self.record_size);
561 self.consumer.head.set(new_head);
562
563 // Ensure stamp clear completes before head advance
564 fence(Ordering::Release);
565 self.consumer.shared.head.store(new_head, Ordering::Relaxed);
566 }
567}
568
569// ============================================================================
570// Tests
571// ============================================================================
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576
577 #[test]
578 fn basic_write_read() {
579 let (mut prod, mut cons) = new(1024);
580
581 let payload = b"hello world";
582 let mut claim = prod.try_claim(payload.len()).unwrap();
583 claim.copy_from_slice(payload);
584 claim.commit();
585
586 let record = cons.try_claim().unwrap();
587 assert_eq!(&*record, payload);
588 }
589
590 #[test]
591 fn empty_returns_none() {
592 let (_, mut cons) = new(1024);
593 assert!(cons.try_claim().is_none());
594 }
595
596 #[test]
597 fn multiple_records() {
598 let (mut prod, mut cons) = new(1024);
599
600 for i in 0..10 {
601 let payload = format!("message {}", i);
602 let mut claim = prod.try_claim(payload.len()).unwrap();
603 claim.copy_from_slice(payload.as_bytes());
604 claim.commit();
605 }
606
607 for i in 0..10 {
608 let record = cons.try_claim().unwrap();
609 let expected = format!("message {}", i);
610 assert_eq!(&*record, expected.as_bytes());
611 }
612
613 assert!(cons.try_claim().is_none());
614 }
615
616 #[test]
617 #[allow(clippy::redundant_clone)]
618 fn producer_is_clone() {
619 let (prod, _cons) = new(1024);
620 let _prod2 = prod.clone();
621 }
622
623 #[test]
624 fn multiple_producers_single_consumer() {
625 use std::thread;
626
627 const PRODUCERS: usize = 4;
628 const MESSAGES_PER_PRODUCER: u64 = 10_000;
629 const TOTAL: u64 = PRODUCERS as u64 * MESSAGES_PER_PRODUCER;
630
631 let (prod, mut cons) = new(64 * 1024);
632
633 let handles: Vec<_> = (0..PRODUCERS)
634 .map(|producer_id| {
635 let mut prod = prod.clone();
636 thread::spawn(move || {
637 for i in 0..MESSAGES_PER_PRODUCER {
638 // Encode producer_id and sequence in payload
639 let mut payload = [0u8; 16];
640 payload[..8].copy_from_slice(&(producer_id as u64).to_le_bytes());
641 payload[8..].copy_from_slice(&i.to_le_bytes());
642
643 loop {
644 match prod.try_claim(16) {
645 Ok(mut claim) => {
646 claim.copy_from_slice(&payload);
647 claim.commit();
648 break;
649 }
650 Err(_) => std::hint::spin_loop(),
651 }
652 }
653 }
654 })
655 })
656 .collect();
657
658 // Drop original producer
659 drop(prod);
660
661 // Consumer: track per-producer sequence
662 let consumer = thread::spawn(move || {
663 let mut received = 0u64;
664 let mut per_producer = vec![0u64; PRODUCERS];
665
666 while received < TOTAL {
667 if let Some(record) = cons.try_claim() {
668 let producer_id = u64::from_le_bytes(record[..8].try_into().unwrap()) as usize;
669 let seq = u64::from_le_bytes(record[8..].try_into().unwrap());
670
671 // Each producer's messages should arrive in order
672 assert_eq!(
673 seq, per_producer[producer_id],
674 "producer {} out of order",
675 producer_id
676 );
677 per_producer[producer_id] += 1;
678 received += 1;
679 } else {
680 std::hint::spin_loop();
681 }
682 }
683
684 per_producer
685 });
686
687 for h in handles {
688 h.join().unwrap();
689 }
690
691 let per_producer = consumer.join().unwrap();
692 for (i, &count) in per_producer.iter().enumerate() {
693 assert_eq!(count, MESSAGES_PER_PRODUCER, "producer {} count", i);
694 }
695 }
696
697 #[test]
698 fn aborted_claim_creates_skip() {
699 let (mut prod, mut cons) = new(1024);
700
701 // Claim and drop without committing
702 {
703 let mut claim = prod.try_claim(10).unwrap();
704 claim.copy_from_slice(b"0123456789");
705 // drop without commit
706 }
707
708 // Write another record
709 {
710 let mut claim = prod.try_claim(5).unwrap();
711 claim.copy_from_slice(b"hello");
712 claim.commit();
713 }
714
715 // Consumer should skip the aborted record and read the committed one
716 let record = cons.try_claim().unwrap();
717 assert_eq!(&*record, b"hello");
718 }
719
720 #[test]
721 fn wrap_around() {
722 let (mut prod, mut cons) = new(64);
723
724 // Fill with messages that will cause wrap-around
725 for i in 0..20 {
726 let payload = format!("msg{:02}", i);
727 loop {
728 match prod.try_claim(payload.len()) {
729 Ok(mut claim) => {
730 claim.copy_from_slice(payload.as_bytes());
731 claim.commit();
732 break;
733 }
734 Err(_) => {
735 // Drain some
736 while cons.try_claim().is_some() {}
737 }
738 }
739 }
740 }
741 }
742
743 #[test]
744 fn full_returns_error() {
745 let (mut prod, _cons) = new(64);
746
747 // Fill the buffer
748 let mut count = 0;
749 while let Ok(mut claim) = prod.try_claim(8) {
750 claim.copy_from_slice(b"12345678");
751 claim.commit();
752 count += 1;
753 }
754
755 assert!(count > 0);
756 assert!(prod.try_claim(8).is_err());
757 }
758
759 #[test]
760 fn disconnection_detection() {
761 let (prod, cons) = new(1024);
762
763 assert!(!prod.is_disconnected());
764 assert!(!cons.is_disconnected());
765
766 drop(cons);
767 assert!(prod.is_disconnected());
768 }
769
770 #[test]
771 #[should_panic(expected = "capacity must be at least 16")]
772 fn tiny_capacity_panics() {
773 let _ = new(8);
774 }
775
776 #[test]
777 fn zero_len_returns_error() {
778 let (mut prod, _) = new(1024);
779 assert!(matches!(prod.try_claim(0), Err(TryClaimError::ZeroLength)));
780 }
781
782 #[test]
783 fn capacity_rounds_to_power_of_two() {
784 let (prod, _) = new(100);
785 assert_eq!(prod.capacity(), 128);
786
787 let (prod, _) = new(1000);
788 assert_eq!(prod.capacity(), 1024);
789 }
790
791 /// High-volume stress test with multiple producers.
792 #[test]
793 fn stress_multiple_producers() {
794 use std::thread;
795
796 const PRODUCERS: usize = 4;
797 const COUNT_PER_PRODUCER: u64 = 100_000;
798 const TOTAL: u64 = PRODUCERS as u64 * COUNT_PER_PRODUCER;
799 const BUFFER_SIZE: usize = 64 * 1024;
800
801 let (prod, mut cons) = new(BUFFER_SIZE);
802
803 let handles: Vec<_> = (0..PRODUCERS)
804 .map(|_| {
805 let mut prod = prod.clone();
806 thread::spawn(move || {
807 for i in 0..COUNT_PER_PRODUCER {
808 let payload = i.to_le_bytes();
809 loop {
810 match prod.try_claim(payload.len()) {
811 Ok(mut claim) => {
812 claim.copy_from_slice(&payload);
813 claim.commit();
814 break;
815 }
816 Err(_) => std::hint::spin_loop(),
817 }
818 }
819 }
820 })
821 })
822 .collect();
823
824 drop(prod);
825
826 let consumer = thread::spawn(move || {
827 let mut received = 0u64;
828 let mut sum = 0u64;
829 while received < TOTAL {
830 if let Some(record) = cons.try_claim() {
831 let value = u64::from_le_bytes((*record).try_into().unwrap());
832 sum = sum.wrapping_add(value);
833 received += 1;
834 } else {
835 std::hint::spin_loop();
836 }
837 }
838 (received, sum)
839 });
840
841 for h in handles {
842 h.join().unwrap();
843 }
844
845 let (received, sum) = consumer.join().unwrap();
846 assert_eq!(received, TOTAL);
847
848 // Each producer sends 0..COUNT_PER_PRODUCER
849 // Sum per producer = COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2
850 let expected_sum = PRODUCERS as u64 * COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2;
851 assert_eq!(sum, expected_sum);
852 }
853}