Skip to main content

nexus_logbuf/queue/
spsc.rs

1//! Single-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared:                                                                 │
8//! │   head: CachePadded<AtomicUsize>  ← Consumer writes, producer reads     │
9//! │   buffer: *mut u8                                                       │
10//! │   capacity: usize                 (power of 2)                          │
11//! │   mask: usize                     (capacity - 1)                        │
12//! └─────────────────────────────────────────────────────────────────────────┘
13//!
14//! ┌─────────────────────────────────┐   ┌─────────────────────────────────┐
15//! │ Producer:                       │   │ Consumer:                       │
16//! │   tail: usize        (local)    │   │   head: usize        (local)    │
17//! │   cached_head: usize (local)    │   │                                 │
18//! └─────────────────────────────────┘   └─────────────────────────────────┘
19//! ```
20//!
21//! # Record Layout
22//!
23//! ```text
24//! ┌──────────────────────────────────────────────┐
25//! │ len: usize              (8 bytes on 64-bit)   │ ← payload length / commit marker
26//! ├──────────────────────────────────────────────┤
27//! │ payload: [u8; len]      (variable)           │ ← raw bytes
28//! ├──────────────────────────────────────────────┤
29//! │ padding: [u8; ...]      (0-7 bytes)          │ ← align to 8-byte boundary
30//! └──────────────────────────────────────────────┘
31//! ```
32//!
33//! Records are packed contiguously. Total record size is
34//! `align8(size_of::<usize>() + len)`. Using `usize` for the header ensures
35//! the payload starts at a word-aligned offset.
36//!
37//! # Len Field Encoding
38//!
39//! - `len == 0`: Not committed, consumer waits
40//! - `len > 0, high bit clear`: Committed record, payload is `len` bytes
41//! - `len high bit set`: Skip marker, advance by `len & LEN_MASK` bytes
42
43use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
44use std::cell::Cell;
45use std::ops::{Deref, DerefMut};
46use std::ptr;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicUsize, Ordering, fence};
49
50use crossbeam_utils::CachePadded;
51
52use crate::{BufferFull, LEN_MASK, SKIP_BIT, align8};
53
54/// Header size in bytes — one system word (`usize`).
55///
56/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
57const HEADER_SIZE: usize = std::mem::size_of::<usize>();
58
59/// Creates a bounded SPSC byte ring buffer.
60///
61/// Capacity is rounded up to the next power of two.
62///
63/// # Panics
64///
65/// Panics if `capacity` is zero or less than 16 bytes.
66pub fn new(capacity: usize) -> (Producer, Consumer) {
67    assert!(capacity >= 16, "capacity must be at least 16 bytes");
68
69    let capacity = capacity.next_power_of_two();
70    let mask = capacity - 1;
71
72    // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
73    let layout = Layout::from_size_align(capacity, 8)
74        .expect("valid layout: capacity is a power of two >= 16, align is 8");
75    // SAFETY: Layout is valid — capacity >= 16 (power of two), align is 8.
76    let buffer_ptr = unsafe { alloc_zeroed(layout) };
77    if buffer_ptr.is_null() {
78        handle_alloc_error(layout);
79    }
80
81    let shared = Arc::new(Shared {
82        head: CachePadded::new(AtomicUsize::new(0)),
83        buffer: buffer_ptr,
84        capacity,
85        mask,
86    });
87
88    (
89        Producer {
90            tail: Cell::new(0),
91            cached_head: Cell::new(0),
92            shared: Arc::clone(&shared),
93        },
94        Consumer {
95            head: Cell::new(0),
96            shared,
97        },
98    )
99}
100
101struct Shared {
102    /// Consumer's read position. Updated by consumer, read by producer.
103    head: CachePadded<AtomicUsize>,
104    /// Buffer pointer.
105    buffer: *mut u8,
106    /// Buffer capacity (power of 2).
107    capacity: usize,
108    /// Mask for wrapping (capacity - 1).
109    mask: usize,
110}
111
112// Safety: Buffer is only accessed by one producer and one consumer.
113// The atomic head provides synchronization.
114unsafe impl Send for Shared {}
115unsafe impl Sync for Shared {}
116
117impl Drop for Shared {
118    fn drop(&mut self) {
119        let layout = Layout::from_size_align(self.capacity, 8)
120            .expect("valid layout: capacity was validated at construction");
121        // SAFETY: buffer was allocated with alloc_zeroed using this exact layout.
122        // Shared is only dropped once (Arc prevents earlier drops).
123        unsafe { dealloc(self.buffer, layout) };
124    }
125}
126
127// ============================================================================
128// Producer
129// ============================================================================
130
131/// Producer endpoint of the SPSC ring buffer.
132///
133/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
134pub struct Producer {
135    /// Local tail position (free-running).
136    tail: Cell<usize>,
137    /// Cached head position (Rigtorp optimization).
138    cached_head: Cell<usize>,
139    /// Shared state.
140    shared: Arc<Shared>,
141}
142
143// Safety: Producer is only used from one thread.
144unsafe impl Send for Producer {}
145
146impl Producer {
147    /// Attempts to claim space for a record with the given payload length.
148    ///
149    /// Returns a [`WriteClaim`] that can be written to and then committed.
150    ///
151    /// # Errors
152    ///
153    /// Returns [`BufferFull`] if the buffer has no space for the record.
154    ///
155    /// # Panics
156    ///
157    /// Panics if `len == 0`. The wire format reserves `len == 0` as the
158    /// "uncommitted" sentinel — letting it through would silently hang the
159    /// consumer. Aborting a non-zero claim is fully supported (drop the
160    /// [`WriteClaim`] without committing); only claiming zero bytes upfront
161    /// is forbidden.
162    ///
163    /// # Safety Contract
164    ///
165    /// `len` must not exceed `LEN_MASK`. On 64-bit this is ~9.2 exabytes
166    /// (unreachable in practice). On 32-bit, records >2GB could set
167    /// `SKIP_BIT` and corrupt the stream — enforced with `assert!`.
168    /// On 64-bit this is checked with `debug_assert!` only.
169    #[inline]
170    pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, BufferFull> {
171        assert!(len > 0, "payload length must be non-zero");
172        #[cfg(target_pointer_width = "32")]
173        assert!(len <= LEN_MASK, "payload too large for 32-bit logbuf");
174        #[cfg(not(target_pointer_width = "32"))]
175        debug_assert!(len <= LEN_MASK, "payload too large");
176
177        let record_size = align8(HEADER_SIZE + len);
178
179        // Check if we have space
180        let tail = self.tail.get();
181        let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
182
183        if available < record_size {
184            // Reload head from shared state
185            self.cached_head
186                .set(self.shared.head.load(Ordering::Relaxed));
187            fence(Ordering::Acquire);
188
189            let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
190            if available < record_size {
191                return Err(BufferFull);
192            }
193        }
194
195        // Check if record fits before buffer end, or needs wrap
196        let offset = tail & self.shared.mask;
197        let space_to_end = self.shared.capacity - offset;
198
199        if space_to_end < record_size {
200            // Need to wrap. First check if we have space for padding + record at start.
201            let total_needed = space_to_end + record_size;
202            let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
203
204            if available < total_needed {
205                // Reload and recheck
206                self.cached_head
207                    .set(self.shared.head.load(Ordering::Relaxed));
208                fence(Ordering::Acquire);
209
210                let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
211                if available < total_needed {
212                    return Err(BufferFull);
213                }
214            }
215
216            // Write padding skip marker
217            let buffer = self.shared.buffer;
218            let skip_len = space_to_end | SKIP_BIT;
219            fence(Ordering::Release);
220            // SAFETY: offset is masked to [0, capacity) and 8-byte aligned.
221            // Buffer is valid for capacity bytes. We are the sole producer.
222            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
223            // SAFETY: len_ptr points to a valid, aligned, zero-initialized usize
224            // within the buffer. AtomicUsize reference is used for store visibility.
225            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
226
227            // Advance tail past padding
228            self.tail.set(tail.wrapping_add(space_to_end));
229            let new_offset = 0;
230
231            Ok(WriteClaim {
232                producer: self,
233                offset: new_offset,
234                len,
235                record_size,
236                committed: false,
237            })
238        } else {
239            // Fits without wrapping
240            Ok(WriteClaim {
241                producer: self,
242                offset,
243                len,
244                record_size,
245                committed: false,
246            })
247        }
248    }
249
250    /// Returns the capacity of the buffer.
251    #[inline]
252    pub fn capacity(&self) -> usize {
253        self.shared.capacity
254    }
255
256    /// Best-effort hint: returns `true` if the consumer has likely been dropped.
257    ///
258    /// Uses `Arc::strong_count` which is inherently racy — the count can
259    /// change between the check and the caller's next action. Suitable for
260    /// graceful shutdown detection, not for correctness. For reliable
261    /// disconnection detection, use the channel layer (`channel::spsc`)
262    /// which tracks disconnection via dedicated atomic flags.
263    // Decision: No AtomicBool flag at the raw queue level. The channel
264    // layer (channel::spsc) provides reliable detection via dedicated
265    // flags. Adding one here would cost an atomic on every push/pop
266    // for a feature only the channel layer needs.
267    #[inline]
268    pub fn is_disconnected(&self) -> bool {
269        Arc::strong_count(&self.shared) == 1
270    }
271}
272
273impl std::fmt::Debug for Producer {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        f.debug_struct("Producer")
276            .field("capacity", &self.capacity())
277            .finish_non_exhaustive()
278    }
279}
280
281// ============================================================================
282// WriteClaim
283// ============================================================================
284
285/// A claimed region for writing a record.
286///
287/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
288/// when done writing to publish the record. If dropped without committing, a skip
289/// marker is written so the consumer can advance past the dead region.
290///
291/// # Important
292///
293/// Leaking a `WriteClaim` via [`mem::forget`](std::mem::forget) will permanently
294/// block the consumer at this record's offset. This is not undefined behavior
295/// but causes an unrecoverable deadlock. Always drop or explicitly abort claims.
296pub struct WriteClaim<'a> {
297    producer: &'a mut Producer,
298    offset: usize,
299    len: usize,
300    record_size: usize,
301    committed: bool,
302}
303
304impl WriteClaim<'_> {
305    /// Commits the record, making it visible to the consumer.
306    #[inline]
307    pub fn commit(mut self) {
308        self.do_commit();
309        self.committed = true;
310    }
311
312    #[inline]
313    fn do_commit(&mut self) {
314        let buffer = self.producer.shared.buffer;
315        // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
316        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
317
318        // Release fence: ensures payload writes are visible before len store
319        fence(Ordering::Release);
320        // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
321        unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
322
323        // Advance tail
324        self.producer
325            .tail
326            .set(self.producer.tail.get().wrapping_add(self.record_size));
327    }
328
329    /// Returns the length of the payload region.
330    #[inline]
331    pub fn len(&self) -> usize {
332        self.len
333    }
334
335    /// Returns `true` if the payload is empty (always false, len must be > 0).
336    #[inline]
337    pub fn is_empty(&self) -> bool {
338        false
339    }
340}
341
342impl Deref for WriteClaim<'_> {
343    type Target = [u8];
344
345    #[inline]
346    fn deref(&self) -> &Self::Target {
347        let buffer = self.producer.shared.buffer;
348        // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
349        // exclusive access to this region via &mut Producer borrow.
350        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
351        // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
352        // and exclusively owned by this claim. Lifetime tied to &self.
353        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
354    }
355}
356
357impl DerefMut for WriteClaim<'_> {
358    #[inline]
359    fn deref_mut(&mut self) -> &mut Self::Target {
360        let buffer = self.producer.shared.buffer;
361        // SAFETY: offset + HEADER_SIZE is within the buffer. Exclusive access
362        // guaranteed by &mut self (only one WriteClaim exists per try_claim).
363        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
364        // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
365        // and exclusively owned by this claim. Lifetime tied to &mut self.
366        unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
367    }
368}
369
370impl Drop for WriteClaim<'_> {
371    fn drop(&mut self) {
372        if !self.committed {
373            // Write skip marker so consumer can advance past this region
374            let buffer = self.producer.shared.buffer;
375            // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
376            let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
377            let skip_len = self.record_size | SKIP_BIT;
378
379            fence(Ordering::Release);
380            // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
381            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
382
383            // Advance tail past the dead region
384            self.producer
385                .tail
386                .set(self.producer.tail.get().wrapping_add(self.record_size));
387        }
388    }
389}
390
391// ============================================================================
392// Consumer
393// ============================================================================
394
395/// Consumer endpoint of the SPSC ring buffer.
396///
397/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
398pub struct Consumer {
399    /// Local head position (free-running).
400    head: Cell<usize>,
401    /// Shared state.
402    shared: Arc<Shared>,
403}
404
405// Safety: Consumer is only used from one thread.
406unsafe impl Send for Consumer {}
407
408impl Consumer {
409    /// Attempts to claim the next record for reading.
410    ///
411    /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
412    /// to `&[u8]` for the payload. When dropped, the record region is zeroed
413    /// and the head is advanced.
414    ///
415    /// Returns `None` if no committed record is available.
416    #[inline]
417    pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
418        let buffer = self.shared.buffer;
419
420        loop {
421            let offset = self.head.get() & self.shared.mask;
422            // SAFETY: offset is masked to [0, capacity), always 8-byte aligned
423            // (head advances by align8'd record sizes). Buffer is valid.
424            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
425
426            // Relaxed atomic load, then Acquire fence for payload visibility
427            // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
428            let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
429            fence(Ordering::Acquire);
430
431            if len_raw == 0 {
432                // Not committed yet
433                return None;
434            }
435
436            if len_raw & SKIP_BIT != 0 {
437                // Skip marker: zero the region and advance
438                let skip_size = len_raw & LEN_MASK;
439                // Zero payload first, then stamp last (mirrors write path)
440                if skip_size > HEADER_SIZE {
441                    // SAFETY: offset + HEADER_SIZE .. offset + skip_size is within
442                    // the buffer. Consumer has exclusive read access to this region.
443                    unsafe {
444                        ptr::write_bytes(
445                            buffer.add(offset + HEADER_SIZE),
446                            0,
447                            skip_size - HEADER_SIZE,
448                        );
449                    }
450                }
451                // Ensure payload zeroing completes before clearing stamp
452                fence(Ordering::Release);
453                // SAFETY: len_ptr is still valid, computed above.
454                unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
455
456                self.head.set(self.head.get().wrapping_add(skip_size));
457
458                // Ensure stamp clear completes before head advance
459                fence(Ordering::Release);
460                self.shared.head.store(self.head.get(), Ordering::Relaxed);
461
462                // Continue to check next position
463                continue;
464            }
465
466            // Valid record
467            let len = len_raw;
468            let record_size = align8(HEADER_SIZE + len);
469
470            return Some(ReadClaim {
471                consumer: self,
472                offset,
473                len,
474                record_size,
475            });
476        }
477    }
478
479    /// Returns the capacity of the buffer.
480    #[inline]
481    pub fn capacity(&self) -> usize {
482        self.shared.capacity
483    }
484
485    /// Best-effort hint: returns `true` if the producer has likely been dropped.
486    ///
487    /// See [`Producer::is_disconnected`] for caveats — uses `Arc::strong_count`.
488    #[inline]
489    pub fn is_disconnected(&self) -> bool {
490        Arc::strong_count(&self.shared) == 1
491    }
492}
493
494impl std::fmt::Debug for Consumer {
495    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
496        f.debug_struct("Consumer")
497            .field("capacity", &self.capacity())
498            .finish_non_exhaustive()
499    }
500}
501
502// ============================================================================
503// ReadClaim
504// ============================================================================
505
506/// A claimed record for reading.
507///
508/// Dereferences to `&[u8]` for the payload. When dropped, the record region
509/// is zeroed and the head is advanced, freeing space for the producer.
510pub struct ReadClaim<'a> {
511    consumer: &'a mut Consumer,
512    offset: usize,
513    len: usize,
514    record_size: usize,
515}
516
517impl ReadClaim<'_> {
518    /// Returns the length of the payload.
519    #[inline]
520    pub fn len(&self) -> usize {
521        self.len
522    }
523
524    /// Returns `true` if the payload is empty.
525    #[inline]
526    pub fn is_empty(&self) -> bool {
527        self.len == 0
528    }
529}
530
531impl Deref for ReadClaim<'_> {
532    type Target = [u8];
533
534    #[inline]
535    fn deref(&self) -> &Self::Target {
536        let buffer = self.consumer.shared.buffer;
537        // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
538        // exclusive read access via &mut Consumer borrow.
539        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
540        // SAFETY: payload_ptr is valid for self.len bytes. The producer has
541        // finished writing (len was non-zero, preceded by Release fence).
542        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
543    }
544}
545
546impl Drop for ReadClaim<'_> {
547    fn drop(&mut self) {
548        let buffer = self.consumer.shared.buffer;
549
550        // Zero payload first, then stamp last (mirrors write path)
551        if self.record_size > HEADER_SIZE {
552            // SAFETY: offset + HEADER_SIZE .. offset + record_size is within
553            // the buffer. Consumer owns this region exclusively.
554            unsafe {
555                ptr::write_bytes(
556                    buffer.add(self.offset + HEADER_SIZE),
557                    0,
558                    self.record_size - HEADER_SIZE,
559                );
560            }
561        }
562        // Ensure payload zeroing completes before clearing stamp
563        fence(Ordering::Release);
564        // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
565        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
566        // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
567        unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
568
569        // Advance head
570        let new_head = self.consumer.head.get().wrapping_add(self.record_size);
571        self.consumer.head.set(new_head);
572
573        // Ensure stamp clear completes before head advance
574        fence(Ordering::Release);
575        self.consumer.shared.head.store(new_head, Ordering::Relaxed);
576    }
577}
578
579// ============================================================================
580// Tests
581// ============================================================================
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586
587    #[test]
588    fn basic_write_read() {
589        let (mut prod, mut cons) = new(1024);
590
591        let payload = b"hello world";
592        let mut claim = prod.try_claim(payload.len()).unwrap();
593        claim.copy_from_slice(payload);
594        claim.commit();
595
596        let record = cons.try_claim().unwrap();
597        assert_eq!(&*record, payload);
598    }
599
600    #[test]
601    fn empty_returns_none() {
602        let (_, mut cons) = new(1024);
603        assert!(cons.try_claim().is_none());
604    }
605
606    #[test]
607    fn multiple_records() {
608        let (mut prod, mut cons) = new(1024);
609
610        for i in 0..10 {
611            let payload = format!("message {}", i);
612            let mut claim = prod.try_claim(payload.len()).unwrap();
613            claim.copy_from_slice(payload.as_bytes());
614            claim.commit();
615        }
616
617        for i in 0..10 {
618            let record = cons.try_claim().unwrap();
619            let expected = format!("message {}", i);
620            assert_eq!(&*record, expected.as_bytes());
621        }
622
623        assert!(cons.try_claim().is_none());
624    }
625
626    #[test]
627    fn aborted_claim_creates_skip() {
628        let (mut prod, mut cons) = new(1024);
629
630        // Claim and drop without committing
631        {
632            let mut claim = prod.try_claim(10).unwrap();
633            claim.copy_from_slice(b"0123456789");
634            // drop without commit
635        }
636
637        // Write another record
638        {
639            let mut claim = prod.try_claim(5).unwrap();
640            claim.copy_from_slice(b"hello");
641            claim.commit();
642        }
643
644        // Consumer should skip the aborted record and read the committed one
645        let record = cons.try_claim().unwrap();
646        assert_eq!(&*record, b"hello");
647    }
648
649    #[test]
650    fn wrap_around() {
651        let (mut prod, mut cons) = new(64);
652
653        // Fill with messages that will cause wrap-around
654        for i in 0..20 {
655            let payload = format!("msg{:02}", i);
656            loop {
657                match prod.try_claim(payload.len()) {
658                    Ok(mut claim) => {
659                        claim.copy_from_slice(payload.as_bytes());
660                        claim.commit();
661                        break;
662                    }
663                    Err(_) => {
664                        // Drain some
665                        while cons.try_claim().is_some() {}
666                    }
667                }
668            }
669        }
670    }
671
672    #[test]
673    fn full_returns_error() {
674        let (mut prod, _cons) = new(64);
675
676        // Fill the buffer
677        let mut count = 0;
678        while let Ok(mut claim) = prod.try_claim(8) {
679            claim.copy_from_slice(b"12345678");
680            claim.commit();
681            count += 1;
682        }
683
684        assert!(count > 0);
685        assert!(prod.try_claim(8).is_err());
686    }
687
688    #[test]
689    fn cross_thread() {
690        use std::thread;
691
692        let (mut prod, mut cons) = new(4096);
693
694        let producer = thread::spawn(move || {
695            for i in 0..10_000u64 {
696                let payload = i.to_le_bytes();
697                loop {
698                    match prod.try_claim(payload.len()) {
699                        Ok(mut claim) => {
700                            claim.copy_from_slice(&payload);
701                            claim.commit();
702                            break;
703                        }
704                        Err(_) => std::hint::spin_loop(),
705                    }
706                }
707            }
708        });
709
710        let consumer = thread::spawn(move || {
711            let mut received = 0u64;
712            while received < 10_000 {
713                if let Some(record) = cons.try_claim() {
714                    let value = u64::from_le_bytes((*record).try_into().unwrap());
715                    assert_eq!(value, received);
716                    received += 1;
717                } else {
718                    std::hint::spin_loop();
719                }
720            }
721        });
722
723        producer.join().unwrap();
724        consumer.join().unwrap();
725    }
726
727    #[test]
728    fn disconnection_detection() {
729        let (prod, cons) = new(1024);
730
731        assert!(!prod.is_disconnected());
732        assert!(!cons.is_disconnected());
733
734        drop(cons);
735        assert!(prod.is_disconnected());
736    }
737
738    #[test]
739    #[should_panic(expected = "capacity must be at least 16")]
740    fn tiny_capacity_panics() {
741        let _ = new(8);
742    }
743
744    #[test]
745    #[should_panic(expected = "payload length must be non-zero")]
746    fn zero_len_panics() {
747        let (mut prod, _) = new(1024);
748        let _ = prod.try_claim(0);
749    }
750
751    #[test]
752    fn capacity_rounds_to_power_of_two() {
753        let (prod, _) = new(100);
754        assert_eq!(prod.capacity(), 128);
755
756        let (prod, _) = new(1000);
757        assert_eq!(prod.capacity(), 1024);
758    }
759
760    #[test]
761    fn variable_length_records() {
762        let (mut prod, mut cons) = new(4096);
763
764        let messages = [
765            "a",
766            "hello",
767            "this is a longer message",
768            "x",
769            "medium length",
770        ];
771
772        for msg in &messages {
773            let mut claim = prod.try_claim(msg.len()).unwrap();
774            claim.copy_from_slice(msg.as_bytes());
775            claim.commit();
776        }
777
778        for msg in &messages {
779            let record = cons.try_claim().unwrap();
780            assert_eq!(&*record, msg.as_bytes());
781        }
782    }
783
784    /// High-volume stress test with variable-length messages.
785    ///
786    /// Tests correctness under sustained load with wrap-around.
787    #[test]
788    fn stress_high_volume() {
789        use std::thread;
790
791        const COUNT: u64 = 1_000_000;
792        const BUFFER_SIZE: usize = 64 * 1024; // 64KB
793
794        let (mut prod, mut cons) = new(BUFFER_SIZE);
795
796        let producer = thread::spawn(move || {
797            for i in 0..COUNT {
798                // Variable length: 8-64 bytes based on sequence
799                let len = 8 + ((i % 8) * 8) as usize;
800                let mut payload = vec![0u8; len];
801                // Write sequence number at start
802                payload[..8].copy_from_slice(&i.to_le_bytes());
803
804                loop {
805                    match prod.try_claim(len) {
806                        Ok(mut claim) => {
807                            claim.copy_from_slice(&payload);
808                            claim.commit();
809                            break;
810                        }
811                        Err(_) => std::hint::spin_loop(),
812                    }
813                }
814            }
815        });
816
817        let consumer = thread::spawn(move || {
818            let mut received = 0u64;
819            while received < COUNT {
820                if let Some(record) = cons.try_claim() {
821                    // Verify sequence number
822                    let seq = u64::from_le_bytes(record[..8].try_into().unwrap());
823                    assert_eq!(seq, received, "sequence mismatch at {}", received);
824
825                    // Verify expected length
826                    let expected_len = 8 + ((received % 8) * 8) as usize;
827                    assert_eq!(
828                        record.len(),
829                        expected_len,
830                        "length mismatch at {}",
831                        received
832                    );
833
834                    received += 1;
835                } else {
836                    std::hint::spin_loop();
837                }
838            }
839            received
840        });
841
842        producer.join().unwrap();
843        let received = consumer.join().unwrap();
844        assert_eq!(received, COUNT);
845    }
846
847    /// Stress test with maximum contention - tiny buffer, high throughput.
848    #[test]
849    fn stress_high_contention() {
850        use std::thread;
851
852        const COUNT: u64 = 100_000;
853        const BUFFER_SIZE: usize = 256; // Tiny buffer forces constant wrap-around
854
855        let (mut prod, mut cons) = new(BUFFER_SIZE);
856
857        let producer = thread::spawn(move || {
858            for i in 0..COUNT {
859                let payload = i.to_le_bytes();
860                loop {
861                    match prod.try_claim(payload.len()) {
862                        Ok(mut claim) => {
863                            claim.copy_from_slice(&payload);
864                            claim.commit();
865                            break;
866                        }
867                        Err(_) => std::hint::spin_loop(),
868                    }
869                }
870            }
871        });
872
873        let consumer = thread::spawn(move || {
874            let mut received = 0u64;
875            let mut sum = 0u64;
876            while received < COUNT {
877                if let Some(record) = cons.try_claim() {
878                    let value = u64::from_le_bytes((*record).try_into().unwrap());
879                    assert_eq!(value, received);
880                    sum = sum.wrapping_add(value);
881                    received += 1;
882                } else {
883                    std::hint::spin_loop();
884                }
885            }
886            sum
887        });
888
889        producer.join().unwrap();
890        let sum = consumer.join().unwrap();
891        // Sum of 0..COUNT = COUNT * (COUNT-1) / 2
892        let expected = COUNT * (COUNT - 1) / 2;
893        assert_eq!(sum, expected);
894    }
895
896    /// Payload pointers must be word-aligned so users can write aligned structs.
897    #[test]
898    fn payload_is_word_aligned() {
899        let (mut prod, mut cons) = new(1024);
900
901        // Test several payload sizes to cover padding edge cases
902        for len in [1, 3, 7, 8, 13, 64, 255] {
903            let mut claim = prod.try_claim(len).unwrap();
904            let ptr = claim.as_mut_ptr();
905            assert_eq!(
906                ptr as usize % std::mem::align_of::<usize>(),
907                0,
908                "WriteClaim payload not word-aligned for len={len}"
909            );
910            claim.commit();
911
912            let record = cons.try_claim().unwrap();
913            let ptr = record.as_ptr();
914            assert_eq!(
915                ptr as usize % std::mem::align_of::<usize>(),
916                0,
917                "ReadClaim payload not word-aligned for len={len}"
918            );
919        }
920    }
921}