Skip to main content

nexus_logbuf/queue/
mpsc.rs

1//! Multi-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared:                                                                 │
8//! │   head: CachePadded<AtomicUsize>  ← Consumer writes, producers read     │
9//! │   tail: CachePadded<AtomicUsize>  ← Producers CAS to claim space        │
10//! │   buffer: *mut u8                                                       │
11//! │   capacity: usize                 (power of 2)                          │
12//! │   mask: usize                     (capacity - 1)                        │
13//! └─────────────────────────────────────────────────────────────────────────┘
14//!
15//! ┌─────────────────────────────────┐   ┌─────────────────────────────────┐
16//! │ Producer (cloneable):           │   │ Consumer:                       │
17//! │   cached_head: usize (local)    │   │   head: usize        (local)    │
18//! │   shared: Arc<Shared>           │   │                                 │
19//! └─────────────────────────────────┘   └─────────────────────────────────┘
20//! ```
21//!
22//! # Differences from SPSC
23//!
24//! - Tail is atomic in shared state (not local to producer)
25//! - Producers use CAS loop to claim space
26//! - Producer is `Clone` - multiple producers allowed
27//! - Synchronization: Relaxed CAS on tail, Release on len commit, Acquire on len read
28//!
29//! # Record Layout
30//!
31//! Same as SPSC - see [`crate::spsc`] for details.
32
33use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
34use std::cell::Cell;
35use std::ops::{Deref, DerefMut};
36use std::ptr;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicUsize, Ordering, fence};
39
40use crossbeam_utils::CachePadded;
41
42use crate::{LEN_MASK, SKIP_BIT, TryClaimError, align8};
43
44/// Header size in bytes — one system word (`usize`).
45///
46/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
47const HEADER_SIZE: usize = std::mem::size_of::<usize>();
48
49/// Creates a bounded MPSC byte ring buffer.
50///
51/// Capacity is rounded up to the next power of two.
52///
53/// # Panics
54///
55/// Panics if `capacity` is zero or less than 16 bytes.
56pub fn new(capacity: usize) -> (Producer, Consumer) {
57    assert!(capacity >= 16, "capacity must be at least 16 bytes");
58
59    let capacity = capacity.next_power_of_two();
60    let mask = capacity - 1;
61
62    // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
63    let layout = Layout::from_size_align(capacity, 8)
64        .expect("valid layout: capacity is a power of two >= 16, align is 8");
65    // SAFETY: Layout is valid — capacity >= 16 (power of two), align is 8.
66    let buffer_ptr = unsafe { alloc_zeroed(layout) };
67    if buffer_ptr.is_null() {
68        handle_alloc_error(layout);
69    }
70
71    let shared = Arc::new(Shared {
72        head: CachePadded::new(AtomicUsize::new(0)),
73        tail: CachePadded::new(AtomicUsize::new(0)),
74        buffer: buffer_ptr,
75        capacity,
76        mask,
77    });
78
79    (
80        Producer {
81            cached_head: Cell::new(0),
82            shared: Arc::clone(&shared),
83        },
84        Consumer {
85            head: Cell::new(0),
86            shared,
87        },
88    )
89}
90
91struct Shared {
92    /// Consumer's read position. Updated by consumer, read by producers.
93    head: CachePadded<AtomicUsize>,
94    /// Producers' write position. CAS'd by producers.
95    tail: CachePadded<AtomicUsize>,
96    /// Buffer pointer.
97    buffer: *mut u8,
98    /// Buffer capacity (power of 2).
99    capacity: usize,
100    /// Mask for wrapping (capacity - 1).
101    mask: usize,
102}
103
104// Safety: Buffer access is synchronized through atomic head/tail.
105// Multiple producers coordinate via CAS on tail.
106// Single consumer is enforced by API (Consumer is not Clone).
107unsafe impl Send for Shared {}
108unsafe impl Sync for Shared {}
109
110impl Drop for Shared {
111    fn drop(&mut self) {
112        let layout = Layout::from_size_align(self.capacity, 8)
113            .expect("valid layout: capacity was validated at construction");
114        // SAFETY: buffer was allocated with alloc_zeroed using this exact layout.
115        // Shared is only dropped once (Arc prevents earlier drops).
116        unsafe { dealloc(self.buffer, layout) };
117    }
118}
119
120// ============================================================================
121// Producer
122// ============================================================================
123
124/// Producer endpoint of the MPSC ring buffer.
125///
126/// This type is `Clone` - multiple producers can write concurrently.
127/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
128#[derive(Clone)]
129pub struct Producer {
130    /// Cached head position (Rigtorp-style optimization, per-producer).
131    cached_head: Cell<usize>,
132    /// Shared state.
133    shared: Arc<Shared>,
134}
135
136// Safety: Producer coordinates with other producers via atomic CAS.
137unsafe impl Send for Producer {}
138
139impl Producer {
140    /// Attempts to claim space for a record with the given payload length.
141    ///
142    /// Returns a [`WriteClaim`] that can be written to and then committed.
143    ///
144    /// # Errors
145    ///
146    /// - [`TryClaimError::ZeroLength`] if `len` is zero
147    /// - [`TryClaimError::Full`] if the buffer is full
148    ///
149    /// # Safety Contract
150    ///
151    /// `len` must not exceed `LEN_MASK`. This is checked with
152    /// `debug_assert!` only.
153    #[inline]
154    pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, TryClaimError> {
155        debug_assert!(len <= LEN_MASK, "payload too large");
156        if len == 0 {
157            return Err(TryClaimError::ZeroLength);
158        }
159
160        let record_size = align8(HEADER_SIZE + len);
161
162        // CAS loop to claim space
163        loop {
164            let tail = self.shared.tail.load(Ordering::Relaxed);
165
166            // Calculate used space. If cached_head is stale, used can exceed capacity.
167            // saturating_sub handles this gracefully (returns 0 if stale).
168            let used = tail.wrapping_sub(self.cached_head.get());
169            let available = self.shared.capacity.saturating_sub(used);
170
171            if available < record_size {
172                // Reload head from shared state
173                self.cached_head
174                    .set(self.shared.head.load(Ordering::Relaxed));
175                fence(Ordering::Acquire);
176
177                let used = tail.wrapping_sub(self.cached_head.get());
178                if used > self.shared.capacity || self.shared.capacity - used < record_size {
179                    return Err(TryClaimError::Full);
180                }
181            }
182
183            // Check if record fits before buffer end, or needs wrap
184            let offset = tail & self.shared.mask;
185            let space_to_end = self.shared.capacity - offset;
186
187            if space_to_end < record_size {
188                // Need to wrap. Check if we have space for padding + record at start.
189                let total_needed = space_to_end + record_size;
190
191                let used = tail.wrapping_sub(self.cached_head.get());
192                let available = self.shared.capacity.saturating_sub(used);
193
194                if available < total_needed {
195                    // Reload and recheck
196                    self.cached_head
197                        .set(self.shared.head.load(Ordering::Relaxed));
198                    fence(Ordering::Acquire);
199
200                    let used = tail.wrapping_sub(self.cached_head.get());
201                    if used > self.shared.capacity || self.shared.capacity - used < total_needed {
202                        return Err(TryClaimError::Full);
203                    }
204                }
205
206                // Try to claim the padding + record space
207                let new_tail = tail.wrapping_add(total_needed);
208                if self
209                    .shared
210                    .tail
211                    .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
212                    .is_ok()
213                {
214                    // We claimed the space. Write padding skip marker.
215                    let buffer = self.shared.buffer;
216                    let skip_len = space_to_end | SKIP_BIT;
217
218                    // Release fence before writing skip marker
219                    fence(Ordering::Release);
220                    // SAFETY: offset is masked to [0, capacity), 8-byte aligned.
221                    // CAS success guarantees exclusive ownership of this region.
222                    let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
223                    // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
224                    unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
225
226                    return Ok(WriteClaim {
227                        shared: &self.shared,
228                        offset: 0, // Record starts at beginning after wrap
229                        len,
230                        record_size,
231                        committed: false,
232                    });
233                }
234                // CAS failed, retry
235                continue;
236            }
237
238            // Fits without wrapping
239            let new_tail = tail.wrapping_add(record_size);
240            if self
241                .shared
242                .tail
243                .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
244                .is_ok()
245            {
246                return Ok(WriteClaim {
247                    shared: &self.shared,
248                    offset,
249                    len,
250                    record_size,
251                    committed: false,
252                });
253            }
254            // CAS failed, retry
255        }
256    }
257
258    /// Returns the capacity of the buffer.
259    #[inline]
260    pub fn capacity(&self) -> usize {
261        self.shared.capacity
262    }
263
264    /// Best-effort hint: returns `true` if the consumer has likely been dropped.
265    ///
266    /// Uses `Arc::strong_count` which is inherently racy. For reliable
267    /// disconnection detection, use the channel layer (`channel::mpsc`).
268    #[inline]
269    pub fn is_disconnected(&self) -> bool {
270        Arc::strong_count(&self.shared) == 1
271    }
272}
273
274impl std::fmt::Debug for Producer {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        f.debug_struct("Producer")
277            .field("capacity", &self.capacity())
278            .finish_non_exhaustive()
279    }
280}
281
282// ============================================================================
283// WriteClaim
284// ============================================================================
285
286/// A claimed region for writing a record.
287///
288/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
289/// when done writing to publish the record. If dropped without committing, a skip
290/// marker is written so the consumer can advance past the dead region.
291pub struct WriteClaim<'a> {
292    shared: &'a Shared,
293    offset: usize,
294    len: usize,
295    record_size: usize,
296    committed: bool,
297}
298
299impl WriteClaim<'_> {
300    /// Commits the record, making it visible to the consumer.
301    #[inline]
302    pub fn commit(mut self) {
303        self.do_commit();
304        self.committed = true;
305    }
306
307    #[inline]
308    fn do_commit(&mut self) {
309        let buffer = self.shared.buffer;
310        // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
311        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
312
313        // Release fence: ensures payload writes are visible before len store
314        fence(Ordering::Release);
315        // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
316        unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
317    }
318
319    /// Returns the length of the payload region.
320    #[inline]
321    pub fn len(&self) -> usize {
322        self.len
323    }
324
325    /// Returns `true` if the payload is empty (always false, len must be > 0).
326    #[inline]
327    pub fn is_empty(&self) -> bool {
328        false
329    }
330}
331
332impl Deref for WriteClaim<'_> {
333    type Target = [u8];
334
335    #[inline]
336    fn deref(&self) -> &Self::Target {
337        let buffer = self.shared.buffer;
338        // SAFETY: offset + HEADER_SIZE is within the buffer. CAS atomically
339        // advances tail past this region before the pointer is constructed,
340        // guaranteeing disjoint regions — no two WriteClaims overlap in the buffer.
341        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
342        // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
343        // and exclusively owned by this claim (disjoint CAS regions).
344        // Lifetime tied to &self.
345        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
346    }
347}
348
349impl DerefMut for WriteClaim<'_> {
350    #[inline]
351    fn deref_mut(&mut self) -> &mut Self::Target {
352        let buffer = self.shared.buffer;
353        // SAFETY: offset + HEADER_SIZE is within the buffer. CAS atomically
354        // advances tail past this region before the pointer is constructed,
355        // guaranteeing disjoint regions — no two WriteClaims overlap in the buffer.
356        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
357        // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
358        // and exclusively owned by this claim (disjoint CAS regions).
359        // Lifetime tied to &mut self.
360        unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
361    }
362}
363
364impl Drop for WriteClaim<'_> {
365    fn drop(&mut self) {
366        if !self.committed {
367            // Write skip marker so consumer can advance past this region
368            let buffer = self.shared.buffer;
369            // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
370            let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
371            let skip_len = self.record_size | SKIP_BIT;
372
373            fence(Ordering::Release);
374            // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
375            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
376        }
377    }
378}
379
380// ============================================================================
381// Consumer
382// ============================================================================
383
384/// Consumer endpoint of the MPSC ring buffer.
385///
386/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
387/// This type is NOT `Clone` - only one consumer is allowed.
388pub struct Consumer {
389    /// Local head position (free-running).
390    head: Cell<usize>,
391    /// Shared state.
392    shared: Arc<Shared>,
393}
394
395// Safety: Consumer is only used from one thread.
396unsafe impl Send for Consumer {}
397
398impl Consumer {
399    /// Attempts to claim the next record for reading.
400    ///
401    /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
402    /// to `&[u8]` for the payload. When dropped, the record region is zeroed
403    /// and the head is advanced.
404    ///
405    /// Returns `None` if no committed record is available.
406    #[inline]
407    pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
408        let buffer = self.shared.buffer;
409
410        loop {
411            let offset = self.head.get() & self.shared.mask;
412            // SAFETY: offset is masked to [0, capacity), always 8-byte aligned
413            // (head advances by align8'd record sizes). Buffer is valid.
414            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
415
416            // Relaxed atomic load, then Acquire fence for payload visibility
417            // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
418            let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
419            fence(Ordering::Acquire);
420
421            if len_raw == 0 {
422                // Not committed yet
423                return None;
424            }
425
426            if len_raw & SKIP_BIT != 0 {
427                // Skip marker: zero the region and advance
428                let skip_size = len_raw & LEN_MASK;
429                // Zero payload first, then stamp last (mirrors write path)
430                if skip_size > HEADER_SIZE {
431                    // SAFETY: offset + HEADER_SIZE .. offset + skip_size is within
432                    // the buffer. Consumer has exclusive read access to this region.
433                    unsafe {
434                        ptr::write_bytes(
435                            buffer.add(offset + HEADER_SIZE),
436                            0,
437                            skip_size - HEADER_SIZE,
438                        );
439                    }
440                }
441                // Ensure payload zeroing completes before clearing stamp
442                fence(Ordering::Release);
443                // SAFETY: len_ptr is still valid, computed above.
444                unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
445
446                self.head.set(self.head.get().wrapping_add(skip_size));
447
448                // Ensure stamp clear completes before head advance
449                fence(Ordering::Release);
450                self.shared.head.store(self.head.get(), Ordering::Relaxed);
451
452                // Continue to check next position
453                continue;
454            }
455
456            // Valid record
457            let len = len_raw;
458            let record_size = align8(HEADER_SIZE + len);
459
460            return Some(ReadClaim {
461                consumer: self,
462                offset,
463                len,
464                record_size,
465            });
466        }
467    }
468
469    /// Returns the capacity of the buffer.
470    #[inline]
471    pub fn capacity(&self) -> usize {
472        self.shared.capacity
473    }
474
475    /// Best-effort hint: returns `true` if all producers have likely been dropped.
476    ///
477    /// See producer's [`is_disconnected`](Producer::is_disconnected) for caveats.
478    #[inline]
479    pub fn is_disconnected(&self) -> bool {
480        Arc::strong_count(&self.shared) == 1
481    }
482}
483
484impl std::fmt::Debug for Consumer {
485    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486        f.debug_struct("Consumer")
487            .field("capacity", &self.capacity())
488            .finish_non_exhaustive()
489    }
490}
491
492// ============================================================================
493// ReadClaim
494// ============================================================================
495
496/// A claimed record for reading.
497///
498/// Dereferences to `&[u8]` for the payload. When dropped, the record region
499/// is zeroed and the head is advanced, freeing space for producers.
500pub struct ReadClaim<'a> {
501    consumer: &'a mut Consumer,
502    offset: usize,
503    len: usize,
504    record_size: usize,
505}
506
507impl ReadClaim<'_> {
508    /// Returns the length of the payload.
509    #[inline]
510    pub fn len(&self) -> usize {
511        self.len
512    }
513
514    /// Returns `true` if the payload is empty.
515    #[inline]
516    pub fn is_empty(&self) -> bool {
517        self.len == 0
518    }
519}
520
521impl Deref for ReadClaim<'_> {
522    type Target = [u8];
523
524    #[inline]
525    fn deref(&self) -> &Self::Target {
526        let buffer = self.consumer.shared.buffer;
527        // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
528        // exclusive read access via &mut Consumer borrow.
529        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
530        // SAFETY: payload_ptr is valid for self.len bytes. The producer has
531        // finished writing (len was non-zero, preceded by Release fence).
532        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
533    }
534}
535
536impl Drop for ReadClaim<'_> {
537    fn drop(&mut self) {
538        let buffer = self.consumer.shared.buffer;
539
540        // Zero payload first, then stamp last (mirrors write path)
541        if self.record_size > HEADER_SIZE {
542            // SAFETY: offset + HEADER_SIZE .. offset + record_size is within
543            // the buffer. Consumer owns this region exclusively.
544            unsafe {
545                ptr::write_bytes(
546                    buffer.add(self.offset + HEADER_SIZE),
547                    0,
548                    self.record_size - HEADER_SIZE,
549                );
550            }
551        }
552        // Ensure payload zeroing completes before clearing stamp
553        fence(Ordering::Release);
554        // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
555        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
556        // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
557        unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
558
559        // Advance head
560        let new_head = self.consumer.head.get().wrapping_add(self.record_size);
561        self.consumer.head.set(new_head);
562
563        // Ensure stamp clear completes before head advance
564        fence(Ordering::Release);
565        self.consumer.shared.head.store(new_head, Ordering::Relaxed);
566    }
567}
568
569// ============================================================================
570// Tests
571// ============================================================================
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576
577    #[test]
578    fn basic_write_read() {
579        let (mut prod, mut cons) = new(1024);
580
581        let payload = b"hello world";
582        let mut claim = prod.try_claim(payload.len()).unwrap();
583        claim.copy_from_slice(payload);
584        claim.commit();
585
586        let record = cons.try_claim().unwrap();
587        assert_eq!(&*record, payload);
588    }
589
590    #[test]
591    fn empty_returns_none() {
592        let (_, mut cons) = new(1024);
593        assert!(cons.try_claim().is_none());
594    }
595
596    #[test]
597    fn multiple_records() {
598        let (mut prod, mut cons) = new(1024);
599
600        for i in 0..10 {
601            let payload = format!("message {}", i);
602            let mut claim = prod.try_claim(payload.len()).unwrap();
603            claim.copy_from_slice(payload.as_bytes());
604            claim.commit();
605        }
606
607        for i in 0..10 {
608            let record = cons.try_claim().unwrap();
609            let expected = format!("message {}", i);
610            assert_eq!(&*record, expected.as_bytes());
611        }
612
613        assert!(cons.try_claim().is_none());
614    }
615
616    #[test]
617    #[allow(clippy::redundant_clone)]
618    fn producer_is_clone() {
619        let (prod, _cons) = new(1024);
620        let _prod2 = prod.clone();
621    }
622
623    #[test]
624    fn multiple_producers_single_consumer() {
625        use std::thread;
626
627        const PRODUCERS: usize = 4;
628        const MESSAGES_PER_PRODUCER: u64 = 10_000;
629        const TOTAL: u64 = PRODUCERS as u64 * MESSAGES_PER_PRODUCER;
630
631        let (prod, mut cons) = new(64 * 1024);
632
633        let handles: Vec<_> = (0..PRODUCERS)
634            .map(|producer_id| {
635                let mut prod = prod.clone();
636                thread::spawn(move || {
637                    for i in 0..MESSAGES_PER_PRODUCER {
638                        // Encode producer_id and sequence in payload
639                        let mut payload = [0u8; 16];
640                        payload[..8].copy_from_slice(&(producer_id as u64).to_le_bytes());
641                        payload[8..].copy_from_slice(&i.to_le_bytes());
642
643                        loop {
644                            match prod.try_claim(16) {
645                                Ok(mut claim) => {
646                                    claim.copy_from_slice(&payload);
647                                    claim.commit();
648                                    break;
649                                }
650                                Err(_) => std::hint::spin_loop(),
651                            }
652                        }
653                    }
654                })
655            })
656            .collect();
657
658        // Drop original producer
659        drop(prod);
660
661        // Consumer: track per-producer sequence
662        let consumer = thread::spawn(move || {
663            let mut received = 0u64;
664            let mut per_producer = vec![0u64; PRODUCERS];
665
666            while received < TOTAL {
667                if let Some(record) = cons.try_claim() {
668                    let producer_id = u64::from_le_bytes(record[..8].try_into().unwrap()) as usize;
669                    let seq = u64::from_le_bytes(record[8..].try_into().unwrap());
670
671                    // Each producer's messages should arrive in order
672                    assert_eq!(
673                        seq, per_producer[producer_id],
674                        "producer {} out of order",
675                        producer_id
676                    );
677                    per_producer[producer_id] += 1;
678                    received += 1;
679                } else {
680                    std::hint::spin_loop();
681                }
682            }
683
684            per_producer
685        });
686
687        for h in handles {
688            h.join().unwrap();
689        }
690
691        let per_producer = consumer.join().unwrap();
692        for (i, &count) in per_producer.iter().enumerate() {
693            assert_eq!(count, MESSAGES_PER_PRODUCER, "producer {} count", i);
694        }
695    }
696
697    #[test]
698    fn aborted_claim_creates_skip() {
699        let (mut prod, mut cons) = new(1024);
700
701        // Claim and drop without committing
702        {
703            let mut claim = prod.try_claim(10).unwrap();
704            claim.copy_from_slice(b"0123456789");
705            // drop without commit
706        }
707
708        // Write another record
709        {
710            let mut claim = prod.try_claim(5).unwrap();
711            claim.copy_from_slice(b"hello");
712            claim.commit();
713        }
714
715        // Consumer should skip the aborted record and read the committed one
716        let record = cons.try_claim().unwrap();
717        assert_eq!(&*record, b"hello");
718    }
719
720    #[test]
721    fn wrap_around() {
722        let (mut prod, mut cons) = new(64);
723
724        // Fill with messages that will cause wrap-around
725        for i in 0..20 {
726            let payload = format!("msg{:02}", i);
727            loop {
728                match prod.try_claim(payload.len()) {
729                    Ok(mut claim) => {
730                        claim.copy_from_slice(payload.as_bytes());
731                        claim.commit();
732                        break;
733                    }
734                    Err(_) => {
735                        // Drain some
736                        while cons.try_claim().is_some() {}
737                    }
738                }
739            }
740        }
741    }
742
743    #[test]
744    fn full_returns_error() {
745        let (mut prod, _cons) = new(64);
746
747        // Fill the buffer
748        let mut count = 0;
749        while let Ok(mut claim) = prod.try_claim(8) {
750            claim.copy_from_slice(b"12345678");
751            claim.commit();
752            count += 1;
753        }
754
755        assert!(count > 0);
756        assert!(prod.try_claim(8).is_err());
757    }
758
759    #[test]
760    fn disconnection_detection() {
761        let (prod, cons) = new(1024);
762
763        assert!(!prod.is_disconnected());
764        assert!(!cons.is_disconnected());
765
766        drop(cons);
767        assert!(prod.is_disconnected());
768    }
769
770    #[test]
771    #[should_panic(expected = "capacity must be at least 16")]
772    fn tiny_capacity_panics() {
773        let _ = new(8);
774    }
775
776    #[test]
777    fn zero_len_returns_error() {
778        let (mut prod, _) = new(1024);
779        assert!(matches!(prod.try_claim(0), Err(TryClaimError::ZeroLength)));
780    }
781
782    #[test]
783    fn capacity_rounds_to_power_of_two() {
784        let (prod, _) = new(100);
785        assert_eq!(prod.capacity(), 128);
786
787        let (prod, _) = new(1000);
788        assert_eq!(prod.capacity(), 1024);
789    }
790
791    /// High-volume stress test with multiple producers.
792    #[test]
793    fn stress_multiple_producers() {
794        use std::thread;
795
796        const PRODUCERS: usize = 4;
797        const COUNT_PER_PRODUCER: u64 = 100_000;
798        const TOTAL: u64 = PRODUCERS as u64 * COUNT_PER_PRODUCER;
799        const BUFFER_SIZE: usize = 64 * 1024;
800
801        let (prod, mut cons) = new(BUFFER_SIZE);
802
803        let handles: Vec<_> = (0..PRODUCERS)
804            .map(|_| {
805                let mut prod = prod.clone();
806                thread::spawn(move || {
807                    for i in 0..COUNT_PER_PRODUCER {
808                        let payload = i.to_le_bytes();
809                        loop {
810                            match prod.try_claim(payload.len()) {
811                                Ok(mut claim) => {
812                                    claim.copy_from_slice(&payload);
813                                    claim.commit();
814                                    break;
815                                }
816                                Err(_) => std::hint::spin_loop(),
817                            }
818                        }
819                    }
820                })
821            })
822            .collect();
823
824        drop(prod);
825
826        let consumer = thread::spawn(move || {
827            let mut received = 0u64;
828            let mut sum = 0u64;
829            while received < TOTAL {
830                if let Some(record) = cons.try_claim() {
831                    let value = u64::from_le_bytes((*record).try_into().unwrap());
832                    sum = sum.wrapping_add(value);
833                    received += 1;
834                } else {
835                    std::hint::spin_loop();
836                }
837            }
838            (received, sum)
839        });
840
841        for h in handles {
842            h.join().unwrap();
843        }
844
845        let (received, sum) = consumer.join().unwrap();
846        assert_eq!(received, TOTAL);
847
848        // Each producer sends 0..COUNT_PER_PRODUCER
849        // Sum per producer = COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2
850        let expected_sum = PRODUCERS as u64 * COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2;
851        assert_eq!(sum, expected_sum);
852    }
853}