heph_inbox/lib.rs
1//! Bounded capacity channel.
2//!
3//! The channel is a multi-producer, single-consumer (MPSC) bounded queue. It is
4//! designed to be used as inbox for actors, following the [actor model].
5//!
6//! [actor model]: https://en.wikipedia.org/wiki/Actor_model
7//!
8//! # Notes
9//!
10//! The implementation assumes the access to the channel is mostly uncontested
11//! and optimises for this use case. Furthermore it optimises for small memory
12//! footprint, sometimes over faster access.
13//!
14//! The implementation doesn't provide a lot of guarantees. For example this
15//! channel is **not** guaranteed to be First In First Out (FIFO), it does this
16//! on a best effort basis. In return it means that a slow `Sender` does not
17//! block the receiving of other messages.
18//!
19//! # Examples
20//!
21//! Simple creation of a channel and sending a message over it.
22//!
23//!```
24//! use std::thread;
25//!
26//! use heph_inbox::RecvError;
27//!
28//! // Create a new small channel.
29//! let (sender, mut receiver) = heph_inbox::new_small();
30//!
31//! let sender_handle = thread::spawn(move || {
32//! if let Err(err) = sender.try_send("Hello world!".to_owned()) {
33//! panic!("Failed to send value: {}", err);
34//! }
35//! });
36//!
37//! let receiver_handle = thread::spawn(move || {
38//! # #[cfg(not(miri))] // `sleep` not supported.
39//! # thread::sleep(std::time::Duration::from_millis(1)); // Don't waste cycles.
40//! // NOTE: this is just an example don't actually use a loop like this, it
41//! // will waste CPU cycles when the channel is empty!
42//! loop {
43//! match receiver.try_recv() {
44//! Ok(value) => println!("Got a value: {}", value),
45//! Err(RecvError::Empty) => continue,
46//! Err(RecvError::Disconnected) => break,
47//! }
48//! }
49//! });
50//!
51//! sender_handle.join().unwrap();
52//! receiver_handle.join().unwrap();
53//! ```
54
55#![cfg_attr(unstable_nightly, feature(cfg_sanitize))]
56#![warn(
57 missing_debug_implementations,
58 missing_docs,
59 unused_results,
60 variant_size_differences
61)]
62// Disallow warnings when running tests.
63#![cfg_attr(test, deny(warnings))]
64// Disallow warnings in examples, we want to set a good example after all.
65#![doc(test(attr(deny(warnings))))]
66
67use std::alloc::{alloc, handle_alloc_error, Layout};
68use std::cell::UnsafeCell;
69use std::error::Error;
70use std::fmt;
71use std::future::Future;
72use std::mem::{drop as unlock, replace, take, MaybeUninit};
73use std::ops::Deref;
74use std::pin::Pin;
75use std::ptr::{self, NonNull};
76use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
77use std::sync::Mutex;
78use std::task::{self, Poll};
79
80#[cfg(test)]
81mod tests;
82
83/// `ThreadSanitizer` does not support memory fences. To avoid false positive
84/// reports use atomic loads for synchronization instead of a fence. Macro
85/// inspired by the one found in Rust's standard library for the `Arc`
86/// implementation.
87macro_rules! fence {
88 ($val: expr, $ordering: expr) => {
89 #[cfg_attr(unstable_nightly, not(sanitize = "thread"))]
90 std::sync::atomic::fence($ordering);
91 #[cfg_attr(unstable_nightly, sanitize = "thread")]
92 let _ = $val.load($ordering);
93 };
94}
95
96pub mod oneshot;
97
98mod waker;
99use waker::WakerRegistration;
100
101/// The capacity of a small channel.
102const SMALL_CAP: usize = 8;
103/// Maximum capacity of a channel.
104// NOTE: see [`Channel::new`] why.
105pub const MAX_CAP: usize = 29;
106/// Minimum capacity of a channel.
107pub const MIN_CAP: usize = 1;
108
109/// Create a small bounded channel.
110pub fn new_small<T>() -> (Sender<T>, Receiver<T>) {
111 new(SMALL_CAP)
112}
113
114/// Create a new bounded channel.
115///
116/// The `capacity` must be in the range [`MIN_CAP`]`..=`[`MAX_CAP`].
117pub fn new<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
118 assert!(
119 (MIN_CAP..=MAX_CAP).contains(&capacity),
120 "inbox channel capacity must be between {} and {}",
121 MIN_CAP,
122 MAX_CAP
123 );
124 let channel = Channel::new(capacity);
125 let sender = Sender { channel };
126 let receiver = Receiver { channel };
127 (sender, receiver)
128}
129
130/// Bit mask to mark the receiver as alive.
131const RECEIVER_ALIVE: usize = 1 << (usize::BITS - 1);
132/// Bit mask to mark the receiver still has access to the channel. See the
133/// `Drop` impl for [`Receiver`].
134const RECEIVER_ACCESS: usize = 1 << (usize::BITS - 2);
135/// Bit mask to mark a sender still has access to the channel. See the `Drop`
136/// impl for [`Sender`].
137const SENDER_ACCESS: usize = 1 << (usize::BITS - 3);
138/// Bit mask to mark the manager as alive.
139const MANAGER_ALIVE: usize = 1 << (usize::BITS - 4);
140/// Bit mask to mark the manager has access to the channel. See the `Drop` impl
141/// for [`Manager`].
142const MANAGER_ACCESS: usize = 1 << (usize::BITS - 5);
143
144/// Return `true` if the receiver or manager is alive in `ref_count`.
145#[inline(always)]
146const fn has_receiver(ref_count: usize) -> bool {
147 ref_count & RECEIVER_ALIVE != 0
148}
149
150/// Returns `true` if the manager is alive in `ref_count`.
151#[inline(always)]
152const fn has_manager(ref_count: usize) -> bool {
153 ref_count & MANAGER_ALIVE != 0
154}
155
156/// Return `true` if the receiver or manager is alive in `ref_count`.
157#[inline(always)]
158const fn has_receiver_or_manager(ref_count: usize) -> bool {
159 ref_count & (RECEIVER_ALIVE | MANAGER_ALIVE) != 0
160}
161
162/// Returns the number of senders connected in `ref_count`.
163#[inline(always)]
164const fn sender_count(ref_count: usize) -> usize {
165 ref_count & !(RECEIVER_ALIVE | RECEIVER_ACCESS | SENDER_ACCESS | MANAGER_ALIVE | MANAGER_ACCESS)
166}
167
168// Bits to mark the status of a slot.
169const STATUS_BITS: u64 = 2; // Number of bits used per slot.
170const STATUS_MASK: u64 = (1 << STATUS_BITS) - 1;
171#[cfg(test)]
172const ALL_STATUSES_MASK: u64 = (1 << (MAX_CAP as u64 * STATUS_BITS)) - 1;
173// The possible statuses of a slot.
174const EMPTY: u64 = 0b00; // Slot is empty (initial state).
175const TAKEN: u64 = 0b01; // `Sender` acquired write access, currently writing.
176const FILLED: u64 = 0b11; // `Sender` wrote a value into the slot.
177const READING: u64 = 0b10; // A `Receiver` is reading from the slot.
178
179// Status transitions.
180const MARK_TAKEN: u64 = 0b01; // OR to go from EMPTY -> TAKEN.
181const MARK_FILLED: u64 = 0b11; // OR to go from TAKEN -> FILLED.
182const MARK_READING: u64 = 0b01; // XOR to go from FILLED -> READING.
183const MARK_EMPTIED: u64 = 0b11; // ! AND to go from FILLED or READING -> EMPTY.
184
185/// Returns `true` if `slot` in `status` is empty.
186#[inline(always)]
187fn is_available(status: u64, slot: usize) -> bool {
188 has_status(status, slot, EMPTY)
189}
190
191/// Returns `true` if `slot` in `status` is filled.
192#[inline(always)]
193fn is_filled(status: u64, slot: usize) -> bool {
194 has_status(status, slot, FILLED)
195}
196
197/// Returns `true` if `slot` (in `status`) equals the `expected` status.
198#[inline(always)]
199fn has_status(status: u64, slot: usize, expected: u64) -> bool {
200 slot_status(status, slot) == expected
201}
202
203/// Returns the `STATUS_BITS` for `slot` in `status`.
204#[inline(always)]
205fn slot_status(status: u64, slot: usize) -> u64 {
206 debug_assert!(slot <= MAX_CAP);
207 (status >> (STATUS_BITS * slot as u64)) & STATUS_MASK
208}
209
210/// Creates a mask to transition `slot` using `transition`. `transition` must be
211/// one of the `MARK_*` constants.
212#[inline(always)]
213fn mark_slot(slot: usize, transition: u64) -> u64 {
214 debug_assert!(slot <= MAX_CAP);
215 transition << (STATUS_BITS * slot as u64)
216}
217
218/// Returns a string name for the `slot_status`.
219fn dbg_status(slot_status: u64) -> &'static str {
220 match slot_status {
221 EMPTY => "EMPTY",
222 TAKEN => "TAKEN",
223 FILLED => "FILLED",
224 READING => "READING",
225 _ => "INVALID",
226 }
227}
228
229// Bits to mark the position of the receiver.
230const MARK_NEXT_POS: u64 = 1 << (STATUS_BITS * MAX_CAP as u64); // Add to increase position by 1.
231
232/// Returns the position of the receiver. Will be in 0..[`MAX_CAP`] range.
233#[inline(always)]
234#[allow(clippy::cast_possible_truncation)]
235fn receiver_pos(status: u64, capacity: usize) -> usize {
236 (status >> (STATUS_BITS * MAX_CAP as u64)) as usize % capacity
237}
238
239/// Sending side of the channel.
240pub struct Sender<T> {
241 channel: NonNull<Channel<T>>,
242}
243
244/// Error returned in case sending a value across the channel fails. See
245/// [`Sender::try_send`].
246#[derive(Copy, Clone, Debug, Eq, PartialEq)]
247pub enum SendError<T> {
248 /// Channel is full.
249 Full(T),
250 /// [`Receiver`] and [`Manager`] are disconnected.
251 Disconnected(T),
252}
253
254impl<T> fmt::Display for SendError<T> {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 match self {
257 SendError::Full(..) => f.pad("channel is full"),
258 SendError::Disconnected(..) => f.pad("receiver is disconnected"),
259 }
260 }
261}
262
263impl<T: fmt::Debug> Error for SendError<T> {}
264
265impl<T> Sender<T> {
266 /// Attempts to send the `value` into the channel.
267 pub fn try_send(&self, value: T) -> Result<(), SendError<T>> {
268 try_send(self.channel(), value)
269 }
270
271 /// Returns a future that sends a value into the channel, waiting if the
272 /// channel is full.
273 ///
274 /// If the returned [`Future`] returns an error it means the [`Receiver`]
275 /// and [`Manager`] are [disconnected] and no more values will be read from
276 /// the channel. This is the same error as [`SendError::Disconnected`].
277 /// [`SendError::Full`] will never be returned, the `Future` will return
278 /// [`Poll::Pending`] instead.
279 ///
280 /// [disconnected]: Sender::is_connected
281 pub fn send(&self, value: T) -> SendValue<T> {
282 SendValue {
283 channel: self.channel(),
284 value: Some(value),
285 registered_waker: None,
286 }
287 }
288
289 /// Returns a [`Future`] that waits until the other side of the channel is
290 /// [disconnected].
291 ///
292 /// [disconnected]: Sender::is_connected
293 pub fn join(&self) -> Join<T> {
294 Join {
295 channel: self.channel(),
296 registered_waker: None,
297 }
298 }
299
300 /// Returns the capacity of the channel.
301 pub fn capacity(&self) -> usize {
302 self.channel().slots.len()
303 }
304
305 /// Returns `true` if the [`Receiver`] and or the [`Manager`] are connected.
306 ///
307 /// # Notes
308 ///
309 /// Unlike [`Receiver::is_connected`] this method takes the [`Manager`] into
310 /// account. This is done to support the use case in which an actor is
311 /// restarted and a new receiver is created for it.
312 pub fn is_connected(&self) -> bool {
313 // Relaxed is fine here since there is always a bit of a race condition
314 // when using this method (and then doing something based on it).
315 has_receiver_or_manager(self.channel().ref_count.load(Ordering::Relaxed))
316 }
317
318 /// Returns `true` if the [`Manager`] is connected.
319 pub fn has_manager(&self) -> bool {
320 // Relaxed is fine here since there is always a bit of a race condition
321 // when using this method (and then doing something based on it).
322 has_manager(self.channel().ref_count.load(Ordering::Relaxed))
323 }
324
325 /// Returns `true` if senders send into the same channel.
326 pub fn same_channel(&self, other: &Sender<T>) -> bool {
327 self.channel == other.channel
328 }
329
330 /// Returns `true` if this sender sends to the `receiver`.
331 pub fn sends_to(&self, receiver: &Receiver<T>) -> bool {
332 self.channel == receiver.channel
333 }
334
335 /// Returns the id of this sender.
336 pub fn id(&self) -> Id {
337 Id(self.channel.as_ptr() as *const () as usize)
338 }
339
340 fn channel(&self) -> &Channel<T> {
341 unsafe { self.channel.as_ref() }
342 }
343}
344
345/// See [`Sender::try_send`].
346fn try_send<T>(channel: &Channel<T>, value: T) -> Result<(), SendError<T>> {
347 if !has_receiver_or_manager(channel.ref_count.load(Ordering::Relaxed)) {
348 return Err(SendError::Disconnected(value));
349 }
350
351 // NOTE: relaxed ordering here is ok because we acquire unique
352 // permission to write to the slot later before writing to it. Something
353 // we have to do no matter the ordering.
354 let mut status: u64 = channel.status.load(Ordering::Relaxed);
355 let cap = channel.slots.len();
356 let start = receiver_pos(status, cap);
357 for slot in (0..cap).cycle().skip(start).take(cap) {
358 if !is_available(status, slot) {
359 continue;
360 }
361
362 // In our local status the slot is available, however another sender
363 // could have taken it between the time we read the status and the
364 // time we got here. So we write our `TAKEN` status and check if in
365 // the *previous* (up-to-date) status (returned by `fetch_or`) the
366 // slot was still available. If it was it means we have acquired the
367 // slot, otherwise another sender beat us to it.
368 //
369 // NOTE: The OR operation here is safe: if another sender already
370 // wrote TAKEN (01) or FILLED (11) we're not overwriting anything.
371 // If a reader wrote READING (10) we won't use the slot and the
372 // reader will overwrite it with EMPTY later. If we overwrite EMPTY
373 // (00) we can reuse the slot safely, but the message will be in a
374 // different order.
375 status = channel
376 .status
377 .fetch_or(mark_slot(slot, MARK_TAKEN), Ordering::AcqRel);
378 if !is_available(status, slot) {
379 // Another thread beat us to taking the slot.
380 continue;
381 }
382
383 // Safety: we've acquired the slot above so we're ensured unique
384 // access to the slot.
385 unsafe {
386 let _ = (&mut *channel.slots[slot].get()).write(value);
387 }
388
389 // Now we've writing to the slot we can mark it slot as filled.
390 let old_status = channel
391 .status
392 .fetch_or(mark_slot(slot, MARK_FILLED), Ordering::AcqRel);
393 // Debug assertion to check the slot was in the TAKEN status.
394 debug_assert!(has_status(old_status, slot, TAKEN));
395
396 // If the receiver is waiting for this lot we wake it.
397 if receiver_pos(old_status, cap) == slot {
398 channel.wake_receiver();
399 }
400
401 return Ok(());
402 }
403
404 Err(SendError::Full(value))
405}
406
407/// # Safety
408///
409/// Only `2 ^ 30` (a billion) `Sender`s may be alive concurrently, more then
410/// enough for all practical use cases.
411impl<T> Clone for Sender<T> {
412 fn clone(&self) -> Sender<T> {
413 // For the reasoning behind this relaxed ordering see `Arc::clone`.
414 let old_ref_count = self.channel().ref_count.fetch_add(1, Ordering::Relaxed);
415 debug_assert!(old_ref_count & SENDER_ACCESS != 0);
416 Sender {
417 channel: self.channel,
418 }
419 }
420}
421
422impl<T> fmt::Debug for Sender<T> {
423 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
424 f.debug_struct("Sender")
425 .field("channel", &self.channel())
426 .finish()
427 }
428}
429
430// Safety: if the value can be send across thread than so can the channel.
431unsafe impl<T: Send> Send for Sender<T> {}
432
433unsafe impl<T> Sync for Sender<T> {}
434
435impl<T> Unpin for Sender<T> {}
436
437impl<T> Drop for Sender<T> {
438 #[rustfmt::skip]
439 fn drop(&mut self) {
440 // Safety: for the reasoning behind this ordering see `Arc::drop`.
441 let old_ref_count = self.channel().ref_count.fetch_sub(1, Ordering::Release);
442 if sender_count(old_ref_count) != 1 {
443 // If we're not the last sender all we have to do is decrement the
444 // ref count (above).
445 return;
446 }
447
448 // If we're the last sender being dropped wake the receiver.
449 if has_receiver_or_manager(old_ref_count) {
450 self.channel().wake_receiver();
451 }
452
453 // If the previous value was `SENDER_ACCESS` it means that the receiver,
454 // all other senders and the manager were all dropped, so we need to do
455 // the deallocating.
456 let old_ref_count = self.channel().ref_count.fetch_and(!SENDER_ACCESS, Ordering::Release);
457 if old_ref_count != SENDER_ACCESS {
458 // Another sender, the receiver or the manager is still alive.
459 return;
460 }
461
462 // For the reasoning behind this ordering see `Arc::drop`.
463 fence!(self.channel().ref_count, Ordering::Acquire);
464
465 // Drop the memory.
466 unsafe { drop(Box::from_raw(self.channel.as_ptr())) }
467 }
468}
469
470/// [`Future`] implementation behind [`Sender::send`].
471#[derive(Debug)]
472#[must_use = "futures do nothing unless you `.await` or poll them"]
473pub struct SendValue<'s, T> {
474 channel: &'s Channel<T>,
475 value: Option<T>,
476 registered_waker: Option<task::Waker>,
477}
478
479impl<'s, T> Future for SendValue<'s, T> {
480 type Output = Result<(), T>;
481
482 fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
483 // Safety: only `waker_node` is pinned, which is only used by
484 // `register_waker`.
485 let this = unsafe { self.as_mut().get_unchecked_mut() };
486 let value = this
487 .value
488 .take()
489 .expect("SendValue polled after completion");
490
491 // First we try to send the value, if this succeeds we don't have to
492 // allocate in the waker list.
493 match try_send(this.channel, value) {
494 Ok(()) => Poll::Ready(Ok(())),
495 Err(SendError::Full(value)) => {
496 let registered_waker = register_waker(
497 &mut this.registered_waker,
498 &this.channel.sender_wakers,
499 ctx.waker(),
500 );
501 if !registered_waker {
502 return Poll::Pending;
503 }
504
505 // It could be the case that the received received a value in
506 // the time after we tried to send the value and before we added
507 // the our waker to list. So we try to send a value again to
508 // ensure we don't awoken and the channel has a slot available.
509 match try_send(this.channel, value) {
510 Ok(()) => Poll::Ready(Ok(())),
511 Err(SendError::Full(value)) => {
512 // Channel is still full, we'll have to wait.
513 this.value = Some(value);
514 Poll::Pending
515 }
516 Err(SendError::Disconnected(value)) => Poll::Ready(Err(value)),
517 }
518 }
519 Err(SendError::Disconnected(value)) => Poll::Ready(Err(value)),
520 }
521 }
522}
523
524unsafe impl<'s, T> Sync for SendValue<'s, T> {}
525
526impl<'s, T> Drop for SendValue<'s, T> {
527 fn drop(&mut self) {
528 if let Some(waker) = self.registered_waker.take() {
529 let mut sender_wakers = self.channel.sender_wakers.lock().unwrap();
530 let idx = sender_wakers.iter().position(|w| w.will_wake(&waker));
531 if let Some(idx) = idx {
532 drop(sender_wakers.swap_remove(idx));
533 }
534 }
535 }
536}
537
538/// [`Future`] implementation behind [`Sender::join`].
539#[derive(Debug)]
540#[must_use = "futures do nothing unless you `.await` or poll them"]
541pub struct Join<'s, T> {
542 channel: &'s Channel<T>,
543 registered_waker: Option<task::Waker>,
544}
545
546impl<'s, T> Future for Join<'s, T> {
547 type Output = ();
548
549 fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
550 if !has_receiver_or_manager(self.channel.ref_count.load(Ordering::Acquire)) {
551 // Other side is disconnected.
552 return Poll::Ready(());
553 }
554
555 let this = &mut *self;
556 let registered_waker = &mut this.registered_waker;
557 let join_wakers = &this.channel.join_wakers;
558 let registered_waker = register_waker(registered_waker, join_wakers, ctx.waker());
559 if !registered_waker {
560 return Poll::Pending;
561 }
562
563 if has_receiver_or_manager(this.channel.ref_count.load(Ordering::Acquire)) {
564 Poll::Pending
565 } else {
566 // Other side is disconnected.
567 Poll::Ready(())
568 }
569 }
570}
571
572unsafe impl<'s, T> Sync for Join<'s, T> {}
573
574impl<'s, T> Drop for Join<'s, T> {
575 fn drop(&mut self) {
576 if let Some(waker) = self.registered_waker.take() {
577 let mut join_wakers = self.channel.join_wakers.lock().unwrap();
578 let idx = join_wakers.iter().position(|w| w.will_wake(&waker));
579 if let Some(idx) = idx {
580 drop(join_wakers.swap_remove(idx));
581 }
582 }
583 }
584}
585
586/// Registers `waker` in `channel_wakers` if `registered_waker` is `None` or is
587/// different from `waker`. Return `true` if `waker` was registered, `false`
588/// otherwise.
589fn register_waker(
590 registered_waker: &mut Option<task::Waker>,
591 channel_wakers: &Mutex<Vec<task::Waker>>,
592 waker: &task::Waker,
593) -> bool {
594 match registered_waker {
595 // Already registered this waker, don't have to do anything.
596 Some(w) if w.will_wake(waker) => false,
597 // Different waker, replace the old one.
598 Some(w) => {
599 let waker = waker.clone();
600 let old_waker = replace(w, waker.clone());
601
602 let mut channel_wakers = channel_wakers.lock().unwrap();
603 let idx = channel_wakers.iter().position(|w| w.will_wake(&old_waker));
604 if let Some(idx) = idx {
605 // Replace the old waker with the new one.
606 channel_wakers[idx] = waker;
607 } else {
608 // This can happen if `Sender` (or `Manager`) is being
609 // dropped, most likely this `push` is pointless and we
610 // return `Poll::Ready` below, but just in case.
611 channel_wakers.push(waker);
612 }
613 true
614 }
615 // Haven't registered waker yet.
616 None => {
617 let waker = waker.clone();
618 *registered_waker = Some(waker.clone());
619
620 let mut channel_wakers = channel_wakers.lock().unwrap();
621 channel_wakers.push(waker);
622 true
623 }
624 }
625}
626
627/// Receiving side of the channel.
628pub struct Receiver<T> {
629 channel: NonNull<Channel<T>>,
630}
631
632/// Error returned in case receiving a value from the channel fails. See
633/// [`Receiver::try_recv`].
634#[derive(Copy, Clone, Debug, Eq, PartialEq)]
635pub enum RecvError {
636 /// Channel is empty.
637 Empty,
638 /// All [`Sender`]s (but not necessarily the [`Manager`]) are disconnected
639 /// and the channel is empty, see [`Receiver::is_connected`].
640 Disconnected,
641}
642
643impl fmt::Display for RecvError {
644 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
645 match self {
646 RecvError::Empty => f.pad("channel is empty"),
647 RecvError::Disconnected => f.pad("all senders are disconnected"),
648 }
649 }
650}
651
652impl Error for RecvError {}
653
654impl<T> Receiver<T> {
655 /// Attempts to receive a value from this channel.
656 pub fn try_recv(&mut self) -> Result<T, RecvError> {
657 try_recv(self.channel())
658 }
659
660 /// Returns a future that receives a value from the channel, waiting if the
661 /// channel is empty.
662 ///
663 /// If the returned [`Future`] returns `None` it means all [`Sender`]s are
664 /// [disconnected]. This is the same error as [`RecvError::Disconnected`].
665 /// [`RecvError::Empty`] will never be returned, the `Future` will return
666 /// [`Poll::Pending`] instead.
667 ///
668 /// [disconnected]: Receiver::is_connected
669 pub fn recv(&mut self) -> RecvValue<T> {
670 RecvValue {
671 channel: self.channel(),
672 }
673 }
674
675 /// Attempts to peek a value from this channel.
676 pub fn try_peek(&mut self) -> Result<&T, RecvError> {
677 try_peek(self.channel())
678 }
679
680 /// Returns a future that peeks at a value from the channel, waiting if the
681 /// channel is empty.
682 ///
683 /// If the returned [`Future`] returns `None` it means all [`Sender`]s are
684 /// [disconnected]. This is the same error as [`RecvError::Disconnected`].
685 /// [`RecvError::Empty`] will never be returned, the `Future` will return
686 /// [`Poll::Pending`] instead.
687 ///
688 /// [disconnected]: Receiver::is_connected
689 pub fn peek(&mut self) -> PeekValue<T> {
690 PeekValue {
691 channel: self.channel(),
692 }
693 }
694
695 /// Create a new [`Sender`] that sends to this channel.
696 ///
697 /// # Safety
698 ///
699 /// The same restrictions apply to this function as they do to
700 /// [`Sender::clone`].
701 ///
702 /// [`Sender::clone`]: struct.Sender.html#impl-Clone
703 pub fn new_sender(&self) -> Sender<T> {
704 // For the reasoning behind this relaxed ordering see `Arc::clone`.
705 let old_ref_count = self.channel().ref_count.fetch_add(1, Ordering::Relaxed);
706 if old_ref_count & SENDER_ACCESS != 0 {
707 let _ = self
708 .channel()
709 .ref_count
710 .fetch_or(SENDER_ACCESS, Ordering::Relaxed);
711 }
712
713 Sender {
714 channel: self.channel,
715 }
716 }
717
718 /// Returns the capacity of the channel.
719 pub fn capacity(&self) -> usize {
720 self.channel().slots.len()
721 }
722
723 /// Returns `false` if all [`Sender`]s are disconnected.
724 ///
725 /// # Notes
726 ///
727 /// Unlike [`Sender::is_connected`] this method doesn't take the [`Manager`]
728 /// into account. This means that this method can return `false` and later
729 /// `true` (if the `Manager` created another `Sender`), which might be
730 /// unexpected.
731 pub fn is_connected(&self) -> bool {
732 // Relaxed is fine here since there is always a bit of a race condition
733 // when using this method (and then doing something based on it).
734 sender_count(self.channel().ref_count.load(Ordering::Relaxed)) > 0
735 }
736
737 /// Returns `true` if the [`Manager`] is connected.
738 pub fn has_manager(&self) -> bool {
739 // Relaxed is fine here since there is always a bit of a race condition
740 // when using this method (and then doing something based on it).
741 has_manager(self.channel().ref_count.load(Ordering::Relaxed))
742 }
743
744 /// Set the receiver's waker to `waker`, if they are different. Returns
745 /// `true` if the waker is changed, `false` otherwise.
746 ///
747 /// This is useful if you can't call [`Receiver::recv`] but still want a
748 /// wake-up notification once messages are added to the inbox.
749 pub fn register_waker(&mut self, waker: &task::Waker) -> bool {
750 self.channel().receiver_waker.register(waker)
751 }
752
753 /// Returns the id of this receiver.
754 pub fn id(&self) -> Id {
755 Id(self.channel.as_ptr() as *const () as usize)
756 }
757
758 fn channel(&self) -> &Channel<T> {
759 unsafe { self.channel.as_ref() }
760 }
761}
762
763/// See [`Receiver::try_recv`].
764fn try_recv<T>(channel: &Channel<T>) -> Result<T, RecvError> {
765 // We check if we are connected **before** checking for messages. This
766 // is important because there is a time between 1) the checking of the
767 // messages in the channel and 2) checking if we're connected (if we
768 // would do it in the last `if` statement of this method) in which the
769 // sender could send a message and be dropped.
770 // In this case, if we would check if we're connected after checking for
771 // messages, we would incorrectly return `RecvError::Disconnected` (all
772 // senders are dropped after all), however we would miss the last
773 // message send.
774 // Checking before hand causes us to return `RecvError::Empty`, which
775 // technically isn't correct either but it will cause the user to check
776 // again later. In `RecvValue` this is solved by calling `try_recv`
777 // after registering the task waker, ensuring no wake-up events are
778 // missed.
779 let is_connected = sender_count(channel.ref_count.load(Ordering::Relaxed)) > 0;
780
781 // Since we subtract from the `status` this will overflow at some point. But
782 // `fetch_add` wraps-around on overflow, so the position will "reset" itself
783 // to 0. This is one of the reasons we don't support FIFO order. The status
784 // bits will not be touched (even on wrap-around).
785 let mut status = channel.status.fetch_add(MARK_NEXT_POS, Ordering::AcqRel);
786 let cap = channel.slots.len();
787 let start = receiver_pos(status, cap);
788 for slot in (0..cap).cycle().skip(start).take(cap) {
789 if !is_filled(status, slot) {
790 continue;
791 }
792
793 // Mark the slot as being read.
794 status = channel
795 .status
796 .fetch_xor(mark_slot(slot, MARK_READING), Ordering::AcqRel);
797 if !is_filled(status, slot) {
798 // Slot isn't available after all.
799 continue;
800 }
801
802 // Safety: we've acquired unique access to the slot above and we're
803 // ensured the slot is filled.
804 let value = unsafe { (&*channel.slots[slot].get()).assume_init_read() };
805
806 // Mark the slot as empty.
807 let old_status = channel
808 .status
809 .fetch_and(!mark_slot(slot, MARK_EMPTIED), Ordering::AcqRel);
810
811 // Debug assertion to check the slot was in the READING or FILLED
812 // status. The slot can be in the FILLED status if the sender tried
813 // to mark this slot as TAKEN (01) after we marked it as READING
814 // (10) (01 | 10 = 11 (FILLED)).
815 debug_assert!(
816 has_status(old_status, slot, READING) || has_status(old_status, slot, FILLED)
817 );
818
819 channel.wake_next_sender();
820
821 return Ok(value);
822 }
823
824 if is_connected {
825 Err(RecvError::Empty)
826 } else {
827 Err(RecvError::Disconnected)
828 }
829}
830
831/// See [`Receiver::try_peek`].
832fn try_peek<T>(channel: &Channel<T>) -> Result<&T, RecvError> {
833 // See `try_recv` why we do this first.
834 let is_connected = sender_count(channel.ref_count.load(Ordering::Relaxed)) > 0;
835
836 let status = channel.status.load(Ordering::Acquire);
837 let cap = channel.slots.len();
838 let start = receiver_pos(status, cap);
839 for slot in (0..cap).cycle().skip(start).take(cap) {
840 if !is_filled(status, slot) {
841 continue;
842 }
843
844 // Safety: we've acquired unique access to the slot above and we're
845 // ensured the slot is filled.
846 return Ok(unsafe { (&*channel.slots[slot].get()).assume_init_ref() });
847 }
848
849 if is_connected {
850 Err(RecvError::Empty)
851 } else {
852 Err(RecvError::Disconnected)
853 }
854}
855
856impl<T: fmt::Debug> fmt::Debug for Receiver<T> {
857 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
858 f.debug_struct("Receiver")
859 .field("channel", &self.channel())
860 .finish()
861 }
862}
863
864// Safety: if the value can be send across thread than so can the channel.
865unsafe impl<T: Send> Send for Receiver<T> {}
866
867unsafe impl<T> Sync for Receiver<T> {}
868
869impl<T> Unpin for Receiver<T> {}
870
871impl<T> Drop for Receiver<T> {
872 #[rustfmt::skip]
873 fn drop(&mut self) {
874 // First mark the receiver as dropped.
875 // Safety: for the reasoning behind this ordering see `Arc::drop`.
876 let old_ref_count = self.channel().ref_count.fetch_and(!RECEIVER_ALIVE, Ordering::Release);
877 if has_manager(old_ref_count) {
878 // If the channel has a manager we only mark the receiver as dropped
879 // (above).
880 return;
881 }
882
883 // If the channel doesn't have a manager we empty the channel. We do
884 // this to support the use case were the channel holds a
885 // `oneshot::Sender` and the receiver of the oneshot channel is holding
886 // a `Sender` to this channel. Effectively this creates a cyclic drop
887 // dependency: `Sender` -> `Channel` -> `oneshot::Sender` which blocks
888 // `oneshot::Receiver::recv`. If the actor holding a `Sender` calls
889 // `oneshot::Receiver::recv` it will wait for a response or until the
890 // `oneshot::Sender` is dropped, while the actor is holding a `Sender`
891 // to this channel. However if this `Receiver` is dropped it won't drop
892 // the `oneshot::Sender` without the emptying below. This causes
893 // `oneshot::Receiver::recv` to wait forever, while holding a `Sender`.
894 while let Ok(msg) = self.try_recv() {
895 drop(msg);
896 }
897
898 // Let all senders know the sender is disconnected.
899 self.channel().wake_all_join();
900
901 // If the previous value was `RECEIVER_ACCESS` it means that all senders
902 // and the manager were all dropped, so we need to do the deallocating.
903 let old_ref_count = self.channel().ref_count.fetch_and(!RECEIVER_ACCESS, Ordering::Release);
904 if old_ref_count != RECEIVER_ACCESS {
905 // Another sender is alive, can't deallocate yet.
906 return;
907 }
908
909 // For the reasoning behind this ordering see `Arc::drop`.
910 fence!(self.channel().ref_count, Ordering::Acquire);
911
912 // Drop the memory.
913 unsafe { drop(Box::from_raw(self.channel.as_ptr())) }
914 }
915}
916
917/// [`Future`] implementation behind [`Receiver::recv`].
918#[derive(Debug)]
919#[must_use = "futures do nothing unless you `.await` or poll them"]
920pub struct RecvValue<'r, T> {
921 channel: &'r Channel<T>,
922}
923
924impl<'r, T> Future for RecvValue<'r, T> {
925 type Output = Option<T>;
926
927 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
928 match try_recv(self.channel) {
929 Ok(value) => Poll::Ready(Some(value)),
930 Err(RecvError::Empty) => {
931 // The channel is empty, we'll set the waker.
932 if !self.channel.receiver_waker.register(ctx.waker()) {
933 // Waker already set.
934 return Poll::Pending;
935 }
936
937 // But it could be the case that a sender send a value in the
938 // time between we last checked and we actually marked ourselves
939 // as needing a wake up, so we need to check again.
940 match try_recv(self.channel) {
941 Ok(value) => Poll::Ready(Some(value)),
942 // The `Sender` will wake us when a new message is send.
943 Err(RecvError::Empty) => Poll::Pending,
944 Err(RecvError::Disconnected) => Poll::Ready(None),
945 }
946 }
947 Err(RecvError::Disconnected) => Poll::Ready(None),
948 }
949 }
950}
951
952impl<'r, T> Unpin for RecvValue<'r, T> {}
953
954/// [`Future`] implementation behind [`Receiver::peek`].
955#[derive(Debug)]
956#[must_use = "futures do nothing unless you `.await` or poll them"]
957pub struct PeekValue<'r, T> {
958 channel: &'r Channel<T>,
959}
960
961impl<'r, T> Future for PeekValue<'r, T> {
962 type Output = Option<&'r T>;
963
964 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
965 match try_peek(self.channel) {
966 Ok(value) => Poll::Ready(Some(value)),
967 Err(RecvError::Empty) => {
968 // The channel is empty, we'll set the waker.
969 if !self.channel.receiver_waker.register(ctx.waker()) {
970 // Waker already set.
971 return Poll::Pending;
972 }
973
974 // But it could be the case that a sender send a value in the
975 // time between we last checked and we actually marked ourselves
976 // as needing a wake up, so we need to check again.
977 match try_peek(self.channel) {
978 Ok(value) => Poll::Ready(Some(value)),
979 // The `Sender` will wake us when a new message is send.
980 Err(RecvError::Empty) => Poll::Pending,
981 Err(RecvError::Disconnected) => Poll::Ready(None),
982 }
983 }
984 Err(RecvError::Disconnected) => Poll::Ready(None),
985 }
986 }
987}
988
989impl<'r, T> Unpin for PeekValue<'r, T> {}
990
991/// Channel internals shared between zero or more [`Sender`]s, zero or one
992/// [`Receiver`] and zero or one [`Manager`].
993struct Channel<T> {
994 inner: Inner,
995 /// The slots in the channel, see `status` for what slots are used/unused.
996 slots: [UnsafeCell<MaybeUninit<T>>],
997}
998
999/// Inner data of [`Channel`].
1000///
1001/// This is only in a different struct to calculate the `Layout` of `Channel`,
1002/// see [`Channel::new`].
1003struct Inner {
1004 /// Status of the slots.
1005 ///
1006 /// This contains the status of the slots. Each status consists of
1007 /// [`STATUS_BITS`] bits to describe if the slot is taken or not.
1008 ///
1009 /// The first `STATUS_BITS * MAX_CAP` bits are the statuses for the `slots`
1010 /// field. The remaining bits are used by the `Sender` to indicate its
1011 /// current reading position (modulo [`MAX_CAP`]).
1012 status: AtomicU64,
1013 /// The number of senders alive. If the [`RECEIVER_ALIVE`] bit is set the
1014 /// [`Receiver`] is alive. If the [`MANAGER_ALIVE`] bit is the [`Manager`]
1015 /// is alive.
1016 ref_count: AtomicUsize,
1017 sender_wakers: Mutex<Vec<task::Waker>>,
1018 join_wakers: Mutex<Vec<task::Waker>>,
1019 receiver_waker: WakerRegistration,
1020}
1021
1022// Safety: if the value can be send across thread than so can the channel.
1023unsafe impl<T: Send> Send for Channel<T> {}
1024
1025unsafe impl<T> Sync for Channel<T> {}
1026
1027impl<T> Channel<T> {
1028 /// Allocates a new `Channel` on the heap.
1029 ///
1030 /// `capacity` must small enough to ensure each slot has 2 bits for the
1031 /// status, while ensuring that the remaining bits can store `capacity` (in
1032 /// binary) to keep track of the reading position. This means following must
1033 /// hold true where $N is capacity: `2 ^ (64 - ($N * 2)) >= $N`. The maximum
1034 /// is 29.
1035 ///
1036 /// Marks a single [`Receiver`] and [`Sender`] as alive.
1037 fn new(capacity: usize) -> NonNull<Channel<T>> {
1038 assert!(capacity >= MIN_CAP, "capacity can't be zero");
1039 assert!(capacity <= MAX_CAP, "capacity too large");
1040
1041 // Allocate some raw bytes.
1042 // Safety: returns an error on arithmetic overflow, but it should be OK
1043 // with a capacity <= MAX_CAP.
1044 let (layout, _) = Layout::array::<UnsafeCell<MaybeUninit<T>>>(capacity)
1045 .and_then(|slots_layout| Layout::new::<Inner>().extend(slots_layout))
1046 .unwrap();
1047 // Safety: we check if the allocation is successful.
1048 let ptr = unsafe { alloc(layout) };
1049 if ptr.is_null() {
1050 handle_alloc_error(layout);
1051 }
1052 let ptr = ptr::slice_from_raw_parts_mut(ptr as *mut T, capacity) as *mut Channel<T>;
1053
1054 // Initialise all fields (that need it).
1055 unsafe {
1056 ptr::addr_of_mut!((*ptr).inner.status).write(AtomicU64::new(0));
1057 ptr::addr_of_mut!((*ptr).inner.ref_count).write(AtomicUsize::new(
1058 RECEIVER_ALIVE | RECEIVER_ACCESS | SENDER_ACCESS | 1,
1059 ));
1060 ptr::addr_of_mut!((*ptr).inner.sender_wakers).write(Mutex::new(Vec::new()));
1061 ptr::addr_of_mut!((*ptr).inner.join_wakers).write(Mutex::new(Vec::new()));
1062 ptr::addr_of_mut!((*ptr).inner.receiver_waker).write(WakerRegistration::new());
1063 }
1064
1065 // Safety: checked if the pointer is null above.
1066 unsafe { NonNull::new_unchecked(ptr) }
1067 }
1068
1069 /// Returns the next `task::Waker` to wake, if any.
1070 fn wake_next_sender(&self) {
1071 let mut sender_wakers = self.sender_wakers.lock().unwrap();
1072 let waker = (!sender_wakers.is_empty()).then(|| sender_wakers.swap_remove(0));
1073 unlock(sender_wakers);
1074 if let Some(waker) = waker {
1075 waker.wake();
1076 }
1077 }
1078
1079 /// Wakes all wakers waiting on the sender to disconnect.
1080 fn wake_all_join(&self) {
1081 let mut join_wakers = self.join_wakers.lock().unwrap();
1082 let wakers = take(&mut *join_wakers);
1083 unlock(join_wakers);
1084 for waker in wakers {
1085 waker.wake();
1086 }
1087 }
1088
1089 /// Wake the `Receiver`.
1090 fn wake_receiver(&self) {
1091 self.receiver_waker.wake();
1092 }
1093}
1094
1095// NOTE: this is here so we don't have to type `self.channel().inner`
1096// everywhere.
1097impl<T> Deref for Channel<T> {
1098 type Target = Inner;
1099
1100 fn deref(&self) -> &Self::Target {
1101 &self.inner
1102 }
1103}
1104
1105impl<T> fmt::Debug for Channel<T> {
1106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1107 let status = self.status.load(Ordering::Relaxed);
1108 let ref_count = self.ref_count.load(Ordering::Relaxed);
1109 let sender_count = sender_count(ref_count);
1110 let recv_pos = receiver_pos(status, self.slots.len());
1111 let mut slots = [""; MAX_CAP];
1112 for n in 0..self.slots.len() {
1113 slots[n] = dbg_status(slot_status(status, n));
1114 }
1115 let slots = &slots[..self.slots.len()];
1116 f.debug_struct("Channel")
1117 .field("senders_alive", &sender_count)
1118 .field("receiver_alive", &has_receiver(ref_count))
1119 .field("manager_alive", &has_manager(ref_count))
1120 .field("receiver_position", &recv_pos)
1121 .field("slots", &slots)
1122 .finish()
1123 }
1124}
1125
1126impl<T> Drop for Channel<T> {
1127 fn drop(&mut self) {
1128 // Safety: we have unique access, per the mutable reference, so relaxed
1129 // is fine.
1130 let status: u64 = self.status.load(Ordering::Relaxed);
1131 for slot in 0..self.slots.len() {
1132 if is_filled(status, slot) {
1133 // Safety: we have unique access to the slot and we've checked
1134 // above whether or not the slot is filled.
1135 unsafe { self.slots[slot].get_mut().assume_init_drop() };
1136 }
1137 }
1138 }
1139}
1140
1141/// Manager of a channel.
1142///
1143/// A channel manager can be used to create [`Sender`]s and [`Receiver`]s for a
1144/// channel, without having access to either. Its made for the following use
1145/// case: restarting an actor which takes ownership of the `Receiver` and
1146/// crashes, and to restart the actor we need another `Receiver`. Using the
1147/// manager a new `Receiver` can be created, ensuring only a single `Receiver`
1148/// is alive at any given time.
1149pub struct Manager<T> {
1150 channel: NonNull<Channel<T>>,
1151}
1152
1153/// Error returned by [`Manager::new_receiver`] if a receiver is already
1154/// connected.
1155#[derive(Copy, Clone, Debug, Eq, PartialEq)]
1156pub struct ReceiverConnected;
1157
1158impl fmt::Display for ReceiverConnected {
1159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1160 f.pad("receiver already connected")
1161 }
1162}
1163
1164impl Error for ReceiverConnected {}
1165
1166impl<T> Manager<T> {
1167 /// Create a small bounded channel with a `Manager`.
1168 ///
1169 /// Same as [`new_small`] but with a `Manager`.
1170 pub fn new_small_channel() -> (Manager<T>, Sender<T>, Receiver<T>) {
1171 Manager::new_channel(SMALL_CAP)
1172 }
1173
1174 /// Create a bounded channel with a `Manager`.
1175 ///
1176 /// Same as [`new`] but with a `Manager`.
1177 pub fn new_channel(capacity: usize) -> (Manager<T>, Sender<T>, Receiver<T>) {
1178 let (sender, receiver) = new(capacity);
1179 let old_count = sender
1180 .channel()
1181 .ref_count
1182 .fetch_or(MANAGER_ALIVE | MANAGER_ACCESS, Ordering::Relaxed);
1183 debug_assert!(!has_manager(old_count));
1184 let manager = Manager {
1185 channel: sender.channel,
1186 };
1187 (manager, sender, receiver)
1188 }
1189
1190 /// Create a new [`Sender`].
1191 ///
1192 /// # Safety
1193 ///
1194 /// See the [safety nodes] on `Sender`'s [`Clone`] implemenation, the same
1195 /// conditions apply here.
1196 ///
1197 /// [safety nodes]: struct.Sender.html#impl-Clone
1198 pub fn new_sender(&self) -> Sender<T> {
1199 // For the reasoning behind this relaxed ordering see `Arc::clone`.
1200 let old_ref_count = self.channel().ref_count.fetch_add(1, Ordering::Relaxed);
1201 if old_ref_count & SENDER_ACCESS != 0 {
1202 let _ = self
1203 .channel()
1204 .ref_count
1205 .fetch_or(SENDER_ACCESS, Ordering::Relaxed);
1206 }
1207 Sender {
1208 channel: self.channel,
1209 }
1210 }
1211
1212 /// Attempt to create a new [`Receiver`].
1213 ///
1214 /// This will fail if there already is a receiver.
1215 pub fn new_receiver(&self) -> Result<Receiver<T>, ReceiverConnected> {
1216 let old_count = self
1217 .channel()
1218 .ref_count
1219 .fetch_or(RECEIVER_ALIVE, Ordering::AcqRel);
1220 if has_receiver(old_count) {
1221 Err(ReceiverConnected)
1222 } else {
1223 // No receiver was connected so its safe to create one.
1224 debug_assert!(old_count & RECEIVER_ACCESS != 0);
1225 Ok(Receiver {
1226 channel: self.channel,
1227 })
1228 }
1229 }
1230
1231 fn channel(&self) -> &Channel<T> {
1232 unsafe { self.channel.as_ref() }
1233 }
1234}
1235
1236impl<T> fmt::Debug for Manager<T> {
1237 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1238 f.debug_struct("Manager")
1239 .field("channel", &self.channel())
1240 .finish()
1241 }
1242}
1243
1244// Safety: if the value can be send across thread than so can the channel.
1245unsafe impl<T: Send> Send for Manager<T> {}
1246
1247unsafe impl<T> Sync for Manager<T> {}
1248
1249impl<T> Unpin for Manager<T> {}
1250
1251impl<T> Drop for Manager<T> {
1252 #[rustfmt::skip]
1253 fn drop(&mut self) {
1254 // First mark the manager as dropped.
1255 // Safety: for the reasoning behind this ordering see `Arc::drop`.
1256 let old_ref_count = self.channel().ref_count.fetch_and(!MANAGER_ALIVE, Ordering::Release);
1257 if has_receiver(old_ref_count) {
1258 // If the channel has a receiver we only mark the manager as dropped
1259 // (above).
1260 let _ = self.channel().ref_count.fetch_and(!MANAGER_ACCESS, Ordering::Release);
1261 return;
1262 }
1263
1264 debug_assert!(!has_receiver(old_ref_count));
1265 debug_assert!(old_ref_count & RECEIVER_ACCESS != 0);
1266 // NOTE: because `RECEIVER_ACCESS` bit is still set we don't have to set
1267 // the `RECEIVER_ALIVE` bit (as the receiver will dropped at the end of
1268 // the function).
1269 let receiver = Receiver { channel: self.channel };
1270
1271 let _ = self.channel().ref_count.fetch_and(!MANAGER_ACCESS, Ordering::Release);
1272 // Let the receiver do the cleanup.
1273 drop(receiver);
1274 }
1275}
1276
1277/// Identifier of a channel.
1278///
1279/// This type can be created by calling [`Sender::id`] or [`Receiver::id`] and
1280/// be used to identify channels. It only use case is to compare two ids with
1281/// one another, if two id are the same the sender(s) and receiver(s) point to
1282/// the same channel.
1283///
1284/// # Notes
1285///
1286/// The id is only valid for the lifetime of the channel. Once the channel is
1287/// dropped all ids of the channel are invalidated and might return incorrect
1288/// results after.
1289///
1290/// The methods [`Sender::same_channel`] and [`Sender::sends_to`] should be
1291/// preferred over using this type as they are less error-prone.
1292#[derive(Copy, Clone, Debug, Eq, PartialEq)]
1293pub struct Id(usize);