Skip to main content

nexus_logbuf/queue/
mpsc.rs

1//! Multi-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared:                                                                 │
8//! │   head: CachePadded<AtomicUsize>  ← Consumer writes, producers read     │
9//! │   tail: CachePadded<AtomicUsize>  ← Producers CAS to claim space        │
10//! │   buffer: *mut u8                                                       │
11//! │   capacity: usize                 (power of 2)                          │
12//! │   mask: usize                     (capacity - 1)                        │
13//! └─────────────────────────────────────────────────────────────────────────┘
14//!
15//! ┌─────────────────────────────────┐   ┌─────────────────────────────────┐
16//! │ Producer (cloneable):           │   │ Consumer:                       │
17//! │   cached_head: usize (local)    │   │   head: usize        (local)    │
18//! │   shared: Arc<Shared>           │   │                                 │
19//! └─────────────────────────────────┘   └─────────────────────────────────┘
20//! ```
21//!
22//! # Differences from SPSC
23//!
24//! - Tail is atomic in shared state (not local to producer)
25//! - Producers use CAS loop to claim space
26//! - Producer is `Clone` - multiple producers allowed
27//! - Synchronization: Relaxed CAS on tail, Release on len commit, Acquire on len read
28//!
29//! # Record Layout
30//!
31//! Same as SPSC - see [`crate::spsc`] for details.
32
33use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
34use std::cell::Cell;
35use std::ops::{Deref, DerefMut};
36use std::ptr;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicUsize, Ordering, fence};
39
40use crossbeam_utils::CachePadded;
41
42use crate::{LEN_MASK, SKIP_BIT, TryClaimError, align8};
43
44/// Header size in bytes — one system word (`usize`).
45///
46/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
47const HEADER_SIZE: usize = std::mem::size_of::<usize>();
48
49/// Creates a bounded MPSC byte ring buffer.
50///
51/// Capacity is rounded up to the next power of two.
52///
53/// # Panics
54///
55/// Panics if `capacity` is zero or less than 16 bytes.
56pub fn new(capacity: usize) -> (Producer, Consumer) {
57    assert!(capacity >= 16, "capacity must be at least 16 bytes");
58
59    let capacity = capacity.next_power_of_two();
60    let mask = capacity - 1;
61
62    // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
63    let layout = Layout::from_size_align(capacity, 8).unwrap();
64    let buffer_ptr = unsafe { alloc_zeroed(layout) };
65    if buffer_ptr.is_null() {
66        handle_alloc_error(layout);
67    }
68
69    let shared = Arc::new(Shared {
70        head: CachePadded::new(AtomicUsize::new(0)),
71        tail: CachePadded::new(AtomicUsize::new(0)),
72        buffer: buffer_ptr,
73        capacity,
74        mask,
75    });
76
77    (
78        Producer {
79            cached_head: Cell::new(0),
80            shared: Arc::clone(&shared),
81        },
82        Consumer {
83            head: Cell::new(0),
84            shared,
85        },
86    )
87}
88
89struct Shared {
90    /// Consumer's read position. Updated by consumer, read by producers.
91    head: CachePadded<AtomicUsize>,
92    /// Producers' write position. CAS'd by producers.
93    tail: CachePadded<AtomicUsize>,
94    /// Buffer pointer.
95    buffer: *mut u8,
96    /// Buffer capacity (power of 2).
97    capacity: usize,
98    /// Mask for wrapping (capacity - 1).
99    mask: usize,
100}
101
102// Safety: Buffer access is synchronized through atomic head/tail.
103// Multiple producers coordinate via CAS on tail.
104// Single consumer is enforced by API (Consumer is not Clone).
105unsafe impl Send for Shared {}
106unsafe impl Sync for Shared {}
107
108impl Drop for Shared {
109    fn drop(&mut self) {
110        // Safety: buffer was allocated with alloc_zeroed using this layout.
111        let layout = Layout::from_size_align(self.capacity, 8).unwrap();
112        unsafe { dealloc(self.buffer, layout) };
113    }
114}
115
116// ============================================================================
117// Producer
118// ============================================================================
119
120/// Producer endpoint of the MPSC ring buffer.
121///
122/// This type is `Clone` - multiple producers can write concurrently.
123/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
124#[derive(Clone)]
125pub struct Producer {
126    /// Cached head position (Rigtorp-style optimization, per-producer).
127    cached_head: Cell<usize>,
128    /// Shared state.
129    shared: Arc<Shared>,
130}
131
132// Safety: Producer coordinates with other producers via atomic CAS.
133unsafe impl Send for Producer {}
134
135impl Producer {
136    /// Attempts to claim space for a record with the given payload length.
137    ///
138    /// Returns a [`WriteClaim`] that can be written to and then committed.
139    ///
140    /// # Errors
141    ///
142    /// - [`TryClaimError::ZeroLength`] if `len` is zero
143    /// - [`TryClaimError::Full`] if the buffer is full
144    ///
145    /// # Safety Contract
146    ///
147    /// `len` must not exceed `LEN_MASK`. This is checked with
148    /// `debug_assert!` only.
149    #[inline]
150    pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, TryClaimError> {
151        debug_assert!(len <= LEN_MASK, "payload too large");
152        if len == 0 {
153            return Err(TryClaimError::ZeroLength);
154        }
155
156        let record_size = align8(HEADER_SIZE + len);
157
158        // CAS loop to claim space
159        loop {
160            let tail = self.shared.tail.load(Ordering::Relaxed);
161
162            // Calculate used space. If cached_head is stale, used can exceed capacity.
163            // saturating_sub handles this gracefully (returns 0 if stale).
164            let used = tail.wrapping_sub(self.cached_head.get());
165            let available = self.shared.capacity.saturating_sub(used);
166
167            if available < record_size {
168                // Reload head from shared state
169                self.cached_head
170                    .set(self.shared.head.load(Ordering::Relaxed));
171                fence(Ordering::Acquire);
172
173                let used = tail.wrapping_sub(self.cached_head.get());
174                if used > self.shared.capacity || self.shared.capacity - used < record_size {
175                    return Err(TryClaimError::Full);
176                }
177            }
178
179            // Check if record fits before buffer end, or needs wrap
180            let offset = tail & self.shared.mask;
181            let space_to_end = self.shared.capacity - offset;
182
183            if space_to_end < record_size {
184                // Need to wrap. Check if we have space for padding + record at start.
185                let total_needed = space_to_end + record_size;
186
187                let used = tail.wrapping_sub(self.cached_head.get());
188                let available = self.shared.capacity.saturating_sub(used);
189
190                if available < total_needed {
191                    // Reload and recheck
192                    self.cached_head
193                        .set(self.shared.head.load(Ordering::Relaxed));
194                    fence(Ordering::Acquire);
195
196                    let used = tail.wrapping_sub(self.cached_head.get());
197                    if used > self.shared.capacity || self.shared.capacity - used < total_needed {
198                        return Err(TryClaimError::Full);
199                    }
200                }
201
202                // Try to claim the padding + record space
203                let new_tail = tail.wrapping_add(total_needed);
204                if self
205                    .shared
206                    .tail
207                    .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
208                    .is_ok()
209                {
210                    // We claimed the space. Write padding skip marker.
211                    let buffer = self.shared.buffer;
212                    let skip_len = space_to_end | SKIP_BIT;
213
214                    // Release fence before writing skip marker
215                    fence(Ordering::Release);
216                    let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
217                    unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
218
219                    return Ok(WriteClaim {
220                        shared: &self.shared,
221                        offset: 0, // Record starts at beginning after wrap
222                        len,
223                        record_size,
224                        committed: false,
225                    });
226                }
227                // CAS failed, retry
228                continue;
229            }
230
231            // Fits without wrapping
232            let new_tail = tail.wrapping_add(record_size);
233            if self
234                .shared
235                .tail
236                .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
237                .is_ok()
238            {
239                return Ok(WriteClaim {
240                    shared: &self.shared,
241                    offset,
242                    len,
243                    record_size,
244                    committed: false,
245                });
246            }
247            // CAS failed, retry
248        }
249    }
250
251    /// Returns the capacity of the buffer.
252    #[inline]
253    pub fn capacity(&self) -> usize {
254        self.shared.capacity
255    }
256
257    /// Best-effort hint: returns `true` if the consumer has likely been dropped.
258    ///
259    /// Uses `Arc::strong_count` which is inherently racy. For reliable
260    /// disconnection detection, use the channel layer (`channel::mpsc`).
261    #[inline]
262    pub fn is_disconnected(&self) -> bool {
263        Arc::strong_count(&self.shared) == 1
264    }
265}
266
267impl std::fmt::Debug for Producer {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        f.debug_struct("Producer")
270            .field("capacity", &self.capacity())
271            .finish_non_exhaustive()
272    }
273}
274
275// ============================================================================
276// WriteClaim
277// ============================================================================
278
279/// A claimed region for writing a record.
280///
281/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
282/// when done writing to publish the record. If dropped without committing, a skip
283/// marker is written so the consumer can advance past the dead region.
284pub struct WriteClaim<'a> {
285    shared: &'a Shared,
286    offset: usize,
287    len: usize,
288    record_size: usize,
289    committed: bool,
290}
291
292impl WriteClaim<'_> {
293    /// Commits the record, making it visible to the consumer.
294    #[inline]
295    pub fn commit(mut self) {
296        self.do_commit();
297        self.committed = true;
298    }
299
300    #[inline]
301    fn do_commit(&mut self) {
302        let buffer = self.shared.buffer;
303        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
304
305        // Release fence: ensures payload writes are visible before len store
306        fence(Ordering::Release);
307        unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
308    }
309
310    /// Returns the length of the payload region.
311    #[inline]
312    pub fn len(&self) -> usize {
313        self.len
314    }
315
316    /// Returns `true` if the payload is empty (always false, len must be > 0).
317    #[inline]
318    pub fn is_empty(&self) -> bool {
319        false
320    }
321}
322
323impl Deref for WriteClaim<'_> {
324    type Target = [u8];
325
326    #[inline]
327    fn deref(&self) -> &Self::Target {
328        let buffer = self.shared.buffer;
329        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
330        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
331    }
332}
333
334impl DerefMut for WriteClaim<'_> {
335    #[inline]
336    fn deref_mut(&mut self) -> &mut Self::Target {
337        let buffer = self.shared.buffer;
338        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
339        unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
340    }
341}
342
343impl Drop for WriteClaim<'_> {
344    fn drop(&mut self) {
345        if !self.committed {
346            // Write skip marker so consumer can advance past this region
347            let buffer = self.shared.buffer;
348            let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
349            let skip_len = self.record_size | SKIP_BIT;
350
351            fence(Ordering::Release);
352            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
353        }
354    }
355}
356
357// ============================================================================
358// Consumer
359// ============================================================================
360
361/// Consumer endpoint of the MPSC ring buffer.
362///
363/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
364/// This type is NOT `Clone` - only one consumer is allowed.
365pub struct Consumer {
366    /// Local head position (free-running).
367    head: Cell<usize>,
368    /// Shared state.
369    shared: Arc<Shared>,
370}
371
372// Safety: Consumer is only used from one thread.
373unsafe impl Send for Consumer {}
374
375impl Consumer {
376    /// Attempts to claim the next record for reading.
377    ///
378    /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
379    /// to `&[u8]` for the payload. When dropped, the record region is zeroed
380    /// and the head is advanced.
381    ///
382    /// Returns `None` if no committed record is available.
383    #[inline]
384    pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
385        let buffer = self.shared.buffer;
386
387        loop {
388            let offset = self.head.get() & self.shared.mask;
389            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
390
391            // Relaxed atomic load, then Acquire fence for payload visibility
392            let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
393            fence(Ordering::Acquire);
394
395            if len_raw == 0 {
396                // Not committed yet
397                return None;
398            }
399
400            if len_raw & SKIP_BIT != 0 {
401                // Skip marker: zero the region and advance
402                let skip_size = len_raw & LEN_MASK;
403                // Zero payload first, then stamp last (mirrors write path)
404                if skip_size > HEADER_SIZE {
405                    unsafe {
406                        ptr::write_bytes(
407                            buffer.add(offset + HEADER_SIZE),
408                            0,
409                            skip_size - HEADER_SIZE,
410                        );
411                    }
412                }
413                // Ensure payload zeroing completes before clearing stamp
414                fence(Ordering::Release);
415                unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
416
417                self.head.set(self.head.get().wrapping_add(skip_size));
418
419                // Ensure stamp clear completes before head advance
420                fence(Ordering::Release);
421                self.shared.head.store(self.head.get(), Ordering::Relaxed);
422
423                // Continue to check next position
424                continue;
425            }
426
427            // Valid record
428            let len = len_raw;
429            let record_size = align8(HEADER_SIZE + len);
430
431            return Some(ReadClaim {
432                consumer: self,
433                offset,
434                len,
435                record_size,
436            });
437        }
438    }
439
440    /// Returns the capacity of the buffer.
441    #[inline]
442    pub fn capacity(&self) -> usize {
443        self.shared.capacity
444    }
445
446    /// Best-effort hint: returns `true` if all producers have likely been dropped.
447    ///
448    /// See producer's [`is_disconnected`](Producer::is_disconnected) for caveats.
449    #[inline]
450    pub fn is_disconnected(&self) -> bool {
451        Arc::strong_count(&self.shared) == 1
452    }
453}
454
455impl std::fmt::Debug for Consumer {
456    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457        f.debug_struct("Consumer")
458            .field("capacity", &self.capacity())
459            .finish_non_exhaustive()
460    }
461}
462
463// ============================================================================
464// ReadClaim
465// ============================================================================
466
467/// A claimed record for reading.
468///
469/// Dereferences to `&[u8]` for the payload. When dropped, the record region
470/// is zeroed and the head is advanced, freeing space for producers.
471pub struct ReadClaim<'a> {
472    consumer: &'a mut Consumer,
473    offset: usize,
474    len: usize,
475    record_size: usize,
476}
477
478impl ReadClaim<'_> {
479    /// Returns the length of the payload.
480    #[inline]
481    pub fn len(&self) -> usize {
482        self.len
483    }
484
485    /// Returns `true` if the payload is empty.
486    #[inline]
487    pub fn is_empty(&self) -> bool {
488        self.len == 0
489    }
490}
491
492impl Deref for ReadClaim<'_> {
493    type Target = [u8];
494
495    #[inline]
496    fn deref(&self) -> &Self::Target {
497        let buffer = self.consumer.shared.buffer;
498        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
499        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
500    }
501}
502
503impl Drop for ReadClaim<'_> {
504    fn drop(&mut self) {
505        let buffer = self.consumer.shared.buffer;
506
507        // Zero payload first, then stamp last (mirrors write path)
508        if self.record_size > HEADER_SIZE {
509            unsafe {
510                ptr::write_bytes(
511                    buffer.add(self.offset + HEADER_SIZE),
512                    0,
513                    self.record_size - HEADER_SIZE,
514                );
515            }
516        }
517        // Ensure payload zeroing completes before clearing stamp
518        fence(Ordering::Release);
519        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
520        unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
521
522        // Advance head
523        let new_head = self.consumer.head.get().wrapping_add(self.record_size);
524        self.consumer.head.set(new_head);
525
526        // Ensure stamp clear completes before head advance
527        fence(Ordering::Release);
528        self.consumer.shared.head.store(new_head, Ordering::Relaxed);
529    }
530}
531
532// ============================================================================
533// Tests
534// ============================================================================
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539
540    #[test]
541    fn basic_write_read() {
542        let (mut prod, mut cons) = new(1024);
543
544        let payload = b"hello world";
545        let mut claim = prod.try_claim(payload.len()).unwrap();
546        claim.copy_from_slice(payload);
547        claim.commit();
548
549        let record = cons.try_claim().unwrap();
550        assert_eq!(&*record, payload);
551    }
552
553    #[test]
554    fn empty_returns_none() {
555        let (_, mut cons) = new(1024);
556        assert!(cons.try_claim().is_none());
557    }
558
559    #[test]
560    fn multiple_records() {
561        let (mut prod, mut cons) = new(1024);
562
563        for i in 0..10 {
564            let payload = format!("message {}", i);
565            let mut claim = prod.try_claim(payload.len()).unwrap();
566            claim.copy_from_slice(payload.as_bytes());
567            claim.commit();
568        }
569
570        for i in 0..10 {
571            let record = cons.try_claim().unwrap();
572            let expected = format!("message {}", i);
573            assert_eq!(&*record, expected.as_bytes());
574        }
575
576        assert!(cons.try_claim().is_none());
577    }
578
579    #[test]
580    #[allow(clippy::redundant_clone)]
581    fn producer_is_clone() {
582        let (prod, _cons) = new(1024);
583        let _prod2 = prod.clone();
584    }
585
586    #[test]
587    fn multiple_producers_single_consumer() {
588        use std::thread;
589
590        const PRODUCERS: usize = 4;
591        const MESSAGES_PER_PRODUCER: u64 = 10_000;
592        const TOTAL: u64 = PRODUCERS as u64 * MESSAGES_PER_PRODUCER;
593
594        let (prod, mut cons) = new(64 * 1024);
595
596        let handles: Vec<_> = (0..PRODUCERS)
597            .map(|producer_id| {
598                let mut prod = prod.clone();
599                thread::spawn(move || {
600                    for i in 0..MESSAGES_PER_PRODUCER {
601                        // Encode producer_id and sequence in payload
602                        let mut payload = [0u8; 16];
603                        payload[..8].copy_from_slice(&(producer_id as u64).to_le_bytes());
604                        payload[8..].copy_from_slice(&i.to_le_bytes());
605
606                        loop {
607                            match prod.try_claim(16) {
608                                Ok(mut claim) => {
609                                    claim.copy_from_slice(&payload);
610                                    claim.commit();
611                                    break;
612                                }
613                                Err(_) => std::hint::spin_loop(),
614                            }
615                        }
616                    }
617                })
618            })
619            .collect();
620
621        // Drop original producer
622        drop(prod);
623
624        // Consumer: track per-producer sequence
625        let consumer = thread::spawn(move || {
626            let mut received = 0u64;
627            let mut per_producer = vec![0u64; PRODUCERS];
628
629            while received < TOTAL {
630                if let Some(record) = cons.try_claim() {
631                    let producer_id = u64::from_le_bytes(record[..8].try_into().unwrap()) as usize;
632                    let seq = u64::from_le_bytes(record[8..].try_into().unwrap());
633
634                    // Each producer's messages should arrive in order
635                    assert_eq!(
636                        seq, per_producer[producer_id],
637                        "producer {} out of order",
638                        producer_id
639                    );
640                    per_producer[producer_id] += 1;
641                    received += 1;
642                } else {
643                    std::hint::spin_loop();
644                }
645            }
646
647            per_producer
648        });
649
650        for h in handles {
651            h.join().unwrap();
652        }
653
654        let per_producer = consumer.join().unwrap();
655        for (i, &count) in per_producer.iter().enumerate() {
656            assert_eq!(count, MESSAGES_PER_PRODUCER, "producer {} count", i);
657        }
658    }
659
660    #[test]
661    fn aborted_claim_creates_skip() {
662        let (mut prod, mut cons) = new(1024);
663
664        // Claim and drop without committing
665        {
666            let mut claim = prod.try_claim(10).unwrap();
667            claim.copy_from_slice(b"0123456789");
668            // drop without commit
669        }
670
671        // Write another record
672        {
673            let mut claim = prod.try_claim(5).unwrap();
674            claim.copy_from_slice(b"hello");
675            claim.commit();
676        }
677
678        // Consumer should skip the aborted record and read the committed one
679        let record = cons.try_claim().unwrap();
680        assert_eq!(&*record, b"hello");
681    }
682
683    #[test]
684    fn wrap_around() {
685        let (mut prod, mut cons) = new(64);
686
687        // Fill with messages that will cause wrap-around
688        for i in 0..20 {
689            let payload = format!("msg{:02}", i);
690            loop {
691                match prod.try_claim(payload.len()) {
692                    Ok(mut claim) => {
693                        claim.copy_from_slice(payload.as_bytes());
694                        claim.commit();
695                        break;
696                    }
697                    Err(_) => {
698                        // Drain some
699                        while cons.try_claim().is_some() {}
700                    }
701                }
702            }
703        }
704    }
705
706    #[test]
707    fn full_returns_error() {
708        let (mut prod, _cons) = new(64);
709
710        // Fill the buffer
711        let mut count = 0;
712        while let Ok(mut claim) = prod.try_claim(8) {
713            claim.copy_from_slice(b"12345678");
714            claim.commit();
715            count += 1;
716        }
717
718        assert!(count > 0);
719        assert!(prod.try_claim(8).is_err());
720    }
721
722    #[test]
723    fn disconnection_detection() {
724        let (prod, cons) = new(1024);
725
726        assert!(!prod.is_disconnected());
727        assert!(!cons.is_disconnected());
728
729        drop(cons);
730        assert!(prod.is_disconnected());
731    }
732
733    #[test]
734    #[should_panic(expected = "capacity must be at least 16")]
735    fn tiny_capacity_panics() {
736        let _ = new(8);
737    }
738
739    #[test]
740    fn zero_len_returns_error() {
741        let (mut prod, _) = new(1024);
742        assert!(matches!(prod.try_claim(0), Err(TryClaimError::ZeroLength)));
743    }
744
745    #[test]
746    fn capacity_rounds_to_power_of_two() {
747        let (prod, _) = new(100);
748        assert_eq!(prod.capacity(), 128);
749
750        let (prod, _) = new(1000);
751        assert_eq!(prod.capacity(), 1024);
752    }
753
754    /// High-volume stress test with multiple producers.
755    #[test]
756    fn stress_multiple_producers() {
757        use std::thread;
758
759        const PRODUCERS: usize = 4;
760        const COUNT_PER_PRODUCER: u64 = 100_000;
761        const TOTAL: u64 = PRODUCERS as u64 * COUNT_PER_PRODUCER;
762        const BUFFER_SIZE: usize = 64 * 1024;
763
764        let (prod, mut cons) = new(BUFFER_SIZE);
765
766        let handles: Vec<_> = (0..PRODUCERS)
767            .map(|_| {
768                let mut prod = prod.clone();
769                thread::spawn(move || {
770                    for i in 0..COUNT_PER_PRODUCER {
771                        let payload = i.to_le_bytes();
772                        loop {
773                            match prod.try_claim(payload.len()) {
774                                Ok(mut claim) => {
775                                    claim.copy_from_slice(&payload);
776                                    claim.commit();
777                                    break;
778                                }
779                                Err(_) => std::hint::spin_loop(),
780                            }
781                        }
782                    }
783                })
784            })
785            .collect();
786
787        drop(prod);
788
789        let consumer = thread::spawn(move || {
790            let mut received = 0u64;
791            let mut sum = 0u64;
792            while received < TOTAL {
793                if let Some(record) = cons.try_claim() {
794                    let value = u64::from_le_bytes((*record).try_into().unwrap());
795                    sum = sum.wrapping_add(value);
796                    received += 1;
797                } else {
798                    std::hint::spin_loop();
799                }
800            }
801            (received, sum)
802        });
803
804        for h in handles {
805            h.join().unwrap();
806        }
807
808        let (received, sum) = consumer.join().unwrap();
809        assert_eq!(received, TOTAL);
810
811        // Each producer sends 0..COUNT_PER_PRODUCER
812        // Sum per producer = COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2
813        let expected_sum = PRODUCERS as u64 * COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2;
814        assert_eq!(sum, expected_sum);
815    }
816}