Skip to main content

nexus_logbuf/queue/
mpsc.rs

1//! Multi-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared:                                                                 │
8//! │   head: CachePadded<AtomicUsize>  ← Consumer writes, producers read     │
9//! │   tail: CachePadded<AtomicUsize>  ← Producers CAS to claim space        │
10//! │   buffer: *mut u8                                                       │
11//! │   capacity: usize                 (power of 2)                          │
12//! │   mask: usize                     (capacity - 1)                        │
13//! └─────────────────────────────────────────────────────────────────────────┘
14//!
15//! ┌─────────────────────────────────┐   ┌─────────────────────────────────┐
16//! │ Producer (cloneable):           │   │ Consumer:                       │
17//! │   cached_head: usize (local)    │   │   head: usize        (local)    │
18//! │   shared: Arc<Shared>           │   │                                 │
19//! └─────────────────────────────────┘   └─────────────────────────────────┘
20//! ```
21//!
22//! # Differences from SPSC
23//!
24//! - Tail is atomic in shared state (not local to producer)
25//! - Producers use CAS loop to claim space
26//! - Producer is `Clone` - multiple producers allowed
27//! - Synchronization: Relaxed CAS on tail, Release on len commit, Acquire on len read
28//!
29//! # Record Layout
30//!
31//! Same as SPSC - see [`crate::spsc`] for details.
32
33use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
34use std::cell::Cell;
35use std::ops::{Deref, DerefMut};
36use std::ptr;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicUsize, Ordering, fence};
39
40use crossbeam_utils::CachePadded;
41
42use crate::{LEN_MASK, SKIP_BIT, TryClaimError, align8};
43
44/// Header size in bytes — one system word (`usize`).
45///
46/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
47const HEADER_SIZE: usize = std::mem::size_of::<usize>();
48
49/// Creates a bounded MPSC byte ring buffer.
50///
51/// Capacity is rounded up to the next power of two.
52///
53/// # Panics
54///
55/// Panics if `capacity` is zero or less than 16 bytes.
56pub fn new(capacity: usize) -> (Producer, Consumer) {
57    assert!(capacity >= 16, "capacity must be at least 16 bytes");
58
59    let capacity = capacity.next_power_of_two();
60    let mask = capacity - 1;
61
62    // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
63    let layout = Layout::from_size_align(capacity, 8)
64        .expect("valid layout: capacity is a power of two >= 16, align is 8");
65    let buffer_ptr = unsafe { alloc_zeroed(layout) };
66    if buffer_ptr.is_null() {
67        handle_alloc_error(layout);
68    }
69
70    let shared = Arc::new(Shared {
71        head: CachePadded::new(AtomicUsize::new(0)),
72        tail: CachePadded::new(AtomicUsize::new(0)),
73        buffer: buffer_ptr,
74        capacity,
75        mask,
76    });
77
78    (
79        Producer {
80            cached_head: Cell::new(0),
81            shared: Arc::clone(&shared),
82        },
83        Consumer {
84            head: Cell::new(0),
85            shared,
86        },
87    )
88}
89
90struct Shared {
91    /// Consumer's read position. Updated by consumer, read by producers.
92    head: CachePadded<AtomicUsize>,
93    /// Producers' write position. CAS'd by producers.
94    tail: CachePadded<AtomicUsize>,
95    /// Buffer pointer.
96    buffer: *mut u8,
97    /// Buffer capacity (power of 2).
98    capacity: usize,
99    /// Mask for wrapping (capacity - 1).
100    mask: usize,
101}
102
103// Safety: Buffer access is synchronized through atomic head/tail.
104// Multiple producers coordinate via CAS on tail.
105// Single consumer is enforced by API (Consumer is not Clone).
106unsafe impl Send for Shared {}
107unsafe impl Sync for Shared {}
108
109impl Drop for Shared {
110    fn drop(&mut self) {
111        // Safety: buffer was allocated with alloc_zeroed using this layout.
112        let layout = Layout::from_size_align(self.capacity, 8)
113            .expect("valid layout: capacity was validated at construction");
114        unsafe { dealloc(self.buffer, layout) };
115    }
116}
117
118// ============================================================================
119// Producer
120// ============================================================================
121
122/// Producer endpoint of the MPSC ring buffer.
123///
124/// This type is `Clone` - multiple producers can write concurrently.
125/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
126#[derive(Clone)]
127pub struct Producer {
128    /// Cached head position (Rigtorp-style optimization, per-producer).
129    cached_head: Cell<usize>,
130    /// Shared state.
131    shared: Arc<Shared>,
132}
133
134// Safety: Producer coordinates with other producers via atomic CAS.
135unsafe impl Send for Producer {}
136
137impl Producer {
138    /// Attempts to claim space for a record with the given payload length.
139    ///
140    /// Returns a [`WriteClaim`] that can be written to and then committed.
141    ///
142    /// # Errors
143    ///
144    /// - [`TryClaimError::ZeroLength`] if `len` is zero
145    /// - [`TryClaimError::Full`] if the buffer is full
146    ///
147    /// # Safety Contract
148    ///
149    /// `len` must not exceed `LEN_MASK`. This is checked with
150    /// `debug_assert!` only.
151    #[inline]
152    pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, TryClaimError> {
153        debug_assert!(len <= LEN_MASK, "payload too large");
154        if len == 0 {
155            return Err(TryClaimError::ZeroLength);
156        }
157
158        let record_size = align8(HEADER_SIZE + len);
159
160        // CAS loop to claim space
161        loop {
162            let tail = self.shared.tail.load(Ordering::Relaxed);
163
164            // Calculate used space. If cached_head is stale, used can exceed capacity.
165            // saturating_sub handles this gracefully (returns 0 if stale).
166            let used = tail.wrapping_sub(self.cached_head.get());
167            let available = self.shared.capacity.saturating_sub(used);
168
169            if available < record_size {
170                // Reload head from shared state
171                self.cached_head
172                    .set(self.shared.head.load(Ordering::Relaxed));
173                fence(Ordering::Acquire);
174
175                let used = tail.wrapping_sub(self.cached_head.get());
176                if used > self.shared.capacity || self.shared.capacity - used < record_size {
177                    return Err(TryClaimError::Full);
178                }
179            }
180
181            // Check if record fits before buffer end, or needs wrap
182            let offset = tail & self.shared.mask;
183            let space_to_end = self.shared.capacity - offset;
184
185            if space_to_end < record_size {
186                // Need to wrap. Check if we have space for padding + record at start.
187                let total_needed = space_to_end + record_size;
188
189                let used = tail.wrapping_sub(self.cached_head.get());
190                let available = self.shared.capacity.saturating_sub(used);
191
192                if available < total_needed {
193                    // Reload and recheck
194                    self.cached_head
195                        .set(self.shared.head.load(Ordering::Relaxed));
196                    fence(Ordering::Acquire);
197
198                    let used = tail.wrapping_sub(self.cached_head.get());
199                    if used > self.shared.capacity || self.shared.capacity - used < total_needed {
200                        return Err(TryClaimError::Full);
201                    }
202                }
203
204                // Try to claim the padding + record space
205                let new_tail = tail.wrapping_add(total_needed);
206                if self
207                    .shared
208                    .tail
209                    .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
210                    .is_ok()
211                {
212                    // We claimed the space. Write padding skip marker.
213                    let buffer = self.shared.buffer;
214                    let skip_len = space_to_end | SKIP_BIT;
215
216                    // Release fence before writing skip marker
217                    fence(Ordering::Release);
218                    let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
219                    unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
220
221                    return Ok(WriteClaim {
222                        shared: &self.shared,
223                        offset: 0, // Record starts at beginning after wrap
224                        len,
225                        record_size,
226                        committed: false,
227                    });
228                }
229                // CAS failed, retry
230                continue;
231            }
232
233            // Fits without wrapping
234            let new_tail = tail.wrapping_add(record_size);
235            if self
236                .shared
237                .tail
238                .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
239                .is_ok()
240            {
241                return Ok(WriteClaim {
242                    shared: &self.shared,
243                    offset,
244                    len,
245                    record_size,
246                    committed: false,
247                });
248            }
249            // CAS failed, retry
250        }
251    }
252
253    /// Returns the capacity of the buffer.
254    #[inline]
255    pub fn capacity(&self) -> usize {
256        self.shared.capacity
257    }
258
259    /// Best-effort hint: returns `true` if the consumer has likely been dropped.
260    ///
261    /// Uses `Arc::strong_count` which is inherently racy. For reliable
262    /// disconnection detection, use the channel layer (`channel::mpsc`).
263    #[inline]
264    pub fn is_disconnected(&self) -> bool {
265        Arc::strong_count(&self.shared) == 1
266    }
267}
268
269impl std::fmt::Debug for Producer {
270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271        f.debug_struct("Producer")
272            .field("capacity", &self.capacity())
273            .finish_non_exhaustive()
274    }
275}
276
277// ============================================================================
278// WriteClaim
279// ============================================================================
280
281/// A claimed region for writing a record.
282///
283/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
284/// when done writing to publish the record. If dropped without committing, a skip
285/// marker is written so the consumer can advance past the dead region.
286pub struct WriteClaim<'a> {
287    shared: &'a Shared,
288    offset: usize,
289    len: usize,
290    record_size: usize,
291    committed: bool,
292}
293
294impl WriteClaim<'_> {
295    /// Commits the record, making it visible to the consumer.
296    #[inline]
297    pub fn commit(mut self) {
298        self.do_commit();
299        self.committed = true;
300    }
301
302    #[inline]
303    fn do_commit(&mut self) {
304        let buffer = self.shared.buffer;
305        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
306
307        // Release fence: ensures payload writes are visible before len store
308        fence(Ordering::Release);
309        unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
310    }
311
312    /// Returns the length of the payload region.
313    #[inline]
314    pub fn len(&self) -> usize {
315        self.len
316    }
317
318    /// Returns `true` if the payload is empty (always false, len must be > 0).
319    #[inline]
320    pub fn is_empty(&self) -> bool {
321        false
322    }
323}
324
325impl Deref for WriteClaim<'_> {
326    type Target = [u8];
327
328    #[inline]
329    fn deref(&self) -> &Self::Target {
330        let buffer = self.shared.buffer;
331        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
332        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
333    }
334}
335
336impl DerefMut for WriteClaim<'_> {
337    #[inline]
338    fn deref_mut(&mut self) -> &mut Self::Target {
339        let buffer = self.shared.buffer;
340        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
341        unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
342    }
343}
344
345impl Drop for WriteClaim<'_> {
346    fn drop(&mut self) {
347        if !self.committed {
348            // Write skip marker so consumer can advance past this region
349            let buffer = self.shared.buffer;
350            let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
351            let skip_len = self.record_size | SKIP_BIT;
352
353            fence(Ordering::Release);
354            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
355        }
356    }
357}
358
359// ============================================================================
360// Consumer
361// ============================================================================
362
363/// Consumer endpoint of the MPSC ring buffer.
364///
365/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
366/// This type is NOT `Clone` - only one consumer is allowed.
367pub struct Consumer {
368    /// Local head position (free-running).
369    head: Cell<usize>,
370    /// Shared state.
371    shared: Arc<Shared>,
372}
373
374// Safety: Consumer is only used from one thread.
375unsafe impl Send for Consumer {}
376
377impl Consumer {
378    /// Attempts to claim the next record for reading.
379    ///
380    /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
381    /// to `&[u8]` for the payload. When dropped, the record region is zeroed
382    /// and the head is advanced.
383    ///
384    /// Returns `None` if no committed record is available.
385    #[inline]
386    pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
387        let buffer = self.shared.buffer;
388
389        loop {
390            let offset = self.head.get() & self.shared.mask;
391            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
392
393            // Relaxed atomic load, then Acquire fence for payload visibility
394            let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
395            fence(Ordering::Acquire);
396
397            if len_raw == 0 {
398                // Not committed yet
399                return None;
400            }
401
402            if len_raw & SKIP_BIT != 0 {
403                // Skip marker: zero the region and advance
404                let skip_size = len_raw & LEN_MASK;
405                // Zero payload first, then stamp last (mirrors write path)
406                if skip_size > HEADER_SIZE {
407                    unsafe {
408                        ptr::write_bytes(
409                            buffer.add(offset + HEADER_SIZE),
410                            0,
411                            skip_size - HEADER_SIZE,
412                        );
413                    }
414                }
415                // Ensure payload zeroing completes before clearing stamp
416                fence(Ordering::Release);
417                unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
418
419                self.head.set(self.head.get().wrapping_add(skip_size));
420
421                // Ensure stamp clear completes before head advance
422                fence(Ordering::Release);
423                self.shared.head.store(self.head.get(), Ordering::Relaxed);
424
425                // Continue to check next position
426                continue;
427            }
428
429            // Valid record
430            let len = len_raw;
431            let record_size = align8(HEADER_SIZE + len);
432
433            return Some(ReadClaim {
434                consumer: self,
435                offset,
436                len,
437                record_size,
438            });
439        }
440    }
441
442    /// Returns the capacity of the buffer.
443    #[inline]
444    pub fn capacity(&self) -> usize {
445        self.shared.capacity
446    }
447
448    /// Best-effort hint: returns `true` if all producers have likely been dropped.
449    ///
450    /// See producer's [`is_disconnected`](Producer::is_disconnected) for caveats.
451    #[inline]
452    pub fn is_disconnected(&self) -> bool {
453        Arc::strong_count(&self.shared) == 1
454    }
455}
456
457impl std::fmt::Debug for Consumer {
458    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
459        f.debug_struct("Consumer")
460            .field("capacity", &self.capacity())
461            .finish_non_exhaustive()
462    }
463}
464
465// ============================================================================
466// ReadClaim
467// ============================================================================
468
469/// A claimed record for reading.
470///
471/// Dereferences to `&[u8]` for the payload. When dropped, the record region
472/// is zeroed and the head is advanced, freeing space for producers.
473pub struct ReadClaim<'a> {
474    consumer: &'a mut Consumer,
475    offset: usize,
476    len: usize,
477    record_size: usize,
478}
479
480impl ReadClaim<'_> {
481    /// Returns the length of the payload.
482    #[inline]
483    pub fn len(&self) -> usize {
484        self.len
485    }
486
487    /// Returns `true` if the payload is empty.
488    #[inline]
489    pub fn is_empty(&self) -> bool {
490        self.len == 0
491    }
492}
493
494impl Deref for ReadClaim<'_> {
495    type Target = [u8];
496
497    #[inline]
498    fn deref(&self) -> &Self::Target {
499        let buffer = self.consumer.shared.buffer;
500        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
501        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
502    }
503}
504
505impl Drop for ReadClaim<'_> {
506    fn drop(&mut self) {
507        let buffer = self.consumer.shared.buffer;
508
509        // Zero payload first, then stamp last (mirrors write path)
510        if self.record_size > HEADER_SIZE {
511            unsafe {
512                ptr::write_bytes(
513                    buffer.add(self.offset + HEADER_SIZE),
514                    0,
515                    self.record_size - HEADER_SIZE,
516                );
517            }
518        }
519        // Ensure payload zeroing completes before clearing stamp
520        fence(Ordering::Release);
521        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
522        unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
523
524        // Advance head
525        let new_head = self.consumer.head.get().wrapping_add(self.record_size);
526        self.consumer.head.set(new_head);
527
528        // Ensure stamp clear completes before head advance
529        fence(Ordering::Release);
530        self.consumer.shared.head.store(new_head, Ordering::Relaxed);
531    }
532}
533
534// ============================================================================
535// Tests
536// ============================================================================
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541
542    #[test]
543    fn basic_write_read() {
544        let (mut prod, mut cons) = new(1024);
545
546        let payload = b"hello world";
547        let mut claim = prod.try_claim(payload.len()).unwrap();
548        claim.copy_from_slice(payload);
549        claim.commit();
550
551        let record = cons.try_claim().unwrap();
552        assert_eq!(&*record, payload);
553    }
554
555    #[test]
556    fn empty_returns_none() {
557        let (_, mut cons) = new(1024);
558        assert!(cons.try_claim().is_none());
559    }
560
561    #[test]
562    fn multiple_records() {
563        let (mut prod, mut cons) = new(1024);
564
565        for i in 0..10 {
566            let payload = format!("message {}", i);
567            let mut claim = prod.try_claim(payload.len()).unwrap();
568            claim.copy_from_slice(payload.as_bytes());
569            claim.commit();
570        }
571
572        for i in 0..10 {
573            let record = cons.try_claim().unwrap();
574            let expected = format!("message {}", i);
575            assert_eq!(&*record, expected.as_bytes());
576        }
577
578        assert!(cons.try_claim().is_none());
579    }
580
581    #[test]
582    #[allow(clippy::redundant_clone)]
583    fn producer_is_clone() {
584        let (prod, _cons) = new(1024);
585        let _prod2 = prod.clone();
586    }
587
588    #[test]
589    fn multiple_producers_single_consumer() {
590        use std::thread;
591
592        const PRODUCERS: usize = 4;
593        const MESSAGES_PER_PRODUCER: u64 = 10_000;
594        const TOTAL: u64 = PRODUCERS as u64 * MESSAGES_PER_PRODUCER;
595
596        let (prod, mut cons) = new(64 * 1024);
597
598        let handles: Vec<_> = (0..PRODUCERS)
599            .map(|producer_id| {
600                let mut prod = prod.clone();
601                thread::spawn(move || {
602                    for i in 0..MESSAGES_PER_PRODUCER {
603                        // Encode producer_id and sequence in payload
604                        let mut payload = [0u8; 16];
605                        payload[..8].copy_from_slice(&(producer_id as u64).to_le_bytes());
606                        payload[8..].copy_from_slice(&i.to_le_bytes());
607
608                        loop {
609                            match prod.try_claim(16) {
610                                Ok(mut claim) => {
611                                    claim.copy_from_slice(&payload);
612                                    claim.commit();
613                                    break;
614                                }
615                                Err(_) => std::hint::spin_loop(),
616                            }
617                        }
618                    }
619                })
620            })
621            .collect();
622
623        // Drop original producer
624        drop(prod);
625
626        // Consumer: track per-producer sequence
627        let consumer = thread::spawn(move || {
628            let mut received = 0u64;
629            let mut per_producer = vec![0u64; PRODUCERS];
630
631            while received < TOTAL {
632                if let Some(record) = cons.try_claim() {
633                    let producer_id = u64::from_le_bytes(record[..8].try_into().unwrap()) as usize;
634                    let seq = u64::from_le_bytes(record[8..].try_into().unwrap());
635
636                    // Each producer's messages should arrive in order
637                    assert_eq!(
638                        seq, per_producer[producer_id],
639                        "producer {} out of order",
640                        producer_id
641                    );
642                    per_producer[producer_id] += 1;
643                    received += 1;
644                } else {
645                    std::hint::spin_loop();
646                }
647            }
648
649            per_producer
650        });
651
652        for h in handles {
653            h.join().unwrap();
654        }
655
656        let per_producer = consumer.join().unwrap();
657        for (i, &count) in per_producer.iter().enumerate() {
658            assert_eq!(count, MESSAGES_PER_PRODUCER, "producer {} count", i);
659        }
660    }
661
662    #[test]
663    fn aborted_claim_creates_skip() {
664        let (mut prod, mut cons) = new(1024);
665
666        // Claim and drop without committing
667        {
668            let mut claim = prod.try_claim(10).unwrap();
669            claim.copy_from_slice(b"0123456789");
670            // drop without commit
671        }
672
673        // Write another record
674        {
675            let mut claim = prod.try_claim(5).unwrap();
676            claim.copy_from_slice(b"hello");
677            claim.commit();
678        }
679
680        // Consumer should skip the aborted record and read the committed one
681        let record = cons.try_claim().unwrap();
682        assert_eq!(&*record, b"hello");
683    }
684
685    #[test]
686    fn wrap_around() {
687        let (mut prod, mut cons) = new(64);
688
689        // Fill with messages that will cause wrap-around
690        for i in 0..20 {
691            let payload = format!("msg{:02}", i);
692            loop {
693                match prod.try_claim(payload.len()) {
694                    Ok(mut claim) => {
695                        claim.copy_from_slice(payload.as_bytes());
696                        claim.commit();
697                        break;
698                    }
699                    Err(_) => {
700                        // Drain some
701                        while cons.try_claim().is_some() {}
702                    }
703                }
704            }
705        }
706    }
707
708    #[test]
709    fn full_returns_error() {
710        let (mut prod, _cons) = new(64);
711
712        // Fill the buffer
713        let mut count = 0;
714        while let Ok(mut claim) = prod.try_claim(8) {
715            claim.copy_from_slice(b"12345678");
716            claim.commit();
717            count += 1;
718        }
719
720        assert!(count > 0);
721        assert!(prod.try_claim(8).is_err());
722    }
723
724    #[test]
725    fn disconnection_detection() {
726        let (prod, cons) = new(1024);
727
728        assert!(!prod.is_disconnected());
729        assert!(!cons.is_disconnected());
730
731        drop(cons);
732        assert!(prod.is_disconnected());
733    }
734
735    #[test]
736    #[should_panic(expected = "capacity must be at least 16")]
737    fn tiny_capacity_panics() {
738        let _ = new(8);
739    }
740
741    #[test]
742    fn zero_len_returns_error() {
743        let (mut prod, _) = new(1024);
744        assert!(matches!(prod.try_claim(0), Err(TryClaimError::ZeroLength)));
745    }
746
747    #[test]
748    fn capacity_rounds_to_power_of_two() {
749        let (prod, _) = new(100);
750        assert_eq!(prod.capacity(), 128);
751
752        let (prod, _) = new(1000);
753        assert_eq!(prod.capacity(), 1024);
754    }
755
756    /// High-volume stress test with multiple producers.
757    #[test]
758    fn stress_multiple_producers() {
759        use std::thread;
760
761        const PRODUCERS: usize = 4;
762        const COUNT_PER_PRODUCER: u64 = 100_000;
763        const TOTAL: u64 = PRODUCERS as u64 * COUNT_PER_PRODUCER;
764        const BUFFER_SIZE: usize = 64 * 1024;
765
766        let (prod, mut cons) = new(BUFFER_SIZE);
767
768        let handles: Vec<_> = (0..PRODUCERS)
769            .map(|_| {
770                let mut prod = prod.clone();
771                thread::spawn(move || {
772                    for i in 0..COUNT_PER_PRODUCER {
773                        let payload = i.to_le_bytes();
774                        loop {
775                            match prod.try_claim(payload.len()) {
776                                Ok(mut claim) => {
777                                    claim.copy_from_slice(&payload);
778                                    claim.commit();
779                                    break;
780                                }
781                                Err(_) => std::hint::spin_loop(),
782                            }
783                        }
784                    }
785                })
786            })
787            .collect();
788
789        drop(prod);
790
791        let consumer = thread::spawn(move || {
792            let mut received = 0u64;
793            let mut sum = 0u64;
794            while received < TOTAL {
795                if let Some(record) = cons.try_claim() {
796                    let value = u64::from_le_bytes((*record).try_into().unwrap());
797                    sum = sum.wrapping_add(value);
798                    received += 1;
799                } else {
800                    std::hint::spin_loop();
801                }
802            }
803            (received, sum)
804        });
805
806        for h in handles {
807            h.join().unwrap();
808        }
809
810        let (received, sum) = consumer.join().unwrap();
811        assert_eq!(received, TOTAL);
812
813        // Each producer sends 0..COUNT_PER_PRODUCER
814        // Sum per producer = COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2
815        let expected_sum = PRODUCERS as u64 * COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2;
816        assert_eq!(sum, expected_sum);
817    }
818}