nexus_logbuf/queue/mpsc.rs
1//! Multi-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared: │
8//! │ head: CachePadded<AtomicUsize> ← Consumer writes, producers read │
9//! │ tail: CachePadded<AtomicUsize> ← Producers CAS to claim space │
10//! │ buffer: *mut u8 │
11//! │ capacity: usize (power of 2) │
12//! │ mask: usize (capacity - 1) │
13//! └─────────────────────────────────────────────────────────────────────────┘
14//!
15//! ┌─────────────────────────────────┐ ┌─────────────────────────────────┐
16//! │ Producer (cloneable): │ │ Consumer: │
17//! │ cached_head: usize (local) │ │ head: usize (local) │
18//! │ shared: Arc<Shared> │ │ │
19//! └─────────────────────────────────┘ └─────────────────────────────────┘
20//! ```
21//!
22//! # Differences from SPSC
23//!
24//! - Tail is atomic in shared state (not local to producer)
25//! - Producers use CAS loop to claim space
26//! - Producer is `Clone` - multiple producers allowed
27//! - Synchronization: Relaxed CAS on tail, Release on len commit, Acquire on len read
28//!
29//! # Record Layout
30//!
31//! Same as SPSC - see [`crate::spsc`] for details.
32
33use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
34use std::cell::Cell;
35use std::ops::{Deref, DerefMut};
36use std::ptr;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicUsize, Ordering, fence};
39
40use crossbeam_utils::CachePadded;
41
42use crate::{LEN_MASK, SKIP_BIT, TryClaimError, align8};
43
44/// Header size in bytes — one system word (`usize`).
45///
46/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
47const HEADER_SIZE: usize = std::mem::size_of::<usize>();
48
49/// Creates a bounded MPSC byte ring buffer.
50///
51/// Capacity is rounded up to the next power of two.
52///
53/// # Panics
54///
55/// Panics if `capacity` is zero or less than 16 bytes.
56pub fn new(capacity: usize) -> (Producer, Consumer) {
57 assert!(capacity >= 16, "capacity must be at least 16 bytes");
58
59 let capacity = capacity.next_power_of_two();
60 let mask = capacity - 1;
61
62 // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
63 let layout = Layout::from_size_align(capacity, 8)
64 .expect("valid layout: capacity is a power of two >= 16, align is 8");
65 // SAFETY: Layout is valid — capacity >= 16 (power of two), align is 8.
66 let buffer_ptr = unsafe { alloc_zeroed(layout) };
67 if buffer_ptr.is_null() {
68 handle_alloc_error(layout);
69 }
70
71 let shared = Arc::new(Shared {
72 head: CachePadded::new(AtomicUsize::new(0)),
73 tail: CachePadded::new(AtomicUsize::new(0)),
74 buffer: buffer_ptr,
75 capacity,
76 mask,
77 });
78
79 (
80 Producer {
81 cached_head: Cell::new(0),
82 shared: Arc::clone(&shared),
83 },
84 Consumer {
85 head: Cell::new(0),
86 shared,
87 },
88 )
89}
90
91struct Shared {
92 /// Consumer's read position. Updated by consumer, read by producers.
93 head: CachePadded<AtomicUsize>,
94 /// Producers' write position. CAS'd by producers.
95 tail: CachePadded<AtomicUsize>,
96 /// Buffer pointer.
97 buffer: *mut u8,
98 /// Buffer capacity (power of 2).
99 capacity: usize,
100 /// Mask for wrapping (capacity - 1).
101 mask: usize,
102}
103
104// Safety: Buffer access is synchronized through atomic head/tail.
105// Multiple producers coordinate via CAS on tail.
106// Single consumer is enforced by API (Consumer is not Clone).
107unsafe impl Send for Shared {}
108unsafe impl Sync for Shared {}
109
110impl Drop for Shared {
111 fn drop(&mut self) {
112 let layout = Layout::from_size_align(self.capacity, 8)
113 .expect("valid layout: capacity was validated at construction");
114 // SAFETY: buffer was allocated with alloc_zeroed using this exact layout.
115 // Shared is only dropped once (Arc prevents earlier drops).
116 unsafe { dealloc(self.buffer, layout) };
117 }
118}
119
120// ============================================================================
121// Producer
122// ============================================================================
123
124/// Producer endpoint of the MPSC ring buffer.
125///
126/// This type is `Clone` - multiple producers can write concurrently.
127/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
128#[derive(Clone)]
129pub struct Producer {
130 /// Cached head position (Rigtorp-style optimization, per-producer).
131 cached_head: Cell<usize>,
132 /// Shared state.
133 shared: Arc<Shared>,
134}
135
136// Safety: Producer coordinates with other producers via atomic CAS.
137unsafe impl Send for Producer {}
138
139impl Producer {
140 /// Attempts to claim space for a record with the given payload length.
141 ///
142 /// Returns a [`WriteClaim`] that can be written to and then committed.
143 ///
144 /// # Errors
145 ///
146 /// - [`TryClaimError::ZeroLength`] if `len` is zero
147 /// - [`TryClaimError::Full`] if the buffer is full
148 ///
149 /// # Safety Contract
150 ///
151 /// `len` must not exceed `LEN_MASK`. On 64-bit this is ~9.2 exabytes
152 /// (unreachable in practice). On 32-bit, records >2GB could set
153 /// `SKIP_BIT` and corrupt the stream — enforced with `assert!`.
154 #[inline]
155 pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, TryClaimError> {
156 #[cfg(target_pointer_width = "32")]
157 assert!(len <= LEN_MASK, "payload too large for 32-bit logbuf");
158 #[cfg(not(target_pointer_width = "32"))]
159 debug_assert!(len <= LEN_MASK, "payload too large");
160 if len == 0 {
161 return Err(TryClaimError::ZeroLength);
162 }
163
164 let record_size = align8(HEADER_SIZE + len);
165
166 // CAS loop to claim space
167 loop {
168 let tail = self.shared.tail.load(Ordering::Relaxed);
169
170 // Calculate used space. If cached_head is stale, used can exceed capacity.
171 // saturating_sub handles this gracefully (returns 0 if stale).
172 let used = tail.wrapping_sub(self.cached_head.get());
173 let available = self.shared.capacity.saturating_sub(used);
174
175 if available < record_size {
176 // Reload head from shared state
177 self.cached_head
178 .set(self.shared.head.load(Ordering::Relaxed));
179 fence(Ordering::Acquire);
180
181 let used = tail.wrapping_sub(self.cached_head.get());
182 if used > self.shared.capacity || self.shared.capacity - used < record_size {
183 return Err(TryClaimError::Full);
184 }
185 }
186
187 // Check if record fits before buffer end, or needs wrap
188 let offset = tail & self.shared.mask;
189 let space_to_end = self.shared.capacity - offset;
190
191 if space_to_end < record_size {
192 // Need to wrap. Check if we have space for padding + record at start.
193 let total_needed = space_to_end + record_size;
194
195 let used = tail.wrapping_sub(self.cached_head.get());
196 let available = self.shared.capacity.saturating_sub(used);
197
198 if available < total_needed {
199 // Reload and recheck
200 self.cached_head
201 .set(self.shared.head.load(Ordering::Relaxed));
202 fence(Ordering::Acquire);
203
204 let used = tail.wrapping_sub(self.cached_head.get());
205 if used > self.shared.capacity || self.shared.capacity - used < total_needed {
206 return Err(TryClaimError::Full);
207 }
208 }
209
210 // Try to claim the padding + record space
211 let new_tail = tail.wrapping_add(total_needed);
212 if self
213 .shared
214 .tail
215 .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
216 .is_ok()
217 {
218 // We claimed the space. Write padding skip marker.
219 let buffer = self.shared.buffer;
220 let skip_len = space_to_end | SKIP_BIT;
221
222 // Release fence before writing skip marker
223 fence(Ordering::Release);
224 // SAFETY: offset is masked to [0, capacity), 8-byte aligned.
225 // CAS success guarantees exclusive ownership of this region.
226 let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
227 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
228 unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
229
230 return Ok(WriteClaim {
231 shared: &self.shared,
232 offset: 0, // Record starts at beginning after wrap
233 len,
234 record_size,
235 committed: false,
236 });
237 }
238 // CAS failed — another producer claimed first. Pause to
239 // reduce pipeline pressure, then retry with fresh tail.
240 core::hint::spin_loop();
241 continue;
242 }
243
244 // Fits without wrapping
245 let new_tail = tail.wrapping_add(record_size);
246 if self
247 .shared
248 .tail
249 .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
250 .is_ok()
251 {
252 return Ok(WriteClaim {
253 shared: &self.shared,
254 offset,
255 len,
256 record_size,
257 committed: false,
258 });
259 }
260 // CAS failed — pause before retry.
261 core::hint::spin_loop();
262 }
263 }
264
265 /// Returns the capacity of the buffer.
266 #[inline]
267 pub fn capacity(&self) -> usize {
268 self.shared.capacity
269 }
270
271 /// Best-effort hint: returns `true` if the consumer has likely been dropped.
272 ///
273 /// Uses `Arc::strong_count` which is inherently racy. For reliable
274 /// disconnection detection, use the channel layer (`channel::mpsc`).
275 #[inline]
276 pub fn is_disconnected(&self) -> bool {
277 Arc::strong_count(&self.shared) == 1
278 }
279}
280
281impl std::fmt::Debug for Producer {
282 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283 f.debug_struct("Producer")
284 .field("capacity", &self.capacity())
285 .finish_non_exhaustive()
286 }
287}
288
289// ============================================================================
290// WriteClaim
291// ============================================================================
292
293/// A claimed region for writing a record.
294///
295/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
296/// when done writing to publish the record. If dropped without committing, a skip
297/// marker is written so the consumer can advance past the dead region.
298///
299/// # Important
300///
301/// Leaking a `WriteClaim` via [`mem::forget`](std::mem::forget) will permanently
302/// block the consumer at this record's offset. This is not undefined behavior
303/// but causes an unrecoverable deadlock. Always drop or explicitly abort claims.
304pub struct WriteClaim<'a> {
305 shared: &'a Shared,
306 offset: usize,
307 len: usize,
308 record_size: usize,
309 committed: bool,
310}
311
312impl WriteClaim<'_> {
313 /// Commits the record, making it visible to the consumer.
314 #[inline]
315 pub fn commit(mut self) {
316 self.do_commit();
317 self.committed = true;
318 }
319
320 #[inline]
321 fn do_commit(&mut self) {
322 let buffer = self.shared.buffer;
323 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
324 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
325
326 // Release fence: ensures payload writes are visible before len store
327 fence(Ordering::Release);
328 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
329 unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
330 }
331
332 /// Returns the length of the payload region.
333 #[inline]
334 pub fn len(&self) -> usize {
335 self.len
336 }
337
338 /// Returns `true` if the payload is empty (always false, len must be > 0).
339 #[inline]
340 pub fn is_empty(&self) -> bool {
341 false
342 }
343}
344
345impl Deref for WriteClaim<'_> {
346 type Target = [u8];
347
348 #[inline]
349 fn deref(&self) -> &Self::Target {
350 let buffer = self.shared.buffer;
351 // SAFETY: offset + HEADER_SIZE is within the buffer. CAS atomically
352 // advances tail past this region before the pointer is constructed,
353 // guaranteeing disjoint regions — no two WriteClaims overlap in the buffer.
354 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
355 // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
356 // and exclusively owned by this claim (disjoint CAS regions).
357 // Lifetime tied to &self.
358 unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
359 }
360}
361
362impl DerefMut for WriteClaim<'_> {
363 #[inline]
364 fn deref_mut(&mut self) -> &mut Self::Target {
365 let buffer = self.shared.buffer;
366 // SAFETY: offset + HEADER_SIZE is within the buffer. CAS atomically
367 // advances tail past this region before the pointer is constructed,
368 // guaranteeing disjoint regions — no two WriteClaims overlap in the buffer.
369 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
370 // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
371 // and exclusively owned by this claim (disjoint CAS regions).
372 // Lifetime tied to &mut self.
373 unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
374 }
375}
376
377impl Drop for WriteClaim<'_> {
378 fn drop(&mut self) {
379 if !self.committed {
380 // Write skip marker so consumer can advance past this region
381 let buffer = self.shared.buffer;
382 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
383 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
384 let skip_len = self.record_size | SKIP_BIT;
385
386 fence(Ordering::Release);
387 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
388 unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
389 }
390 }
391}
392
393// ============================================================================
394// Consumer
395// ============================================================================
396
397/// Consumer endpoint of the MPSC ring buffer.
398///
399/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
400/// This type is NOT `Clone` - only one consumer is allowed.
401pub struct Consumer {
402 /// Local head position (free-running).
403 head: Cell<usize>,
404 /// Shared state.
405 shared: Arc<Shared>,
406}
407
408// Safety: Consumer is only used from one thread.
409unsafe impl Send for Consumer {}
410
411impl Consumer {
412 /// Attempts to claim the next record for reading.
413 ///
414 /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
415 /// to `&[u8]` for the payload. When dropped, the record region is zeroed
416 /// and the head is advanced.
417 ///
418 /// Returns `None` if no committed record is available.
419 #[inline]
420 pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
421 let buffer = self.shared.buffer;
422
423 loop {
424 let offset = self.head.get() & self.shared.mask;
425 // SAFETY: offset is masked to [0, capacity), always 8-byte aligned
426 // (head advances by align8'd record sizes). Buffer is valid.
427 let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
428
429 // Relaxed atomic load, then Acquire fence for payload visibility
430 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
431 let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
432 fence(Ordering::Acquire);
433
434 if len_raw == 0 {
435 // Not committed yet
436 return None;
437 }
438
439 if len_raw & SKIP_BIT != 0 {
440 // Skip marker: zero the region and advance
441 let skip_size = len_raw & LEN_MASK;
442 // Zero payload first, then stamp last (mirrors write path)
443 if skip_size > HEADER_SIZE {
444 // SAFETY: offset + HEADER_SIZE .. offset + skip_size is within
445 // the buffer. Consumer has exclusive read access to this region.
446 unsafe {
447 ptr::write_bytes(
448 buffer.add(offset + HEADER_SIZE),
449 0,
450 skip_size - HEADER_SIZE,
451 );
452 }
453 }
454 // Ensure payload zeroing completes before clearing stamp
455 fence(Ordering::Release);
456 // SAFETY: len_ptr is still valid, computed above.
457 unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
458
459 self.head.set(self.head.get().wrapping_add(skip_size));
460
461 // Ensure stamp clear completes before head advance
462 fence(Ordering::Release);
463 self.shared.head.store(self.head.get(), Ordering::Relaxed);
464
465 // Continue to check next position
466 continue;
467 }
468
469 // Valid record
470 let len = len_raw;
471 let record_size = align8(HEADER_SIZE + len);
472
473 return Some(ReadClaim {
474 consumer: self,
475 offset,
476 len,
477 record_size,
478 });
479 }
480 }
481
482 /// Returns the capacity of the buffer.
483 #[inline]
484 pub fn capacity(&self) -> usize {
485 self.shared.capacity
486 }
487
488 /// Best-effort hint: returns `true` if all producers have likely been dropped.
489 ///
490 /// See producer's [`is_disconnected`](Producer::is_disconnected) for caveats.
491 #[inline]
492 pub fn is_disconnected(&self) -> bool {
493 Arc::strong_count(&self.shared) == 1
494 }
495}
496
497impl std::fmt::Debug for Consumer {
498 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
499 f.debug_struct("Consumer")
500 .field("capacity", &self.capacity())
501 .finish_non_exhaustive()
502 }
503}
504
505// ============================================================================
506// ReadClaim
507// ============================================================================
508
509/// A claimed record for reading.
510///
511/// Dereferences to `&[u8]` for the payload. When dropped, the record region
512/// is zeroed and the head is advanced, freeing space for producers.
513pub struct ReadClaim<'a> {
514 consumer: &'a mut Consumer,
515 offset: usize,
516 len: usize,
517 record_size: usize,
518}
519
520impl ReadClaim<'_> {
521 /// Returns the length of the payload.
522 #[inline]
523 pub fn len(&self) -> usize {
524 self.len
525 }
526
527 /// Returns `true` if the payload is empty.
528 #[inline]
529 pub fn is_empty(&self) -> bool {
530 self.len == 0
531 }
532}
533
534impl Deref for ReadClaim<'_> {
535 type Target = [u8];
536
537 #[inline]
538 fn deref(&self) -> &Self::Target {
539 let buffer = self.consumer.shared.buffer;
540 // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
541 // exclusive read access via &mut Consumer borrow.
542 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
543 // SAFETY: payload_ptr is valid for self.len bytes. The producer has
544 // finished writing (len was non-zero, preceded by Release fence).
545 unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
546 }
547}
548
549impl Drop for ReadClaim<'_> {
550 fn drop(&mut self) {
551 let buffer = self.consumer.shared.buffer;
552
553 // Zero payload first, then stamp last (mirrors write path)
554 if self.record_size > HEADER_SIZE {
555 // SAFETY: offset + HEADER_SIZE .. offset + record_size is within
556 // the buffer. Consumer owns this region exclusively.
557 unsafe {
558 ptr::write_bytes(
559 buffer.add(self.offset + HEADER_SIZE),
560 0,
561 self.record_size - HEADER_SIZE,
562 );
563 }
564 }
565 // Ensure payload zeroing completes before clearing stamp
566 fence(Ordering::Release);
567 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
568 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
569 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
570 unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
571
572 // Advance head
573 let new_head = self.consumer.head.get().wrapping_add(self.record_size);
574 self.consumer.head.set(new_head);
575
576 // Ensure stamp clear completes before head advance
577 fence(Ordering::Release);
578 self.consumer.shared.head.store(new_head, Ordering::Relaxed);
579 }
580}
581
582// ============================================================================
583// Tests
584// ============================================================================
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589
590 #[test]
591 fn basic_write_read() {
592 let (mut prod, mut cons) = new(1024);
593
594 let payload = b"hello world";
595 let mut claim = prod.try_claim(payload.len()).unwrap();
596 claim.copy_from_slice(payload);
597 claim.commit();
598
599 let record = cons.try_claim().unwrap();
600 assert_eq!(&*record, payload);
601 }
602
603 #[test]
604 fn empty_returns_none() {
605 let (_, mut cons) = new(1024);
606 assert!(cons.try_claim().is_none());
607 }
608
609 #[test]
610 fn multiple_records() {
611 let (mut prod, mut cons) = new(1024);
612
613 for i in 0..10 {
614 let payload = format!("message {}", i);
615 let mut claim = prod.try_claim(payload.len()).unwrap();
616 claim.copy_from_slice(payload.as_bytes());
617 claim.commit();
618 }
619
620 for i in 0..10 {
621 let record = cons.try_claim().unwrap();
622 let expected = format!("message {}", i);
623 assert_eq!(&*record, expected.as_bytes());
624 }
625
626 assert!(cons.try_claim().is_none());
627 }
628
629 #[test]
630 #[allow(clippy::redundant_clone)]
631 fn producer_is_clone() {
632 let (prod, _cons) = new(1024);
633 let _prod2 = prod.clone();
634 }
635
636 #[test]
637 fn multiple_producers_single_consumer() {
638 use std::thread;
639
640 const PRODUCERS: usize = 4;
641 const MESSAGES_PER_PRODUCER: u64 = 10_000;
642 const TOTAL: u64 = PRODUCERS as u64 * MESSAGES_PER_PRODUCER;
643
644 let (prod, mut cons) = new(64 * 1024);
645
646 let handles: Vec<_> = (0..PRODUCERS)
647 .map(|producer_id| {
648 let mut prod = prod.clone();
649 thread::spawn(move || {
650 for i in 0..MESSAGES_PER_PRODUCER {
651 // Encode producer_id and sequence in payload
652 let mut payload = [0u8; 16];
653 payload[..8].copy_from_slice(&(producer_id as u64).to_le_bytes());
654 payload[8..].copy_from_slice(&i.to_le_bytes());
655
656 loop {
657 match prod.try_claim(16) {
658 Ok(mut claim) => {
659 claim.copy_from_slice(&payload);
660 claim.commit();
661 break;
662 }
663 Err(_) => std::hint::spin_loop(),
664 }
665 }
666 }
667 })
668 })
669 .collect();
670
671 // Drop original producer
672 drop(prod);
673
674 // Consumer: track per-producer sequence
675 let consumer = thread::spawn(move || {
676 let mut received = 0u64;
677 let mut per_producer = vec![0u64; PRODUCERS];
678
679 while received < TOTAL {
680 if let Some(record) = cons.try_claim() {
681 let producer_id = u64::from_le_bytes(record[..8].try_into().unwrap()) as usize;
682 let seq = u64::from_le_bytes(record[8..].try_into().unwrap());
683
684 // Each producer's messages should arrive in order
685 assert_eq!(
686 seq, per_producer[producer_id],
687 "producer {} out of order",
688 producer_id
689 );
690 per_producer[producer_id] += 1;
691 received += 1;
692 } else {
693 std::hint::spin_loop();
694 }
695 }
696
697 per_producer
698 });
699
700 for h in handles {
701 h.join().unwrap();
702 }
703
704 let per_producer = consumer.join().unwrap();
705 for (i, &count) in per_producer.iter().enumerate() {
706 assert_eq!(count, MESSAGES_PER_PRODUCER, "producer {} count", i);
707 }
708 }
709
710 #[test]
711 fn aborted_claim_creates_skip() {
712 let (mut prod, mut cons) = new(1024);
713
714 // Claim and drop without committing
715 {
716 let mut claim = prod.try_claim(10).unwrap();
717 claim.copy_from_slice(b"0123456789");
718 // drop without commit
719 }
720
721 // Write another record
722 {
723 let mut claim = prod.try_claim(5).unwrap();
724 claim.copy_from_slice(b"hello");
725 claim.commit();
726 }
727
728 // Consumer should skip the aborted record and read the committed one
729 let record = cons.try_claim().unwrap();
730 assert_eq!(&*record, b"hello");
731 }
732
733 #[test]
734 fn wrap_around() {
735 let (mut prod, mut cons) = new(64);
736
737 // Fill with messages that will cause wrap-around
738 for i in 0..20 {
739 let payload = format!("msg{:02}", i);
740 loop {
741 match prod.try_claim(payload.len()) {
742 Ok(mut claim) => {
743 claim.copy_from_slice(payload.as_bytes());
744 claim.commit();
745 break;
746 }
747 Err(_) => {
748 // Drain some
749 while cons.try_claim().is_some() {}
750 }
751 }
752 }
753 }
754 }
755
756 #[test]
757 fn full_returns_error() {
758 let (mut prod, _cons) = new(64);
759
760 // Fill the buffer
761 let mut count = 0;
762 while let Ok(mut claim) = prod.try_claim(8) {
763 claim.copy_from_slice(b"12345678");
764 claim.commit();
765 count += 1;
766 }
767
768 assert!(count > 0);
769 assert!(prod.try_claim(8).is_err());
770 }
771
772 #[test]
773 fn disconnection_detection() {
774 let (prod, cons) = new(1024);
775
776 assert!(!prod.is_disconnected());
777 assert!(!cons.is_disconnected());
778
779 drop(cons);
780 assert!(prod.is_disconnected());
781 }
782
783 #[test]
784 #[should_panic(expected = "capacity must be at least 16")]
785 fn tiny_capacity_panics() {
786 let _ = new(8);
787 }
788
789 #[test]
790 fn zero_len_returns_error() {
791 let (mut prod, _) = new(1024);
792 assert!(matches!(prod.try_claim(0), Err(TryClaimError::ZeroLength)));
793 }
794
795 #[test]
796 fn capacity_rounds_to_power_of_two() {
797 let (prod, _) = new(100);
798 assert_eq!(prod.capacity(), 128);
799
800 let (prod, _) = new(1000);
801 assert_eq!(prod.capacity(), 1024);
802 }
803
804 /// High-volume stress test with multiple producers.
805 #[test]
806 fn stress_multiple_producers() {
807 use std::thread;
808
809 const PRODUCERS: usize = 4;
810 const COUNT_PER_PRODUCER: u64 = 100_000;
811 const TOTAL: u64 = PRODUCERS as u64 * COUNT_PER_PRODUCER;
812 const BUFFER_SIZE: usize = 64 * 1024;
813
814 let (prod, mut cons) = new(BUFFER_SIZE);
815
816 let handles: Vec<_> = (0..PRODUCERS)
817 .map(|_| {
818 let mut prod = prod.clone();
819 thread::spawn(move || {
820 for i in 0..COUNT_PER_PRODUCER {
821 let payload = i.to_le_bytes();
822 loop {
823 match prod.try_claim(payload.len()) {
824 Ok(mut claim) => {
825 claim.copy_from_slice(&payload);
826 claim.commit();
827 break;
828 }
829 Err(_) => std::hint::spin_loop(),
830 }
831 }
832 }
833 })
834 })
835 .collect();
836
837 drop(prod);
838
839 let consumer = thread::spawn(move || {
840 let mut received = 0u64;
841 let mut sum = 0u64;
842 while received < TOTAL {
843 if let Some(record) = cons.try_claim() {
844 let value = u64::from_le_bytes((*record).try_into().unwrap());
845 sum = sum.wrapping_add(value);
846 received += 1;
847 } else {
848 std::hint::spin_loop();
849 }
850 }
851 (received, sum)
852 });
853
854 for h in handles {
855 h.join().unwrap();
856 }
857
858 let (received, sum) = consumer.join().unwrap();
859 assert_eq!(received, TOTAL);
860
861 // Each producer sends 0..COUNT_PER_PRODUCER
862 // Sum per producer = COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2
863 let expected_sum = PRODUCERS as u64 * COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2;
864 assert_eq!(sum, expected_sum);
865 }
866}