nexus_logbuf/queue/mpsc.rs
1//! Multi-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared: │
8//! │ head: CachePadded<AtomicUsize> ← Consumer writes, producers read │
9//! │ tail: CachePadded<AtomicUsize> ← Producers CAS to claim space │
10//! │ buffer: *mut u8 │
11//! │ capacity: usize (power of 2) │
12//! │ mask: usize (capacity - 1) │
13//! └─────────────────────────────────────────────────────────────────────────┘
14//!
15//! ┌─────────────────────────────────┐ ┌─────────────────────────────────┐
16//! │ Producer (cloneable): │ │ Consumer: │
17//! │ cached_head: usize (local) │ │ head: usize (local) │
18//! │ shared: Arc<Shared> │ │ │
19//! └─────────────────────────────────┘ └─────────────────────────────────┘
20//! ```
21//!
22//! # Differences from SPSC
23//!
24//! - Tail is atomic in shared state (not local to producer)
25//! - Producers use CAS loop to claim space
26//! - Producer is `Clone` - multiple producers allowed
27//! - Synchronization: Relaxed CAS on tail, Release on len commit, Acquire on len read
28//!
29//! # Record Layout
30//!
31//! Same as SPSC - see [`crate::spsc`] for details.
32
33use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
34use std::cell::Cell;
35use std::ops::{Deref, DerefMut};
36use std::ptr;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicUsize, Ordering, fence};
39
40use crossbeam_utils::CachePadded;
41
42use crate::{BufferFull, LEN_MASK, SKIP_BIT, align8};
43
44/// Header size in bytes — one system word (`usize`).
45///
46/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
47const HEADER_SIZE: usize = std::mem::size_of::<usize>();
48
49/// Creates a bounded MPSC byte ring buffer.
50///
51/// Capacity is rounded up to the next power of two.
52///
53/// # Panics
54///
55/// Panics if `capacity` is zero or less than 16 bytes.
56pub fn new(capacity: usize) -> (Producer, Consumer) {
57 assert!(capacity >= 16, "capacity must be at least 16 bytes");
58
59 let capacity = capacity.next_power_of_two();
60 let mask = capacity - 1;
61
62 // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
63 let layout = Layout::from_size_align(capacity, 8)
64 .expect("valid layout: capacity is a power of two >= 16, align is 8");
65 // SAFETY: Layout is valid — capacity >= 16 (power of two), align is 8.
66 let buffer_ptr = unsafe { alloc_zeroed(layout) };
67 if buffer_ptr.is_null() {
68 handle_alloc_error(layout);
69 }
70
71 let shared = Arc::new(Shared {
72 head: CachePadded::new(AtomicUsize::new(0)),
73 tail: CachePadded::new(AtomicUsize::new(0)),
74 buffer: buffer_ptr,
75 capacity,
76 mask,
77 });
78
79 (
80 Producer {
81 cached_head: Cell::new(0),
82 shared: Arc::clone(&shared),
83 },
84 Consumer {
85 head: Cell::new(0),
86 shared,
87 },
88 )
89}
90
91struct Shared {
92 /// Consumer's read position. Updated by consumer, read by producers.
93 head: CachePadded<AtomicUsize>,
94 /// Producers' write position. CAS'd by producers.
95 tail: CachePadded<AtomicUsize>,
96 /// Buffer pointer.
97 buffer: *mut u8,
98 /// Buffer capacity (power of 2).
99 capacity: usize,
100 /// Mask for wrapping (capacity - 1).
101 mask: usize,
102}
103
104// Safety: Buffer access is synchronized through atomic head/tail.
105// Multiple producers coordinate via CAS on tail.
106// Single consumer is enforced by API (Consumer is not Clone).
107unsafe impl Send for Shared {}
108unsafe impl Sync for Shared {}
109
110impl Drop for Shared {
111 fn drop(&mut self) {
112 let layout = Layout::from_size_align(self.capacity, 8)
113 .expect("valid layout: capacity was validated at construction");
114 // SAFETY: buffer was allocated with alloc_zeroed using this exact layout.
115 // Shared is only dropped once (Arc prevents earlier drops).
116 unsafe { dealloc(self.buffer, layout) };
117 }
118}
119
120// ============================================================================
121// Producer
122// ============================================================================
123
124/// Producer endpoint of the MPSC ring buffer.
125///
126/// This type is `Clone` - multiple producers can write concurrently.
127/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
128#[derive(Clone)]
129pub struct Producer {
130 /// Cached head position (Rigtorp-style optimization, per-producer).
131 cached_head: Cell<usize>,
132 /// Shared state.
133 shared: Arc<Shared>,
134}
135
136// Safety: Producer coordinates with other producers via atomic CAS.
137unsafe impl Send for Producer {}
138
139impl Producer {
140 /// Attempts to claim space for a record with the given payload length.
141 ///
142 /// Returns a [`WriteClaim`] that can be written to and then committed.
143 ///
144 /// # Errors
145 ///
146 /// Returns [`BufferFull`] if the buffer has no space for the record.
147 ///
148 /// # Panics
149 ///
150 /// Panics if `len == 0`. The wire format reserves `len == 0` as the
151 /// "uncommitted" sentinel — letting it through would silently hang the
152 /// consumer. Aborting a non-zero claim is fully supported (drop the
153 /// [`WriteClaim`] without committing); only claiming zero bytes upfront
154 /// is forbidden.
155 ///
156 /// # Safety Contract
157 ///
158 /// `len` must not exceed `LEN_MASK`. On 64-bit this is ~9.2 exabytes
159 /// (unreachable in practice). On 32-bit, records >2GB could set
160 /// `SKIP_BIT` and corrupt the stream — enforced with `assert!`.
161 #[inline]
162 pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, BufferFull> {
163 assert!(len > 0, "payload length must be non-zero");
164 #[cfg(target_pointer_width = "32")]
165 assert!(len <= LEN_MASK, "payload too large for 32-bit logbuf");
166 #[cfg(not(target_pointer_width = "32"))]
167 debug_assert!(len <= LEN_MASK, "payload too large");
168
169 let record_size = align8(HEADER_SIZE + len);
170
171 // CAS loop to claim space
172 loop {
173 let tail = self.shared.tail.load(Ordering::Relaxed);
174
175 // Calculate used space. If cached_head is stale, used can exceed capacity.
176 // saturating_sub handles this gracefully (returns 0 if stale).
177 let used = tail.wrapping_sub(self.cached_head.get());
178 let available = self.shared.capacity.saturating_sub(used);
179
180 if available < record_size {
181 // Reload head from shared state
182 self.cached_head
183 .set(self.shared.head.load(Ordering::Relaxed));
184 fence(Ordering::Acquire);
185
186 let used = tail.wrapping_sub(self.cached_head.get());
187 if used > self.shared.capacity || self.shared.capacity - used < record_size {
188 return Err(BufferFull);
189 }
190 }
191
192 // Check if record fits before buffer end, or needs wrap
193 let offset = tail & self.shared.mask;
194 let space_to_end = self.shared.capacity - offset;
195
196 if space_to_end < record_size {
197 // Need to wrap. Check if we have space for padding + record at start.
198 let total_needed = space_to_end + record_size;
199
200 let used = tail.wrapping_sub(self.cached_head.get());
201 let available = self.shared.capacity.saturating_sub(used);
202
203 if available < total_needed {
204 // Reload and recheck
205 self.cached_head
206 .set(self.shared.head.load(Ordering::Relaxed));
207 fence(Ordering::Acquire);
208
209 let used = tail.wrapping_sub(self.cached_head.get());
210 if used > self.shared.capacity || self.shared.capacity - used < total_needed {
211 return Err(BufferFull);
212 }
213 }
214
215 // Try to claim the padding + record space
216 let new_tail = tail.wrapping_add(total_needed);
217 if self
218 .shared
219 .tail
220 .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
221 .is_ok()
222 {
223 // We claimed the space. Write padding skip marker.
224 let buffer = self.shared.buffer;
225 let skip_len = space_to_end | SKIP_BIT;
226
227 // Release fence before writing skip marker
228 fence(Ordering::Release);
229 // SAFETY: offset is masked to [0, capacity), 8-byte aligned.
230 // CAS success guarantees exclusive ownership of this region.
231 let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
232 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
233 unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
234
235 return Ok(WriteClaim {
236 shared: &self.shared,
237 offset: 0, // Record starts at beginning after wrap
238 len,
239 record_size,
240 committed: false,
241 });
242 }
243 // CAS failed — another producer claimed first. Pause to
244 // reduce pipeline pressure, then retry with fresh tail.
245 core::hint::spin_loop();
246 continue;
247 }
248
249 // Fits without wrapping
250 let new_tail = tail.wrapping_add(record_size);
251 if self
252 .shared
253 .tail
254 .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
255 .is_ok()
256 {
257 return Ok(WriteClaim {
258 shared: &self.shared,
259 offset,
260 len,
261 record_size,
262 committed: false,
263 });
264 }
265 // CAS failed — pause before retry.
266 core::hint::spin_loop();
267 }
268 }
269
270 /// Returns the capacity of the buffer.
271 #[inline]
272 pub fn capacity(&self) -> usize {
273 self.shared.capacity
274 }
275
276 /// Best-effort hint: returns `true` if the consumer has likely been dropped.
277 ///
278 /// Uses `Arc::strong_count` which is inherently racy. For reliable
279 /// disconnection detection, use the channel layer (`channel::mpsc`).
280 #[inline]
281 pub fn is_disconnected(&self) -> bool {
282 Arc::strong_count(&self.shared) == 1
283 }
284}
285
286impl std::fmt::Debug for Producer {
287 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288 f.debug_struct("Producer")
289 .field("capacity", &self.capacity())
290 .finish_non_exhaustive()
291 }
292}
293
294// ============================================================================
295// WriteClaim
296// ============================================================================
297
298/// A claimed region for writing a record.
299///
300/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
301/// when done writing to publish the record. If dropped without committing, a skip
302/// marker is written so the consumer can advance past the dead region.
303///
304/// # Important
305///
306/// Leaking a `WriteClaim` via [`mem::forget`](std::mem::forget) will permanently
307/// block the consumer at this record's offset. This is not undefined behavior
308/// but causes an unrecoverable deadlock. Always drop or explicitly abort claims.
309pub struct WriteClaim<'a> {
310 shared: &'a Shared,
311 offset: usize,
312 len: usize,
313 record_size: usize,
314 committed: bool,
315}
316
317impl WriteClaim<'_> {
318 /// Commits the record, making it visible to the consumer.
319 #[inline]
320 pub fn commit(mut self) {
321 self.do_commit();
322 self.committed = true;
323 }
324
325 #[inline]
326 fn do_commit(&mut self) {
327 let buffer = self.shared.buffer;
328 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
329 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
330
331 // Release fence: ensures payload writes are visible before len store
332 fence(Ordering::Release);
333 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
334 unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
335 }
336
337 /// Returns the length of the payload region.
338 #[inline]
339 pub fn len(&self) -> usize {
340 self.len
341 }
342
343 /// Returns `true` if the payload is empty (always false, len must be > 0).
344 #[inline]
345 pub fn is_empty(&self) -> bool {
346 false
347 }
348}
349
350impl Deref for WriteClaim<'_> {
351 type Target = [u8];
352
353 #[inline]
354 fn deref(&self) -> &Self::Target {
355 let buffer = self.shared.buffer;
356 // SAFETY: offset + HEADER_SIZE is within the buffer. CAS atomically
357 // advances tail past this region before the pointer is constructed,
358 // guaranteeing disjoint regions — no two WriteClaims overlap in the buffer.
359 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
360 // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
361 // and exclusively owned by this claim (disjoint CAS regions).
362 // Lifetime tied to &self.
363 unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
364 }
365}
366
367impl DerefMut for WriteClaim<'_> {
368 #[inline]
369 fn deref_mut(&mut self) -> &mut Self::Target {
370 let buffer = self.shared.buffer;
371 // SAFETY: offset + HEADER_SIZE is within the buffer. CAS atomically
372 // advances tail past this region before the pointer is constructed,
373 // guaranteeing disjoint regions — no two WriteClaims overlap in the buffer.
374 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
375 // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
376 // and exclusively owned by this claim (disjoint CAS regions).
377 // Lifetime tied to &mut self.
378 unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
379 }
380}
381
382impl Drop for WriteClaim<'_> {
383 fn drop(&mut self) {
384 if !self.committed {
385 // Write skip marker so consumer can advance past this region
386 let buffer = self.shared.buffer;
387 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
388 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
389 let skip_len = self.record_size | SKIP_BIT;
390
391 fence(Ordering::Release);
392 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
393 unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
394 }
395 }
396}
397
398// ============================================================================
399// Consumer
400// ============================================================================
401
402/// Consumer endpoint of the MPSC ring buffer.
403///
404/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
405/// This type is NOT `Clone` - only one consumer is allowed.
406pub struct Consumer {
407 /// Local head position (free-running).
408 head: Cell<usize>,
409 /// Shared state.
410 shared: Arc<Shared>,
411}
412
413// Safety: Consumer is only used from one thread.
414unsafe impl Send for Consumer {}
415
416impl Consumer {
417 /// Attempts to claim the next record for reading.
418 ///
419 /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
420 /// to `&[u8]` for the payload. When dropped, the record region is zeroed
421 /// and the head is advanced.
422 ///
423 /// Returns `None` if no committed record is available.
424 #[inline]
425 pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
426 let buffer = self.shared.buffer;
427
428 loop {
429 let offset = self.head.get() & self.shared.mask;
430 // SAFETY: offset is masked to [0, capacity), always 8-byte aligned
431 // (head advances by align8'd record sizes). Buffer is valid.
432 let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
433
434 // Relaxed atomic load, then Acquire fence for payload visibility
435 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
436 let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
437 fence(Ordering::Acquire);
438
439 if len_raw == 0 {
440 // Not committed yet
441 return None;
442 }
443
444 if len_raw & SKIP_BIT != 0 {
445 // Skip marker: zero the region and advance
446 let skip_size = len_raw & LEN_MASK;
447 // Zero payload first, then stamp last (mirrors write path)
448 if skip_size > HEADER_SIZE {
449 // SAFETY: offset + HEADER_SIZE .. offset + skip_size is within
450 // the buffer. Consumer has exclusive read access to this region.
451 unsafe {
452 ptr::write_bytes(
453 buffer.add(offset + HEADER_SIZE),
454 0,
455 skip_size - HEADER_SIZE,
456 );
457 }
458 }
459 // Ensure payload zeroing completes before clearing stamp
460 fence(Ordering::Release);
461 // SAFETY: len_ptr is still valid, computed above.
462 unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
463
464 self.head.set(self.head.get().wrapping_add(skip_size));
465
466 // Ensure stamp clear completes before head advance
467 fence(Ordering::Release);
468 self.shared.head.store(self.head.get(), Ordering::Relaxed);
469
470 // Continue to check next position
471 continue;
472 }
473
474 // Valid record
475 let len = len_raw;
476 let record_size = align8(HEADER_SIZE + len);
477
478 return Some(ReadClaim {
479 consumer: self,
480 offset,
481 len,
482 record_size,
483 });
484 }
485 }
486
487 /// Returns the capacity of the buffer.
488 #[inline]
489 pub fn capacity(&self) -> usize {
490 self.shared.capacity
491 }
492
493 /// Best-effort hint: returns `true` if all producers have likely been dropped.
494 ///
495 /// See producer's [`is_disconnected`](Producer::is_disconnected) for caveats.
496 #[inline]
497 pub fn is_disconnected(&self) -> bool {
498 Arc::strong_count(&self.shared) == 1
499 }
500}
501
502impl std::fmt::Debug for Consumer {
503 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
504 f.debug_struct("Consumer")
505 .field("capacity", &self.capacity())
506 .finish_non_exhaustive()
507 }
508}
509
510// ============================================================================
511// ReadClaim
512// ============================================================================
513
514/// A claimed record for reading.
515///
516/// Dereferences to `&[u8]` for the payload. When dropped, the record region
517/// is zeroed and the head is advanced, freeing space for producers.
518pub struct ReadClaim<'a> {
519 consumer: &'a mut Consumer,
520 offset: usize,
521 len: usize,
522 record_size: usize,
523}
524
525impl ReadClaim<'_> {
526 /// Returns the length of the payload.
527 #[inline]
528 pub fn len(&self) -> usize {
529 self.len
530 }
531
532 /// Returns `true` if the payload is empty.
533 #[inline]
534 pub fn is_empty(&self) -> bool {
535 self.len == 0
536 }
537}
538
539impl Deref for ReadClaim<'_> {
540 type Target = [u8];
541
542 #[inline]
543 fn deref(&self) -> &Self::Target {
544 let buffer = self.consumer.shared.buffer;
545 // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
546 // exclusive read access via &mut Consumer borrow.
547 let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
548 // SAFETY: payload_ptr is valid for self.len bytes. The producer has
549 // finished writing (len was non-zero, preceded by Release fence).
550 unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
551 }
552}
553
554impl Drop for ReadClaim<'_> {
555 fn drop(&mut self) {
556 let buffer = self.consumer.shared.buffer;
557
558 // Zero payload first, then stamp last (mirrors write path)
559 if self.record_size > HEADER_SIZE {
560 // SAFETY: offset + HEADER_SIZE .. offset + record_size is within
561 // the buffer. Consumer owns this region exclusively.
562 unsafe {
563 ptr::write_bytes(
564 buffer.add(self.offset + HEADER_SIZE),
565 0,
566 self.record_size - HEADER_SIZE,
567 );
568 }
569 }
570 // Ensure payload zeroing completes before clearing stamp
571 fence(Ordering::Release);
572 // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
573 let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
574 // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
575 unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
576
577 // Advance head
578 let new_head = self.consumer.head.get().wrapping_add(self.record_size);
579 self.consumer.head.set(new_head);
580
581 // Ensure stamp clear completes before head advance
582 fence(Ordering::Release);
583 self.consumer.shared.head.store(new_head, Ordering::Relaxed);
584 }
585}
586
587// ============================================================================
588// Tests
589// ============================================================================
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594
595 #[test]
596 fn basic_write_read() {
597 let (mut prod, mut cons) = new(1024);
598
599 let payload = b"hello world";
600 let mut claim = prod.try_claim(payload.len()).unwrap();
601 claim.copy_from_slice(payload);
602 claim.commit();
603
604 let record = cons.try_claim().unwrap();
605 assert_eq!(&*record, payload);
606 }
607
608 #[test]
609 fn empty_returns_none() {
610 let (_, mut cons) = new(1024);
611 assert!(cons.try_claim().is_none());
612 }
613
614 #[test]
615 fn multiple_records() {
616 let (mut prod, mut cons) = new(1024);
617
618 for i in 0..10 {
619 let payload = format!("message {}", i);
620 let mut claim = prod.try_claim(payload.len()).unwrap();
621 claim.copy_from_slice(payload.as_bytes());
622 claim.commit();
623 }
624
625 for i in 0..10 {
626 let record = cons.try_claim().unwrap();
627 let expected = format!("message {}", i);
628 assert_eq!(&*record, expected.as_bytes());
629 }
630
631 assert!(cons.try_claim().is_none());
632 }
633
634 #[test]
635 #[allow(clippy::redundant_clone)]
636 fn producer_is_clone() {
637 let (prod, _cons) = new(1024);
638 let _prod2 = prod.clone();
639 }
640
641 #[test]
642 fn multiple_producers_single_consumer() {
643 use std::thread;
644
645 const PRODUCERS: usize = 4;
646 const MESSAGES_PER_PRODUCER: u64 = 10_000;
647 const TOTAL: u64 = PRODUCERS as u64 * MESSAGES_PER_PRODUCER;
648
649 let (prod, mut cons) = new(64 * 1024);
650
651 let handles: Vec<_> = (0..PRODUCERS)
652 .map(|producer_id| {
653 let mut prod = prod.clone();
654 thread::spawn(move || {
655 for i in 0..MESSAGES_PER_PRODUCER {
656 // Encode producer_id and sequence in payload
657 let mut payload = [0u8; 16];
658 payload[..8].copy_from_slice(&(producer_id as u64).to_le_bytes());
659 payload[8..].copy_from_slice(&i.to_le_bytes());
660
661 loop {
662 match prod.try_claim(16) {
663 Ok(mut claim) => {
664 claim.copy_from_slice(&payload);
665 claim.commit();
666 break;
667 }
668 Err(_) => std::hint::spin_loop(),
669 }
670 }
671 }
672 })
673 })
674 .collect();
675
676 // Drop original producer
677 drop(prod);
678
679 // Consumer: track per-producer sequence
680 let consumer = thread::spawn(move || {
681 let mut received = 0u64;
682 let mut per_producer = vec![0u64; PRODUCERS];
683
684 while received < TOTAL {
685 if let Some(record) = cons.try_claim() {
686 let producer_id = u64::from_le_bytes(record[..8].try_into().unwrap()) as usize;
687 let seq = u64::from_le_bytes(record[8..].try_into().unwrap());
688
689 // Each producer's messages should arrive in order
690 assert_eq!(
691 seq, per_producer[producer_id],
692 "producer {} out of order",
693 producer_id
694 );
695 per_producer[producer_id] += 1;
696 received += 1;
697 } else {
698 std::hint::spin_loop();
699 }
700 }
701
702 per_producer
703 });
704
705 for h in handles {
706 h.join().unwrap();
707 }
708
709 let per_producer = consumer.join().unwrap();
710 for (i, &count) in per_producer.iter().enumerate() {
711 assert_eq!(count, MESSAGES_PER_PRODUCER, "producer {} count", i);
712 }
713 }
714
715 #[test]
716 fn aborted_claim_creates_skip() {
717 let (mut prod, mut cons) = new(1024);
718
719 // Claim and drop without committing
720 {
721 let mut claim = prod.try_claim(10).unwrap();
722 claim.copy_from_slice(b"0123456789");
723 // drop without commit
724 }
725
726 // Write another record
727 {
728 let mut claim = prod.try_claim(5).unwrap();
729 claim.copy_from_slice(b"hello");
730 claim.commit();
731 }
732
733 // Consumer should skip the aborted record and read the committed one
734 let record = cons.try_claim().unwrap();
735 assert_eq!(&*record, b"hello");
736 }
737
738 #[test]
739 fn wrap_around() {
740 let (mut prod, mut cons) = new(64);
741
742 // Fill with messages that will cause wrap-around
743 for i in 0..20 {
744 let payload = format!("msg{:02}", i);
745 loop {
746 match prod.try_claim(payload.len()) {
747 Ok(mut claim) => {
748 claim.copy_from_slice(payload.as_bytes());
749 claim.commit();
750 break;
751 }
752 Err(_) => {
753 // Drain some
754 while cons.try_claim().is_some() {}
755 }
756 }
757 }
758 }
759 }
760
761 #[test]
762 fn full_returns_error() {
763 let (mut prod, _cons) = new(64);
764
765 // Fill the buffer
766 let mut count = 0;
767 while let Ok(mut claim) = prod.try_claim(8) {
768 claim.copy_from_slice(b"12345678");
769 claim.commit();
770 count += 1;
771 }
772
773 assert!(count > 0);
774 assert!(prod.try_claim(8).is_err());
775 }
776
777 #[test]
778 fn disconnection_detection() {
779 let (prod, cons) = new(1024);
780
781 assert!(!prod.is_disconnected());
782 assert!(!cons.is_disconnected());
783
784 drop(cons);
785 assert!(prod.is_disconnected());
786 }
787
788 #[test]
789 #[should_panic(expected = "capacity must be at least 16")]
790 fn tiny_capacity_panics() {
791 let _ = new(8);
792 }
793
794 #[test]
795 #[should_panic(expected = "payload length must be non-zero")]
796 fn zero_len_panics() {
797 let (mut prod, _) = new(1024);
798 let _ = prod.try_claim(0);
799 }
800
801 #[test]
802 fn capacity_rounds_to_power_of_two() {
803 let (prod, _) = new(100);
804 assert_eq!(prod.capacity(), 128);
805
806 let (prod, _) = new(1000);
807 assert_eq!(prod.capacity(), 1024);
808 }
809
810 /// High-volume stress test with multiple producers.
811 #[test]
812 fn stress_multiple_producers() {
813 use std::thread;
814
815 const PRODUCERS: usize = 4;
816 const COUNT_PER_PRODUCER: u64 = 100_000;
817 const TOTAL: u64 = PRODUCERS as u64 * COUNT_PER_PRODUCER;
818 const BUFFER_SIZE: usize = 64 * 1024;
819
820 let (prod, mut cons) = new(BUFFER_SIZE);
821
822 let handles: Vec<_> = (0..PRODUCERS)
823 .map(|_| {
824 let mut prod = prod.clone();
825 thread::spawn(move || {
826 for i in 0..COUNT_PER_PRODUCER {
827 let payload = i.to_le_bytes();
828 loop {
829 match prod.try_claim(payload.len()) {
830 Ok(mut claim) => {
831 claim.copy_from_slice(&payload);
832 claim.commit();
833 break;
834 }
835 Err(_) => std::hint::spin_loop(),
836 }
837 }
838 }
839 })
840 })
841 .collect();
842
843 drop(prod);
844
845 let consumer = thread::spawn(move || {
846 let mut received = 0u64;
847 let mut sum = 0u64;
848 while received < TOTAL {
849 if let Some(record) = cons.try_claim() {
850 let value = u64::from_le_bytes((*record).try_into().unwrap());
851 sum = sum.wrapping_add(value);
852 received += 1;
853 } else {
854 std::hint::spin_loop();
855 }
856 }
857 (received, sum)
858 });
859
860 for h in handles {
861 h.join().unwrap();
862 }
863
864 let (received, sum) = consumer.join().unwrap();
865 assert_eq!(received, TOTAL);
866
867 // Each producer sends 0..COUNT_PER_PRODUCER
868 // Sum per producer = COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2
869 let expected_sum = PRODUCERS as u64 * COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2;
870 assert_eq!(sum, expected_sum);
871 }
872}