Skip to main content

nexus_logbuf/queue/
mpsc.rs

1//! Multi-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared:                                                                 │
8//! │   head: CachePadded<AtomicUsize>  ← Consumer writes, producers read     │
9//! │   tail: CachePadded<AtomicUsize>  ← Producers CAS to claim space        │
10//! │   buffer: *mut u8                                                       │
11//! │   capacity: usize                 (power of 2)                          │
12//! │   mask: usize                     (capacity - 1)                        │
13//! └─────────────────────────────────────────────────────────────────────────┘
14//!
15//! ┌─────────────────────────────────┐   ┌─────────────────────────────────┐
16//! │ Producer (cloneable):           │   │ Consumer:                       │
17//! │   cached_head: usize (local)    │   │   head: usize        (local)    │
18//! │   shared: Arc<Shared>           │   │                                 │
19//! └─────────────────────────────────┘   └─────────────────────────────────┘
20//! ```
21//!
22//! # Differences from SPSC
23//!
24//! - Tail is atomic in shared state (not local to producer)
25//! - Producers use CAS loop to claim space
26//! - Producer is `Clone` - multiple producers allowed
27//! - Synchronization: Relaxed CAS on tail, Release on len commit, Acquire on len read
28//!
29//! # Record Layout
30//!
31//! Same as SPSC - see [`crate::spsc`] for details.
32
33use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
34use std::cell::Cell;
35use std::ops::{Deref, DerefMut};
36use std::ptr;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicUsize, Ordering, fence};
39
40use crossbeam_utils::CachePadded;
41
42use crate::{LEN_MASK, SKIP_BIT, TryClaimError, align8};
43
44/// Header size in bytes — one system word (`usize`).
45///
46/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
47const HEADER_SIZE: usize = std::mem::size_of::<usize>();
48
49/// Creates a bounded MPSC byte ring buffer.
50///
51/// Capacity is rounded up to the next power of two.
52///
53/// # Panics
54///
55/// Panics if `capacity` is zero or less than 16 bytes.
56pub fn new(capacity: usize) -> (Producer, Consumer) {
57    assert!(capacity >= 16, "capacity must be at least 16 bytes");
58
59    let capacity = capacity.next_power_of_two();
60    let mask = capacity - 1;
61
62    // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
63    let layout = Layout::from_size_align(capacity, 8)
64        .expect("valid layout: capacity is a power of two >= 16, align is 8");
65    // SAFETY: Layout is valid — capacity >= 16 (power of two), align is 8.
66    let buffer_ptr = unsafe { alloc_zeroed(layout) };
67    if buffer_ptr.is_null() {
68        handle_alloc_error(layout);
69    }
70
71    let shared = Arc::new(Shared {
72        head: CachePadded::new(AtomicUsize::new(0)),
73        tail: CachePadded::new(AtomicUsize::new(0)),
74        buffer: buffer_ptr,
75        capacity,
76        mask,
77    });
78
79    (
80        Producer {
81            cached_head: Cell::new(0),
82            shared: Arc::clone(&shared),
83        },
84        Consumer {
85            head: Cell::new(0),
86            shared,
87        },
88    )
89}
90
91struct Shared {
92    /// Consumer's read position. Updated by consumer, read by producers.
93    head: CachePadded<AtomicUsize>,
94    /// Producers' write position. CAS'd by producers.
95    tail: CachePadded<AtomicUsize>,
96    /// Buffer pointer.
97    buffer: *mut u8,
98    /// Buffer capacity (power of 2).
99    capacity: usize,
100    /// Mask for wrapping (capacity - 1).
101    mask: usize,
102}
103
104// Safety: Buffer access is synchronized through atomic head/tail.
105// Multiple producers coordinate via CAS on tail.
106// Single consumer is enforced by API (Consumer is not Clone).
107unsafe impl Send for Shared {}
108unsafe impl Sync for Shared {}
109
110impl Drop for Shared {
111    fn drop(&mut self) {
112        let layout = Layout::from_size_align(self.capacity, 8)
113            .expect("valid layout: capacity was validated at construction");
114        // SAFETY: buffer was allocated with alloc_zeroed using this exact layout.
115        // Shared is only dropped once (Arc prevents earlier drops).
116        unsafe { dealloc(self.buffer, layout) };
117    }
118}
119
120// ============================================================================
121// Producer
122// ============================================================================
123
124/// Producer endpoint of the MPSC ring buffer.
125///
126/// This type is `Clone` - multiple producers can write concurrently.
127/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
128#[derive(Clone)]
129pub struct Producer {
130    /// Cached head position (Rigtorp-style optimization, per-producer).
131    cached_head: Cell<usize>,
132    /// Shared state.
133    shared: Arc<Shared>,
134}
135
136// Safety: Producer coordinates with other producers via atomic CAS.
137unsafe impl Send for Producer {}
138
139impl Producer {
140    /// Attempts to claim space for a record with the given payload length.
141    ///
142    /// Returns a [`WriteClaim`] that can be written to and then committed.
143    ///
144    /// # Errors
145    ///
146    /// - [`TryClaimError::ZeroLength`] if `len` is zero
147    /// - [`TryClaimError::Full`] if the buffer is full
148    ///
149    /// # Safety Contract
150    ///
151    /// `len` must not exceed `LEN_MASK`. On 64-bit this is ~9.2 exabytes
152    /// (unreachable in practice). On 32-bit, records >2GB could set
153    /// `SKIP_BIT` and corrupt the stream — enforced with `assert!`.
154    #[inline]
155    pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, TryClaimError> {
156        #[cfg(target_pointer_width = "32")]
157        assert!(len <= LEN_MASK, "payload too large for 32-bit logbuf");
158        #[cfg(not(target_pointer_width = "32"))]
159        debug_assert!(len <= LEN_MASK, "payload too large");
160        if len == 0 {
161            return Err(TryClaimError::ZeroLength);
162        }
163
164        let record_size = align8(HEADER_SIZE + len);
165
166        // CAS loop to claim space
167        loop {
168            let tail = self.shared.tail.load(Ordering::Relaxed);
169
170            // Calculate used space. If cached_head is stale, used can exceed capacity.
171            // saturating_sub handles this gracefully (returns 0 if stale).
172            let used = tail.wrapping_sub(self.cached_head.get());
173            let available = self.shared.capacity.saturating_sub(used);
174
175            if available < record_size {
176                // Reload head from shared state
177                self.cached_head
178                    .set(self.shared.head.load(Ordering::Relaxed));
179                fence(Ordering::Acquire);
180
181                let used = tail.wrapping_sub(self.cached_head.get());
182                if used > self.shared.capacity || self.shared.capacity - used < record_size {
183                    return Err(TryClaimError::Full);
184                }
185            }
186
187            // Check if record fits before buffer end, or needs wrap
188            let offset = tail & self.shared.mask;
189            let space_to_end = self.shared.capacity - offset;
190
191            if space_to_end < record_size {
192                // Need to wrap. Check if we have space for padding + record at start.
193                let total_needed = space_to_end + record_size;
194
195                let used = tail.wrapping_sub(self.cached_head.get());
196                let available = self.shared.capacity.saturating_sub(used);
197
198                if available < total_needed {
199                    // Reload and recheck
200                    self.cached_head
201                        .set(self.shared.head.load(Ordering::Relaxed));
202                    fence(Ordering::Acquire);
203
204                    let used = tail.wrapping_sub(self.cached_head.get());
205                    if used > self.shared.capacity || self.shared.capacity - used < total_needed {
206                        return Err(TryClaimError::Full);
207                    }
208                }
209
210                // Try to claim the padding + record space
211                let new_tail = tail.wrapping_add(total_needed);
212                if self
213                    .shared
214                    .tail
215                    .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
216                    .is_ok()
217                {
218                    // We claimed the space. Write padding skip marker.
219                    let buffer = self.shared.buffer;
220                    let skip_len = space_to_end | SKIP_BIT;
221
222                    // Release fence before writing skip marker
223                    fence(Ordering::Release);
224                    // SAFETY: offset is masked to [0, capacity), 8-byte aligned.
225                    // CAS success guarantees exclusive ownership of this region.
226                    let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
227                    // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
228                    unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
229
230                    return Ok(WriteClaim {
231                        shared: &self.shared,
232                        offset: 0, // Record starts at beginning after wrap
233                        len,
234                        record_size,
235                        committed: false,
236                    });
237                }
238                // CAS failed — another producer claimed first. Pause to
239                // reduce pipeline pressure, then retry with fresh tail.
240                core::hint::spin_loop();
241                continue;
242            }
243
244            // Fits without wrapping
245            let new_tail = tail.wrapping_add(record_size);
246            if self
247                .shared
248                .tail
249                .compare_exchange_weak(tail, new_tail, Ordering::Relaxed, Ordering::Relaxed)
250                .is_ok()
251            {
252                return Ok(WriteClaim {
253                    shared: &self.shared,
254                    offset,
255                    len,
256                    record_size,
257                    committed: false,
258                });
259            }
260            // CAS failed — pause before retry.
261            core::hint::spin_loop();
262        }
263    }
264
265    /// Returns the capacity of the buffer.
266    #[inline]
267    pub fn capacity(&self) -> usize {
268        self.shared.capacity
269    }
270
271    /// Best-effort hint: returns `true` if the consumer has likely been dropped.
272    ///
273    /// Uses `Arc::strong_count` which is inherently racy. For reliable
274    /// disconnection detection, use the channel layer (`channel::mpsc`).
275    #[inline]
276    pub fn is_disconnected(&self) -> bool {
277        Arc::strong_count(&self.shared) == 1
278    }
279}
280
281impl std::fmt::Debug for Producer {
282    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283        f.debug_struct("Producer")
284            .field("capacity", &self.capacity())
285            .finish_non_exhaustive()
286    }
287}
288
289// ============================================================================
290// WriteClaim
291// ============================================================================
292
293/// A claimed region for writing a record.
294///
295/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
296/// when done writing to publish the record. If dropped without committing, a skip
297/// marker is written so the consumer can advance past the dead region.
298///
299/// # Important
300///
301/// Leaking a `WriteClaim` via [`mem::forget`](std::mem::forget) will permanently
302/// block the consumer at this record's offset. This is not undefined behavior
303/// but causes an unrecoverable deadlock. Always drop or explicitly abort claims.
304pub struct WriteClaim<'a> {
305    shared: &'a Shared,
306    offset: usize,
307    len: usize,
308    record_size: usize,
309    committed: bool,
310}
311
312impl WriteClaim<'_> {
313    /// Commits the record, making it visible to the consumer.
314    #[inline]
315    pub fn commit(mut self) {
316        self.do_commit();
317        self.committed = true;
318    }
319
320    #[inline]
321    fn do_commit(&mut self) {
322        let buffer = self.shared.buffer;
323        // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
324        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
325
326        // Release fence: ensures payload writes are visible before len store
327        fence(Ordering::Release);
328        // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
329        unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
330    }
331
332    /// Returns the length of the payload region.
333    #[inline]
334    pub fn len(&self) -> usize {
335        self.len
336    }
337
338    /// Returns `true` if the payload is empty (always false, len must be > 0).
339    #[inline]
340    pub fn is_empty(&self) -> bool {
341        false
342    }
343}
344
345impl Deref for WriteClaim<'_> {
346    type Target = [u8];
347
348    #[inline]
349    fn deref(&self) -> &Self::Target {
350        let buffer = self.shared.buffer;
351        // SAFETY: offset + HEADER_SIZE is within the buffer. CAS atomically
352        // advances tail past this region before the pointer is constructed,
353        // guaranteeing disjoint regions — no two WriteClaims overlap in the buffer.
354        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
355        // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
356        // and exclusively owned by this claim (disjoint CAS regions).
357        // Lifetime tied to &self.
358        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
359    }
360}
361
362impl DerefMut for WriteClaim<'_> {
363    #[inline]
364    fn deref_mut(&mut self) -> &mut Self::Target {
365        let buffer = self.shared.buffer;
366        // SAFETY: offset + HEADER_SIZE is within the buffer. CAS atomically
367        // advances tail past this region before the pointer is constructed,
368        // guaranteeing disjoint regions — no two WriteClaims overlap in the buffer.
369        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
370        // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
371        // and exclusively owned by this claim (disjoint CAS regions).
372        // Lifetime tied to &mut self.
373        unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
374    }
375}
376
377impl Drop for WriteClaim<'_> {
378    fn drop(&mut self) {
379        if !self.committed {
380            // Write skip marker so consumer can advance past this region
381            let buffer = self.shared.buffer;
382            // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
383            let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
384            let skip_len = self.record_size | SKIP_BIT;
385
386            fence(Ordering::Release);
387            // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
388            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
389        }
390    }
391}
392
393// ============================================================================
394// Consumer
395// ============================================================================
396
397/// Consumer endpoint of the MPSC ring buffer.
398///
399/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
400/// This type is NOT `Clone` - only one consumer is allowed.
401pub struct Consumer {
402    /// Local head position (free-running).
403    head: Cell<usize>,
404    /// Shared state.
405    shared: Arc<Shared>,
406}
407
408// Safety: Consumer is only used from one thread.
409unsafe impl Send for Consumer {}
410
411impl Consumer {
412    /// Attempts to claim the next record for reading.
413    ///
414    /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
415    /// to `&[u8]` for the payload. When dropped, the record region is zeroed
416    /// and the head is advanced.
417    ///
418    /// Returns `None` if no committed record is available.
419    #[inline]
420    pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
421        let buffer = self.shared.buffer;
422
423        loop {
424            let offset = self.head.get() & self.shared.mask;
425            // SAFETY: offset is masked to [0, capacity), always 8-byte aligned
426            // (head advances by align8'd record sizes). Buffer is valid.
427            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
428
429            // Relaxed atomic load, then Acquire fence for payload visibility
430            // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
431            let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
432            fence(Ordering::Acquire);
433
434            if len_raw == 0 {
435                // Not committed yet
436                return None;
437            }
438
439            if len_raw & SKIP_BIT != 0 {
440                // Skip marker: zero the region and advance
441                let skip_size = len_raw & LEN_MASK;
442                // Zero payload first, then stamp last (mirrors write path)
443                if skip_size > HEADER_SIZE {
444                    // SAFETY: offset + HEADER_SIZE .. offset + skip_size is within
445                    // the buffer. Consumer has exclusive read access to this region.
446                    unsafe {
447                        ptr::write_bytes(
448                            buffer.add(offset + HEADER_SIZE),
449                            0,
450                            skip_size - HEADER_SIZE,
451                        );
452                    }
453                }
454                // Ensure payload zeroing completes before clearing stamp
455                fence(Ordering::Release);
456                // SAFETY: len_ptr is still valid, computed above.
457                unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
458
459                self.head.set(self.head.get().wrapping_add(skip_size));
460
461                // Ensure stamp clear completes before head advance
462                fence(Ordering::Release);
463                self.shared.head.store(self.head.get(), Ordering::Relaxed);
464
465                // Continue to check next position
466                continue;
467            }
468
469            // Valid record
470            let len = len_raw;
471            let record_size = align8(HEADER_SIZE + len);
472
473            return Some(ReadClaim {
474                consumer: self,
475                offset,
476                len,
477                record_size,
478            });
479        }
480    }
481
482    /// Returns the capacity of the buffer.
483    #[inline]
484    pub fn capacity(&self) -> usize {
485        self.shared.capacity
486    }
487
488    /// Best-effort hint: returns `true` if all producers have likely been dropped.
489    ///
490    /// See producer's [`is_disconnected`](Producer::is_disconnected) for caveats.
491    #[inline]
492    pub fn is_disconnected(&self) -> bool {
493        Arc::strong_count(&self.shared) == 1
494    }
495}
496
497impl std::fmt::Debug for Consumer {
498    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
499        f.debug_struct("Consumer")
500            .field("capacity", &self.capacity())
501            .finish_non_exhaustive()
502    }
503}
504
505// ============================================================================
506// ReadClaim
507// ============================================================================
508
509/// A claimed record for reading.
510///
511/// Dereferences to `&[u8]` for the payload. When dropped, the record region
512/// is zeroed and the head is advanced, freeing space for producers.
513pub struct ReadClaim<'a> {
514    consumer: &'a mut Consumer,
515    offset: usize,
516    len: usize,
517    record_size: usize,
518}
519
520impl ReadClaim<'_> {
521    /// Returns the length of the payload.
522    #[inline]
523    pub fn len(&self) -> usize {
524        self.len
525    }
526
527    /// Returns `true` if the payload is empty.
528    #[inline]
529    pub fn is_empty(&self) -> bool {
530        self.len == 0
531    }
532}
533
534impl Deref for ReadClaim<'_> {
535    type Target = [u8];
536
537    #[inline]
538    fn deref(&self) -> &Self::Target {
539        let buffer = self.consumer.shared.buffer;
540        // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
541        // exclusive read access via &mut Consumer borrow.
542        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
543        // SAFETY: payload_ptr is valid for self.len bytes. The producer has
544        // finished writing (len was non-zero, preceded by Release fence).
545        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
546    }
547}
548
549impl Drop for ReadClaim<'_> {
550    fn drop(&mut self) {
551        let buffer = self.consumer.shared.buffer;
552
553        // Zero payload first, then stamp last (mirrors write path)
554        if self.record_size > HEADER_SIZE {
555            // SAFETY: offset + HEADER_SIZE .. offset + record_size is within
556            // the buffer. Consumer owns this region exclusively.
557            unsafe {
558                ptr::write_bytes(
559                    buffer.add(self.offset + HEADER_SIZE),
560                    0,
561                    self.record_size - HEADER_SIZE,
562                );
563            }
564        }
565        // Ensure payload zeroing completes before clearing stamp
566        fence(Ordering::Release);
567        // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
568        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
569        // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
570        unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
571
572        // Advance head
573        let new_head = self.consumer.head.get().wrapping_add(self.record_size);
574        self.consumer.head.set(new_head);
575
576        // Ensure stamp clear completes before head advance
577        fence(Ordering::Release);
578        self.consumer.shared.head.store(new_head, Ordering::Relaxed);
579    }
580}
581
582// ============================================================================
583// Tests
584// ============================================================================
585
586#[cfg(test)]
587mod tests {
588    use super::*;
589
590    #[test]
591    fn basic_write_read() {
592        let (mut prod, mut cons) = new(1024);
593
594        let payload = b"hello world";
595        let mut claim = prod.try_claim(payload.len()).unwrap();
596        claim.copy_from_slice(payload);
597        claim.commit();
598
599        let record = cons.try_claim().unwrap();
600        assert_eq!(&*record, payload);
601    }
602
603    #[test]
604    fn empty_returns_none() {
605        let (_, mut cons) = new(1024);
606        assert!(cons.try_claim().is_none());
607    }
608
609    #[test]
610    fn multiple_records() {
611        let (mut prod, mut cons) = new(1024);
612
613        for i in 0..10 {
614            let payload = format!("message {}", i);
615            let mut claim = prod.try_claim(payload.len()).unwrap();
616            claim.copy_from_slice(payload.as_bytes());
617            claim.commit();
618        }
619
620        for i in 0..10 {
621            let record = cons.try_claim().unwrap();
622            let expected = format!("message {}", i);
623            assert_eq!(&*record, expected.as_bytes());
624        }
625
626        assert!(cons.try_claim().is_none());
627    }
628
629    #[test]
630    #[allow(clippy::redundant_clone)]
631    fn producer_is_clone() {
632        let (prod, _cons) = new(1024);
633        let _prod2 = prod.clone();
634    }
635
636    #[test]
637    fn multiple_producers_single_consumer() {
638        use std::thread;
639
640        const PRODUCERS: usize = 4;
641        const MESSAGES_PER_PRODUCER: u64 = 10_000;
642        const TOTAL: u64 = PRODUCERS as u64 * MESSAGES_PER_PRODUCER;
643
644        let (prod, mut cons) = new(64 * 1024);
645
646        let handles: Vec<_> = (0..PRODUCERS)
647            .map(|producer_id| {
648                let mut prod = prod.clone();
649                thread::spawn(move || {
650                    for i in 0..MESSAGES_PER_PRODUCER {
651                        // Encode producer_id and sequence in payload
652                        let mut payload = [0u8; 16];
653                        payload[..8].copy_from_slice(&(producer_id as u64).to_le_bytes());
654                        payload[8..].copy_from_slice(&i.to_le_bytes());
655
656                        loop {
657                            match prod.try_claim(16) {
658                                Ok(mut claim) => {
659                                    claim.copy_from_slice(&payload);
660                                    claim.commit();
661                                    break;
662                                }
663                                Err(_) => std::hint::spin_loop(),
664                            }
665                        }
666                    }
667                })
668            })
669            .collect();
670
671        // Drop original producer
672        drop(prod);
673
674        // Consumer: track per-producer sequence
675        let consumer = thread::spawn(move || {
676            let mut received = 0u64;
677            let mut per_producer = vec![0u64; PRODUCERS];
678
679            while received < TOTAL {
680                if let Some(record) = cons.try_claim() {
681                    let producer_id = u64::from_le_bytes(record[..8].try_into().unwrap()) as usize;
682                    let seq = u64::from_le_bytes(record[8..].try_into().unwrap());
683
684                    // Each producer's messages should arrive in order
685                    assert_eq!(
686                        seq, per_producer[producer_id],
687                        "producer {} out of order",
688                        producer_id
689                    );
690                    per_producer[producer_id] += 1;
691                    received += 1;
692                } else {
693                    std::hint::spin_loop();
694                }
695            }
696
697            per_producer
698        });
699
700        for h in handles {
701            h.join().unwrap();
702        }
703
704        let per_producer = consumer.join().unwrap();
705        for (i, &count) in per_producer.iter().enumerate() {
706            assert_eq!(count, MESSAGES_PER_PRODUCER, "producer {} count", i);
707        }
708    }
709
710    #[test]
711    fn aborted_claim_creates_skip() {
712        let (mut prod, mut cons) = new(1024);
713
714        // Claim and drop without committing
715        {
716            let mut claim = prod.try_claim(10).unwrap();
717            claim.copy_from_slice(b"0123456789");
718            // drop without commit
719        }
720
721        // Write another record
722        {
723            let mut claim = prod.try_claim(5).unwrap();
724            claim.copy_from_slice(b"hello");
725            claim.commit();
726        }
727
728        // Consumer should skip the aborted record and read the committed one
729        let record = cons.try_claim().unwrap();
730        assert_eq!(&*record, b"hello");
731    }
732
733    #[test]
734    fn wrap_around() {
735        let (mut prod, mut cons) = new(64);
736
737        // Fill with messages that will cause wrap-around
738        for i in 0..20 {
739            let payload = format!("msg{:02}", i);
740            loop {
741                match prod.try_claim(payload.len()) {
742                    Ok(mut claim) => {
743                        claim.copy_from_slice(payload.as_bytes());
744                        claim.commit();
745                        break;
746                    }
747                    Err(_) => {
748                        // Drain some
749                        while cons.try_claim().is_some() {}
750                    }
751                }
752            }
753        }
754    }
755
756    #[test]
757    fn full_returns_error() {
758        let (mut prod, _cons) = new(64);
759
760        // Fill the buffer
761        let mut count = 0;
762        while let Ok(mut claim) = prod.try_claim(8) {
763            claim.copy_from_slice(b"12345678");
764            claim.commit();
765            count += 1;
766        }
767
768        assert!(count > 0);
769        assert!(prod.try_claim(8).is_err());
770    }
771
772    #[test]
773    fn disconnection_detection() {
774        let (prod, cons) = new(1024);
775
776        assert!(!prod.is_disconnected());
777        assert!(!cons.is_disconnected());
778
779        drop(cons);
780        assert!(prod.is_disconnected());
781    }
782
783    #[test]
784    #[should_panic(expected = "capacity must be at least 16")]
785    fn tiny_capacity_panics() {
786        let _ = new(8);
787    }
788
789    #[test]
790    fn zero_len_returns_error() {
791        let (mut prod, _) = new(1024);
792        assert!(matches!(prod.try_claim(0), Err(TryClaimError::ZeroLength)));
793    }
794
795    #[test]
796    fn capacity_rounds_to_power_of_two() {
797        let (prod, _) = new(100);
798        assert_eq!(prod.capacity(), 128);
799
800        let (prod, _) = new(1000);
801        assert_eq!(prod.capacity(), 1024);
802    }
803
804    /// High-volume stress test with multiple producers.
805    #[test]
806    fn stress_multiple_producers() {
807        use std::thread;
808
809        const PRODUCERS: usize = 4;
810        const COUNT_PER_PRODUCER: u64 = 100_000;
811        const TOTAL: u64 = PRODUCERS as u64 * COUNT_PER_PRODUCER;
812        const BUFFER_SIZE: usize = 64 * 1024;
813
814        let (prod, mut cons) = new(BUFFER_SIZE);
815
816        let handles: Vec<_> = (0..PRODUCERS)
817            .map(|_| {
818                let mut prod = prod.clone();
819                thread::spawn(move || {
820                    for i in 0..COUNT_PER_PRODUCER {
821                        let payload = i.to_le_bytes();
822                        loop {
823                            match prod.try_claim(payload.len()) {
824                                Ok(mut claim) => {
825                                    claim.copy_from_slice(&payload);
826                                    claim.commit();
827                                    break;
828                                }
829                                Err(_) => std::hint::spin_loop(),
830                            }
831                        }
832                    }
833                })
834            })
835            .collect();
836
837        drop(prod);
838
839        let consumer = thread::spawn(move || {
840            let mut received = 0u64;
841            let mut sum = 0u64;
842            while received < TOTAL {
843                if let Some(record) = cons.try_claim() {
844                    let value = u64::from_le_bytes((*record).try_into().unwrap());
845                    sum = sum.wrapping_add(value);
846                    received += 1;
847                } else {
848                    std::hint::spin_loop();
849                }
850            }
851            (received, sum)
852        });
853
854        for h in handles {
855            h.join().unwrap();
856        }
857
858        let (received, sum) = consumer.join().unwrap();
859        assert_eq!(received, TOTAL);
860
861        // Each producer sends 0..COUNT_PER_PRODUCER
862        // Sum per producer = COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2
863        let expected_sum = PRODUCERS as u64 * COUNT_PER_PRODUCER * (COUNT_PER_PRODUCER - 1) / 2;
864        assert_eq!(sum, expected_sum);
865    }
866}