Skip to main content

nexus_async_rt/channel/
mpsc_bytes.rs

1//! Bounded cross-thread MPSC byte channel.
2//!
3//! Variable-length messages over `nexus_logbuf::mpsc`. Multiple senders
4//! can write `&[u8]` into claim regions and commit. The single consumer
5//! reads `ReadClaim` references that deref to `&[u8]`.
6//!
7//! Zero allocation on the send/recv hot path. Must be created inside
8//! [`Runtime::block_on`](crate::Runtime::block_on).
9//!
10//! ```ignore
11//! use nexus_async_rt::channel::mpsc_bytes;
12//!
13//! let (tx, mut rx) = mpsc_bytes::channel(64 * 1024);
14//!
15//! // Clone sender for multiple producers
16//! let tx2 = tx.clone();
17//!
18//! // Claim, write, commit (zero-copy)
19//! let mut claim = tx.claim(5).await?;
20//! claim.copy_from_slice(b"hello");
21//! claim.commit();
22//!
23//! // Or from another sender
24//! tx2.send(b"world").await?;
25//!
26//! // Receive
27//! let msg = rx.recv().await?;
28//! assert_eq!(&*msg, b"hello");
29//! drop(msg);  // advances consumer head
30//! ```
31
32use std::cell::UnsafeCell;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
35use std::task::{Poll, Waker};
36
37use std::ops::{Deref, DerefMut};
38
39use crate::cross_wake::{FallbackWaker, TaskWakerSlot};
40
41// =============================================================================
42// Sender waiter list (intrusive, same pattern as mpsc typed)
43// =============================================================================
44
45struct SenderWakerNode {
46    waker: UnsafeCell<Option<Waker>>,
47    next: std::sync::atomic::AtomicPtr<SenderWakerNode>,
48    queued: AtomicBool,
49    /// Set when the Sender is dropped while node is in the list.
50    /// wake_one skips cancelled nodes.
51    cancelled: AtomicBool,
52}
53
54unsafe impl Send for SenderWakerNode {}
55unsafe impl Sync for SenderWakerNode {}
56
57impl SenderWakerNode {
58    fn new() -> Self {
59        Self {
60            waker: UnsafeCell::new(None),
61            next: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
62            queued: AtomicBool::new(false),
63            cancelled: AtomicBool::new(false),
64        }
65    }
66}
67
68/// Atomic head pointer for the sender waiter list.
69/// Senders CAS-push their node. Receiver pops one and wakes it.
70///
71/// Each node in the list has its Arc refcount bumped on push and
72/// decremented on pop, ensuring the node memory stays valid even
73/// if the Sender is dropped while queued.
74struct SenderWaitList {
75    head: std::sync::atomic::AtomicPtr<SenderWakerNode>,
76}
77
78impl SenderWaitList {
79    fn new() -> Self {
80        Self {
81            head: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
82        }
83    }
84
85    /// Push a sender's waker node onto the list. Thread-safe.
86    ///
87    /// Clones the Arc (bumps refcount) to keep the node alive in the list
88    /// independently of the Sender's lifetime.
89    fn push(&self, node: &Arc<SenderWakerNode>) {
90        let ptr = Arc::as_ptr(node).cast_mut();
91        // Bump refcount: the list now holds a reference.
92        std::mem::forget(Arc::clone(node));
93
94        unsafe { (*ptr).queued.store(true, Ordering::Relaxed) };
95        loop {
96            let head = self.head.load(Ordering::Acquire);
97            unsafe { (*ptr).next.store(head, Ordering::Relaxed) };
98            if self
99                .head
100                .compare_exchange_weak(head, ptr, Ordering::AcqRel, Ordering::Relaxed)
101                .is_ok()
102            {
103                break;
104            }
105        }
106    }
107
108    /// Pop one node and wake it. Called by receiver (single thread).
109    /// Skips cancelled nodes (senders that were dropped while queued).
110    /// Returns true if a sender was woken.
111    fn wake_one(&self) -> bool {
112        let head = self.head.swap(std::ptr::null_mut(), Ordering::AcqRel);
113        if head.is_null() {
114            return false;
115        }
116
117        let mut cursor = head;
118        let mut woken = false;
119        while !cursor.is_null() {
120            let next = unsafe { (*cursor).next.load(Ordering::Acquire) };
121            let cancelled = unsafe { (*cursor).cancelled.load(Ordering::Acquire) };
122
123            unsafe {
124                (*cursor).queued.store(false, Ordering::Release);
125                (*cursor)
126                    .next
127                    .store(std::ptr::null_mut(), Ordering::Relaxed);
128            }
129
130            if !cancelled && !woken {
131                let waker = unsafe { (*cursor).waker.get().read() };
132                unsafe { (*cursor).waker.get().write(None) };
133                // Drop the list's Arc refcount for this node.
134                unsafe { Arc::decrement_strong_count(cursor) };
135                if let Some(w) = waker {
136                    w.wake();
137                    woken = true;
138                }
139            } else if !cancelled {
140                // Non-cancelled but already woke one -- re-push.
141                // Keep the refcount (list still owns it).
142                loop {
143                    let cur_head = self.head.load(Ordering::Acquire);
144                    unsafe { (*cursor).next.store(cur_head, Ordering::Relaxed) };
145                    unsafe { (*cursor).queued.store(true, Ordering::Relaxed) };
146                    if self
147                        .head
148                        .compare_exchange_weak(
149                            cur_head,
150                            cursor,
151                            Ordering::AcqRel,
152                            Ordering::Relaxed,
153                        )
154                        .is_ok()
155                    {
156                        break;
157                    }
158                }
159            } else {
160                // Cancelled: drop the list's Arc refcount.
161                unsafe { Arc::decrement_strong_count(cursor) };
162            }
163
164            cursor = next;
165        }
166
167        woken
168    }
169
170    fn has_waiters(&self) -> bool {
171        !self.head.load(Ordering::Acquire).is_null()
172    }
173
174    /// Wake all waiters. Called when receiver drops.
175    fn wake_all(&self) {
176        let mut node = self.head.swap(std::ptr::null_mut(), Ordering::AcqRel);
177        while !node.is_null() {
178            let next = unsafe { (*node).next.load(Ordering::Acquire) };
179            let cancelled = unsafe { (*node).cancelled.load(Ordering::Acquire) };
180            unsafe {
181                (*node).next.store(std::ptr::null_mut(), Ordering::Relaxed);
182                (*node).queued.store(false, Ordering::Release);
183            }
184            if !cancelled {
185                let waker = unsafe { (*node).waker.get().read() };
186                unsafe { (*node).waker.get().write(None) };
187                if let Some(w) = waker {
188                    w.wake();
189                }
190            }
191            // Drop the list's Arc refcount.
192            unsafe { Arc::decrement_strong_count(node) };
193            node = next;
194        }
195    }
196}
197
198// =============================================================================
199// Shared state
200// =============================================================================
201
202struct Inner {
203    rx_slot: TaskWakerSlot,
204    rx_fallback: FallbackWaker,
205    tx_waiters: SenderWaitList,
206    _cross_wake_owner: Arc<crate::cross_wake::CrossWakeContext>,
207    sender_count: AtomicUsize,
208    rx_closed: AtomicBool,
209}
210
211unsafe impl Send for Inner {}
212unsafe impl Sync for Inner {}
213
214impl Inner {
215    fn wake_rx(&self) {
216        if !self.rx_slot.wake() {
217            self.rx_fallback.wake();
218        }
219    }
220
221    fn has_rx_waker(&self) -> bool {
222        self.rx_slot.has_waker() || self.rx_fallback.has_waker()
223    }
224}
225
226// =============================================================================
227// WriteClaim wrapper -- auto-notifies receiver on commit
228// =============================================================================
229
230/// A claimed write region in the byte channel. Dereferences to `&mut [u8]`.
231///
232/// Call [`.commit()`](WriteClaim::commit) to publish the record and
233/// wake the receiver. Dropping without commit writes a skip marker (abort).
234pub struct WriteClaim<'a> {
235    inner: nexus_logbuf::queue::mpsc::WriteClaim<'a>,
236    notify: &'a Inner,
237}
238
239impl WriteClaim<'_> {
240    /// Commit the record, making it visible to the receiver.
241    /// Automatically wakes the receiver if it's parked.
242    pub fn commit(self) {
243        let notify = self.notify;
244        self.inner.commit();
245        if notify.has_rx_waker() {
246            notify.wake_rx();
247        }
248    }
249
250    /// Payload length in bytes.
251    pub fn len(&self) -> usize {
252        self.inner.len()
253    }
254
255    /// Always false (claims must have len > 0).
256    pub fn is_empty(&self) -> bool {
257        self.inner.is_empty()
258    }
259}
260
261impl Deref for WriteClaim<'_> {
262    type Target = [u8];
263    fn deref(&self) -> &[u8] {
264        &self.inner
265    }
266}
267
268impl DerefMut for WriteClaim<'_> {
269    fn deref_mut(&mut self) -> &mut [u8] {
270        &mut self.inner
271    }
272}
273
274// =============================================================================
275// ReadClaim wrapper -- auto-wakes sender on drop (frees space)
276// =============================================================================
277
278/// A received message from the byte channel. Dereferences to `&[u8]`.
279///
280/// When dropped, the record region is freed (consumer head advances)
281/// and a sender is woken if it was parked on a full buffer.
282pub struct ReadClaim<'a> {
283    inner: nexus_logbuf::queue::mpsc::ReadClaim<'a>,
284    notify: &'a Inner,
285}
286
287impl ReadClaim<'_> {
288    /// Payload length in bytes.
289    pub fn len(&self) -> usize {
290        self.inner.len()
291    }
292
293    /// Always false.
294    pub fn is_empty(&self) -> bool {
295        self.inner.is_empty()
296    }
297}
298
299impl Deref for ReadClaim<'_> {
300    type Target = [u8];
301    fn deref(&self) -> &[u8] {
302        &self.inner
303    }
304}
305
306impl Drop for ReadClaim<'_> {
307    fn drop(&mut self) {
308        // The inner ReadClaim drops after this impl runs (field drop order),
309        // which advances the consumer head and frees space. We wake a
310        // sender BEFORE inner drops -- the sender will re-try and see space
311        // once inner's drop completes. This ordering is acceptable because
312        // the sender's try_claim will simply fail and re-park if the space
313        // isn't freed yet. On the next poll it succeeds.
314        if self.notify.tx_waiters.has_waiters() {
315            self.notify.tx_waiters.wake_one();
316        }
317    }
318}
319
320// =============================================================================
321// Error types
322// =============================================================================
323
324/// Claim failed.
325///
326/// `len == 0` is not a runtime error — it's a precondition violation and
327/// panics in [`nexus_logbuf::queue::mpsc::Producer::try_claim`].
328#[derive(Debug)]
329#[non_exhaustive]
330pub enum ClaimError {
331    /// All receivers were dropped.
332    Closed,
333    /// Requested length exceeds buffer capacity (can never succeed).
334    TooLarge,
335}
336
337impl std::fmt::Display for ClaimError {
338    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
339        match self {
340            Self::Closed => f.write_str("byte channel closed"),
341            Self::TooLarge => f.write_str("message exceeds buffer capacity"),
342        }
343    }
344}
345
346impl std::error::Error for ClaimError {}
347
348/// Receive failed -- all senders dropped and buffer empty.
349#[derive(Debug)]
350pub struct RecvError;
351
352impl std::fmt::Display for RecvError {
353    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
354        f.write_str("byte channel closed")
355    }
356}
357
358impl std::error::Error for RecvError {}
359
360// =============================================================================
361// channel()
362// =============================================================================
363
364/// Create a bounded cross-thread MPSC byte channel.
365///
366/// `capacity` is the ring buffer size in bytes (rounded up to next power of two).
367///
368/// `Sender` is `Clone + Send` -- multiple producers allowed.
369/// `Receiver` is `Send` -- single consumer.
370///
371/// # Panics
372///
373/// - Panics if called outside [`Runtime::block_on`](crate::Runtime::block_on).
374pub fn channel(capacity: usize) -> (Sender, Receiver) {
375    crate::context::assert_in_runtime("mpsc_bytes::channel() called outside Runtime::block_on");
376
377    let cross_ctx = crate::cross_wake::cross_wake_context()
378        .expect("mpsc_bytes::channel() requires runtime context");
379
380    let (producer, consumer) = nexus_logbuf::queue::mpsc::new(capacity);
381    let rx_slot = TaskWakerSlot::new(Arc::as_ptr(&cross_ctx));
382
383    let inner = Arc::new(Inner {
384        rx_slot,
385        rx_fallback: FallbackWaker::new(),
386        tx_waiters: SenderWaitList::new(),
387        _cross_wake_owner: cross_ctx,
388        sender_count: AtomicUsize::new(1),
389        rx_closed: AtomicBool::new(false),
390    });
391
392    (
393        Sender {
394            producer,
395            inner: inner.clone(),
396            wake_node: Arc::new(SenderWakerNode::new()),
397        },
398        Receiver { consumer, inner },
399    )
400}
401
402// =============================================================================
403// Sender
404// =============================================================================
405
406/// Sending half of a bounded MPSC byte channel.
407///
408/// `Clone + Send` -- multiple producers allowed.
409pub struct Sender {
410    producer: nexus_logbuf::queue::mpsc::Producer,
411    inner: Arc<Inner>,
412    /// Pre-allocated waker node for backpressure parking.
413    /// Arc so the node survives in the waiter list after Sender drops.
414    wake_node: Arc<SenderWakerNode>,
415}
416
417impl Sender {
418    /// Claim `len` bytes for zero-copy writing.
419    ///
420    /// Waits if the buffer is full. Write into the returned `WriteClaim`,
421    /// then call `.commit()` to publish. Drop without commit writes a
422    /// skip marker (abort).
423    ///
424    /// Returns `Err(ClaimError::TooLarge)` immediately if `len` exceeds
425    /// the buffer capacity (can never succeed).
426    ///
427    /// # Panics
428    ///
429    /// Polling the returned future with `len == 0` panics (see
430    /// [`nexus_logbuf::queue::mpsc::Producer::try_claim`]).
431    pub fn claim(&mut self, len: usize) -> ClaimFut<'_> {
432        ClaimFut { sender: self, len }
433    }
434
435    /// Try to claim without waiting.
436    ///
437    /// # Panics
438    ///
439    /// Panics if `len == 0` (see
440    /// [`nexus_logbuf::queue::mpsc::Producer::try_claim`]).
441    pub fn try_claim(&mut self, len: usize) -> Result<WriteClaim<'_>, nexus_logbuf::BufferFull> {
442        let inner_claim = self.producer.try_claim(len)?;
443        Ok(WriteClaim {
444            inner: inner_claim,
445            notify: &self.inner,
446        })
447    }
448}
449
450impl Clone for Sender {
451    fn clone(&self) -> Self {
452        self.inner.sender_count.fetch_add(1, Ordering::Relaxed);
453        Self {
454            producer: self.producer.clone(),
455            inner: self.inner.clone(),
456            wake_node: Arc::new(SenderWakerNode::new()),
457        }
458    }
459}
460
461impl Drop for Sender {
462    fn drop(&mut self) {
463        // Mark our wake node as cancelled. If it's in the waiter list,
464        // wake_one/wake_all will skip it (they check cancelled with
465        // Acquire before reading the waker). The waker is NOT touched
466        // here — wake_one may be reading it concurrently on the
467        // receiver thread.
468        self.wake_node.cancelled.store(true, Ordering::Release);
469
470        if self.inner.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
471            // Last sender dropped -- wake receiver so it sees closed.
472            self.inner.wake_rx();
473        }
474    }
475}
476
477// SAFETY: Inner uses atomic operations. Producer is Send. wake_node is owned.
478unsafe impl Send for Sender {}
479
480// =============================================================================
481// ClaimFut
482// =============================================================================
483
484/// Future returned by [`Sender::claim`].
485pub struct ClaimFut<'a> {
486    sender: &'a mut Sender,
487    len: usize,
488}
489
490impl<'a> Future for ClaimFut<'a> {
491    type Output = Result<WriteClaim<'a>, ClaimError>;
492
493    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
494        let this = unsafe { &mut *std::pin::Pin::into_inner_unchecked(self) };
495        // SAFETY: Extend the reborrow lifetime to 'a. This is sound because:
496        // - ClaimFut holds &'a mut Sender, so the Sender lives for 'a
497        // - WriteClaim borrows &mut Producer from that Sender
498        // - The future won't be polled again after returning Ready
499        let sender: &'a mut Sender = unsafe { &mut *(this.sender as *mut Sender) };
500
501        // Precondition check before any state inspection — `len == 0` is a
502        // contract violation regardless of channel state, and the doc
503        // contract is honest only if it panics unconditionally.
504        assert!(this.len > 0, "payload length must be non-zero");
505
506        if sender.inner.rx_closed.load(Ordering::Acquire) {
507            return Poll::Ready(Err(ClaimError::Closed));
508        }
509
510        if this.len > sender.producer.capacity() {
511            return Poll::Ready(Err(ClaimError::TooLarge));
512        }
513
514        if let Ok(inner_claim) = sender.producer.try_claim(this.len) {
515            return Poll::Ready(Ok(WriteClaim {
516                inner: inner_claim,
517                notify: &sender.inner,
518            }));
519        }
520        // BufferFull — park in the waiter list.
521        let node = &sender.wake_node;
522        if !node.queued.load(Ordering::Acquire) {
523            // Not in list yet -- safe to write waker, then push.
524            // SAFETY: exclusive access -- node not in any shared structure.
525            unsafe { *node.waker.get() = Some(cx.waker().clone()) };
526            sender.inner.tx_waiters.push(node);
527        }
528        Poll::Pending
529    }
530}
531
532unsafe impl Send for ClaimFut<'_> {}
533
534// =============================================================================
535// Receiver
536// =============================================================================
537
538/// Receiving half of a bounded MPSC byte channel.
539///
540/// `Send` but not `Clone` -- single consumer.
541pub struct Receiver {
542    consumer: nexus_logbuf::queue::mpsc::Consumer,
543    inner: Arc<Inner>,
544}
545
546impl Receiver {
547    /// Receive the next message. Returns a `ReadClaim` that derefs to `&[u8]`.
548    ///
549    /// Dropping the claim advances the consumer head and wakes a sender
550    /// if it was blocked on a full buffer.
551    pub fn recv(&mut self) -> RecvFut<'_> {
552        RecvFut { receiver: self }
553    }
554
555    /// Try to receive without waiting.
556    pub fn try_recv(&mut self) -> Option<ReadClaim<'_>> {
557        let inner_claim = self.consumer.try_claim()?;
558        Some(ReadClaim {
559            inner: inner_claim,
560            notify: &self.inner,
561        })
562    }
563}
564
565/// Future returned by [`Receiver::recv`].
566pub struct RecvFut<'a> {
567    receiver: &'a mut Receiver,
568}
569
570impl Drop for RecvFut<'_> {
571    fn drop(&mut self) {
572        self.receiver.inner.rx_slot.clear();
573    }
574}
575
576impl<'a> Future for RecvFut<'a> {
577    type Output = Result<ReadClaim<'a>, RecvError>;
578
579    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
580        // SAFETY: RecvFut is not Unpin-sensitive. We need &mut access to
581        // receiver.consumer for try_claim, and the returned ReadClaim must
582        // have lifetime 'a (tied to the Receiver, not this poll call).
583        let this = unsafe { &mut *std::pin::Pin::into_inner_unchecked(self) };
584
585        // SAFETY: Extend the reborrow lifetime to 'a. This is sound because:
586        // - RecvFut holds &'a mut Receiver, so the Receiver lives for 'a
587        // - ReadClaim borrows &mut Consumer from that Receiver
588        // - The future won't be polled again after returning Ready
589        let receiver: &'a mut Receiver = unsafe { &mut *(this.receiver as *mut Receiver) };
590
591        // Try to claim.
592        if let Some(inner_claim) = receiver.consumer.try_claim() {
593            return Poll::Ready(Ok(ReadClaim {
594                inner: inner_claim,
595                notify: &receiver.inner,
596            }));
597        }
598
599        // Empty + all senders dropped -> closed.
600        if receiver.inner.sender_count.load(Ordering::Acquire) == 0 {
601            return Poll::Ready(Err(RecvError));
602        }
603
604        // Park.
605        if !receiver.inner.rx_slot.try_register_local(cx.waker()) {
606            receiver.inner.rx_fallback.register(cx.waker());
607        }
608
609        Poll::Pending
610    }
611}
612
613unsafe impl Send for RecvFut<'_> {}
614
615impl Drop for Receiver {
616    fn drop(&mut self) {
617        self.inner.rx_closed.store(true, Ordering::Release);
618        self.inner.tx_waiters.wake_all();
619    }
620}
621
622unsafe impl Send for Receiver {}
623
624// =============================================================================
625// Tests
626// =============================================================================
627
628#[cfg(test)]
629mod tests {
630    use super::*;
631
632    fn test_channel(capacity: usize) -> (Sender, Receiver) {
633        let poll = mio::Poll::new().unwrap();
634        let mio_waker = Arc::new(mio::Waker::new(poll.registry(), mio::Token(usize::MAX)).unwrap());
635        let cross_ctx = Arc::new(crate::cross_wake::CrossWakeContext {
636            queue: crate::cross_wake::CrossWakeQueue::new(),
637            mio_waker,
638            parked: AtomicBool::new(false),
639        });
640
641        let (producer, consumer) = nexus_logbuf::queue::mpsc::new(capacity);
642        let rx_slot = TaskWakerSlot::new(Arc::as_ptr(&cross_ctx));
643
644        let inner = Arc::new(Inner {
645            rx_slot,
646            rx_fallback: FallbackWaker::new(),
647            tx_waiters: SenderWaitList::new(),
648            _cross_wake_owner: cross_ctx,
649            sender_count: AtomicUsize::new(1),
650            rx_closed: AtomicBool::new(false),
651        });
652
653        (
654            Sender {
655                producer,
656                inner: inner.clone(),
657                wake_node: Arc::new(SenderWakerNode::new()),
658            },
659            Receiver { consumer, inner },
660        )
661    }
662
663    fn try_send(tx: &mut Sender, data: &[u8]) {
664        let mut claim = tx.try_claim(data.len()).unwrap();
665        claim.copy_from_slice(data);
666        claim.commit();
667    }
668
669    #[test]
670    fn claim_commit_recv() {
671        let (mut tx, mut rx) = test_channel(4096);
672        try_send(&mut tx, b"hello");
673        try_send(&mut tx, b"world");
674
675        let msg = rx.try_recv().unwrap();
676        assert_eq!(&*msg, b"hello");
677        drop(msg);
678
679        let msg = rx.try_recv().unwrap();
680        assert_eq!(&*msg, b"world");
681        drop(msg);
682
683        assert!(rx.try_recv().is_none());
684    }
685
686    #[test]
687    fn fifo_ordering() {
688        let (mut tx, mut rx) = test_channel(4096);
689        for i in 0u32..10 {
690            try_send(&mut tx, &i.to_le_bytes());
691        }
692        for i in 0u32..10 {
693            let msg = rx.try_recv().unwrap();
694            assert_eq!(&*msg, &i.to_le_bytes());
695        }
696    }
697
698    #[test]
699    fn sender_drop_signals_closed() {
700        let (mut tx, mut rx) = test_channel(4096);
701        try_send(&mut tx, b"last");
702        drop(tx);
703
704        let msg = rx.try_recv().unwrap();
705        assert_eq!(&*msg, b"last");
706        drop(msg);
707
708        assert!(rx.try_recv().is_none());
709    }
710
711    #[test]
712    fn receiver_drop_signals_sender() {
713        let (_tx, rx) = test_channel(4096);
714        drop(rx);
715        assert!(_tx.inner.rx_closed.load(Ordering::Acquire));
716    }
717
718    #[test]
719    fn variable_length_messages() {
720        let (mut tx, mut rx) = test_channel(8192);
721
722        try_send(&mut tx, b"hi");
723        try_send(&mut tx, &vec![0xABu8; 100]);
724        try_send(&mut tx, &vec![0xCDu8; 1000]);
725
726        let msg = rx.try_recv().unwrap();
727        assert_eq!(msg.len(), 2);
728        drop(msg);
729
730        let msg = rx.try_recv().unwrap();
731        assert_eq!(msg.len(), 100);
732        drop(msg);
733
734        let msg = rx.try_recv().unwrap();
735        assert_eq!(msg.len(), 1000);
736    }
737
738    #[test]
739    fn cross_thread_claim_send() {
740        let (mut tx, mut rx) = test_channel(64 * 1024);
741
742        let handle = std::thread::spawn(move || {
743            for i in 0u64..100 {
744                try_send(&mut tx, &i.to_le_bytes());
745            }
746        });
747
748        handle.join().unwrap();
749
750        for i in 0u64..100 {
751            let msg = rx.try_recv().unwrap();
752            assert_eq!(&*msg, &i.to_le_bytes());
753        }
754    }
755
756    #[test]
757    fn stress_sequential() {
758        let (mut tx, mut rx) = test_channel(4096);
759        let data = [0xFFu8; 32];
760
761        let n = if cfg!(miri) { 100 } else { 10_000 };
762        for _ in 0..n {
763            try_send(&mut tx, &data);
764            let msg = rx.try_recv().unwrap();
765            assert_eq!(msg.len(), 32);
766        }
767    }
768
769    #[test]
770    fn claim_without_commit_aborts() {
771        let (mut tx, mut rx) = test_channel(4096);
772
773        // Claim and drop without commit -- skip marker.
774        let claim = tx.try_claim(10).unwrap();
775        drop(claim);
776
777        // Next claim + commit should work.
778        try_send(&mut tx, b"after_abort");
779
780        let msg = rx.try_recv().unwrap();
781        assert_eq!(&*msg, b"after_abort");
782    }
783
784    #[test]
785    fn multiple_senders() {
786        let (mut tx1, mut rx) = test_channel(64 * 1024);
787        let mut tx2 = tx1.clone();
788
789        try_send(&mut tx1, b"from_tx1");
790        try_send(&mut tx2, b"from_tx2");
791        try_send(&mut tx1, b"tx1_again");
792
793        let msg = rx.try_recv().unwrap();
794        assert_eq!(&*msg, b"from_tx1");
795        drop(msg);
796
797        let msg = rx.try_recv().unwrap();
798        assert_eq!(&*msg, b"from_tx2");
799        drop(msg);
800
801        let msg = rx.try_recv().unwrap();
802        assert_eq!(&*msg, b"tx1_again");
803        drop(msg);
804
805        assert!(rx.try_recv().is_none());
806    }
807
808    /// Sender dropped while its wake_node may be in the waiter list.
809    /// Previously caused use-after-free when wake_one read freed memory.
810    /// Fixed by Arc refcount on the node.
811    #[test]
812    fn sender_drop_while_queued() {
813        let (mut tx1, mut rx) = test_channel(4096);
814        let tx2 = tx1.clone();
815
816        try_send(&mut tx1, b"data");
817
818        // Drop tx2 -- its node may or may not be in the list.
819        // Key test: this shouldn't crash even if the node IS in the list.
820        drop(tx2);
821
822        // Receiver pops -- should still work.
823        let msg = rx.try_recv().unwrap();
824        assert_eq!(&*msg, b"data");
825        drop(msg);
826
827        // tx1 can still send.
828        try_send(&mut tx1, b"more");
829        let msg = rx.try_recv().unwrap();
830        assert_eq!(&*msg, b"more");
831    }
832}
833
834// =============================================================================
835// BUG-2 (#168) — cross-thread wake-path UAF regression tests
836// =============================================================================
837//
838// Tests live in `crate::cross_wake::uaf_scenarios` (one canonical body
839// per scenario, shared across all four channels). These per-channel
840// `#[test]` wrappers exist for `cargo test mpsc_bytes::uaf_tests`
841// output visibility and to verify the consolidated `TaskWakerSlot`
842// works identically across channel modules.
843#[cfg(test)]
844mod uaf_tests {
845    use crate::cross_wake::uaf_scenarios as h;
846
847    #[test]
848    fn waker_slot_uaf_when_task_freed_mid_dispatch() {
849        h::waker_slot_uaf_when_task_freed_mid_dispatch();
850    }
851
852    #[test]
853    fn slot_drop_releases_ref_when_still_registered() {
854        h::slot_drop_releases_ref_when_still_registered();
855    }
856
857    #[test]
858    fn register_during_wake_does_not_leak_ref() {
859        h::register_during_wake_does_not_leak_ref();
860    }
861}