Skip to main content

nexus_logbuf/queue/
spsc.rs

1//! Single-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared:                                                                 │
8//! │   head: CachePadded<AtomicUsize>  ← Consumer writes, producer reads     │
9//! │   buffer: *mut u8                                                       │
10//! │   capacity: usize                 (power of 2)                          │
11//! │   mask: usize                     (capacity - 1)                        │
12//! └─────────────────────────────────────────────────────────────────────────┘
13//!
14//! ┌─────────────────────────────────┐   ┌─────────────────────────────────┐
15//! │ Producer:                       │   │ Consumer:                       │
16//! │   tail: usize        (local)    │   │   head: usize        (local)    │
17//! │   cached_head: usize (local)    │   │                                 │
18//! └─────────────────────────────────┘   └─────────────────────────────────┘
19//! ```
20//!
21//! # Record Layout
22//!
23//! ```text
24//! ┌──────────────────────────────────────────────┐
25//! │ len: usize              (8 bytes on 64-bit)   │ ← payload length / commit marker
26//! ├──────────────────────────────────────────────┤
27//! │ payload: [u8; len]      (variable)           │ ← raw bytes
28//! ├──────────────────────────────────────────────┤
29//! │ padding: [u8; ...]      (0-7 bytes)          │ ← align to 8-byte boundary
30//! └──────────────────────────────────────────────┘
31//! ```
32//!
33//! Records are packed contiguously. Total record size is
34//! `align8(size_of::<usize>() + len)`. Using `usize` for the header ensures
35//! the payload starts at a word-aligned offset.
36//!
37//! # Len Field Encoding
38//!
39//! - `len == 0`: Not committed, consumer waits
40//! - `len > 0, high bit clear`: Committed record, payload is `len` bytes
41//! - `len high bit set`: Skip marker, advance by `len & LEN_MASK` bytes
42
43use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
44use std::cell::Cell;
45use std::ops::{Deref, DerefMut};
46use std::ptr;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicUsize, Ordering, fence};
49
50use crossbeam_utils::CachePadded;
51
52use crate::{LEN_MASK, SKIP_BIT, TryClaimError, align8};
53
54/// Header size in bytes — one system word (`usize`).
55///
56/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
57const HEADER_SIZE: usize = std::mem::size_of::<usize>();
58
59/// Creates a bounded SPSC byte ring buffer.
60///
61/// Capacity is rounded up to the next power of two.
62///
63/// # Panics
64///
65/// Panics if `capacity` is zero or less than 16 bytes.
66pub fn new(capacity: usize) -> (Producer, Consumer) {
67    assert!(capacity >= 16, "capacity must be at least 16 bytes");
68
69    let capacity = capacity.next_power_of_two();
70    let mask = capacity - 1;
71
72    // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
73    let layout = Layout::from_size_align(capacity, 8)
74        .expect("valid layout: capacity is a power of two >= 16, align is 8");
75    // SAFETY: Layout is valid — capacity >= 16 (power of two), align is 8.
76    let buffer_ptr = unsafe { alloc_zeroed(layout) };
77    if buffer_ptr.is_null() {
78        handle_alloc_error(layout);
79    }
80
81    let shared = Arc::new(Shared {
82        head: CachePadded::new(AtomicUsize::new(0)),
83        buffer: buffer_ptr,
84        capacity,
85        mask,
86    });
87
88    (
89        Producer {
90            tail: Cell::new(0),
91            cached_head: Cell::new(0),
92            shared: Arc::clone(&shared),
93        },
94        Consumer {
95            head: Cell::new(0),
96            shared,
97        },
98    )
99}
100
101struct Shared {
102    /// Consumer's read position. Updated by consumer, read by producer.
103    head: CachePadded<AtomicUsize>,
104    /// Buffer pointer.
105    buffer: *mut u8,
106    /// Buffer capacity (power of 2).
107    capacity: usize,
108    /// Mask for wrapping (capacity - 1).
109    mask: usize,
110}
111
112// Safety: Buffer is only accessed by one producer and one consumer.
113// The atomic head provides synchronization.
114unsafe impl Send for Shared {}
115unsafe impl Sync for Shared {}
116
117impl Drop for Shared {
118    fn drop(&mut self) {
119        let layout = Layout::from_size_align(self.capacity, 8)
120            .expect("valid layout: capacity was validated at construction");
121        // SAFETY: buffer was allocated with alloc_zeroed using this exact layout.
122        // Shared is only dropped once (Arc prevents earlier drops).
123        unsafe { dealloc(self.buffer, layout) };
124    }
125}
126
127// ============================================================================
128// Producer
129// ============================================================================
130
131/// Producer endpoint of the SPSC ring buffer.
132///
133/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
134pub struct Producer {
135    /// Local tail position (free-running).
136    tail: Cell<usize>,
137    /// Cached head position (Rigtorp optimization).
138    cached_head: Cell<usize>,
139    /// Shared state.
140    shared: Arc<Shared>,
141}
142
143// Safety: Producer is only used from one thread.
144unsafe impl Send for Producer {}
145
146impl Producer {
147    /// Attempts to claim space for a record with the given payload length.
148    ///
149    /// Returns a [`WriteClaim`] that can be written to and then committed.
150    ///
151    /// # Errors
152    ///
153    /// - [`TryClaimError::ZeroLength`] if `len` is zero
154    /// - [`TryClaimError::Full`] if the buffer is full
155    ///
156    /// # Safety Contract
157    ///
158    /// `len` must not exceed `LEN_MASK`. On 64-bit this is ~9.2 exabytes
159    /// (unreachable in practice). On 32-bit, records >2GB could set
160    /// `SKIP_BIT` and corrupt the stream — enforced with `assert!`.
161    /// This is checked with
162    /// `debug_assert!` only.
163    #[inline]
164    pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, TryClaimError> {
165        #[cfg(target_pointer_width = "32")]
166        assert!(len <= LEN_MASK, "payload too large for 32-bit logbuf");
167        #[cfg(not(target_pointer_width = "32"))]
168        debug_assert!(len <= LEN_MASK, "payload too large");
169        if len == 0 {
170            return Err(TryClaimError::ZeroLength);
171        }
172
173        let record_size = align8(HEADER_SIZE + len);
174
175        // Check if we have space
176        let tail = self.tail.get();
177        let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
178
179        if available < record_size {
180            // Reload head from shared state
181            self.cached_head
182                .set(self.shared.head.load(Ordering::Relaxed));
183            fence(Ordering::Acquire);
184
185            let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
186            if available < record_size {
187                return Err(TryClaimError::Full);
188            }
189        }
190
191        // Check if record fits before buffer end, or needs wrap
192        let offset = tail & self.shared.mask;
193        let space_to_end = self.shared.capacity - offset;
194
195        if space_to_end < record_size {
196            // Need to wrap. First check if we have space for padding + record at start.
197            let total_needed = space_to_end + record_size;
198            let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
199
200            if available < total_needed {
201                // Reload and recheck
202                self.cached_head
203                    .set(self.shared.head.load(Ordering::Relaxed));
204                fence(Ordering::Acquire);
205
206                let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
207                if available < total_needed {
208                    return Err(TryClaimError::Full);
209                }
210            }
211
212            // Write padding skip marker
213            let buffer = self.shared.buffer;
214            let skip_len = space_to_end | SKIP_BIT;
215            fence(Ordering::Release);
216            // SAFETY: offset is masked to [0, capacity) and 8-byte aligned.
217            // Buffer is valid for capacity bytes. We are the sole producer.
218            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
219            // SAFETY: len_ptr points to a valid, aligned, zero-initialized usize
220            // within the buffer. AtomicUsize reference is used for store visibility.
221            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
222
223            // Advance tail past padding
224            self.tail.set(tail.wrapping_add(space_to_end));
225            let new_offset = 0;
226
227            Ok(WriteClaim {
228                producer: self,
229                offset: new_offset,
230                len,
231                record_size,
232                committed: false,
233            })
234        } else {
235            // Fits without wrapping
236            Ok(WriteClaim {
237                producer: self,
238                offset,
239                len,
240                record_size,
241                committed: false,
242            })
243        }
244    }
245
246    /// Returns the capacity of the buffer.
247    #[inline]
248    pub fn capacity(&self) -> usize {
249        self.shared.capacity
250    }
251
252    /// Best-effort hint: returns `true` if the consumer has likely been dropped.
253    ///
254    /// Uses `Arc::strong_count` which is inherently racy — the count can
255    /// change between the check and the caller's next action. Suitable for
256    /// graceful shutdown detection, not for correctness. For reliable
257    /// disconnection detection, use the channel layer (`channel::spsc`)
258    /// which tracks disconnection via dedicated atomic flags.
259    // Decision: No AtomicBool flag at the raw queue level. The channel
260    // layer (channel::spsc) provides reliable detection via dedicated
261    // flags. Adding one here would cost an atomic on every push/pop
262    // for a feature only the channel layer needs.
263    #[inline]
264    pub fn is_disconnected(&self) -> bool {
265        Arc::strong_count(&self.shared) == 1
266    }
267}
268
269impl std::fmt::Debug for Producer {
270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271        f.debug_struct("Producer")
272            .field("capacity", &self.capacity())
273            .finish_non_exhaustive()
274    }
275}
276
277// ============================================================================
278// WriteClaim
279// ============================================================================
280
281/// A claimed region for writing a record.
282///
283/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
284/// when done writing to publish the record. If dropped without committing, a skip
285/// marker is written so the consumer can advance past the dead region.
286///
287/// # Important
288///
289/// Leaking a `WriteClaim` via [`mem::forget`](std::mem::forget) will permanently
290/// block the consumer at this record's offset. This is not undefined behavior
291/// but causes an unrecoverable deadlock. Always drop or explicitly abort claims.
292pub struct WriteClaim<'a> {
293    producer: &'a mut Producer,
294    offset: usize,
295    len: usize,
296    record_size: usize,
297    committed: bool,
298}
299
300impl WriteClaim<'_> {
301    /// Commits the record, making it visible to the consumer.
302    #[inline]
303    pub fn commit(mut self) {
304        self.do_commit();
305        self.committed = true;
306    }
307
308    #[inline]
309    fn do_commit(&mut self) {
310        let buffer = self.producer.shared.buffer;
311        // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
312        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
313
314        // Release fence: ensures payload writes are visible before len store
315        fence(Ordering::Release);
316        // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
317        unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
318
319        // Advance tail
320        self.producer
321            .tail
322            .set(self.producer.tail.get().wrapping_add(self.record_size));
323    }
324
325    /// Returns the length of the payload region.
326    #[inline]
327    pub fn len(&self) -> usize {
328        self.len
329    }
330
331    /// Returns `true` if the payload is empty (always false, len must be > 0).
332    #[inline]
333    pub fn is_empty(&self) -> bool {
334        false
335    }
336}
337
338impl Deref for WriteClaim<'_> {
339    type Target = [u8];
340
341    #[inline]
342    fn deref(&self) -> &Self::Target {
343        let buffer = self.producer.shared.buffer;
344        // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
345        // exclusive access to this region via &mut Producer borrow.
346        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
347        // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
348        // and exclusively owned by this claim. Lifetime tied to &self.
349        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
350    }
351}
352
353impl DerefMut for WriteClaim<'_> {
354    #[inline]
355    fn deref_mut(&mut self) -> &mut Self::Target {
356        let buffer = self.producer.shared.buffer;
357        // SAFETY: offset + HEADER_SIZE is within the buffer. Exclusive access
358        // guaranteed by &mut self (only one WriteClaim exists per try_claim).
359        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
360        // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
361        // and exclusively owned by this claim. Lifetime tied to &mut self.
362        unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
363    }
364}
365
366impl Drop for WriteClaim<'_> {
367    fn drop(&mut self) {
368        if !self.committed {
369            // Write skip marker so consumer can advance past this region
370            let buffer = self.producer.shared.buffer;
371            // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
372            let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
373            let skip_len = self.record_size | SKIP_BIT;
374
375            fence(Ordering::Release);
376            // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
377            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
378
379            // Advance tail past the dead region
380            self.producer
381                .tail
382                .set(self.producer.tail.get().wrapping_add(self.record_size));
383        }
384    }
385}
386
387// ============================================================================
388// Consumer
389// ============================================================================
390
391/// Consumer endpoint of the SPSC ring buffer.
392///
393/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
394pub struct Consumer {
395    /// Local head position (free-running).
396    head: Cell<usize>,
397    /// Shared state.
398    shared: Arc<Shared>,
399}
400
401// Safety: Consumer is only used from one thread.
402unsafe impl Send for Consumer {}
403
404impl Consumer {
405    /// Attempts to claim the next record for reading.
406    ///
407    /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
408    /// to `&[u8]` for the payload. When dropped, the record region is zeroed
409    /// and the head is advanced.
410    ///
411    /// Returns `None` if no committed record is available.
412    #[inline]
413    pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
414        let buffer = self.shared.buffer;
415
416        loop {
417            let offset = self.head.get() & self.shared.mask;
418            // SAFETY: offset is masked to [0, capacity), always 8-byte aligned
419            // (head advances by align8'd record sizes). Buffer is valid.
420            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
421
422            // Relaxed atomic load, then Acquire fence for payload visibility
423            // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
424            let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
425            fence(Ordering::Acquire);
426
427            if len_raw == 0 {
428                // Not committed yet
429                return None;
430            }
431
432            if len_raw & SKIP_BIT != 0 {
433                // Skip marker: zero the region and advance
434                let skip_size = len_raw & LEN_MASK;
435                // Zero payload first, then stamp last (mirrors write path)
436                if skip_size > HEADER_SIZE {
437                    // SAFETY: offset + HEADER_SIZE .. offset + skip_size is within
438                    // the buffer. Consumer has exclusive read access to this region.
439                    unsafe {
440                        ptr::write_bytes(
441                            buffer.add(offset + HEADER_SIZE),
442                            0,
443                            skip_size - HEADER_SIZE,
444                        );
445                    }
446                }
447                // Ensure payload zeroing completes before clearing stamp
448                fence(Ordering::Release);
449                // SAFETY: len_ptr is still valid, computed above.
450                unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
451
452                self.head.set(self.head.get().wrapping_add(skip_size));
453
454                // Ensure stamp clear completes before head advance
455                fence(Ordering::Release);
456                self.shared.head.store(self.head.get(), Ordering::Relaxed);
457
458                // Continue to check next position
459                continue;
460            }
461
462            // Valid record
463            let len = len_raw;
464            let record_size = align8(HEADER_SIZE + len);
465
466            return Some(ReadClaim {
467                consumer: self,
468                offset,
469                len,
470                record_size,
471            });
472        }
473    }
474
475    /// Returns the capacity of the buffer.
476    #[inline]
477    pub fn capacity(&self) -> usize {
478        self.shared.capacity
479    }
480
481    /// Best-effort hint: returns `true` if the producer has likely been dropped.
482    ///
483    /// See [`Producer::is_disconnected`] for caveats — uses `Arc::strong_count`.
484    #[inline]
485    pub fn is_disconnected(&self) -> bool {
486        Arc::strong_count(&self.shared) == 1
487    }
488}
489
490impl std::fmt::Debug for Consumer {
491    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492        f.debug_struct("Consumer")
493            .field("capacity", &self.capacity())
494            .finish_non_exhaustive()
495    }
496}
497
498// ============================================================================
499// ReadClaim
500// ============================================================================
501
502/// A claimed record for reading.
503///
504/// Dereferences to `&[u8]` for the payload. When dropped, the record region
505/// is zeroed and the head is advanced, freeing space for the producer.
506pub struct ReadClaim<'a> {
507    consumer: &'a mut Consumer,
508    offset: usize,
509    len: usize,
510    record_size: usize,
511}
512
513impl ReadClaim<'_> {
514    /// Returns the length of the payload.
515    #[inline]
516    pub fn len(&self) -> usize {
517        self.len
518    }
519
520    /// Returns `true` if the payload is empty.
521    #[inline]
522    pub fn is_empty(&self) -> bool {
523        self.len == 0
524    }
525}
526
527impl Deref for ReadClaim<'_> {
528    type Target = [u8];
529
530    #[inline]
531    fn deref(&self) -> &Self::Target {
532        let buffer = self.consumer.shared.buffer;
533        // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
534        // exclusive read access via &mut Consumer borrow.
535        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
536        // SAFETY: payload_ptr is valid for self.len bytes. The producer has
537        // finished writing (len was non-zero, preceded by Release fence).
538        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
539    }
540}
541
542impl Drop for ReadClaim<'_> {
543    fn drop(&mut self) {
544        let buffer = self.consumer.shared.buffer;
545
546        // Zero payload first, then stamp last (mirrors write path)
547        if self.record_size > HEADER_SIZE {
548            // SAFETY: offset + HEADER_SIZE .. offset + record_size is within
549            // the buffer. Consumer owns this region exclusively.
550            unsafe {
551                ptr::write_bytes(
552                    buffer.add(self.offset + HEADER_SIZE),
553                    0,
554                    self.record_size - HEADER_SIZE,
555                );
556            }
557        }
558        // Ensure payload zeroing completes before clearing stamp
559        fence(Ordering::Release);
560        // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
561        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
562        // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
563        unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
564
565        // Advance head
566        let new_head = self.consumer.head.get().wrapping_add(self.record_size);
567        self.consumer.head.set(new_head);
568
569        // Ensure stamp clear completes before head advance
570        fence(Ordering::Release);
571        self.consumer.shared.head.store(new_head, Ordering::Relaxed);
572    }
573}
574
575// ============================================================================
576// Tests
577// ============================================================================
578
579#[cfg(test)]
580mod tests {
581    use super::*;
582
583    #[test]
584    fn basic_write_read() {
585        let (mut prod, mut cons) = new(1024);
586
587        let payload = b"hello world";
588        let mut claim = prod.try_claim(payload.len()).unwrap();
589        claim.copy_from_slice(payload);
590        claim.commit();
591
592        let record = cons.try_claim().unwrap();
593        assert_eq!(&*record, payload);
594    }
595
596    #[test]
597    fn empty_returns_none() {
598        let (_, mut cons) = new(1024);
599        assert!(cons.try_claim().is_none());
600    }
601
602    #[test]
603    fn multiple_records() {
604        let (mut prod, mut cons) = new(1024);
605
606        for i in 0..10 {
607            let payload = format!("message {}", i);
608            let mut claim = prod.try_claim(payload.len()).unwrap();
609            claim.copy_from_slice(payload.as_bytes());
610            claim.commit();
611        }
612
613        for i in 0..10 {
614            let record = cons.try_claim().unwrap();
615            let expected = format!("message {}", i);
616            assert_eq!(&*record, expected.as_bytes());
617        }
618
619        assert!(cons.try_claim().is_none());
620    }
621
622    #[test]
623    fn aborted_claim_creates_skip() {
624        let (mut prod, mut cons) = new(1024);
625
626        // Claim and drop without committing
627        {
628            let mut claim = prod.try_claim(10).unwrap();
629            claim.copy_from_slice(b"0123456789");
630            // drop without commit
631        }
632
633        // Write another record
634        {
635            let mut claim = prod.try_claim(5).unwrap();
636            claim.copy_from_slice(b"hello");
637            claim.commit();
638        }
639
640        // Consumer should skip the aborted record and read the committed one
641        let record = cons.try_claim().unwrap();
642        assert_eq!(&*record, b"hello");
643    }
644
645    #[test]
646    fn wrap_around() {
647        let (mut prod, mut cons) = new(64);
648
649        // Fill with messages that will cause wrap-around
650        for i in 0..20 {
651            let payload = format!("msg{:02}", i);
652            loop {
653                match prod.try_claim(payload.len()) {
654                    Ok(mut claim) => {
655                        claim.copy_from_slice(payload.as_bytes());
656                        claim.commit();
657                        break;
658                    }
659                    Err(_) => {
660                        // Drain some
661                        while cons.try_claim().is_some() {}
662                    }
663                }
664            }
665        }
666    }
667
668    #[test]
669    fn full_returns_error() {
670        let (mut prod, _cons) = new(64);
671
672        // Fill the buffer
673        let mut count = 0;
674        while let Ok(mut claim) = prod.try_claim(8) {
675            claim.copy_from_slice(b"12345678");
676            claim.commit();
677            count += 1;
678        }
679
680        assert!(count > 0);
681        assert!(prod.try_claim(8).is_err());
682    }
683
684    #[test]
685    fn cross_thread() {
686        use std::thread;
687
688        let (mut prod, mut cons) = new(4096);
689
690        let producer = thread::spawn(move || {
691            for i in 0..10_000u64 {
692                let payload = i.to_le_bytes();
693                loop {
694                    match prod.try_claim(payload.len()) {
695                        Ok(mut claim) => {
696                            claim.copy_from_slice(&payload);
697                            claim.commit();
698                            break;
699                        }
700                        Err(_) => std::hint::spin_loop(),
701                    }
702                }
703            }
704        });
705
706        let consumer = thread::spawn(move || {
707            let mut received = 0u64;
708            while received < 10_000 {
709                if let Some(record) = cons.try_claim() {
710                    let value = u64::from_le_bytes((*record).try_into().unwrap());
711                    assert_eq!(value, received);
712                    received += 1;
713                } else {
714                    std::hint::spin_loop();
715                }
716            }
717        });
718
719        producer.join().unwrap();
720        consumer.join().unwrap();
721    }
722
723    #[test]
724    fn disconnection_detection() {
725        let (prod, cons) = new(1024);
726
727        assert!(!prod.is_disconnected());
728        assert!(!cons.is_disconnected());
729
730        drop(cons);
731        assert!(prod.is_disconnected());
732    }
733
734    #[test]
735    #[should_panic(expected = "capacity must be at least 16")]
736    fn tiny_capacity_panics() {
737        let _ = new(8);
738    }
739
740    #[test]
741    fn zero_len_returns_error() {
742        let (mut prod, _) = new(1024);
743        assert!(matches!(prod.try_claim(0), Err(TryClaimError::ZeroLength)));
744    }
745
746    #[test]
747    fn capacity_rounds_to_power_of_two() {
748        let (prod, _) = new(100);
749        assert_eq!(prod.capacity(), 128);
750
751        let (prod, _) = new(1000);
752        assert_eq!(prod.capacity(), 1024);
753    }
754
755    #[test]
756    fn variable_length_records() {
757        let (mut prod, mut cons) = new(4096);
758
759        let messages = [
760            "a",
761            "hello",
762            "this is a longer message",
763            "x",
764            "medium length",
765        ];
766
767        for msg in &messages {
768            let mut claim = prod.try_claim(msg.len()).unwrap();
769            claim.copy_from_slice(msg.as_bytes());
770            claim.commit();
771        }
772
773        for msg in &messages {
774            let record = cons.try_claim().unwrap();
775            assert_eq!(&*record, msg.as_bytes());
776        }
777    }
778
779    /// High-volume stress test with variable-length messages.
780    ///
781    /// Tests correctness under sustained load with wrap-around.
782    #[test]
783    fn stress_high_volume() {
784        use std::thread;
785
786        const COUNT: u64 = 1_000_000;
787        const BUFFER_SIZE: usize = 64 * 1024; // 64KB
788
789        let (mut prod, mut cons) = new(BUFFER_SIZE);
790
791        let producer = thread::spawn(move || {
792            for i in 0..COUNT {
793                // Variable length: 8-64 bytes based on sequence
794                let len = 8 + ((i % 8) * 8) as usize;
795                let mut payload = vec![0u8; len];
796                // Write sequence number at start
797                payload[..8].copy_from_slice(&i.to_le_bytes());
798
799                loop {
800                    match prod.try_claim(len) {
801                        Ok(mut claim) => {
802                            claim.copy_from_slice(&payload);
803                            claim.commit();
804                            break;
805                        }
806                        Err(_) => std::hint::spin_loop(),
807                    }
808                }
809            }
810        });
811
812        let consumer = thread::spawn(move || {
813            let mut received = 0u64;
814            while received < COUNT {
815                if let Some(record) = cons.try_claim() {
816                    // Verify sequence number
817                    let seq = u64::from_le_bytes(record[..8].try_into().unwrap());
818                    assert_eq!(seq, received, "sequence mismatch at {}", received);
819
820                    // Verify expected length
821                    let expected_len = 8 + ((received % 8) * 8) as usize;
822                    assert_eq!(
823                        record.len(),
824                        expected_len,
825                        "length mismatch at {}",
826                        received
827                    );
828
829                    received += 1;
830                } else {
831                    std::hint::spin_loop();
832                }
833            }
834            received
835        });
836
837        producer.join().unwrap();
838        let received = consumer.join().unwrap();
839        assert_eq!(received, COUNT);
840    }
841
842    /// Stress test with maximum contention - tiny buffer, high throughput.
843    #[test]
844    fn stress_high_contention() {
845        use std::thread;
846
847        const COUNT: u64 = 100_000;
848        const BUFFER_SIZE: usize = 256; // Tiny buffer forces constant wrap-around
849
850        let (mut prod, mut cons) = new(BUFFER_SIZE);
851
852        let producer = thread::spawn(move || {
853            for i in 0..COUNT {
854                let payload = i.to_le_bytes();
855                loop {
856                    match prod.try_claim(payload.len()) {
857                        Ok(mut claim) => {
858                            claim.copy_from_slice(&payload);
859                            claim.commit();
860                            break;
861                        }
862                        Err(_) => std::hint::spin_loop(),
863                    }
864                }
865            }
866        });
867
868        let consumer = thread::spawn(move || {
869            let mut received = 0u64;
870            let mut sum = 0u64;
871            while received < COUNT {
872                if let Some(record) = cons.try_claim() {
873                    let value = u64::from_le_bytes((*record).try_into().unwrap());
874                    assert_eq!(value, received);
875                    sum = sum.wrapping_add(value);
876                    received += 1;
877                } else {
878                    std::hint::spin_loop();
879                }
880            }
881            sum
882        });
883
884        producer.join().unwrap();
885        let sum = consumer.join().unwrap();
886        // Sum of 0..COUNT = COUNT * (COUNT-1) / 2
887        let expected = COUNT * (COUNT - 1) / 2;
888        assert_eq!(sum, expected);
889    }
890
891    /// Payload pointers must be word-aligned so users can write aligned structs.
892    #[test]
893    fn payload_is_word_aligned() {
894        let (mut prod, mut cons) = new(1024);
895
896        // Test several payload sizes to cover padding edge cases
897        for len in [1, 3, 7, 8, 13, 64, 255] {
898            let mut claim = prod.try_claim(len).unwrap();
899            let ptr = claim.as_mut_ptr();
900            assert_eq!(
901                ptr as usize % std::mem::align_of::<usize>(),
902                0,
903                "WriteClaim payload not word-aligned for len={len}"
904            );
905            claim.commit();
906
907            let record = cons.try_claim().unwrap();
908            let ptr = record.as_ptr();
909            assert_eq!(
910                ptr as usize % std::mem::align_of::<usize>(),
911                0,
912                "ReadClaim payload not word-aligned for len={len}"
913            );
914        }
915    }
916}