Skip to main content

nexus_logbuf/queue/
spsc.rs

1//! Single-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared:                                                                 │
8//! │   head: CachePadded<AtomicUsize>  ← Consumer writes, producer reads     │
9//! │   buffer: *mut u8                                                       │
10//! │   capacity: usize                 (power of 2)                          │
11//! │   mask: usize                     (capacity - 1)                        │
12//! └─────────────────────────────────────────────────────────────────────────┘
13//!
14//! ┌─────────────────────────────────┐   ┌─────────────────────────────────┐
15//! │ Producer:                       │   │ Consumer:                       │
16//! │   tail: usize        (local)    │   │   head: usize        (local)    │
17//! │   cached_head: usize (local)    │   │                                 │
18//! └─────────────────────────────────┘   └─────────────────────────────────┘
19//! ```
20//!
21//! # Record Layout
22//!
23//! ```text
24//! ┌──────────────────────────────────────────────┐
25//! │ len: usize              (8 bytes on 64-bit)   │ ← payload length / commit marker
26//! ├──────────────────────────────────────────────┤
27//! │ payload: [u8; len]      (variable)           │ ← raw bytes
28//! ├──────────────────────────────────────────────┤
29//! │ padding: [u8; ...]      (0-7 bytes)          │ ← align to 8-byte boundary
30//! └──────────────────────────────────────────────┘
31//! ```
32//!
33//! Records are packed contiguously. Total record size is
34//! `align8(size_of::<usize>() + len)`. Using `usize` for the header ensures
35//! the payload starts at a word-aligned offset.
36//!
37//! # Len Field Encoding
38//!
39//! - `len == 0`: Not committed, consumer waits
40//! - `len > 0, high bit clear`: Committed record, payload is `len` bytes
41//! - `len high bit set`: Skip marker, advance by `len & LEN_MASK` bytes
42
43use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
44use std::cell::Cell;
45use std::ops::{Deref, DerefMut};
46use std::ptr;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicUsize, Ordering, fence};
49
50use crossbeam_utils::CachePadded;
51
52use crate::{LEN_MASK, SKIP_BIT, TryClaimError, align8};
53
54/// Header size in bytes — one system word (`usize`).
55///
56/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
57const HEADER_SIZE: usize = std::mem::size_of::<usize>();
58
59/// Creates a bounded SPSC byte ring buffer.
60///
61/// Capacity is rounded up to the next power of two.
62///
63/// # Panics
64///
65/// Panics if `capacity` is zero or less than 16 bytes.
66pub fn new(capacity: usize) -> (Producer, Consumer) {
67    assert!(capacity >= 16, "capacity must be at least 16 bytes");
68
69    let capacity = capacity.next_power_of_two();
70    let mask = capacity - 1;
71
72    // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
73    let layout = Layout::from_size_align(capacity, 8)
74        .expect("valid layout: capacity is a power of two >= 16, align is 8");
75    // SAFETY: Layout is valid — capacity >= 16 (power of two), align is 8.
76    let buffer_ptr = unsafe { alloc_zeroed(layout) };
77    if buffer_ptr.is_null() {
78        handle_alloc_error(layout);
79    }
80
81    let shared = Arc::new(Shared {
82        head: CachePadded::new(AtomicUsize::new(0)),
83        buffer: buffer_ptr,
84        capacity,
85        mask,
86    });
87
88    (
89        Producer {
90            tail: Cell::new(0),
91            cached_head: Cell::new(0),
92            shared: Arc::clone(&shared),
93        },
94        Consumer {
95            head: Cell::new(0),
96            shared,
97        },
98    )
99}
100
101struct Shared {
102    /// Consumer's read position. Updated by consumer, read by producer.
103    head: CachePadded<AtomicUsize>,
104    /// Buffer pointer.
105    buffer: *mut u8,
106    /// Buffer capacity (power of 2).
107    capacity: usize,
108    /// Mask for wrapping (capacity - 1).
109    mask: usize,
110}
111
112// Safety: Buffer is only accessed by one producer and one consumer.
113// The atomic head provides synchronization.
114unsafe impl Send for Shared {}
115unsafe impl Sync for Shared {}
116
117impl Drop for Shared {
118    fn drop(&mut self) {
119        let layout = Layout::from_size_align(self.capacity, 8)
120            .expect("valid layout: capacity was validated at construction");
121        // SAFETY: buffer was allocated with alloc_zeroed using this exact layout.
122        // Shared is only dropped once (Arc prevents earlier drops).
123        unsafe { dealloc(self.buffer, layout) };
124    }
125}
126
127// ============================================================================
128// Producer
129// ============================================================================
130
131/// Producer endpoint of the SPSC ring buffer.
132///
133/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
134pub struct Producer {
135    /// Local tail position (free-running).
136    tail: Cell<usize>,
137    /// Cached head position (Rigtorp optimization).
138    cached_head: Cell<usize>,
139    /// Shared state.
140    shared: Arc<Shared>,
141}
142
143// Safety: Producer is only used from one thread.
144unsafe impl Send for Producer {}
145
146impl Producer {
147    /// Attempts to claim space for a record with the given payload length.
148    ///
149    /// Returns a [`WriteClaim`] that can be written to and then committed.
150    ///
151    /// # Errors
152    ///
153    /// - [`TryClaimError::ZeroLength`] if `len` is zero
154    /// - [`TryClaimError::Full`] if the buffer is full
155    ///
156    /// # Safety Contract
157    ///
158    /// `len` must not exceed `LEN_MASK`. This is checked with
159    /// `debug_assert!` only.
160    #[inline]
161    pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, TryClaimError> {
162        debug_assert!(len <= LEN_MASK, "payload too large");
163        if len == 0 {
164            return Err(TryClaimError::ZeroLength);
165        }
166
167        let record_size = align8(HEADER_SIZE + len);
168
169        // Check if we have space
170        let tail = self.tail.get();
171        let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
172
173        if available < record_size {
174            // Reload head from shared state
175            self.cached_head
176                .set(self.shared.head.load(Ordering::Relaxed));
177            fence(Ordering::Acquire);
178
179            let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
180            if available < record_size {
181                return Err(TryClaimError::Full);
182            }
183        }
184
185        // Check if record fits before buffer end, or needs wrap
186        let offset = tail & self.shared.mask;
187        let space_to_end = self.shared.capacity - offset;
188
189        if space_to_end < record_size {
190            // Need to wrap. First check if we have space for padding + record at start.
191            let total_needed = space_to_end + record_size;
192            let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
193
194            if available < total_needed {
195                // Reload and recheck
196                self.cached_head
197                    .set(self.shared.head.load(Ordering::Relaxed));
198                fence(Ordering::Acquire);
199
200                let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
201                if available < total_needed {
202                    return Err(TryClaimError::Full);
203                }
204            }
205
206            // Write padding skip marker
207            let buffer = self.shared.buffer;
208            let skip_len = space_to_end | SKIP_BIT;
209            fence(Ordering::Release);
210            // SAFETY: offset is masked to [0, capacity) and 8-byte aligned.
211            // Buffer is valid for capacity bytes. We are the sole producer.
212            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
213            // SAFETY: len_ptr points to a valid, aligned, zero-initialized usize
214            // within the buffer. AtomicUsize reference is used for store visibility.
215            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
216
217            // Advance tail past padding
218            self.tail.set(tail.wrapping_add(space_to_end));
219            let new_offset = 0;
220
221            Ok(WriteClaim {
222                producer: self,
223                offset: new_offset,
224                len,
225                record_size,
226                committed: false,
227            })
228        } else {
229            // Fits without wrapping
230            Ok(WriteClaim {
231                producer: self,
232                offset,
233                len,
234                record_size,
235                committed: false,
236            })
237        }
238    }
239
240    /// Returns the capacity of the buffer.
241    #[inline]
242    pub fn capacity(&self) -> usize {
243        self.shared.capacity
244    }
245
246    /// Best-effort hint: returns `true` if the consumer has likely been dropped.
247    ///
248    /// Uses `Arc::strong_count` which is inherently racy — the count can
249    /// change between the check and the caller's next action. Suitable for
250    /// graceful shutdown detection, not for correctness. For reliable
251    /// disconnection detection, use the channel layer (`channel::spsc`)
252    /// which tracks disconnection via dedicated atomic flags.
253    // TODO: consider adding an AtomicBool flag if reliable detection is
254    // needed at the raw queue level.
255    #[inline]
256    pub fn is_disconnected(&self) -> bool {
257        Arc::strong_count(&self.shared) == 1
258    }
259}
260
261impl std::fmt::Debug for Producer {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        f.debug_struct("Producer")
264            .field("capacity", &self.capacity())
265            .finish_non_exhaustive()
266    }
267}
268
269// ============================================================================
270// WriteClaim
271// ============================================================================
272
273/// A claimed region for writing a record.
274///
275/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
276/// when done writing to publish the record. If dropped without committing, a skip
277/// marker is written so the consumer can advance past the dead region.
278pub struct WriteClaim<'a> {
279    producer: &'a mut Producer,
280    offset: usize,
281    len: usize,
282    record_size: usize,
283    committed: bool,
284}
285
286impl WriteClaim<'_> {
287    /// Commits the record, making it visible to the consumer.
288    #[inline]
289    pub fn commit(mut self) {
290        self.do_commit();
291        self.committed = true;
292    }
293
294    #[inline]
295    fn do_commit(&mut self) {
296        let buffer = self.producer.shared.buffer;
297        // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
298        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
299
300        // Release fence: ensures payload writes are visible before len store
301        fence(Ordering::Release);
302        // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
303        unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
304
305        // Advance tail
306        self.producer
307            .tail
308            .set(self.producer.tail.get().wrapping_add(self.record_size));
309    }
310
311    /// Returns the length of the payload region.
312    #[inline]
313    pub fn len(&self) -> usize {
314        self.len
315    }
316
317    /// Returns `true` if the payload is empty (always false, len must be > 0).
318    #[inline]
319    pub fn is_empty(&self) -> bool {
320        false
321    }
322}
323
324impl Deref for WriteClaim<'_> {
325    type Target = [u8];
326
327    #[inline]
328    fn deref(&self) -> &Self::Target {
329        let buffer = self.producer.shared.buffer;
330        // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
331        // exclusive access to this region via &mut Producer borrow.
332        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
333        // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
334        // and exclusively owned by this claim. Lifetime tied to &self.
335        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
336    }
337}
338
339impl DerefMut for WriteClaim<'_> {
340    #[inline]
341    fn deref_mut(&mut self) -> &mut Self::Target {
342        let buffer = self.producer.shared.buffer;
343        // SAFETY: offset + HEADER_SIZE is within the buffer. Exclusive access
344        // guaranteed by &mut self (only one WriteClaim exists per try_claim).
345        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
346        // SAFETY: payload_ptr is valid for self.len bytes, word-aligned,
347        // and exclusively owned by this claim. Lifetime tied to &mut self.
348        unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
349    }
350}
351
352impl Drop for WriteClaim<'_> {
353    fn drop(&mut self) {
354        if !self.committed {
355            // Write skip marker so consumer can advance past this region
356            let buffer = self.producer.shared.buffer;
357            // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
358            let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
359            let skip_len = self.record_size | SKIP_BIT;
360
361            fence(Ordering::Release);
362            // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
363            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
364
365            // Advance tail past the dead region
366            self.producer
367                .tail
368                .set(self.producer.tail.get().wrapping_add(self.record_size));
369        }
370    }
371}
372
373// ============================================================================
374// Consumer
375// ============================================================================
376
377/// Consumer endpoint of the SPSC ring buffer.
378///
379/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
380pub struct Consumer {
381    /// Local head position (free-running).
382    head: Cell<usize>,
383    /// Shared state.
384    shared: Arc<Shared>,
385}
386
387// Safety: Consumer is only used from one thread.
388unsafe impl Send for Consumer {}
389
390impl Consumer {
391    /// Attempts to claim the next record for reading.
392    ///
393    /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
394    /// to `&[u8]` for the payload. When dropped, the record region is zeroed
395    /// and the head is advanced.
396    ///
397    /// Returns `None` if no committed record is available.
398    #[inline]
399    pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
400        let buffer = self.shared.buffer;
401
402        loop {
403            let offset = self.head.get() & self.shared.mask;
404            // SAFETY: offset is masked to [0, capacity), always 8-byte aligned
405            // (head advances by align8'd record sizes). Buffer is valid.
406            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
407
408            // Relaxed atomic load, then Acquire fence for payload visibility
409            // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
410            let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
411            fence(Ordering::Acquire);
412
413            if len_raw == 0 {
414                // Not committed yet
415                return None;
416            }
417
418            if len_raw & SKIP_BIT != 0 {
419                // Skip marker: zero the region and advance
420                let skip_size = len_raw & LEN_MASK;
421                // Zero payload first, then stamp last (mirrors write path)
422                if skip_size > HEADER_SIZE {
423                    // SAFETY: offset + HEADER_SIZE .. offset + skip_size is within
424                    // the buffer. Consumer has exclusive read access to this region.
425                    unsafe {
426                        ptr::write_bytes(
427                            buffer.add(offset + HEADER_SIZE),
428                            0,
429                            skip_size - HEADER_SIZE,
430                        );
431                    }
432                }
433                // Ensure payload zeroing completes before clearing stamp
434                fence(Ordering::Release);
435                // SAFETY: len_ptr is still valid, computed above.
436                unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
437
438                self.head.set(self.head.get().wrapping_add(skip_size));
439
440                // Ensure stamp clear completes before head advance
441                fence(Ordering::Release);
442                self.shared.head.store(self.head.get(), Ordering::Relaxed);
443
444                // Continue to check next position
445                continue;
446            }
447
448            // Valid record
449            let len = len_raw;
450            let record_size = align8(HEADER_SIZE + len);
451
452            return Some(ReadClaim {
453                consumer: self,
454                offset,
455                len,
456                record_size,
457            });
458        }
459    }
460
461    /// Returns the capacity of the buffer.
462    #[inline]
463    pub fn capacity(&self) -> usize {
464        self.shared.capacity
465    }
466
467    /// Best-effort hint: returns `true` if the producer has likely been dropped.
468    ///
469    /// See [`Producer::is_disconnected`] for caveats — uses `Arc::strong_count`.
470    #[inline]
471    pub fn is_disconnected(&self) -> bool {
472        Arc::strong_count(&self.shared) == 1
473    }
474}
475
476impl std::fmt::Debug for Consumer {
477    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
478        f.debug_struct("Consumer")
479            .field("capacity", &self.capacity())
480            .finish_non_exhaustive()
481    }
482}
483
484// ============================================================================
485// ReadClaim
486// ============================================================================
487
488/// A claimed record for reading.
489///
490/// Dereferences to `&[u8]` for the payload. When dropped, the record region
491/// is zeroed and the head is advanced, freeing space for the producer.
492pub struct ReadClaim<'a> {
493    consumer: &'a mut Consumer,
494    offset: usize,
495    len: usize,
496    record_size: usize,
497}
498
499impl ReadClaim<'_> {
500    /// Returns the length of the payload.
501    #[inline]
502    pub fn len(&self) -> usize {
503        self.len
504    }
505
506    /// Returns `true` if the payload is empty.
507    #[inline]
508    pub fn is_empty(&self) -> bool {
509        self.len == 0
510    }
511}
512
513impl Deref for ReadClaim<'_> {
514    type Target = [u8];
515
516    #[inline]
517    fn deref(&self) -> &Self::Target {
518        let buffer = self.consumer.shared.buffer;
519        // SAFETY: offset + HEADER_SIZE is within the buffer. The claim owns
520        // exclusive read access via &mut Consumer borrow.
521        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
522        // SAFETY: payload_ptr is valid for self.len bytes. The producer has
523        // finished writing (len was non-zero, preceded by Release fence).
524        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
525    }
526}
527
528impl Drop for ReadClaim<'_> {
529    fn drop(&mut self) {
530        let buffer = self.consumer.shared.buffer;
531
532        // Zero payload first, then stamp last (mirrors write path)
533        if self.record_size > HEADER_SIZE {
534            // SAFETY: offset + HEADER_SIZE .. offset + record_size is within
535            // the buffer. Consumer owns this region exclusively.
536            unsafe {
537                ptr::write_bytes(
538                    buffer.add(self.offset + HEADER_SIZE),
539                    0,
540                    self.record_size - HEADER_SIZE,
541                );
542            }
543        }
544        // Ensure payload zeroing completes before clearing stamp
545        fence(Ordering::Release);
546        // SAFETY: offset is within [0, capacity), 8-byte aligned. Buffer is valid.
547        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
548        // SAFETY: len_ptr points to a valid, aligned usize within the buffer.
549        unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
550
551        // Advance head
552        let new_head = self.consumer.head.get().wrapping_add(self.record_size);
553        self.consumer.head.set(new_head);
554
555        // Ensure stamp clear completes before head advance
556        fence(Ordering::Release);
557        self.consumer.shared.head.store(new_head, Ordering::Relaxed);
558    }
559}
560
561// ============================================================================
562// Tests
563// ============================================================================
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568
569    #[test]
570    fn basic_write_read() {
571        let (mut prod, mut cons) = new(1024);
572
573        let payload = b"hello world";
574        let mut claim = prod.try_claim(payload.len()).unwrap();
575        claim.copy_from_slice(payload);
576        claim.commit();
577
578        let record = cons.try_claim().unwrap();
579        assert_eq!(&*record, payload);
580    }
581
582    #[test]
583    fn empty_returns_none() {
584        let (_, mut cons) = new(1024);
585        assert!(cons.try_claim().is_none());
586    }
587
588    #[test]
589    fn multiple_records() {
590        let (mut prod, mut cons) = new(1024);
591
592        for i in 0..10 {
593            let payload = format!("message {}", i);
594            let mut claim = prod.try_claim(payload.len()).unwrap();
595            claim.copy_from_slice(payload.as_bytes());
596            claim.commit();
597        }
598
599        for i in 0..10 {
600            let record = cons.try_claim().unwrap();
601            let expected = format!("message {}", i);
602            assert_eq!(&*record, expected.as_bytes());
603        }
604
605        assert!(cons.try_claim().is_none());
606    }
607
608    #[test]
609    fn aborted_claim_creates_skip() {
610        let (mut prod, mut cons) = new(1024);
611
612        // Claim and drop without committing
613        {
614            let mut claim = prod.try_claim(10).unwrap();
615            claim.copy_from_slice(b"0123456789");
616            // drop without commit
617        }
618
619        // Write another record
620        {
621            let mut claim = prod.try_claim(5).unwrap();
622            claim.copy_from_slice(b"hello");
623            claim.commit();
624        }
625
626        // Consumer should skip the aborted record and read the committed one
627        let record = cons.try_claim().unwrap();
628        assert_eq!(&*record, b"hello");
629    }
630
631    #[test]
632    fn wrap_around() {
633        let (mut prod, mut cons) = new(64);
634
635        // Fill with messages that will cause wrap-around
636        for i in 0..20 {
637            let payload = format!("msg{:02}", i);
638            loop {
639                match prod.try_claim(payload.len()) {
640                    Ok(mut claim) => {
641                        claim.copy_from_slice(payload.as_bytes());
642                        claim.commit();
643                        break;
644                    }
645                    Err(_) => {
646                        // Drain some
647                        while cons.try_claim().is_some() {}
648                    }
649                }
650            }
651        }
652    }
653
654    #[test]
655    fn full_returns_error() {
656        let (mut prod, _cons) = new(64);
657
658        // Fill the buffer
659        let mut count = 0;
660        while let Ok(mut claim) = prod.try_claim(8) {
661            claim.copy_from_slice(b"12345678");
662            claim.commit();
663            count += 1;
664        }
665
666        assert!(count > 0);
667        assert!(prod.try_claim(8).is_err());
668    }
669
670    #[test]
671    fn cross_thread() {
672        use std::thread;
673
674        let (mut prod, mut cons) = new(4096);
675
676        let producer = thread::spawn(move || {
677            for i in 0..10_000u64 {
678                let payload = i.to_le_bytes();
679                loop {
680                    match prod.try_claim(payload.len()) {
681                        Ok(mut claim) => {
682                            claim.copy_from_slice(&payload);
683                            claim.commit();
684                            break;
685                        }
686                        Err(_) => std::hint::spin_loop(),
687                    }
688                }
689            }
690        });
691
692        let consumer = thread::spawn(move || {
693            let mut received = 0u64;
694            while received < 10_000 {
695                if let Some(record) = cons.try_claim() {
696                    let value = u64::from_le_bytes((*record).try_into().unwrap());
697                    assert_eq!(value, received);
698                    received += 1;
699                } else {
700                    std::hint::spin_loop();
701                }
702            }
703        });
704
705        producer.join().unwrap();
706        consumer.join().unwrap();
707    }
708
709    #[test]
710    fn disconnection_detection() {
711        let (prod, cons) = new(1024);
712
713        assert!(!prod.is_disconnected());
714        assert!(!cons.is_disconnected());
715
716        drop(cons);
717        assert!(prod.is_disconnected());
718    }
719
720    #[test]
721    #[should_panic(expected = "capacity must be at least 16")]
722    fn tiny_capacity_panics() {
723        let _ = new(8);
724    }
725
726    #[test]
727    fn zero_len_returns_error() {
728        let (mut prod, _) = new(1024);
729        assert!(matches!(prod.try_claim(0), Err(TryClaimError::ZeroLength)));
730    }
731
732    #[test]
733    fn capacity_rounds_to_power_of_two() {
734        let (prod, _) = new(100);
735        assert_eq!(prod.capacity(), 128);
736
737        let (prod, _) = new(1000);
738        assert_eq!(prod.capacity(), 1024);
739    }
740
741    #[test]
742    fn variable_length_records() {
743        let (mut prod, mut cons) = new(4096);
744
745        let messages = [
746            "a",
747            "hello",
748            "this is a longer message",
749            "x",
750            "medium length",
751        ];
752
753        for msg in &messages {
754            let mut claim = prod.try_claim(msg.len()).unwrap();
755            claim.copy_from_slice(msg.as_bytes());
756            claim.commit();
757        }
758
759        for msg in &messages {
760            let record = cons.try_claim().unwrap();
761            assert_eq!(&*record, msg.as_bytes());
762        }
763    }
764
765    /// High-volume stress test with variable-length messages.
766    ///
767    /// Tests correctness under sustained load with wrap-around.
768    #[test]
769    fn stress_high_volume() {
770        use std::thread;
771
772        const COUNT: u64 = 1_000_000;
773        const BUFFER_SIZE: usize = 64 * 1024; // 64KB
774
775        let (mut prod, mut cons) = new(BUFFER_SIZE);
776
777        let producer = thread::spawn(move || {
778            for i in 0..COUNT {
779                // Variable length: 8-64 bytes based on sequence
780                let len = 8 + ((i % 8) * 8) as usize;
781                let mut payload = vec![0u8; len];
782                // Write sequence number at start
783                payload[..8].copy_from_slice(&i.to_le_bytes());
784
785                loop {
786                    match prod.try_claim(len) {
787                        Ok(mut claim) => {
788                            claim.copy_from_slice(&payload);
789                            claim.commit();
790                            break;
791                        }
792                        Err(_) => std::hint::spin_loop(),
793                    }
794                }
795            }
796        });
797
798        let consumer = thread::spawn(move || {
799            let mut received = 0u64;
800            while received < COUNT {
801                if let Some(record) = cons.try_claim() {
802                    // Verify sequence number
803                    let seq = u64::from_le_bytes(record[..8].try_into().unwrap());
804                    assert_eq!(seq, received, "sequence mismatch at {}", received);
805
806                    // Verify expected length
807                    let expected_len = 8 + ((received % 8) * 8) as usize;
808                    assert_eq!(
809                        record.len(),
810                        expected_len,
811                        "length mismatch at {}",
812                        received
813                    );
814
815                    received += 1;
816                } else {
817                    std::hint::spin_loop();
818                }
819            }
820            received
821        });
822
823        producer.join().unwrap();
824        let received = consumer.join().unwrap();
825        assert_eq!(received, COUNT);
826    }
827
828    /// Stress test with maximum contention - tiny buffer, high throughput.
829    #[test]
830    fn stress_high_contention() {
831        use std::thread;
832
833        const COUNT: u64 = 100_000;
834        const BUFFER_SIZE: usize = 256; // Tiny buffer forces constant wrap-around
835
836        let (mut prod, mut cons) = new(BUFFER_SIZE);
837
838        let producer = thread::spawn(move || {
839            for i in 0..COUNT {
840                let payload = i.to_le_bytes();
841                loop {
842                    match prod.try_claim(payload.len()) {
843                        Ok(mut claim) => {
844                            claim.copy_from_slice(&payload);
845                            claim.commit();
846                            break;
847                        }
848                        Err(_) => std::hint::spin_loop(),
849                    }
850                }
851            }
852        });
853
854        let consumer = thread::spawn(move || {
855            let mut received = 0u64;
856            let mut sum = 0u64;
857            while received < COUNT {
858                if let Some(record) = cons.try_claim() {
859                    let value = u64::from_le_bytes((*record).try_into().unwrap());
860                    assert_eq!(value, received);
861                    sum = sum.wrapping_add(value);
862                    received += 1;
863                } else {
864                    std::hint::spin_loop();
865                }
866            }
867            sum
868        });
869
870        producer.join().unwrap();
871        let sum = consumer.join().unwrap();
872        // Sum of 0..COUNT = COUNT * (COUNT-1) / 2
873        let expected = COUNT * (COUNT - 1) / 2;
874        assert_eq!(sum, expected);
875    }
876
877    /// Payload pointers must be word-aligned so users can write aligned structs.
878    #[test]
879    fn payload_is_word_aligned() {
880        let (mut prod, mut cons) = new(1024);
881
882        // Test several payload sizes to cover padding edge cases
883        for len in [1, 3, 7, 8, 13, 64, 255] {
884            let mut claim = prod.try_claim(len).unwrap();
885            let ptr = claim.as_mut_ptr();
886            assert_eq!(
887                ptr as usize % std::mem::align_of::<usize>(),
888                0,
889                "WriteClaim payload not word-aligned for len={len}"
890            );
891            claim.commit();
892
893            let record = cons.try_claim().unwrap();
894            let ptr = record.as_ptr();
895            assert_eq!(
896                ptr as usize % std::mem::align_of::<usize>(),
897                0,
898                "ReadClaim payload not word-aligned for len={len}"
899            );
900        }
901    }
902}