Skip to main content

nexus_logbuf/queue/
spsc.rs

1//! Single-producer single-consumer byte ring buffer.
2//!
3//! # Design
4//!
5//! ```text
6//! ┌─────────────────────────────────────────────────────────────────────────┐
7//! │ Shared:                                                                 │
8//! │   head: CachePadded<AtomicUsize>  ← Consumer writes, producer reads     │
9//! │   buffer: *mut u8                                                       │
10//! │   capacity: usize                 (power of 2)                          │
11//! │   mask: usize                     (capacity - 1)                        │
12//! └─────────────────────────────────────────────────────────────────────────┘
13//!
14//! ┌─────────────────────────────────┐   ┌─────────────────────────────────┐
15//! │ Producer:                       │   │ Consumer:                       │
16//! │   tail: usize        (local)    │   │   head: usize        (local)    │
17//! │   cached_head: usize (local)    │   │                                 │
18//! └─────────────────────────────────┘   └─────────────────────────────────┘
19//! ```
20//!
21//! # Record Layout
22//!
23//! ```text
24//! ┌──────────────────────────────────────────────┐
25//! │ len: usize              (8 bytes on 64-bit)   │ ← payload length / commit marker
26//! ├──────────────────────────────────────────────┤
27//! │ payload: [u8; len]      (variable)           │ ← raw bytes
28//! ├──────────────────────────────────────────────┤
29//! │ padding: [u8; ...]      (0-7 bytes)          │ ← align to 8-byte boundary
30//! └──────────────────────────────────────────────┘
31//! ```
32//!
33//! Records are packed contiguously. Total record size is
34//! `align8(size_of::<usize>() + len)`. Using `usize` for the header ensures
35//! the payload starts at a word-aligned offset.
36//!
37//! # Len Field Encoding
38//!
39//! - `len == 0`: Not committed, consumer waits
40//! - `len > 0, high bit clear`: Committed record, payload is `len` bytes
41//! - `len high bit set`: Skip marker, advance by `len & LEN_MASK` bytes
42
43use std::alloc::{Layout, alloc_zeroed, dealloc, handle_alloc_error};
44use std::cell::Cell;
45use std::ops::{Deref, DerefMut};
46use std::ptr;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicUsize, Ordering, fence};
49
50use crossbeam_utils::CachePadded;
51
52use crate::{LEN_MASK, SKIP_BIT, TryClaimError, align8};
53
54/// Header size in bytes — one system word (`usize`).
55///
56/// On 64-bit this is 8 bytes, ensuring the payload starts at 8-byte alignment.
57const HEADER_SIZE: usize = std::mem::size_of::<usize>();
58
59/// Creates a bounded SPSC byte ring buffer.
60///
61/// Capacity is rounded up to the next power of two.
62///
63/// # Panics
64///
65/// Panics if `capacity` is zero or less than 16 bytes.
66pub fn new(capacity: usize) -> (Producer, Consumer) {
67    assert!(capacity >= 16, "capacity must be at least 16 bytes");
68
69    let capacity = capacity.next_power_of_two();
70    let mask = capacity - 1;
71
72    // Allocate buffer, zero-initialized, 8-byte aligned for atomic len stamps
73    let layout = Layout::from_size_align(capacity, 8)
74        .expect("valid layout: capacity is a power of two >= 16, align is 8");
75    let buffer_ptr = unsafe { alloc_zeroed(layout) };
76    if buffer_ptr.is_null() {
77        handle_alloc_error(layout);
78    }
79
80    let shared = Arc::new(Shared {
81        head: CachePadded::new(AtomicUsize::new(0)),
82        buffer: buffer_ptr,
83        capacity,
84        mask,
85    });
86
87    (
88        Producer {
89            tail: Cell::new(0),
90            cached_head: Cell::new(0),
91            shared: Arc::clone(&shared),
92        },
93        Consumer {
94            head: Cell::new(0),
95            shared,
96        },
97    )
98}
99
100struct Shared {
101    /// Consumer's read position. Updated by consumer, read by producer.
102    head: CachePadded<AtomicUsize>,
103    /// Buffer pointer.
104    buffer: *mut u8,
105    /// Buffer capacity (power of 2).
106    capacity: usize,
107    /// Mask for wrapping (capacity - 1).
108    mask: usize,
109}
110
111// Safety: Buffer is only accessed by one producer and one consumer.
112// The atomic head provides synchronization.
113unsafe impl Send for Shared {}
114unsafe impl Sync for Shared {}
115
116impl Drop for Shared {
117    fn drop(&mut self) {
118        // Safety: buffer was allocated with alloc_zeroed using this layout.
119        let layout = Layout::from_size_align(self.capacity, 8)
120            .expect("valid layout: capacity was validated at construction");
121        unsafe { dealloc(self.buffer, layout) };
122    }
123}
124
125// ============================================================================
126// Producer
127// ============================================================================
128
129/// Producer endpoint of the SPSC ring buffer.
130///
131/// Use [`try_claim`](Producer::try_claim) to claim space for writing.
132pub struct Producer {
133    /// Local tail position (free-running).
134    tail: Cell<usize>,
135    /// Cached head position (Rigtorp optimization).
136    cached_head: Cell<usize>,
137    /// Shared state.
138    shared: Arc<Shared>,
139}
140
141// Safety: Producer is only used from one thread.
142unsafe impl Send for Producer {}
143
144impl Producer {
145    /// Attempts to claim space for a record with the given payload length.
146    ///
147    /// Returns a [`WriteClaim`] that can be written to and then committed.
148    ///
149    /// # Errors
150    ///
151    /// - [`TryClaimError::ZeroLength`] if `len` is zero
152    /// - [`TryClaimError::Full`] if the buffer is full
153    ///
154    /// # Safety Contract
155    ///
156    /// `len` must not exceed `LEN_MASK`. This is checked with
157    /// `debug_assert!` only.
158    #[inline]
159    pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, TryClaimError> {
160        debug_assert!(len <= LEN_MASK, "payload too large");
161        if len == 0 {
162            return Err(TryClaimError::ZeroLength);
163        }
164
165        let record_size = align8(HEADER_SIZE + len);
166
167        // Check if we have space
168        let tail = self.tail.get();
169        let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
170
171        if available < record_size {
172            // Reload head from shared state
173            self.cached_head
174                .set(self.shared.head.load(Ordering::Relaxed));
175            fence(Ordering::Acquire);
176
177            let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
178            if available < record_size {
179                return Err(TryClaimError::Full);
180            }
181        }
182
183        // Check if record fits before buffer end, or needs wrap
184        let offset = tail & self.shared.mask;
185        let space_to_end = self.shared.capacity - offset;
186
187        if space_to_end < record_size {
188            // Need to wrap. First check if we have space for padding + record at start.
189            let total_needed = space_to_end + record_size;
190            let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
191
192            if available < total_needed {
193                // Reload and recheck
194                self.cached_head
195                    .set(self.shared.head.load(Ordering::Relaxed));
196                fence(Ordering::Acquire);
197
198                let available = self.shared.capacity - (tail.wrapping_sub(self.cached_head.get()));
199                if available < total_needed {
200                    return Err(TryClaimError::Full);
201                }
202            }
203
204            // Write padding skip marker
205            let buffer = self.shared.buffer;
206            let skip_len = space_to_end | SKIP_BIT;
207            fence(Ordering::Release);
208            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
209            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
210
211            // Advance tail past padding
212            self.tail.set(tail.wrapping_add(space_to_end));
213            let new_offset = 0;
214
215            Ok(WriteClaim {
216                producer: self,
217                offset: new_offset,
218                len,
219                record_size,
220                committed: false,
221            })
222        } else {
223            // Fits without wrapping
224            Ok(WriteClaim {
225                producer: self,
226                offset,
227                len,
228                record_size,
229                committed: false,
230            })
231        }
232    }
233
234    /// Returns the capacity of the buffer.
235    #[inline]
236    pub fn capacity(&self) -> usize {
237        self.shared.capacity
238    }
239
240    /// Best-effort hint: returns `true` if the consumer has likely been dropped.
241    ///
242    /// Uses `Arc::strong_count` which is inherently racy — the count can
243    /// change between the check and the caller's next action. Suitable for
244    /// graceful shutdown detection, not for correctness. For reliable
245    /// disconnection detection, use the channel layer (`channel::spsc`)
246    /// which tracks disconnection via dedicated atomic flags.
247    // TODO: consider adding an AtomicBool flag if reliable detection is
248    // needed at the raw queue level.
249    #[inline]
250    pub fn is_disconnected(&self) -> bool {
251        Arc::strong_count(&self.shared) == 1
252    }
253}
254
255impl std::fmt::Debug for Producer {
256    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257        f.debug_struct("Producer")
258            .field("capacity", &self.capacity())
259            .finish_non_exhaustive()
260    }
261}
262
263// ============================================================================
264// WriteClaim
265// ============================================================================
266
267/// A claimed region for writing a record.
268///
269/// Dereferences to `&mut [u8]` for the payload region. Call [`commit`](WriteClaim::commit)
270/// when done writing to publish the record. If dropped without committing, a skip
271/// marker is written so the consumer can advance past the dead region.
272pub struct WriteClaim<'a> {
273    producer: &'a mut Producer,
274    offset: usize,
275    len: usize,
276    record_size: usize,
277    committed: bool,
278}
279
280impl WriteClaim<'_> {
281    /// Commits the record, making it visible to the consumer.
282    #[inline]
283    pub fn commit(mut self) {
284        self.do_commit();
285        self.committed = true;
286    }
287
288    #[inline]
289    fn do_commit(&mut self) {
290        let buffer = self.producer.shared.buffer;
291        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
292
293        // Release fence: ensures payload writes are visible before len store
294        fence(Ordering::Release);
295        unsafe { &*len_ptr }.store(self.len, Ordering::Relaxed);
296
297        // Advance tail
298        self.producer
299            .tail
300            .set(self.producer.tail.get().wrapping_add(self.record_size));
301    }
302
303    /// Returns the length of the payload region.
304    #[inline]
305    pub fn len(&self) -> usize {
306        self.len
307    }
308
309    /// Returns `true` if the payload is empty (always false, len must be > 0).
310    #[inline]
311    pub fn is_empty(&self) -> bool {
312        false
313    }
314}
315
316impl Deref for WriteClaim<'_> {
317    type Target = [u8];
318
319    #[inline]
320    fn deref(&self) -> &Self::Target {
321        let buffer = self.producer.shared.buffer;
322        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
323        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
324    }
325}
326
327impl DerefMut for WriteClaim<'_> {
328    #[inline]
329    fn deref_mut(&mut self) -> &mut Self::Target {
330        let buffer = self.producer.shared.buffer;
331        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
332        unsafe { std::slice::from_raw_parts_mut(payload_ptr, self.len) }
333    }
334}
335
336impl Drop for WriteClaim<'_> {
337    fn drop(&mut self) {
338        if !self.committed {
339            // Write skip marker so consumer can advance past this region
340            let buffer = self.producer.shared.buffer;
341            let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
342            let skip_len = self.record_size | SKIP_BIT;
343
344            fence(Ordering::Release);
345            unsafe { &*len_ptr }.store(skip_len, Ordering::Relaxed);
346
347            // Advance tail past the dead region
348            self.producer
349                .tail
350                .set(self.producer.tail.get().wrapping_add(self.record_size));
351        }
352    }
353}
354
355// ============================================================================
356// Consumer
357// ============================================================================
358
359/// Consumer endpoint of the SPSC ring buffer.
360///
361/// Use [`try_claim`](Consumer::try_claim) to claim the next record for reading.
362pub struct Consumer {
363    /// Local head position (free-running).
364    head: Cell<usize>,
365    /// Shared state.
366    shared: Arc<Shared>,
367}
368
369// Safety: Consumer is only used from one thread.
370unsafe impl Send for Consumer {}
371
372impl Consumer {
373    /// Attempts to claim the next record for reading.
374    ///
375    /// Returns a [`ReadClaim`] if a record is available. The claim dereferences
376    /// to `&[u8]` for the payload. When dropped, the record region is zeroed
377    /// and the head is advanced.
378    ///
379    /// Returns `None` if no committed record is available.
380    #[inline]
381    pub fn try_claim(&mut self) -> Option<ReadClaim<'_>> {
382        let buffer = self.shared.buffer;
383
384        loop {
385            let offset = self.head.get() & self.shared.mask;
386            let len_ptr = unsafe { buffer.add(offset) }.cast::<AtomicUsize>();
387
388            // Relaxed atomic load, then Acquire fence for payload visibility
389            let len_raw = unsafe { &*len_ptr }.load(Ordering::Relaxed);
390            fence(Ordering::Acquire);
391
392            if len_raw == 0 {
393                // Not committed yet
394                return None;
395            }
396
397            if len_raw & SKIP_BIT != 0 {
398                // Skip marker: zero the region and advance
399                let skip_size = len_raw & LEN_MASK;
400                // Zero payload first, then stamp last (mirrors write path)
401                if skip_size > HEADER_SIZE {
402                    unsafe {
403                        ptr::write_bytes(
404                            buffer.add(offset + HEADER_SIZE),
405                            0,
406                            skip_size - HEADER_SIZE,
407                        );
408                    }
409                }
410                // Ensure payload zeroing completes before clearing stamp
411                fence(Ordering::Release);
412                unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
413
414                self.head.set(self.head.get().wrapping_add(skip_size));
415
416                // Ensure stamp clear completes before head advance
417                fence(Ordering::Release);
418                self.shared.head.store(self.head.get(), Ordering::Relaxed);
419
420                // Continue to check next position
421                continue;
422            }
423
424            // Valid record
425            let len = len_raw;
426            let record_size = align8(HEADER_SIZE + len);
427
428            return Some(ReadClaim {
429                consumer: self,
430                offset,
431                len,
432                record_size,
433            });
434        }
435    }
436
437    /// Returns the capacity of the buffer.
438    #[inline]
439    pub fn capacity(&self) -> usize {
440        self.shared.capacity
441    }
442
443    /// Best-effort hint: returns `true` if the producer has likely been dropped.
444    ///
445    /// See [`Producer::is_disconnected`] for caveats — uses `Arc::strong_count`.
446    #[inline]
447    pub fn is_disconnected(&self) -> bool {
448        Arc::strong_count(&self.shared) == 1
449    }
450}
451
452impl std::fmt::Debug for Consumer {
453    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454        f.debug_struct("Consumer")
455            .field("capacity", &self.capacity())
456            .finish_non_exhaustive()
457    }
458}
459
460// ============================================================================
461// ReadClaim
462// ============================================================================
463
464/// A claimed record for reading.
465///
466/// Dereferences to `&[u8]` for the payload. When dropped, the record region
467/// is zeroed and the head is advanced, freeing space for the producer.
468pub struct ReadClaim<'a> {
469    consumer: &'a mut Consumer,
470    offset: usize,
471    len: usize,
472    record_size: usize,
473}
474
475impl ReadClaim<'_> {
476    /// Returns the length of the payload.
477    #[inline]
478    pub fn len(&self) -> usize {
479        self.len
480    }
481
482    /// Returns `true` if the payload is empty.
483    #[inline]
484    pub fn is_empty(&self) -> bool {
485        self.len == 0
486    }
487}
488
489impl Deref for ReadClaim<'_> {
490    type Target = [u8];
491
492    #[inline]
493    fn deref(&self) -> &Self::Target {
494        let buffer = self.consumer.shared.buffer;
495        let payload_ptr = unsafe { buffer.add(self.offset + HEADER_SIZE) };
496        unsafe { std::slice::from_raw_parts(payload_ptr, self.len) }
497    }
498}
499
500impl Drop for ReadClaim<'_> {
501    fn drop(&mut self) {
502        let buffer = self.consumer.shared.buffer;
503
504        // Zero payload first, then stamp last (mirrors write path)
505        if self.record_size > HEADER_SIZE {
506            unsafe {
507                ptr::write_bytes(
508                    buffer.add(self.offset + HEADER_SIZE),
509                    0,
510                    self.record_size - HEADER_SIZE,
511                );
512            }
513        }
514        // Ensure payload zeroing completes before clearing stamp
515        fence(Ordering::Release);
516        let len_ptr = unsafe { buffer.add(self.offset) }.cast::<AtomicUsize>();
517        unsafe { &*len_ptr }.store(0, Ordering::Relaxed);
518
519        // Advance head
520        let new_head = self.consumer.head.get().wrapping_add(self.record_size);
521        self.consumer.head.set(new_head);
522
523        // Ensure stamp clear completes before head advance
524        fence(Ordering::Release);
525        self.consumer.shared.head.store(new_head, Ordering::Relaxed);
526    }
527}
528
529// ============================================================================
530// Tests
531// ============================================================================
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536
537    #[test]
538    fn basic_write_read() {
539        let (mut prod, mut cons) = new(1024);
540
541        let payload = b"hello world";
542        let mut claim = prod.try_claim(payload.len()).unwrap();
543        claim.copy_from_slice(payload);
544        claim.commit();
545
546        let record = cons.try_claim().unwrap();
547        assert_eq!(&*record, payload);
548    }
549
550    #[test]
551    fn empty_returns_none() {
552        let (_, mut cons) = new(1024);
553        assert!(cons.try_claim().is_none());
554    }
555
556    #[test]
557    fn multiple_records() {
558        let (mut prod, mut cons) = new(1024);
559
560        for i in 0..10 {
561            let payload = format!("message {}", i);
562            let mut claim = prod.try_claim(payload.len()).unwrap();
563            claim.copy_from_slice(payload.as_bytes());
564            claim.commit();
565        }
566
567        for i in 0..10 {
568            let record = cons.try_claim().unwrap();
569            let expected = format!("message {}", i);
570            assert_eq!(&*record, expected.as_bytes());
571        }
572
573        assert!(cons.try_claim().is_none());
574    }
575
576    #[test]
577    fn aborted_claim_creates_skip() {
578        let (mut prod, mut cons) = new(1024);
579
580        // Claim and drop without committing
581        {
582            let mut claim = prod.try_claim(10).unwrap();
583            claim.copy_from_slice(b"0123456789");
584            // drop without commit
585        }
586
587        // Write another record
588        {
589            let mut claim = prod.try_claim(5).unwrap();
590            claim.copy_from_slice(b"hello");
591            claim.commit();
592        }
593
594        // Consumer should skip the aborted record and read the committed one
595        let record = cons.try_claim().unwrap();
596        assert_eq!(&*record, b"hello");
597    }
598
599    #[test]
600    fn wrap_around() {
601        let (mut prod, mut cons) = new(64);
602
603        // Fill with messages that will cause wrap-around
604        for i in 0..20 {
605            let payload = format!("msg{:02}", i);
606            loop {
607                match prod.try_claim(payload.len()) {
608                    Ok(mut claim) => {
609                        claim.copy_from_slice(payload.as_bytes());
610                        claim.commit();
611                        break;
612                    }
613                    Err(_) => {
614                        // Drain some
615                        while cons.try_claim().is_some() {}
616                    }
617                }
618            }
619        }
620    }
621
622    #[test]
623    fn full_returns_error() {
624        let (mut prod, _cons) = new(64);
625
626        // Fill the buffer
627        let mut count = 0;
628        while let Ok(mut claim) = prod.try_claim(8) {
629            claim.copy_from_slice(b"12345678");
630            claim.commit();
631            count += 1;
632        }
633
634        assert!(count > 0);
635        assert!(prod.try_claim(8).is_err());
636    }
637
638    #[test]
639    fn cross_thread() {
640        use std::thread;
641
642        let (mut prod, mut cons) = new(4096);
643
644        let producer = thread::spawn(move || {
645            for i in 0..10_000u64 {
646                let payload = i.to_le_bytes();
647                loop {
648                    match prod.try_claim(payload.len()) {
649                        Ok(mut claim) => {
650                            claim.copy_from_slice(&payload);
651                            claim.commit();
652                            break;
653                        }
654                        Err(_) => std::hint::spin_loop(),
655                    }
656                }
657            }
658        });
659
660        let consumer = thread::spawn(move || {
661            let mut received = 0u64;
662            while received < 10_000 {
663                if let Some(record) = cons.try_claim() {
664                    let value = u64::from_le_bytes((*record).try_into().unwrap());
665                    assert_eq!(value, received);
666                    received += 1;
667                } else {
668                    std::hint::spin_loop();
669                }
670            }
671        });
672
673        producer.join().unwrap();
674        consumer.join().unwrap();
675    }
676
677    #[test]
678    fn disconnection_detection() {
679        let (prod, cons) = new(1024);
680
681        assert!(!prod.is_disconnected());
682        assert!(!cons.is_disconnected());
683
684        drop(cons);
685        assert!(prod.is_disconnected());
686    }
687
688    #[test]
689    #[should_panic(expected = "capacity must be at least 16")]
690    fn tiny_capacity_panics() {
691        let _ = new(8);
692    }
693
694    #[test]
695    fn zero_len_returns_error() {
696        let (mut prod, _) = new(1024);
697        assert!(matches!(prod.try_claim(0), Err(TryClaimError::ZeroLength)));
698    }
699
700    #[test]
701    fn capacity_rounds_to_power_of_two() {
702        let (prod, _) = new(100);
703        assert_eq!(prod.capacity(), 128);
704
705        let (prod, _) = new(1000);
706        assert_eq!(prod.capacity(), 1024);
707    }
708
709    #[test]
710    fn variable_length_records() {
711        let (mut prod, mut cons) = new(4096);
712
713        let messages = [
714            "a",
715            "hello",
716            "this is a longer message",
717            "x",
718            "medium length",
719        ];
720
721        for msg in &messages {
722            let mut claim = prod.try_claim(msg.len()).unwrap();
723            claim.copy_from_slice(msg.as_bytes());
724            claim.commit();
725        }
726
727        for msg in &messages {
728            let record = cons.try_claim().unwrap();
729            assert_eq!(&*record, msg.as_bytes());
730        }
731    }
732
733    /// High-volume stress test with variable-length messages.
734    ///
735    /// Tests correctness under sustained load with wrap-around.
736    #[test]
737    fn stress_high_volume() {
738        use std::thread;
739
740        const COUNT: u64 = 1_000_000;
741        const BUFFER_SIZE: usize = 64 * 1024; // 64KB
742
743        let (mut prod, mut cons) = new(BUFFER_SIZE);
744
745        let producer = thread::spawn(move || {
746            for i in 0..COUNT {
747                // Variable length: 8-64 bytes based on sequence
748                let len = 8 + ((i % 8) * 8) as usize;
749                let mut payload = vec![0u8; len];
750                // Write sequence number at start
751                payload[..8].copy_from_slice(&i.to_le_bytes());
752
753                loop {
754                    match prod.try_claim(len) {
755                        Ok(mut claim) => {
756                            claim.copy_from_slice(&payload);
757                            claim.commit();
758                            break;
759                        }
760                        Err(_) => std::hint::spin_loop(),
761                    }
762                }
763            }
764        });
765
766        let consumer = thread::spawn(move || {
767            let mut received = 0u64;
768            while received < COUNT {
769                if let Some(record) = cons.try_claim() {
770                    // Verify sequence number
771                    let seq = u64::from_le_bytes(record[..8].try_into().unwrap());
772                    assert_eq!(seq, received, "sequence mismatch at {}", received);
773
774                    // Verify expected length
775                    let expected_len = 8 + ((received % 8) * 8) as usize;
776                    assert_eq!(
777                        record.len(),
778                        expected_len,
779                        "length mismatch at {}",
780                        received
781                    );
782
783                    received += 1;
784                } else {
785                    std::hint::spin_loop();
786                }
787            }
788            received
789        });
790
791        producer.join().unwrap();
792        let received = consumer.join().unwrap();
793        assert_eq!(received, COUNT);
794    }
795
796    /// Stress test with maximum contention - tiny buffer, high throughput.
797    #[test]
798    fn stress_high_contention() {
799        use std::thread;
800
801        const COUNT: u64 = 100_000;
802        const BUFFER_SIZE: usize = 256; // Tiny buffer forces constant wrap-around
803
804        let (mut prod, mut cons) = new(BUFFER_SIZE);
805
806        let producer = thread::spawn(move || {
807            for i in 0..COUNT {
808                let payload = i.to_le_bytes();
809                loop {
810                    match prod.try_claim(payload.len()) {
811                        Ok(mut claim) => {
812                            claim.copy_from_slice(&payload);
813                            claim.commit();
814                            break;
815                        }
816                        Err(_) => std::hint::spin_loop(),
817                    }
818                }
819            }
820        });
821
822        let consumer = thread::spawn(move || {
823            let mut received = 0u64;
824            let mut sum = 0u64;
825            while received < COUNT {
826                if let Some(record) = cons.try_claim() {
827                    let value = u64::from_le_bytes((*record).try_into().unwrap());
828                    assert_eq!(value, received);
829                    sum = sum.wrapping_add(value);
830                    received += 1;
831                } else {
832                    std::hint::spin_loop();
833                }
834            }
835            sum
836        });
837
838        producer.join().unwrap();
839        let sum = consumer.join().unwrap();
840        // Sum of 0..COUNT = COUNT * (COUNT-1) / 2
841        let expected = COUNT * (COUNT - 1) / 2;
842        assert_eq!(sum, expected);
843    }
844
845    /// Payload pointers must be word-aligned so users can write aligned structs.
846    #[test]
847    fn payload_is_word_aligned() {
848        let (mut prod, mut cons) = new(1024);
849
850        // Test several payload sizes to cover padding edge cases
851        for len in [1, 3, 7, 8, 13, 64, 255] {
852            let mut claim = prod.try_claim(len).unwrap();
853            let ptr = claim.as_mut_ptr();
854            assert_eq!(
855                ptr as usize % std::mem::align_of::<usize>(),
856                0,
857                "WriteClaim payload not word-aligned for len={len}"
858            );
859            claim.commit();
860
861            let record = cons.try_claim().unwrap();
862            let ptr = record.as_ptr();
863            assert_eq!(
864                ptr as usize % std::mem::align_of::<usize>(),
865                0,
866                "ReadClaim payload not word-aligned for len={len}"
867            );
868        }
869    }
870}