Skip to main content

nexus_logbuf/queue/
mpsc.rs

1//! Multi-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared:                                                                 │
8//! │   head: CachePadded<AtomicUsize>  ← Consumer writes, producers read     │
9//! │   tail: CachePadded<AtomicUsize>  ← Producers CAS to claim space        │
10//! │   buffer: *mut u8                                                       │
11//! │   capacity: usize                 (power of 2)                          │
12//! │   mask: usize                     (capacity - 1)                        │
13//! └─────────────────────────────────────────────────────────────────────────┘
14//!
15//! ┌─────────────────────────────────┐   ┌─────────────────────────────────┐
16//! │ Producer (cloneable):           │   │ Consumer:                       │
17//! │   cached_head: usize (local)    │   │   head: usize        (local)    │
18//! │   shared: Arc<Shared>           │   │                                 │
19//! └─────────────────────────────────┘   └─────────────────────────────────┘
20//! ```
21//!
22//! # Differences from SPSC
23//!
24//! - Tail is atomic in shared state (not local to producer)
25//! - Producers use CAS loop to claim space
26//! - Producer is `Clone` - multiple producers allowed
27//! - Synchronization: Relaxed CAS on tail, Release on len commit, Acquire on len read
28//!
29//! # Record Layout
30//!
31//! Same as SPSC - see [`crate::spsc`] for details.
32
33use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
34use std::cell::Cell;
35use std::ops::{Deref, DerefMut};
36use std::ptr;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicUsize, Ordering, fence};
39
40use crossbeam_utils::CachePadded;
41
42use crate::{BufferFull, LEN_MASK, SKIP_BIT, align8};
43
44/// Header size in bytes — one system word (`usize`).
45///
46/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
47const HEADER_SIZE: usize = std::mem::size_of::<usize>();
48
49/// Creates a bounded MPSC byte ring buffer.
50///
51/// Capacity is rounded up to the next power of two.
52///
53/// # Panics
54///
55/// Panics if `capacity` is zero or less than 16 bytes.
56pub fn new(capacity: usize) -> (Producer, Consumer) {
57    assert!(capacity >= 16, "capacity must be at least 16 bytes");
58
59    let capacity = capacity.next_power_of_two();
60    let mask = capacity - 1;
61
62    // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
63    let layout = Layout::from_size_align(capacity, 8)
64        .expect("valid layout: capacity is a power of two >= 16, align is 8");
65    // SAFETY: Layout is valid — capacity >= 16 (power of two), align is 8.
66    let buffer_ptr = unsafe { alloc_zeroed(layout) };
67    if buffer_ptr.is_null() {
68        handle_alloc_error(layout);
69    }
70
71    let shared = Arc::new(Shared {
72        head: CachePadded::new(AtomicUsize::new(0)),
73        tail: CachePadded::new(AtomicUsize::new(0)),
74        buffer: buffer_ptr,
75        capacity,
76        mask,
77    });
78
79    (
80        Producer {
81            cached_head: Cell::new(0),
82            shared: Arc::clone(&shared),
83        },
84        Consumer {
85            head: Cell::new(0),
86            shared,
87        },
88    )
89}
90
91struct Shared {
92    /// Consumer's read position. Updated by consumer, read by producers.
93    head: CachePadded<AtomicUsize>,
94    /// Producers' write position. CAS'd by producers.
95    tail: CachePadded<AtomicUsize>,
96    /// Buffer pointer.
97    buffer: *mut u8,
98    /// Buffer capacity (power of 2).
99    capacity: usize,
100    /// Mask for wrapping (capacity - 1).
101    mask: usize,
102}
103
104// Safety: Buffer access is synchronized through atomic head/tail.
105// Multiple producers coordinate via CAS on tail.
106// Single consumer is enforced by API (Consumer is not Clone).
107unsafe impl Send for Shared {}
108unsafe impl Sync for Shared {}
109
110impl Drop for Shared {
111    fn drop(&mut self) {
112        let layout = Layout::from_size_align(self.capacity, 8)
113            .expect("valid layout: capacity was validated at construction");
114        // SAFETY: buffer was allocated with alloc_zeroed using this exact layout.
115        // Shared is only dropped once (Arc prevents earlier drops).
116        unsafe { dealloc(self.buffer, layout) };
117    }
118}
119
120// ============================================================================
121// Producer
122// ============================================================================
123
124/// Producer endpoint of the MPSC ring buffer.
125///
126/// This type is `Clone` - multiple producers can write concurrently.
127/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
128#[derive(Clone)]
129pub struct Producer {
130    /// Cached head position (Rigtorp-style optimization, per-producer).
131    cached_head: Cell<usize>,
132    /// Shared state.
133    shared: Arc<Shared>,
134}
135
136// Safety: Producer coordinates with other producers via atomic CAS.
137unsafe impl Send for Producer {}
138
139impl Producer {
140    /// Attempts to claim space for a record with the given payload length.
141    ///
142    /// Returns a [`WriteClaim`] that can be written to and then committed.
143    ///
144    /// # Errors
145    ///
146    /// Returns [`BufferFull`] if the buffer has no space for the record.
147    ///
148    /// # Panics
149    ///
150    /// Panics if `len == 0`. The wire format reserves `len == 0` as the
151    /// "uncommitted" sentinel — letting it through would silently hang the
152    /// consumer. Aborting a non-zero claim is fully supported (drop the
153    /// [`WriteClaim`] without committing); only claiming zero bytes upfront
154    /// is forbidden.
155    ///
156    /// # Safety Contract
157    ///
158    /// `len` must not exceed `LEN_MASK`. On 64-bit this is ~9.2 exabytes
159    /// (unreachable in practice). On 32-bit, records >2GB could set
160    /// `SKIP_BIT` and corrupt the stream — enforced with `assert!`.
161    #[inline]
162    pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, BufferFull> {
163        assert!(len > 0, "payload length must be non-zero");
164        #[cfg(target_pointer_width = "32")]
165        assert!(len <= LEN_MASK, "payload too large for 32-bit logbuf");
166        #[cfg(not(target_pointer_width = "32"))]
167        debug_assert!(len <= LEN_MASK, "payload too large");
168
169        let record_size = align8(HEADER_SIZE + len);
170
171        // CAS loop to claim space
172        loop {
173            let tail = self.shared.tail.load(Ordering::Relaxed);
174
175            // Calculate used space. If cached_head is stale, used can exceed capacity.
176            // saturating_sub handles this gracefully (returns 0 if stale).
177            let used = tail.wrapping_sub(self.cached_head.get());
178            let available = self.shared.capacity.saturating_sub(used);
179
180            if available < record_size {
181                // Reload head from shared state
182                self.cached_head
183                    .set(self.shared.head.load(Ordering::Relaxed));
184                fence(Ordering::Acquire);
185
186                let used = tail.wrapping_sub(self.cached_head.get());
187                if used > self.shared.capacity || self.shared.capacity - used < record_size {
188                    return Err(BufferFull);
189                }
190            }
191
192            // Check if record fits before buffer end, or needs wrap
193            let offset = tail & self.shared.mask;
194            let space_to_end = self.shared.capacity - offset;
195
196            if space_to_end < record_size {
197                // Need to wrap. Check if we have space for padding + record at start.
198                let total_needed = space_to_end + record_size;
199
200                let used = tail.wrapping_sub(self.cached_head.get());
201                let available = self.shared.capacity.saturating_sub(used);
202
203                if available < total_needed {
204                    // Reload and recheck
205                    self.cached_head
206                        .set(self.shared.head.load(Ordering::Relaxed));
207                    fence(Ordering::Acquire);
208
209                    let used = tail.wrapping_sub(self.cached_head.get());
210                    if used > self.shared.capacity || self.shared.capacity - used < total_needed {
211                        return Err(BufferFull);
212                    }
213                }
214
215                // Try to claim the padding + record space
216                let new_tail = tail.wrapping_add(total_needed);
217                if self
218                    .shared
219                    .tail
220                    .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
221                    .is_ok()
222                {
223                    // We claimed the space. Write padding skip marker.
224                    let buffer = self.shared.buffer;
225                    let skip_len = space_to_end | SKIP_BIT;
226
227                    // Release fence before writing skip marker
228                    fence(Ordering::Release);
229                    // SAFETY: offset is masked to [0, capacity), 8-byte aligned.
230                    // CAS success guarantees exclusive ownership of this region.
231                    let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
232                    // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
233                    unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
234
235                    return Ok(WriteClaim {
236                        shared: &self.shared,
237                        offset: 0, // Record starts at beginning after wrap
238                        len,
239                        record_size,
240                        committed: false,
241                    });
242                }
243                // CAS failed — another producer claimed first. Pause to
244                // reduce pipeline pressure, then retry with fresh tail.
245                core::hint::spin_loop();
246                continue;
247            }
248
249            // Fits without wrapping
250            let new_tail = tail.wrapping_add(record_size);
251            if self
252                .shared
253                .tail
254                .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
255                .is_ok()
256            {
257                return Ok(WriteClaim {
258                    shared: &self.shared,
259                    offset,
260                    len,
261                    record_size,
262                    committed: false,
263                });
264            }
265            // CAS failed — pause before retry.
266            core::hint::spin_loop();
267        }
268    }
269
270    /// Returns the capacity of the buffer.
271    #[inline]
272    pub fn capacity(&self) -> usize {
273        self.shared.capacity
274    }
275
276    /// Best-effort hint: returns `true` if the consumer has likely been dropped.
277    ///
278    /// Uses `Arc::strong_count` which is inherently racy. For reliable
279    /// disconnection detection, use the channel layer (`channel::mpsc`).
280    #[inline]
281    pub fn is_disconnected(&self) -> bool {
282        Arc::strong_count(&self.shared) == 1
283    }
284}
285
286impl std::fmt::Debug for Producer {
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        f.debug_struct("Producer")
289            .field("capacity", &self.capacity())
290            .finish_non_exhaustive()
291    }
292}
293
294// ============================================================================
295// WriteClaim
296// ============================================================================
297
298/// A claimed region for writing a record.
299///
300/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
301/// when done writing to publish the record. If dropped without committing, a skip
302/// marker is written so the consumer can advance past the dead region.
303///
304/// # Important
305///
306/// Leaking a `WriteClaim` via [`mem::forget`](std::mem::forget) will permanently
307/// block the consumer at this record's offset. This is not undefined behavior
308/// but causes an unrecoverable deadlock. Always drop or explicitly abort claims.
309pub struct WriteClaim<'a> {
310    shared: &'a Shared,
311    offset: usize,
312    len: usize,
313    record_size: usize,
314    committed: bool,
315}
316
317impl WriteClaim<'_> {
318    /// Commits the record, making it visible to the consumer.
319    #[inline]
320    pub fn commit(mut self) {
321        self.do_commit();
322        self.committed = true;
323    }
324
325    #[inline]
326    fn do_commit(&mut self) {
327        let buffer = self.shared.buffer;
328        // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
329        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
330
331        // Release fence: ensures payload writes are visible before len store
332        fence(Ordering::Release);
333        // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
334        unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
335    }
336
337    /// Returns the length of the payload region.
338    #[inline]
339    pub fn len(&self) -> usize {
340        self.len
341    }
342
343    /// Returns `true` if the payload is empty (always false, len must be > 0).
344    #[inline]
345    pub fn is_empty(&self) -> bool {
346        false
347    }
348}
349
350impl Deref for WriteClaim<'_> {
351    type Target = [u8];
352
353    #[inline]
354    fn deref(&self) -> &Self::Target {
355        let buffer = self.shared.buffer;
356        // SAFETY: offset + HEADER_SIZE is within the buffer. CAS atomically
357        // advances tail past this region before the pointer is constructed,
358        // guaranteeing disjoint regions — no two WriteClaims overlap in the buffer.
359        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
360        // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
361        // and exclusively owned by this claim (disjoint CAS regions).
362        // Lifetime tied to &self.
363        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
364    }
365}
366
367impl DerefMut for WriteClaim<'_> {
368    #[inline]
369    fn deref_mut(&mut self) -> &mut Self::Target {
370        let buffer = self.shared.buffer;
371        // SAFETY: offset + HEADER_SIZE is within the buffer. CAS atomically
372        // advances tail past this region before the pointer is constructed,
373        // guaranteeing disjoint regions — no two WriteClaims overlap in the buffer.
374        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
375        // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
376        // and exclusively owned by this claim (disjoint CAS regions).
377        // Lifetime tied to &mut self.
378        unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
379    }
380}
381
382impl Drop for WriteClaim<'_> {
383    fn drop(&mut self) {
384        if !self.committed {
385            // Write skip marker so consumer can advance past this region
386            let buffer = self.shared.buffer;
387            // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
388            let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
389            let skip_len = self.record_size | SKIP_BIT;
390
391            fence(Ordering::Release);
392            // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
393            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
394        }
395    }
396}
397
398// ============================================================================
399// Consumer
400// ============================================================================
401
402/// Consumer endpoint of the MPSC ring buffer.
403///
404/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
405/// This type is NOT `Clone` - only one consumer is allowed.
406pub struct Consumer {
407    /// Local head position (free-running).
408    head: Cell<usize>,
409    /// Shared state.
410    shared: Arc<Shared>,
411}
412
413// Safety: Consumer is only used from one thread.
414unsafe impl Send for Consumer {}
415
416impl Consumer {
417    /// Attempts to claim the next record for reading.
418    ///
419    /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
420    /// to `&[u8]` for the payload. When dropped, the record region is zeroed
421    /// and the head is advanced.
422    ///
423    /// Returns `None` if no committed record is available.
424    #[inline]
425    pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
426        let buffer = self.shared.buffer;
427
428        loop {
429            let offset = self.head.get() & self.shared.mask;
430            // SAFETY: offset is masked to [0, capacity), always 8-byte aligned
431            // (head advances by align8'd record sizes). Buffer is valid.
432            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
433
434            // Relaxed atomic load, then Acquire fence for payload visibility
435            // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
436            let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
437            fence(Ordering::Acquire);
438
439            if len_raw == 0 {
440                // Not committed yet
441                return None;
442            }
443
444            if len_raw & SKIP_BIT != 0 {
445                // Skip marker: zero the region and advance
446                let skip_size = len_raw & LEN_MASK;
447                // Zero payload first, then stamp last (mirrors write path)
448                if skip_size > HEADER_SIZE {
449                    // SAFETY: offset + HEADER_SIZE .. offset + skip_size is within
450                    // the buffer. Consumer has exclusive read access to this region.
451                    unsafe {
452                        ptr::write_bytes(
453                            buffer.add(offset + HEADER_SIZE),
454                            0,
455                            skip_size - HEADER_SIZE,
456                        );
457                    }
458                }
459                // Ensure payload zeroing completes before clearing stamp
460                fence(Ordering::Release);
461                // SAFETY: len_ptr is still valid, computed above.
462                unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
463
464                self.head.set(self.head.get().wrapping_add(skip_size));
465
466                // Ensure stamp clear completes before head advance
467                fence(Ordering::Release);
468                self.shared.head.store(self.head.get(), Ordering::Relaxed);
469
470                // Continue to check next position
471                continue;
472            }
473
474            // Valid record
475            let len = len_raw;
476            let record_size = align8(HEADER_SIZE + len);
477
478            return Some(ReadClaim {
479                consumer: self,
480                offset,
481                len,
482                record_size,
483            });
484        }
485    }
486
487    /// Returns the capacity of the buffer.
488    #[inline]
489    pub fn capacity(&self) -> usize {
490        self.shared.capacity
491    }
492
493    /// Best-effort hint: returns `true` if all producers have likely been dropped.
494    ///
495    /// See producer's [`is_disconnected`](Producer::is_disconnected) for caveats.
496    #[inline]
497    pub fn is_disconnected(&self) -> bool {
498        Arc::strong_count(&self.shared) == 1
499    }
500}
501
502impl std::fmt::Debug for Consumer {
503    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
504        f.debug_struct("Consumer")
505            .field("capacity", &self.capacity())
506            .finish_non_exhaustive()
507    }
508}
509
510// ============================================================================
511// ReadClaim
512// ============================================================================
513
514/// A claimed record for reading.
515///
516/// Dereferences to `&[u8]` for the payload. When dropped, the record region
517/// is zeroed and the head is advanced, freeing space for producers.
518pub struct ReadClaim<'a> {
519    consumer: &'a mut Consumer,
520    offset: usize,
521    len: usize,
522    record_size: usize,
523}
524
525impl ReadClaim<'_> {
526    /// Returns the length of the payload.
527    #[inline]
528    pub fn len(&self) -> usize {
529        self.len
530    }
531
532    /// Returns `true` if the payload is empty.
533    #[inline]
534    pub fn is_empty(&self) -> bool {
535        self.len == 0
536    }
537}
538
539impl Deref for ReadClaim<'_> {
540    type Target = [u8];
541
542    #[inline]
543    fn deref(&self) -> &Self::Target {
544        let buffer = self.consumer.shared.buffer;
545        // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
546        // exclusive read access via &mut Consumer borrow.
547        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
548        // SAFETY: payload_ptr is valid for self.len bytes. The producer has
549        // finished writing (len was non-zero, preceded by Release fence).
550        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
551    }
552}
553
554impl Drop for ReadClaim<'_> {
555    fn drop(&mut self) {
556        let buffer = self.consumer.shared.buffer;
557
558        // Zero payload first, then stamp last (mirrors write path)
559        if self.record_size > HEADER_SIZE {
560            // SAFETY: offset + HEADER_SIZE .. offset + record_size is within
561            // the buffer. Consumer owns this region exclusively.
562            unsafe {
563                ptr::write_bytes(
564                    buffer.add(self.offset + HEADER_SIZE),
565                    0,
566                    self.record_size - HEADER_SIZE,
567                );
568            }
569        }
570        // Ensure payload zeroing completes before clearing stamp
571        fence(Ordering::Release);
572        // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
573        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
574        // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
575        unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
576
577        // Advance head
578        let new_head = self.consumer.head.get().wrapping_add(self.record_size);
579        self.consumer.head.set(new_head);
580
581        // Ensure stamp clear completes before head advance
582        fence(Ordering::Release);
583        self.consumer.shared.head.store(new_head, Ordering::Relaxed);
584    }
585}
586
587// ============================================================================
588// Tests
589// ============================================================================
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594
595    #[test]
596    fn basic_write_read() {
597        let (mut prod, mut cons) = new(1024);
598
599        let payload = b"hello world";
600        let mut claim = prod.try_claim(payload.len()).unwrap();
601        claim.copy_from_slice(payload);
602        claim.commit();
603
604        let record = cons.try_claim().unwrap();
605        assert_eq!(&*record, payload);
606    }
607
608    #[test]
609    fn empty_returns_none() {
610        let (_, mut cons) = new(1024);
611        assert!(cons.try_claim().is_none());
612    }
613
614    #[test]
615    fn multiple_records() {
616        let (mut prod, mut cons) = new(1024);
617
618        for i in 0..10 {
619            let payload = format!("message {}", i);
620            let mut claim = prod.try_claim(payload.len()).unwrap();
621            claim.copy_from_slice(payload.as_bytes());
622            claim.commit();
623        }
624
625        for i in 0..10 {
626            let record = cons.try_claim().unwrap();
627            let expected = format!("message {}", i);
628            assert_eq!(&*record, expected.as_bytes());
629        }
630
631        assert!(cons.try_claim().is_none());
632    }
633
634    #[test]
635    #[allow(clippy::redundant_clone)]
636    fn producer_is_clone() {
637        let (prod, _cons) = new(1024);
638        let _prod2 = prod.clone();
639    }
640
641    #[test]
642    fn multiple_producers_single_consumer() {
643        use std::thread;
644
645        const PRODUCERS: usize = 4;
646        const MESSAGES_PER_PRODUCER: u64 = 10_000;
647        const TOTAL: u64 = PRODUCERS as u64 * MESSAGES_PER_PRODUCER;
648
649        let (prod, mut cons) = new(64 * 1024);
650
651        let handles: Vec<_> = (0..PRODUCERS)
652            .map(|producer_id| {
653                let mut prod = prod.clone();
654                thread::spawn(move || {
655                    for i in 0..MESSAGES_PER_PRODUCER {
656                        // Encode producer_id and sequence in payload
657                        let mut payload = [0u8; 16];
658                        payload[..8].copy_from_slice(&(producer_id as u64).to_le_bytes());
659                        payload[8..].copy_from_slice(&i.to_le_bytes());
660
661                        loop {
662                            match prod.try_claim(16) {
663                                Ok(mut claim) => {
664                                    claim.copy_from_slice(&payload);
665                                    claim.commit();
666                                    break;
667                                }
668                                Err(_) => std::hint::spin_loop(),
669                            }
670                        }
671                    }
672                })
673            })
674            .collect();
675
676        // Drop original producer
677        drop(prod);
678
679        // Consumer: track per-producer sequence
680        let consumer = thread::spawn(move || {
681            let mut received = 0u64;
682            let mut per_producer = vec![0u64; PRODUCERS];
683
684            while received < TOTAL {
685                if let Some(record) = cons.try_claim() {
686                    let producer_id = u64::from_le_bytes(record[..8].try_into().unwrap()) as usize;
687                    let seq = u64::from_le_bytes(record[8..].try_into().unwrap());
688
689                    // Each producer's messages should arrive in order
690                    assert_eq!(
691                        seq, per_producer[producer_id],
692                        "producer {} out of order",
693                        producer_id
694                    );
695                    per_producer[producer_id] += 1;
696                    received += 1;
697                } else {
698                    std::hint::spin_loop();
699                }
700            }
701
702            per_producer
703        });
704
705        for h in handles {
706            h.join().unwrap();
707        }
708
709        let per_producer = consumer.join().unwrap();
710        for (i, &count) in per_producer.iter().enumerate() {
711            assert_eq!(count, MESSAGES_PER_PRODUCER, "producer {} count", i);
712        }
713    }
714
715    #[test]
716    fn aborted_claim_creates_skip() {
717        let (mut prod, mut cons) = new(1024);
718
719        // Claim and drop without committing
720        {
721            let mut claim = prod.try_claim(10).unwrap();
722            claim.copy_from_slice(b"0123456789");
723            // drop without commit
724        }
725
726        // Write another record
727        {
728            let mut claim = prod.try_claim(5).unwrap();
729            claim.copy_from_slice(b"hello");
730            claim.commit();
731        }
732
733        // Consumer should skip the aborted record and read the committed one
734        let record = cons.try_claim().unwrap();
735        assert_eq!(&*record, b"hello");
736    }
737
738    #[test]
739    fn wrap_around() {
740        let (mut prod, mut cons) = new(64);
741
742        // Fill with messages that will cause wrap-around
743        for i in 0..20 {
744            let payload = format!("msg{:02}", i);
745            loop {
746                match prod.try_claim(payload.len()) {
747                    Ok(mut claim) => {
748                        claim.copy_from_slice(payload.as_bytes());
749                        claim.commit();
750                        break;
751                    }
752                    Err(_) => {
753                        // Drain some
754                        while cons.try_claim().is_some() {}
755                    }
756                }
757            }
758        }
759    }
760
761    #[test]
762    fn full_returns_error() {
763        let (mut prod, _cons) = new(64);
764
765        // Fill the buffer
766        let mut count = 0;
767        while let Ok(mut claim) = prod.try_claim(8) {
768            claim.copy_from_slice(b"12345678");
769            claim.commit();
770            count += 1;
771        }
772
773        assert!(count > 0);
774        assert!(prod.try_claim(8).is_err());
775    }
776
777    #[test]
778    fn disconnection_detection() {
779        let (prod, cons) = new(1024);
780
781        assert!(!prod.is_disconnected());
782        assert!(!cons.is_disconnected());
783
784        drop(cons);
785        assert!(prod.is_disconnected());
786    }
787
788    #[test]
789    #[should_panic(expected = "capacity must be at least 16")]
790    fn tiny_capacity_panics() {
791        let _ = new(8);
792    }
793
794    #[test]
795    #[should_panic(expected = "payload length must be non-zero")]
796    fn zero_len_panics() {
797        let (mut prod, _) = new(1024);
798        let _ = prod.try_claim(0);
799    }
800
801    #[test]
802    fn capacity_rounds_to_power_of_two() {
803        let (prod, _) = new(100);
804        assert_eq!(prod.capacity(), 128);
805
806        let (prod, _) = new(1000);
807        assert_eq!(prod.capacity(), 1024);
808    }
809
810    /// High-volume stress test with multiple producers.
811    #[test]
812    fn stress_multiple_producers() {
813        use std::thread;
814
815        const PRODUCERS: usize = 4;
816        const COUNT_PER_PRODUCER: u64 = 100_000;
817        const TOTAL: u64 = PRODUCERS as u64 * COUNT_PER_PRODUCER;
818        const BUFFER_SIZE: usize = 64 * 1024;
819
820        let (prod, mut cons) = new(BUFFER_SIZE);
821
822        let handles: Vec<_> = (0..PRODUCERS)
823            .map(|_| {
824                let mut prod = prod.clone();
825                thread::spawn(move || {
826                    for i in 0..COUNT_PER_PRODUCER {
827                        let payload = i.to_le_bytes();
828                        loop {
829                            match prod.try_claim(payload.len()) {
830                                Ok(mut claim) => {
831                                    claim.copy_from_slice(&payload);
832                                    claim.commit();
833                                    break;
834                                }
835                                Err(_) => std::hint::spin_loop(),
836                            }
837                        }
838                    }
839                })
840            })
841            .collect();
842
843        drop(prod);
844
845        let consumer = thread::spawn(move || {
846            let mut received = 0u64;
847            let mut sum = 0u64;
848            while received < TOTAL {
849                if let Some(record) = cons.try_claim() {
850                    let value = u64::from_le_bytes((*record).try_into().unwrap());
851                    sum = sum.wrapping_add(value);
852                    received += 1;
853                } else {
854                    std::hint::spin_loop();
855                }
856            }
857            (received, sum)
858        });
859
860        for h in handles {
861            h.join().unwrap();
862        }
863
864        let (received, sum) = consumer.join().unwrap();
865        assert_eq!(received, TOTAL);
866
867        // Each producer sends 0..COUNT_PER_PRODUCER
868        // Sum per producer = COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2
869        let expected_sum = PRODUCERS as u64 * COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2;
870        assert_eq!(sum, expected_sum);
871    }
872}