heph_inbox/
lib.rs

1//! Bounded capacity channel.
2//!
3//! The channel is a multi-producer, single-consumer (MPSC) bounded queue. It is
4//! designed to be used as inbox for actors, following the [actor model].
5//!
6//! [actor model]: https://en.wikipedia.org/wiki/Actor_model
7//!
8//! # Notes
9//!
10//! The implementation assumes the access to the channel is mostly uncontested
11//! and optimises for this use case. Furthermore it optimises for small memory
12//! footprint, sometimes over faster access.
13//!
14//! The implementation doesn't provide a lot of guarantees. For example this
15//! channel is **not** guaranteed to be First In First Out (FIFO), it does this
16//! on a best effort basis. In return it means that a slow `Sender` does not
17//! block the receiving of other messages.
18//!
19//! # Examples
20//!
21//! Simple creation of a channel and sending a message over it.
22//!
23//!```
24//! use std::thread;
25//!
26//! use heph_inbox::RecvError;
27//!
28//! // Create a new small channel.
29//! let (sender, mut receiver) = heph_inbox::new_small();
30//!
31//! let sender_handle = thread::spawn(move || {
32//!     if let Err(err) = sender.try_send("Hello world!".to_owned()) {
33//!         panic!("Failed to send value: {}", err);
34//!     }
35//! });
36//!
37//! let receiver_handle = thread::spawn(move || {
38//! #   #[cfg(not(miri))] // `sleep` not supported.
39//! #   thread::sleep(std::time::Duration::from_millis(1)); // Don't waste cycles.
40//!     // NOTE: this is just an example don't actually use a loop like this, it
41//!     // will waste CPU cycles when the channel is empty!
42//!     loop {
43//!         match receiver.try_recv() {
44//!             Ok(value) => println!("Got a value: {}", value),
45//!             Err(RecvError::Empty) => continue,
46//!             Err(RecvError::Disconnected) => break,
47//!         }
48//!     }
49//! });
50//!
51//! sender_handle.join().unwrap();
52//! receiver_handle.join().unwrap();
53//! ```
54
55#![cfg_attr(unstable_nightly, feature(cfg_sanitize))]
56#![warn(
57    missing_debug_implementations,
58    missing_docs,
59    unused_results,
60    variant_size_differences
61)]
62// Disallow warnings when running tests.
63#![cfg_attr(test, deny(warnings))]
64// Disallow warnings in examples, we want to set a good example after all.
65#![doc(test(attr(deny(warnings))))]
66
67use std::alloc::{alloc, handle_alloc_error, Layout};
68use std::cell::UnsafeCell;
69use std::error::Error;
70use std::fmt;
71use std::future::Future;
72use std::mem::{drop as unlock, replace, take, MaybeUninit};
73use std::ops::Deref;
74use std::pin::Pin;
75use std::ptr::{self, NonNull};
76use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
77use std::sync::Mutex;
78use std::task::{self, Poll};
79
80#[cfg(test)]
81mod tests;
82
83/// `ThreadSanitizer` does not support memory fences. To avoid false positive
84/// reports use atomic loads for synchronization instead of a fence. Macro
85/// inspired by the one found in Rust's standard library for the `Arc`
86/// implementation.
87macro_rules! fence {
88    ($val: expr, $ordering: expr) => {
89        #[cfg_attr(unstable_nightly, not(sanitize = "thread"))]
90        std::sync::atomic::fence($ordering);
91        #[cfg_attr(unstable_nightly, sanitize = "thread")]
92        let _ = $val.load($ordering);
93    };
94}
95
96pub mod oneshot;
97
98mod waker;
99use waker::WakerRegistration;
100
101/// The capacity of a small channel.
102const SMALL_CAP: usize = 8;
103/// Maximum capacity of a channel.
104// NOTE: see [`Channel::new`] why.
105pub const MAX_CAP: usize = 29;
106/// Minimum capacity of a channel.
107pub const MIN_CAP: usize = 1;
108
109/// Create a small bounded channel.
110pub fn new_small<T>() -> (Sender<T>, Receiver<T>) {
111    new(SMALL_CAP)
112}
113
114/// Create a new bounded channel.
115///
116/// The `capacity` must be in the range [`MIN_CAP`]`..=`[`MAX_CAP`].
117pub fn new<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
118    assert!(
119        (MIN_CAP..=MAX_CAP).contains(&capacity),
120        "inbox channel capacity must be between {} and {}",
121        MIN_CAP,
122        MAX_CAP
123    );
124    let channel = Channel::new(capacity);
125    let sender = Sender { channel };
126    let receiver = Receiver { channel };
127    (sender, receiver)
128}
129
130/// Bit mask to mark the receiver as alive.
131const RECEIVER_ALIVE: usize = 1 << (usize::BITS - 1);
132/// Bit mask to mark the receiver still has access to the channel. See the
133/// `Drop` impl for [`Receiver`].
134const RECEIVER_ACCESS: usize = 1 << (usize::BITS - 2);
135/// Bit mask to mark a sender still has access to the channel. See the `Drop`
136/// impl for [`Sender`].
137const SENDER_ACCESS: usize = 1 << (usize::BITS - 3);
138/// Bit mask to mark the manager as alive.
139const MANAGER_ALIVE: usize = 1 << (usize::BITS - 4);
140/// Bit mask to mark the manager has access to the channel. See the `Drop` impl
141/// for [`Manager`].
142const MANAGER_ACCESS: usize = 1 << (usize::BITS - 5);
143
144/// Return `true` if the receiver or manager is alive in `ref_count`.
145#[inline(always)]
146const fn has_receiver(ref_count: usize) -> bool {
147    ref_count & RECEIVER_ALIVE != 0
148}
149
150/// Returns `true` if the manager is alive in `ref_count`.
151#[inline(always)]
152const fn has_manager(ref_count: usize) -> bool {
153    ref_count & MANAGER_ALIVE != 0
154}
155
156/// Return `true` if the receiver or manager is alive in `ref_count`.
157#[inline(always)]
158const fn has_receiver_or_manager(ref_count: usize) -> bool {
159    ref_count & (RECEIVER_ALIVE | MANAGER_ALIVE) != 0
160}
161
162/// Returns the number of senders connected in `ref_count`.
163#[inline(always)]
164const fn sender_count(ref_count: usize) -> usize {
165    ref_count & !(RECEIVER_ALIVE | RECEIVER_ACCESS | SENDER_ACCESS | MANAGER_ALIVE | MANAGER_ACCESS)
166}
167
168// Bits to mark the status of a slot.
169const STATUS_BITS: u64 = 2; // Number of bits used per slot.
170const STATUS_MASK: u64 = (1 << STATUS_BITS) - 1;
171#[cfg(test)]
172const ALL_STATUSES_MASK: u64 = (1 << (MAX_CAP as u64 * STATUS_BITS)) - 1;
173// The possible statuses of a slot.
174const EMPTY: u64 = 0b00; // Slot is empty (initial state).
175const TAKEN: u64 = 0b01; // `Sender` acquired write access, currently writing.
176const FILLED: u64 = 0b11; // `Sender` wrote a value into the slot.
177const READING: u64 = 0b10; // A `Receiver` is reading from the slot.
178
179// Status transitions.
180const MARK_TAKEN: u64 = 0b01; // OR to go from EMPTY -> TAKEN.
181const MARK_FILLED: u64 = 0b11; // OR to go from TAKEN -> FILLED.
182const MARK_READING: u64 = 0b01; // XOR to go from FILLED -> READING.
183const MARK_EMPTIED: u64 = 0b11; // ! AND to go from FILLED or READING -> EMPTY.
184
185/// Returns `true` if `slot` in `status` is empty.
186#[inline(always)]
187fn is_available(status: u64, slot: usize) -> bool {
188    has_status(status, slot, EMPTY)
189}
190
191/// Returns `true` if `slot` in `status` is filled.
192#[inline(always)]
193fn is_filled(status: u64, slot: usize) -> bool {
194    has_status(status, slot, FILLED)
195}
196
197/// Returns `true` if `slot` (in `status`) equals the `expected` status.
198#[inline(always)]
199fn has_status(status: u64, slot: usize, expected: u64) -> bool {
200    slot_status(status, slot) == expected
201}
202
203/// Returns the `STATUS_BITS` for `slot` in `status`.
204#[inline(always)]
205fn slot_status(status: u64, slot: usize) -> u64 {
206    debug_assert!(slot <= MAX_CAP);
207    (status >> (STATUS_BITS * slot as u64)) & STATUS_MASK
208}
209
210/// Creates a mask to transition `slot` using `transition`. `transition` must be
211/// one of the `MARK_*` constants.
212#[inline(always)]
213fn mark_slot(slot: usize, transition: u64) -> u64 {
214    debug_assert!(slot <= MAX_CAP);
215    transition << (STATUS_BITS * slot as u64)
216}
217
218/// Returns a string name for the `slot_status`.
219fn dbg_status(slot_status: u64) -> &'static str {
220    match slot_status {
221        EMPTY => "EMPTY",
222        TAKEN => "TAKEN",
223        FILLED => "FILLED",
224        READING => "READING",
225        _ => "INVALID",
226    }
227}
228
229// Bits to mark the position of the receiver.
230const MARK_NEXT_POS: u64 = 1 << (STATUS_BITS * MAX_CAP as u64); // Add to increase position by 1.
231
232/// Returns the position of the receiver. Will be in 0..[`MAX_CAP`] range.
233#[inline(always)]
234#[allow(clippy::cast_possible_truncation)]
235fn receiver_pos(status: u64, capacity: usize) -> usize {
236    (status >> (STATUS_BITS * MAX_CAP as u64)) as usize % capacity
237}
238
239/// Sending side of the channel.
240pub struct Sender<T> {
241    channel: NonNull<Channel<T>>,
242}
243
244/// Error returned in case sending a value across the channel fails. See
245/// [`Sender::try_send`].
246#[derive(Copy, Clone, Debug, Eq, PartialEq)]
247pub enum SendError<T> {
248    /// Channel is full.
249    Full(T),
250    /// [`Receiver`] and [`Manager`] are disconnected.
251    Disconnected(T),
252}
253
254impl<T> fmt::Display for SendError<T> {
255    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256        match self {
257            SendError::Full(..) => f.pad("channel is full"),
258            SendError::Disconnected(..) => f.pad("receiver is disconnected"),
259        }
260    }
261}
262
263impl<T: fmt::Debug> Error for SendError<T> {}
264
265impl<T> Sender<T> {
266    /// Attempts to send the `value` into the channel.
267    pub fn try_send(&self, value: T) -> Result<(), SendError<T>> {
268        try_send(self.channel(), value)
269    }
270
271    /// Returns a future that sends a value into the channel, waiting if the
272    /// channel is full.
273    ///
274    /// If the returned [`Future`] returns an error it means the [`Receiver`]
275    /// and [`Manager`] are [disconnected] and no more values will be read from
276    /// the channel. This is the same error as [`SendError::Disconnected`].
277    /// [`SendError::Full`] will never be returned, the `Future` will return
278    /// [`Poll::Pending`] instead.
279    ///
280    /// [disconnected]: Sender::is_connected
281    pub fn send(&self, value: T) -> SendValue<T> {
282        SendValue {
283            channel: self.channel(),
284            value: Some(value),
285            registered_waker: None,
286        }
287    }
288
289    /// Returns a [`Future`] that waits until the other side of the channel is
290    /// [disconnected].
291    ///
292    /// [disconnected]: Sender::is_connected
293    pub fn join(&self) -> Join<T> {
294        Join {
295            channel: self.channel(),
296            registered_waker: None,
297        }
298    }
299
300    /// Returns the capacity of the channel.
301    pub fn capacity(&self) -> usize {
302        self.channel().slots.len()
303    }
304
305    /// Returns `true` if the [`Receiver`] and or the [`Manager`] are connected.
306    ///
307    /// # Notes
308    ///
309    /// Unlike [`Receiver::is_connected`] this method takes the [`Manager`] into
310    /// account. This is done to support the use case in which an actor is
311    /// restarted and a new receiver is created for it.
312    pub fn is_connected(&self) -> bool {
313        // Relaxed is fine here since there is always a bit of a race condition
314        // when using this method (and then doing something based on it).
315        has_receiver_or_manager(self.channel().ref_count.load(Ordering::Relaxed))
316    }
317
318    /// Returns `true` if the [`Manager`] is connected.
319    pub fn has_manager(&self) -> bool {
320        // Relaxed is fine here since there is always a bit of a race condition
321        // when using this method (and then doing something based on it).
322        has_manager(self.channel().ref_count.load(Ordering::Relaxed))
323    }
324
325    /// Returns `true` if senders send into the same channel.
326    pub fn same_channel(&self, other: &Sender<T>) -> bool {
327        self.channel == other.channel
328    }
329
330    /// Returns `true` if this sender sends to the `receiver`.
331    pub fn sends_to(&self, receiver: &Receiver<T>) -> bool {
332        self.channel == receiver.channel
333    }
334
335    /// Returns the id of this sender.
336    pub fn id(&self) -> Id {
337        Id(self.channel.as_ptr() as *const () as usize)
338    }
339
340    fn channel(&self) -> &Channel<T> {
341        unsafe { self.channel.as_ref() }
342    }
343}
344
345/// See [`Sender::try_send`].
346fn try_send<T>(channel: &Channel<T>, value: T) -> Result<(), SendError<T>> {
347    if !has_receiver_or_manager(channel.ref_count.load(Ordering::Relaxed)) {
348        return Err(SendError::Disconnected(value));
349    }
350
351    // NOTE: relaxed ordering here is ok because we acquire unique
352    // permission to write to the slot later before writing to it. Something
353    // we have to do no matter the ordering.
354    let mut status: u64 = channel.status.load(Ordering::Relaxed);
355    let cap = channel.slots.len();
356    let start = receiver_pos(status, cap);
357    for slot in (0..cap).cycle().skip(start).take(cap) {
358        if !is_available(status, slot) {
359            continue;
360        }
361
362        // In our local status the slot is available, however another sender
363        // could have taken it between the time we read the status and the
364        // time we got here. So we write our `TAKEN` status and check if in
365        // the *previous* (up-to-date) status (returned by `fetch_or`) the
366        // slot was still available. If it was it means we have acquired the
367        // slot, otherwise another sender beat us to it.
368        //
369        // NOTE: The OR operation here is safe: if another sender already
370        // wrote TAKEN (01) or FILLED (11) we're not overwriting anything.
371        // If a reader wrote READING (10) we won't use the slot and the
372        // reader will overwrite it with EMPTY later. If we overwrite EMPTY
373        // (00) we can reuse the slot safely, but the message will be in a
374        // different order.
375        status = channel
376            .status
377            .fetch_or(mark_slot(slot, MARK_TAKEN), Ordering::AcqRel);
378        if !is_available(status, slot) {
379            // Another thread beat us to taking the slot.
380            continue;
381        }
382
383        // Safety: we've acquired the slot above so we're ensured unique
384        // access to the slot.
385        unsafe {
386            let _ = (&mut *channel.slots[slot].get()).write(value);
387        }
388
389        // Now we've writing to the slot we can mark it slot as filled.
390        let old_status = channel
391            .status
392            .fetch_or(mark_slot(slot, MARK_FILLED), Ordering::AcqRel);
393        // Debug assertion to check the slot was in the TAKEN status.
394        debug_assert!(has_status(old_status, slot, TAKEN));
395
396        // If the receiver is waiting for this lot we wake it.
397        if receiver_pos(old_status, cap) == slot {
398            channel.wake_receiver();
399        }
400
401        return Ok(());
402    }
403
404    Err(SendError::Full(value))
405}
406
407/// # Safety
408///
409/// Only `2 ^ 30` (a billion) `Sender`s may be alive concurrently, more then
410/// enough for all practical use cases.
411impl<T> Clone for Sender<T> {
412    fn clone(&self) -> Sender<T> {
413        // For the reasoning behind this relaxed ordering see `Arc::clone`.
414        let old_ref_count = self.channel().ref_count.fetch_add(1, Ordering::Relaxed);
415        debug_assert!(old_ref_count & SENDER_ACCESS != 0);
416        Sender {
417            channel: self.channel,
418        }
419    }
420}
421
422impl<T> fmt::Debug for Sender<T> {
423    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
424        f.debug_struct("Sender")
425            .field("channel", &self.channel())
426            .finish()
427    }
428}
429
430// Safety: if the value can be send across thread than so can the channel.
431unsafe impl<T: Send> Send for Sender<T> {}
432
433unsafe impl<T> Sync for Sender<T> {}
434
435impl<T> Unpin for Sender<T> {}
436
437impl<T> Drop for Sender<T> {
438    #[rustfmt::skip]
439    fn drop(&mut self) {
440        // Safety: for the reasoning behind this ordering see `Arc::drop`.
441        let old_ref_count = self.channel().ref_count.fetch_sub(1, Ordering::Release);
442        if sender_count(old_ref_count) != 1 {
443            // If we're not the last sender all we have to do is decrement the
444            // ref count (above).
445            return;
446        }
447
448        // If we're the last sender being dropped wake the receiver.
449        if has_receiver_or_manager(old_ref_count) {
450            self.channel().wake_receiver();
451        }
452
453        // If the previous value was `SENDER_ACCESS` it means that the receiver,
454        // all other senders and the manager were all dropped, so we need to do
455        // the deallocating.
456        let old_ref_count = self.channel().ref_count.fetch_and(!SENDER_ACCESS, Ordering::Release);
457        if old_ref_count != SENDER_ACCESS {
458            // Another sender, the receiver or the manager is still alive.
459            return;
460        }
461
462        // For the reasoning behind this ordering see `Arc::drop`.
463        fence!(self.channel().ref_count, Ordering::Acquire);
464
465        // Drop the memory.
466        unsafe { drop(Box::from_raw(self.channel.as_ptr())) }
467    }
468}
469
470/// [`Future`] implementation behind [`Sender::send`].
471#[derive(Debug)]
472#[must_use = "futures do nothing unless you `.await` or poll them"]
473pub struct SendValue<'s, T> {
474    channel: &'s Channel<T>,
475    value: Option<T>,
476    registered_waker: Option<task::Waker>,
477}
478
479impl<'s, T> Future for SendValue<'s, T> {
480    type Output = Result<(), T>;
481
482    fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
483        // Safety: only `waker_node` is pinned, which is only used by
484        // `register_waker`.
485        let this = unsafe { self.as_mut().get_unchecked_mut() };
486        let value = this
487            .value
488            .take()
489            .expect("SendValue polled after completion");
490
491        // First we try to send the value, if this succeeds we don't have to
492        // allocate in the waker list.
493        match try_send(this.channel, value) {
494            Ok(()) => Poll::Ready(Ok(())),
495            Err(SendError::Full(value)) => {
496                let registered_waker = register_waker(
497                    &mut this.registered_waker,
498                    &this.channel.sender_wakers,
499                    ctx.waker(),
500                );
501                if !registered_waker {
502                    return Poll::Pending;
503                }
504
505                // It could be the case that the received received a value in
506                // the time after we tried to send the value and before we added
507                // the our waker to list. So we try to send a value again to
508                // ensure we don't awoken and the channel has a slot available.
509                match try_send(this.channel, value) {
510                    Ok(()) => Poll::Ready(Ok(())),
511                    Err(SendError::Full(value)) => {
512                        // Channel is still full, we'll have to wait.
513                        this.value = Some(value);
514                        Poll::Pending
515                    }
516                    Err(SendError::Disconnected(value)) => Poll::Ready(Err(value)),
517                }
518            }
519            Err(SendError::Disconnected(value)) => Poll::Ready(Err(value)),
520        }
521    }
522}
523
524unsafe impl<'s, T> Sync for SendValue<'s, T> {}
525
526impl<'s, T> Drop for SendValue<'s, T> {
527    fn drop(&mut self) {
528        if let Some(waker) = self.registered_waker.take() {
529            let mut sender_wakers = self.channel.sender_wakers.lock().unwrap();
530            let idx = sender_wakers.iter().position(|w| w.will_wake(&waker));
531            if let Some(idx) = idx {
532                drop(sender_wakers.swap_remove(idx));
533            }
534        }
535    }
536}
537
538/// [`Future`] implementation behind [`Sender::join`].
539#[derive(Debug)]
540#[must_use = "futures do nothing unless you `.await` or poll them"]
541pub struct Join<'s, T> {
542    channel: &'s Channel<T>,
543    registered_waker: Option<task::Waker>,
544}
545
546impl<'s, T> Future for Join<'s, T> {
547    type Output = ();
548
549    fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
550        if !has_receiver_or_manager(self.channel.ref_count.load(Ordering::Acquire)) {
551            // Other side is disconnected.
552            return Poll::Ready(());
553        }
554
555        let this = &mut *self;
556        let registered_waker = &mut this.registered_waker;
557        let join_wakers = &this.channel.join_wakers;
558        let registered_waker = register_waker(registered_waker, join_wakers, ctx.waker());
559        if !registered_waker {
560            return Poll::Pending;
561        }
562
563        if has_receiver_or_manager(this.channel.ref_count.load(Ordering::Acquire)) {
564            Poll::Pending
565        } else {
566            // Other side is disconnected.
567            Poll::Ready(())
568        }
569    }
570}
571
572unsafe impl<'s, T> Sync for Join<'s, T> {}
573
574impl<'s, T> Drop for Join<'s, T> {
575    fn drop(&mut self) {
576        if let Some(waker) = self.registered_waker.take() {
577            let mut join_wakers = self.channel.join_wakers.lock().unwrap();
578            let idx = join_wakers.iter().position(|w| w.will_wake(&waker));
579            if let Some(idx) = idx {
580                drop(join_wakers.swap_remove(idx));
581            }
582        }
583    }
584}
585
586/// Registers `waker` in `channel_wakers` if `registered_waker` is `None` or is
587/// different from `waker`. Return `true` if `waker` was registered, `false`
588/// otherwise.
589fn register_waker(
590    registered_waker: &mut Option<task::Waker>,
591    channel_wakers: &Mutex<Vec<task::Waker>>,
592    waker: &task::Waker,
593) -> bool {
594    match registered_waker {
595        // Already registered this waker, don't have to do anything.
596        Some(w) if w.will_wake(waker) => false,
597        // Different waker, replace the old one.
598        Some(w) => {
599            let waker = waker.clone();
600            let old_waker = replace(w, waker.clone());
601
602            let mut channel_wakers = channel_wakers.lock().unwrap();
603            let idx = channel_wakers.iter().position(|w| w.will_wake(&old_waker));
604            if let Some(idx) = idx {
605                // Replace the old waker with the new one.
606                channel_wakers[idx] = waker;
607            } else {
608                // This can happen if `Sender` (or `Manager`) is being
609                // dropped, most likely this `push` is pointless and we
610                // return `Poll::Ready` below, but just in case.
611                channel_wakers.push(waker);
612            }
613            true
614        }
615        // Haven't registered waker yet.
616        None => {
617            let waker = waker.clone();
618            *registered_waker = Some(waker.clone());
619
620            let mut channel_wakers = channel_wakers.lock().unwrap();
621            channel_wakers.push(waker);
622            true
623        }
624    }
625}
626
627/// Receiving side of the channel.
628pub struct Receiver<T> {
629    channel: NonNull<Channel<T>>,
630}
631
632/// Error returned in case receiving a value from the channel fails. See
633/// [`Receiver::try_recv`].
634#[derive(Copy, Clone, Debug, Eq, PartialEq)]
635pub enum RecvError {
636    /// Channel is empty.
637    Empty,
638    /// All [`Sender`]s (but not necessarily the [`Manager`]) are disconnected
639    /// and the channel is empty, see [`Receiver::is_connected`].
640    Disconnected,
641}
642
643impl fmt::Display for RecvError {
644    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
645        match self {
646            RecvError::Empty => f.pad("channel is empty"),
647            RecvError::Disconnected => f.pad("all senders are disconnected"),
648        }
649    }
650}
651
652impl Error for RecvError {}
653
654impl<T> Receiver<T> {
655    /// Attempts to receive a value from this channel.
656    pub fn try_recv(&mut self) -> Result<T, RecvError> {
657        try_recv(self.channel())
658    }
659
660    /// Returns a future that receives a value from the channel, waiting if the
661    /// channel is empty.
662    ///
663    /// If the returned [`Future`] returns `None` it means all [`Sender`]s are
664    /// [disconnected]. This is the same error as [`RecvError::Disconnected`].
665    /// [`RecvError::Empty`] will never be returned, the `Future` will return
666    /// [`Poll::Pending`] instead.
667    ///
668    /// [disconnected]: Receiver::is_connected
669    pub fn recv(&mut self) -> RecvValue<T> {
670        RecvValue {
671            channel: self.channel(),
672        }
673    }
674
675    /// Attempts to peek a value from this channel.
676    pub fn try_peek(&mut self) -> Result<&T, RecvError> {
677        try_peek(self.channel())
678    }
679
680    /// Returns a future that peeks at a value from the channel, waiting if the
681    /// channel is empty.
682    ///
683    /// If the returned [`Future`] returns `None` it means all [`Sender`]s are
684    /// [disconnected]. This is the same error as [`RecvError::Disconnected`].
685    /// [`RecvError::Empty`] will never be returned, the `Future` will return
686    /// [`Poll::Pending`] instead.
687    ///
688    /// [disconnected]: Receiver::is_connected
689    pub fn peek(&mut self) -> PeekValue<T> {
690        PeekValue {
691            channel: self.channel(),
692        }
693    }
694
695    /// Create a new [`Sender`] that sends to this channel.
696    ///
697    /// # Safety
698    ///
699    /// The same restrictions apply to this function as they do to
700    /// [`Sender::clone`].
701    ///
702    /// [`Sender::clone`]: struct.Sender.html#impl-Clone
703    pub fn new_sender(&self) -> Sender<T> {
704        // For the reasoning behind this relaxed ordering see `Arc::clone`.
705        let old_ref_count = self.channel().ref_count.fetch_add(1, Ordering::Relaxed);
706        if old_ref_count & SENDER_ACCESS != 0 {
707            let _ = self
708                .channel()
709                .ref_count
710                .fetch_or(SENDER_ACCESS, Ordering::Relaxed);
711        }
712
713        Sender {
714            channel: self.channel,
715        }
716    }
717
718    /// Returns the capacity of the channel.
719    pub fn capacity(&self) -> usize {
720        self.channel().slots.len()
721    }
722
723    /// Returns `false` if all [`Sender`]s are disconnected.
724    ///
725    /// # Notes
726    ///
727    /// Unlike [`Sender::is_connected`] this method doesn't take the [`Manager`]
728    /// into account. This means that this method can return `false` and later
729    /// `true` (if the `Manager` created another `Sender`), which might be
730    /// unexpected.
731    pub fn is_connected(&self) -> bool {
732        // Relaxed is fine here since there is always a bit of a race condition
733        // when using this method (and then doing something based on it).
734        sender_count(self.channel().ref_count.load(Ordering::Relaxed)) > 0
735    }
736
737    /// Returns `true` if the [`Manager`] is connected.
738    pub fn has_manager(&self) -> bool {
739        // Relaxed is fine here since there is always a bit of a race condition
740        // when using this method (and then doing something based on it).
741        has_manager(self.channel().ref_count.load(Ordering::Relaxed))
742    }
743
744    /// Set the receiver's waker to `waker`, if they are different. Returns
745    /// `true` if the waker is changed, `false` otherwise.
746    ///
747    /// This is useful if you can't call [`Receiver::recv`] but still want a
748    /// wake-up notification once messages are added to the inbox.
749    pub fn register_waker(&mut self, waker: &task::Waker) -> bool {
750        self.channel().receiver_waker.register(waker)
751    }
752
753    /// Returns the id of this receiver.
754    pub fn id(&self) -> Id {
755        Id(self.channel.as_ptr() as *const () as usize)
756    }
757
758    fn channel(&self) -> &Channel<T> {
759        unsafe { self.channel.as_ref() }
760    }
761}
762
763/// See [`Receiver::try_recv`].
764fn try_recv<T>(channel: &Channel<T>) -> Result<T, RecvError> {
765    // We check if we are connected **before** checking for messages. This
766    // is important because there is a time between 1) the checking of the
767    // messages in the channel and 2) checking if we're connected (if we
768    // would do it in the last `if` statement of this method) in which the
769    // sender could send a message and be dropped.
770    // In this case, if we would check if we're connected after checking for
771    // messages, we would incorrectly return `RecvError::Disconnected` (all
772    // senders are dropped after all), however we would miss the last
773    // message send.
774    // Checking before hand causes us to return `RecvError::Empty`, which
775    // technically isn't correct either but it will cause the user to check
776    // again later. In `RecvValue` this is solved by calling `try_recv`
777    // after registering the task waker, ensuring no wake-up events are
778    // missed.
779    let is_connected = sender_count(channel.ref_count.load(Ordering::Relaxed)) > 0;
780
781    // Since we subtract from the `status` this will overflow at some point. But
782    // `fetch_add` wraps-around on overflow, so the position will "reset" itself
783    // to 0. This is one of the reasons we don't support FIFO order. The status
784    // bits will not be touched (even on wrap-around).
785    let mut status = channel.status.fetch_add(MARK_NEXT_POS, Ordering::AcqRel);
786    let cap = channel.slots.len();
787    let start = receiver_pos(status, cap);
788    for slot in (0..cap).cycle().skip(start).take(cap) {
789        if !is_filled(status, slot) {
790            continue;
791        }
792
793        // Mark the slot as being read.
794        status = channel
795            .status
796            .fetch_xor(mark_slot(slot, MARK_READING), Ordering::AcqRel);
797        if !is_filled(status, slot) {
798            // Slot isn't available after all.
799            continue;
800        }
801
802        // Safety: we've acquired unique access to the slot above and we're
803        // ensured the slot is filled.
804        let value = unsafe { (&*channel.slots[slot].get()).assume_init_read() };
805
806        // Mark the slot as empty.
807        let old_status = channel
808            .status
809            .fetch_and(!mark_slot(slot, MARK_EMPTIED), Ordering::AcqRel);
810
811        // Debug assertion to check the slot was in the READING or FILLED
812        // status. The slot can be in the FILLED status if the sender tried
813        // to mark this slot as TAKEN (01) after we marked it as READING
814        // (10) (01 | 10 = 11 (FILLED)).
815        debug_assert!(
816            has_status(old_status, slot, READING) || has_status(old_status, slot, FILLED)
817        );
818
819        channel.wake_next_sender();
820
821        return Ok(value);
822    }
823
824    if is_connected {
825        Err(RecvError::Empty)
826    } else {
827        Err(RecvError::Disconnected)
828    }
829}
830
831/// See [`Receiver::try_peek`].
832fn try_peek<T>(channel: &Channel<T>) -> Result<&T, RecvError> {
833    // See `try_recv` why we do this first.
834    let is_connected = sender_count(channel.ref_count.load(Ordering::Relaxed)) > 0;
835
836    let status = channel.status.load(Ordering::Acquire);
837    let cap = channel.slots.len();
838    let start = receiver_pos(status, cap);
839    for slot in (0..cap).cycle().skip(start).take(cap) {
840        if !is_filled(status, slot) {
841            continue;
842        }
843
844        // Safety: we've acquired unique access to the slot above and we're
845        // ensured the slot is filled.
846        return Ok(unsafe { (&*channel.slots[slot].get()).assume_init_ref() });
847    }
848
849    if is_connected {
850        Err(RecvError::Empty)
851    } else {
852        Err(RecvError::Disconnected)
853    }
854}
855
856impl<T: fmt::Debug> fmt::Debug for Receiver<T> {
857    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
858        f.debug_struct("Receiver")
859            .field("channel", &self.channel())
860            .finish()
861    }
862}
863
864// Safety: if the value can be send across thread than so can the channel.
865unsafe impl<T: Send> Send for Receiver<T> {}
866
867unsafe impl<T> Sync for Receiver<T> {}
868
869impl<T> Unpin for Receiver<T> {}
870
871impl<T> Drop for Receiver<T> {
872    #[rustfmt::skip]
873    fn drop(&mut self) {
874        // First mark the receiver as dropped.
875        // Safety: for the reasoning behind this ordering see `Arc::drop`.
876        let old_ref_count = self.channel().ref_count.fetch_and(!RECEIVER_ALIVE, Ordering::Release);
877        if has_manager(old_ref_count) {
878            // If the channel has a manager we only mark the receiver as dropped
879            // (above).
880            return;
881        }
882
883        // If the channel doesn't have a manager we empty the channel. We do
884        // this to support the use case were the channel holds a
885        // `oneshot::Sender` and the receiver of the oneshot channel is holding
886        // a `Sender` to this channel. Effectively this creates a cyclic drop
887        // dependency: `Sender` -> `Channel` -> `oneshot::Sender` which blocks
888        // `oneshot::Receiver::recv`. If the actor holding a `Sender` calls
889        // `oneshot::Receiver::recv` it will wait for a response or until the
890        // `oneshot::Sender` is dropped, while the actor is holding a `Sender`
891        // to this channel. However if this `Receiver` is dropped it won't drop
892        // the `oneshot::Sender` without the emptying below. This causes
893        // `oneshot::Receiver::recv` to wait forever, while holding a `Sender`.
894        while let Ok(msg) = self.try_recv() {
895            drop(msg);
896        }
897
898        // Let all senders know the sender is disconnected.
899        self.channel().wake_all_join();
900
901        // If the previous value was `RECEIVER_ACCESS` it means that all senders
902        // and the manager were all dropped, so we need to do the deallocating.
903        let old_ref_count = self.channel().ref_count.fetch_and(!RECEIVER_ACCESS, Ordering::Release);
904        if old_ref_count != RECEIVER_ACCESS {
905            // Another sender is alive, can't deallocate yet.
906            return;
907        }
908
909        // For the reasoning behind this ordering see `Arc::drop`.
910        fence!(self.channel().ref_count, Ordering::Acquire);
911
912        // Drop the memory.
913        unsafe { drop(Box::from_raw(self.channel.as_ptr())) }
914    }
915}
916
917/// [`Future`] implementation behind [`Receiver::recv`].
918#[derive(Debug)]
919#[must_use = "futures do nothing unless you `.await` or poll them"]
920pub struct RecvValue<'r, T> {
921    channel: &'r Channel<T>,
922}
923
924impl<'r, T> Future for RecvValue<'r, T> {
925    type Output = Option<T>;
926
927    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
928        match try_recv(self.channel) {
929            Ok(value) => Poll::Ready(Some(value)),
930            Err(RecvError::Empty) => {
931                // The channel is empty, we'll set the waker.
932                if !self.channel.receiver_waker.register(ctx.waker()) {
933                    // Waker already set.
934                    return Poll::Pending;
935                }
936
937                // But it could be the case that a sender send a value in the
938                // time between we last checked and we actually marked ourselves
939                // as needing a wake up, so we need to check again.
940                match try_recv(self.channel) {
941                    Ok(value) => Poll::Ready(Some(value)),
942                    // The `Sender` will wake us when a new message is send.
943                    Err(RecvError::Empty) => Poll::Pending,
944                    Err(RecvError::Disconnected) => Poll::Ready(None),
945                }
946            }
947            Err(RecvError::Disconnected) => Poll::Ready(None),
948        }
949    }
950}
951
952impl<'r, T> Unpin for RecvValue<'r, T> {}
953
954/// [`Future`] implementation behind [`Receiver::peek`].
955#[derive(Debug)]
956#[must_use = "futures do nothing unless you `.await` or poll them"]
957pub struct PeekValue<'r, T> {
958    channel: &'r Channel<T>,
959}
960
961impl<'r, T> Future for PeekValue<'r, T> {
962    type Output = Option<&'r T>;
963
964    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
965        match try_peek(self.channel) {
966            Ok(value) => Poll::Ready(Some(value)),
967            Err(RecvError::Empty) => {
968                // The channel is empty, we'll set the waker.
969                if !self.channel.receiver_waker.register(ctx.waker()) {
970                    // Waker already set.
971                    return Poll::Pending;
972                }
973
974                // But it could be the case that a sender send a value in the
975                // time between we last checked and we actually marked ourselves
976                // as needing a wake up, so we need to check again.
977                match try_peek(self.channel) {
978                    Ok(value) => Poll::Ready(Some(value)),
979                    // The `Sender` will wake us when a new message is send.
980                    Err(RecvError::Empty) => Poll::Pending,
981                    Err(RecvError::Disconnected) => Poll::Ready(None),
982                }
983            }
984            Err(RecvError::Disconnected) => Poll::Ready(None),
985        }
986    }
987}
988
989impl<'r, T> Unpin for PeekValue<'r, T> {}
990
991/// Channel internals shared between zero or more [`Sender`]s, zero or one
992/// [`Receiver`] and zero or one [`Manager`].
993struct Channel<T> {
994    inner: Inner,
995    /// The slots in the channel, see `status` for what slots are used/unused.
996    slots: [UnsafeCell<MaybeUninit<T>>],
997}
998
999/// Inner data of [`Channel`].
1000///
1001/// This is only in a different struct to calculate the `Layout` of `Channel`,
1002/// see [`Channel::new`].
1003struct Inner {
1004    /// Status of the slots.
1005    ///
1006    /// This contains the status of the slots. Each status consists of
1007    /// [`STATUS_BITS`] bits to describe if the slot is taken or not.
1008    ///
1009    /// The first `STATUS_BITS * MAX_CAP` bits are the statuses for the `slots`
1010    /// field. The remaining bits are used by the `Sender` to indicate its
1011    /// current reading position (modulo [`MAX_CAP`]).
1012    status: AtomicU64,
1013    /// The number of senders alive. If the [`RECEIVER_ALIVE`] bit is set the
1014    /// [`Receiver`] is alive. If the [`MANAGER_ALIVE`] bit is the [`Manager`]
1015    /// is alive.
1016    ref_count: AtomicUsize,
1017    sender_wakers: Mutex<Vec<task::Waker>>,
1018    join_wakers: Mutex<Vec<task::Waker>>,
1019    receiver_waker: WakerRegistration,
1020}
1021
1022// Safety: if the value can be send across thread than so can the channel.
1023unsafe impl<T: Send> Send for Channel<T> {}
1024
1025unsafe impl<T> Sync for Channel<T> {}
1026
1027impl<T> Channel<T> {
1028    /// Allocates a new `Channel` on the heap.
1029    ///
1030    /// `capacity` must small enough to ensure each slot has 2 bits for the
1031    /// status, while ensuring that the remaining bits can store `capacity` (in
1032    /// binary) to keep track of the reading position. This means following must
1033    /// hold true where $N is capacity: `2 ^ (64 - ($N * 2)) >= $N`. The maximum
1034    /// is 29.
1035    ///
1036    /// Marks a single [`Receiver`] and [`Sender`] as alive.
1037    fn new(capacity: usize) -> NonNull<Channel<T>> {
1038        assert!(capacity >= MIN_CAP, "capacity can't be zero");
1039        assert!(capacity <= MAX_CAP, "capacity too large");
1040
1041        // Allocate some raw bytes.
1042        // Safety: returns an error on arithmetic overflow, but it should be OK
1043        // with a capacity <= MAX_CAP.
1044        let (layout, _) = Layout::array::<UnsafeCell<MaybeUninit<T>>>(capacity)
1045            .and_then(|slots_layout| Layout::new::<Inner>().extend(slots_layout))
1046            .unwrap();
1047        // Safety: we check if the allocation is successful.
1048        let ptr = unsafe { alloc(layout) };
1049        if ptr.is_null() {
1050            handle_alloc_error(layout);
1051        }
1052        let ptr = ptr::slice_from_raw_parts_mut(ptr as *mut T, capacity) as *mut Channel<T>;
1053
1054        // Initialise all fields (that need it).
1055        unsafe {
1056            ptr::addr_of_mut!((*ptr).inner.status).write(AtomicU64::new(0));
1057            ptr::addr_of_mut!((*ptr).inner.ref_count).write(AtomicUsize::new(
1058                RECEIVER_ALIVE | RECEIVER_ACCESS | SENDER_ACCESS | 1,
1059            ));
1060            ptr::addr_of_mut!((*ptr).inner.sender_wakers).write(Mutex::new(Vec::new()));
1061            ptr::addr_of_mut!((*ptr).inner.join_wakers).write(Mutex::new(Vec::new()));
1062            ptr::addr_of_mut!((*ptr).inner.receiver_waker).write(WakerRegistration::new());
1063        }
1064
1065        // Safety: checked if the pointer is null above.
1066        unsafe { NonNull::new_unchecked(ptr) }
1067    }
1068
1069    /// Returns the next `task::Waker` to wake, if any.
1070    fn wake_next_sender(&self) {
1071        let mut sender_wakers = self.sender_wakers.lock().unwrap();
1072        let waker = (!sender_wakers.is_empty()).then(|| sender_wakers.swap_remove(0));
1073        unlock(sender_wakers);
1074        if let Some(waker) = waker {
1075            waker.wake();
1076        }
1077    }
1078
1079    /// Wakes all wakers waiting on the sender to disconnect.
1080    fn wake_all_join(&self) {
1081        let mut join_wakers = self.join_wakers.lock().unwrap();
1082        let wakers = take(&mut *join_wakers);
1083        unlock(join_wakers);
1084        for waker in wakers {
1085            waker.wake();
1086        }
1087    }
1088
1089    /// Wake the `Receiver`.
1090    fn wake_receiver(&self) {
1091        self.receiver_waker.wake();
1092    }
1093}
1094
1095// NOTE: this is here so we don't have to type `self.channel().inner`
1096// everywhere.
1097impl<T> Deref for Channel<T> {
1098    type Target = Inner;
1099
1100    fn deref(&self) -> &Self::Target {
1101        &self.inner
1102    }
1103}
1104
1105impl<T> fmt::Debug for Channel<T> {
1106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1107        let status = self.status.load(Ordering::Relaxed);
1108        let ref_count = self.ref_count.load(Ordering::Relaxed);
1109        let sender_count = sender_count(ref_count);
1110        let recv_pos = receiver_pos(status, self.slots.len());
1111        let mut slots = [""; MAX_CAP];
1112        for n in 0..self.slots.len() {
1113            slots[n] = dbg_status(slot_status(status, n));
1114        }
1115        let slots = &slots[..self.slots.len()];
1116        f.debug_struct("Channel")
1117            .field("senders_alive", &sender_count)
1118            .field("receiver_alive", &has_receiver(ref_count))
1119            .field("manager_alive", &has_manager(ref_count))
1120            .field("receiver_position", &recv_pos)
1121            .field("slots", &slots)
1122            .finish()
1123    }
1124}
1125
1126impl<T> Drop for Channel<T> {
1127    fn drop(&mut self) {
1128        // Safety: we have unique access, per the mutable reference, so relaxed
1129        // is fine.
1130        let status: u64 = self.status.load(Ordering::Relaxed);
1131        for slot in 0..self.slots.len() {
1132            if is_filled(status, slot) {
1133                // Safety: we have unique access to the slot and we've checked
1134                // above whether or not the slot is filled.
1135                unsafe { self.slots[slot].get_mut().assume_init_drop() };
1136            }
1137        }
1138    }
1139}
1140
1141/// Manager of a channel.
1142///
1143/// A channel manager can be used to create [`Sender`]s and [`Receiver`]s for a
1144/// channel, without having access to either. Its made for the following use
1145/// case: restarting an actor which takes ownership of the `Receiver` and
1146/// crashes, and to restart the actor we need another `Receiver`. Using the
1147/// manager a new `Receiver` can be created, ensuring only a single `Receiver`
1148/// is alive at any given time.
1149pub struct Manager<T> {
1150    channel: NonNull<Channel<T>>,
1151}
1152
1153/// Error returned by [`Manager::new_receiver`] if a receiver is already
1154/// connected.
1155#[derive(Copy, Clone, Debug, Eq, PartialEq)]
1156pub struct ReceiverConnected;
1157
1158impl fmt::Display for ReceiverConnected {
1159    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1160        f.pad("receiver already connected")
1161    }
1162}
1163
1164impl Error for ReceiverConnected {}
1165
1166impl<T> Manager<T> {
1167    /// Create a small bounded channel with a `Manager`.
1168    ///
1169    /// Same as [`new_small`] but with a `Manager`.
1170    pub fn new_small_channel() -> (Manager<T>, Sender<T>, Receiver<T>) {
1171        Manager::new_channel(SMALL_CAP)
1172    }
1173
1174    /// Create a bounded channel with a `Manager`.
1175    ///
1176    /// Same as [`new`] but with a `Manager`.
1177    pub fn new_channel(capacity: usize) -> (Manager<T>, Sender<T>, Receiver<T>) {
1178        let (sender, receiver) = new(capacity);
1179        let old_count = sender
1180            .channel()
1181            .ref_count
1182            .fetch_or(MANAGER_ALIVE | MANAGER_ACCESS, Ordering::Relaxed);
1183        debug_assert!(!has_manager(old_count));
1184        let manager = Manager {
1185            channel: sender.channel,
1186        };
1187        (manager, sender, receiver)
1188    }
1189
1190    /// Create a new [`Sender`].
1191    ///
1192    /// # Safety
1193    ///
1194    /// See the [safety nodes] on `Sender`'s [`Clone`] implemenation, the same
1195    /// conditions apply here.
1196    ///
1197    /// [safety nodes]: struct.Sender.html#impl-Clone
1198    pub fn new_sender(&self) -> Sender<T> {
1199        // For the reasoning behind this relaxed ordering see `Arc::clone`.
1200        let old_ref_count = self.channel().ref_count.fetch_add(1, Ordering::Relaxed);
1201        if old_ref_count & SENDER_ACCESS != 0 {
1202            let _ = self
1203                .channel()
1204                .ref_count
1205                .fetch_or(SENDER_ACCESS, Ordering::Relaxed);
1206        }
1207        Sender {
1208            channel: self.channel,
1209        }
1210    }
1211
1212    /// Attempt to create a new [`Receiver`].
1213    ///
1214    /// This will fail if there already is a receiver.
1215    pub fn new_receiver(&self) -> Result<Receiver<T>, ReceiverConnected> {
1216        let old_count = self
1217            .channel()
1218            .ref_count
1219            .fetch_or(RECEIVER_ALIVE, Ordering::AcqRel);
1220        if has_receiver(old_count) {
1221            Err(ReceiverConnected)
1222        } else {
1223            // No receiver was connected so its safe to create one.
1224            debug_assert!(old_count & RECEIVER_ACCESS != 0);
1225            Ok(Receiver {
1226                channel: self.channel,
1227            })
1228        }
1229    }
1230
1231    fn channel(&self) -> &Channel<T> {
1232        unsafe { self.channel.as_ref() }
1233    }
1234}
1235
1236impl<T> fmt::Debug for Manager<T> {
1237    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1238        f.debug_struct("Manager")
1239            .field("channel", &self.channel())
1240            .finish()
1241    }
1242}
1243
1244// Safety: if the value can be send across thread than so can the channel.
1245unsafe impl<T: Send> Send for Manager<T> {}
1246
1247unsafe impl<T> Sync for Manager<T> {}
1248
1249impl<T> Unpin for Manager<T> {}
1250
1251impl<T> Drop for Manager<T> {
1252    #[rustfmt::skip]
1253    fn drop(&mut self) {
1254        // First mark the manager as dropped.
1255        // Safety: for the reasoning behind this ordering see `Arc::drop`.
1256        let old_ref_count = self.channel().ref_count.fetch_and(!MANAGER_ALIVE, Ordering::Release);
1257        if has_receiver(old_ref_count) {
1258            // If the channel has a receiver we only mark the manager as dropped
1259            // (above).
1260            let _ = self.channel().ref_count.fetch_and(!MANAGER_ACCESS, Ordering::Release);
1261            return;
1262        }
1263
1264        debug_assert!(!has_receiver(old_ref_count));
1265        debug_assert!(old_ref_count & RECEIVER_ACCESS != 0);
1266        // NOTE: because `RECEIVER_ACCESS` bit is still set we don't have to set
1267        // the `RECEIVER_ALIVE` bit (as the receiver will dropped at the end of
1268        // the function).
1269        let receiver = Receiver { channel: self.channel };
1270
1271        let _ = self.channel().ref_count.fetch_and(!MANAGER_ACCESS, Ordering::Release);
1272        // Let the receiver do the cleanup.
1273        drop(receiver);
1274    }
1275}
1276
1277/// Identifier of a channel.
1278///
1279/// This type can be created by calling [`Sender::id`] or [`Receiver::id`] and
1280/// be used to identify channels. It only use case is to compare two ids with
1281/// one another, if two id are the same the sender(s) and receiver(s) point to
1282/// the same channel.
1283///
1284/// # Notes
1285///
1286/// The id is only valid for the lifetime of the channel. Once the channel is
1287/// dropped all ids of the channel are invalidated and might return incorrect
1288/// results after.
1289///
1290/// The methods [`Sender::same_channel`] and [`Sender::sends_to`] should be
1291/// preferred over using this type as they are less error-prone.
1292#[derive(Copy, Clone, Debug, Eq, PartialEq)]
1293pub struct Id(usize);