#![cfg_attr(unstable_nightly, feature(cfg_sanitize))]
#![warn(
missing_debug_implementations,
missing_docs,
unused_results,
variant_size_differences
)]
#![cfg_attr(test, deny(warnings))]
#![doc(test(attr(deny(warnings))))]
use std::alloc::{alloc, handle_alloc_error, Layout};
use std::cell::UnsafeCell;
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::mem::{drop as unlock, replace, take, MaybeUninit};
use std::ops::Deref;
use std::pin::Pin;
use std::ptr::{self, NonNull};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Mutex;
use std::task::{self, Poll};
#[cfg(test)]
mod tests;
macro_rules! fence {
($val: expr, $ordering: expr) => {
#[cfg_attr(unstable_nightly, not(sanitize = "thread"))]
std::sync::atomic::fence($ordering);
#[cfg_attr(unstable_nightly, sanitize = "thread")]
let _ = $val.load($ordering);
};
}
pub mod oneshot;
mod waker;
use waker::WakerRegistration;
const SMALL_CAP: usize = 8;
pub const MAX_CAP: usize = 29;
pub const MIN_CAP: usize = 1;
pub fn new_small<T>() -> (Sender<T>, Receiver<T>) {
new(SMALL_CAP)
}
pub fn new<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
assert!(
(MIN_CAP..=MAX_CAP).contains(&capacity),
"inbox channel capacity must be between {} and {}",
MIN_CAP,
MAX_CAP
);
let channel = Channel::new(capacity);
let sender = Sender { channel };
let receiver = Receiver { channel };
(sender, receiver)
}
const RECEIVER_ALIVE: usize = 1 << (usize::BITS - 1);
const RECEIVER_ACCESS: usize = 1 << (usize::BITS - 2);
const SENDER_ACCESS: usize = 1 << (usize::BITS - 3);
const MANAGER_ALIVE: usize = 1 << (usize::BITS - 4);
const MANAGER_ACCESS: usize = 1 << (usize::BITS - 5);
#[inline(always)]
const fn has_receiver(ref_count: usize) -> bool {
ref_count & RECEIVER_ALIVE != 0
}
#[inline(always)]
const fn has_manager(ref_count: usize) -> bool {
ref_count & MANAGER_ALIVE != 0
}
#[inline(always)]
const fn has_receiver_or_manager(ref_count: usize) -> bool {
ref_count & (RECEIVER_ALIVE | MANAGER_ALIVE) != 0
}
#[inline(always)]
const fn sender_count(ref_count: usize) -> usize {
ref_count & !(RECEIVER_ALIVE | RECEIVER_ACCESS | SENDER_ACCESS | MANAGER_ALIVE | MANAGER_ACCESS)
}
const STATUS_BITS: u64 = 2; const STATUS_MASK: u64 = (1 << STATUS_BITS) - 1;
#[cfg(test)]
const ALL_STATUSES_MASK: u64 = (1 << (MAX_CAP as u64 * STATUS_BITS)) - 1;
const EMPTY: u64 = 0b00; const TAKEN: u64 = 0b01; const FILLED: u64 = 0b11; const READING: u64 = 0b10;
const MARK_TAKEN: u64 = 0b01; const MARK_FILLED: u64 = 0b11; const MARK_READING: u64 = 0b01; const MARK_EMPTIED: u64 = 0b11;
#[inline(always)]
fn is_available(status: u64, slot: usize) -> bool {
has_status(status, slot, EMPTY)
}
#[inline(always)]
fn is_filled(status: u64, slot: usize) -> bool {
has_status(status, slot, FILLED)
}
#[inline(always)]
fn has_status(status: u64, slot: usize, expected: u64) -> bool {
slot_status(status, slot) == expected
}
#[inline(always)]
fn slot_status(status: u64, slot: usize) -> u64 {
debug_assert!(slot <= MAX_CAP);
(status >> (STATUS_BITS * slot as u64)) & STATUS_MASK
}
#[inline(always)]
fn mark_slot(slot: usize, transition: u64) -> u64 {
debug_assert!(slot <= MAX_CAP);
transition << (STATUS_BITS * slot as u64)
}
fn dbg_status(slot_status: u64) -> &'static str {
match slot_status {
EMPTY => "EMPTY",
TAKEN => "TAKEN",
FILLED => "FILLED",
READING => "READING",
_ => "INVALID",
}
}
const MARK_NEXT_POS: u64 = 1 << (STATUS_BITS * MAX_CAP as u64);
#[inline(always)]
#[allow(clippy::cast_possible_truncation)]
fn receiver_pos(status: u64, capacity: usize) -> usize {
(status >> (STATUS_BITS * MAX_CAP as u64)) as usize % capacity
}
pub struct Sender<T> {
channel: NonNull<Channel<T>>,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum SendError<T> {
Full(T),
Disconnected(T),
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SendError::Full(..) => f.pad("channel is full"),
SendError::Disconnected(..) => f.pad("receiver is disconnected"),
}
}
}
impl<T: fmt::Debug> Error for SendError<T> {}
impl<T> Sender<T> {
pub fn try_send(&self, value: T) -> Result<(), SendError<T>> {
try_send(self.channel(), value)
}
pub fn send(&self, value: T) -> SendValue<T> {
SendValue {
channel: self.channel(),
value: Some(value),
registered_waker: None,
}
}
pub fn join(&self) -> Join<T> {
Join {
channel: self.channel(),
registered_waker: None,
}
}
pub fn capacity(&self) -> usize {
self.channel().slots.len()
}
pub fn is_connected(&self) -> bool {
has_receiver_or_manager(self.channel().ref_count.load(Ordering::Relaxed))
}
pub fn has_manager(&self) -> bool {
has_manager(self.channel().ref_count.load(Ordering::Relaxed))
}
pub fn same_channel(&self, other: &Sender<T>) -> bool {
self.channel == other.channel
}
pub fn sends_to(&self, receiver: &Receiver<T>) -> bool {
self.channel == receiver.channel
}
pub fn id(&self) -> Id {
Id(self.channel.as_ptr() as *const () as usize)
}
fn channel(&self) -> &Channel<T> {
unsafe { self.channel.as_ref() }
}
}
fn try_send<T>(channel: &Channel<T>, value: T) -> Result<(), SendError<T>> {
if !has_receiver_or_manager(channel.ref_count.load(Ordering::Relaxed)) {
return Err(SendError::Disconnected(value));
}
let mut status: u64 = channel.status.load(Ordering::Relaxed);
let cap = channel.slots.len();
let start = receiver_pos(status, cap);
for slot in (0..cap).cycle().skip(start).take(cap) {
if !is_available(status, slot) {
continue;
}
status = channel
.status
.fetch_or(mark_slot(slot, MARK_TAKEN), Ordering::AcqRel);
if !is_available(status, slot) {
continue;
}
unsafe {
let _ = (&mut *channel.slots[slot].get()).write(value);
}
let old_status = channel
.status
.fetch_or(mark_slot(slot, MARK_FILLED), Ordering::AcqRel);
debug_assert!(has_status(old_status, slot, TAKEN));
if receiver_pos(old_status, cap) == slot {
channel.wake_receiver();
}
return Ok(());
}
Err(SendError::Full(value))
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let old_ref_count = self.channel().ref_count.fetch_add(1, Ordering::Relaxed);
debug_assert!(old_ref_count & SENDER_ACCESS != 0);
Sender {
channel: self.channel,
}
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender")
.field("channel", &self.channel())
.finish()
}
}
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T> Sync for Sender<T> {}
impl<T> Unpin for Sender<T> {}
impl<T> Drop for Sender<T> {
#[rustfmt::skip]
fn drop(&mut self) {
let old_ref_count = self.channel().ref_count.fetch_sub(1, Ordering::Release);
if sender_count(old_ref_count) != 1 {
return;
}
if has_receiver_or_manager(old_ref_count) {
self.channel().wake_receiver();
}
let old_ref_count = self.channel().ref_count.fetch_and(!SENDER_ACCESS, Ordering::Release);
if old_ref_count != SENDER_ACCESS {
return;
}
fence!(self.channel().ref_count, Ordering::Acquire);
unsafe { drop(Box::from_raw(self.channel.as_ptr())) }
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SendValue<'s, T> {
channel: &'s Channel<T>,
value: Option<T>,
registered_waker: Option<task::Waker>,
}
impl<'s, T> Future for SendValue<'s, T> {
type Output = Result<(), T>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
let this = unsafe { self.as_mut().get_unchecked_mut() };
let value = this
.value
.take()
.expect("SendValue polled after completion");
match try_send(this.channel, value) {
Ok(()) => Poll::Ready(Ok(())),
Err(SendError::Full(value)) => {
let registered_waker = register_waker(
&mut this.registered_waker,
&this.channel.sender_wakers,
ctx.waker(),
);
if !registered_waker {
return Poll::Pending;
}
match try_send(this.channel, value) {
Ok(()) => Poll::Ready(Ok(())),
Err(SendError::Full(value)) => {
this.value = Some(value);
Poll::Pending
}
Err(SendError::Disconnected(value)) => Poll::Ready(Err(value)),
}
}
Err(SendError::Disconnected(value)) => Poll::Ready(Err(value)),
}
}
}
unsafe impl<'s, T> Sync for SendValue<'s, T> {}
impl<'s, T> Drop for SendValue<'s, T> {
fn drop(&mut self) {
if let Some(waker) = self.registered_waker.take() {
let mut sender_wakers = self.channel.sender_wakers.lock().unwrap();
let idx = sender_wakers.iter().position(|w| w.will_wake(&waker));
if let Some(idx) = idx {
drop(sender_wakers.swap_remove(idx));
}
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Join<'s, T> {
channel: &'s Channel<T>,
registered_waker: Option<task::Waker>,
}
impl<'s, T> Future for Join<'s, T> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
if !has_receiver_or_manager(self.channel.ref_count.load(Ordering::Acquire)) {
return Poll::Ready(());
}
let this = &mut *self;
let registered_waker = &mut this.registered_waker;
let join_wakers = &this.channel.join_wakers;
let registered_waker = register_waker(registered_waker, join_wakers, ctx.waker());
if !registered_waker {
return Poll::Pending;
}
if has_receiver_or_manager(this.channel.ref_count.load(Ordering::Acquire)) {
Poll::Pending
} else {
Poll::Ready(())
}
}
}
unsafe impl<'s, T> Sync for Join<'s, T> {}
impl<'s, T> Drop for Join<'s, T> {
fn drop(&mut self) {
if let Some(waker) = self.registered_waker.take() {
let mut join_wakers = self.channel.join_wakers.lock().unwrap();
let idx = join_wakers.iter().position(|w| w.will_wake(&waker));
if let Some(idx) = idx {
drop(join_wakers.swap_remove(idx));
}
}
}
}
fn register_waker(
registered_waker: &mut Option<task::Waker>,
channel_wakers: &Mutex<Vec<task::Waker>>,
waker: &task::Waker,
) -> bool {
match registered_waker {
Some(w) if w.will_wake(waker) => false,
Some(w) => {
let waker = waker.clone();
let old_waker = replace(w, waker.clone());
let mut channel_wakers = channel_wakers.lock().unwrap();
let idx = channel_wakers.iter().position(|w| w.will_wake(&old_waker));
if let Some(idx) = idx {
channel_wakers[idx] = waker;
} else {
channel_wakers.push(waker);
}
true
}
None => {
let waker = waker.clone();
*registered_waker = Some(waker.clone());
let mut channel_wakers = channel_wakers.lock().unwrap();
channel_wakers.push(waker);
true
}
}
}
pub struct Receiver<T> {
channel: NonNull<Channel<T>>,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum RecvError {
Empty,
Disconnected,
}
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RecvError::Empty => f.pad("channel is empty"),
RecvError::Disconnected => f.pad("all senders are disconnected"),
}
}
}
impl Error for RecvError {}
impl<T> Receiver<T> {
pub fn try_recv(&mut self) -> Result<T, RecvError> {
try_recv(self.channel())
}
pub fn recv(&mut self) -> RecvValue<T> {
RecvValue {
channel: self.channel(),
}
}
pub fn try_peek(&mut self) -> Result<&T, RecvError> {
try_peek(self.channel())
}
pub fn peek(&mut self) -> PeekValue<T> {
PeekValue {
channel: self.channel(),
}
}
pub fn new_sender(&self) -> Sender<T> {
let old_ref_count = self.channel().ref_count.fetch_add(1, Ordering::Relaxed);
if old_ref_count & SENDER_ACCESS != 0 {
let _ = self
.channel()
.ref_count
.fetch_or(SENDER_ACCESS, Ordering::Relaxed);
}
Sender {
channel: self.channel,
}
}
pub fn capacity(&self) -> usize {
self.channel().slots.len()
}
pub fn is_connected(&self) -> bool {
sender_count(self.channel().ref_count.load(Ordering::Relaxed)) > 0
}
pub fn has_manager(&self) -> bool {
has_manager(self.channel().ref_count.load(Ordering::Relaxed))
}
pub fn register_waker(&mut self, waker: &task::Waker) -> bool {
self.channel().receiver_waker.register(waker)
}
pub fn id(&self) -> Id {
Id(self.channel.as_ptr() as *const () as usize)
}
fn channel(&self) -> &Channel<T> {
unsafe { self.channel.as_ref() }
}
}
fn try_recv<T>(channel: &Channel<T>) -> Result<T, RecvError> {
let is_connected = sender_count(channel.ref_count.load(Ordering::Relaxed)) > 0;
let mut status = channel.status.fetch_add(MARK_NEXT_POS, Ordering::AcqRel);
let cap = channel.slots.len();
let start = receiver_pos(status, cap);
for slot in (0..cap).cycle().skip(start).take(cap) {
if !is_filled(status, slot) {
continue;
}
status = channel
.status
.fetch_xor(mark_slot(slot, MARK_READING), Ordering::AcqRel);
if !is_filled(status, slot) {
continue;
}
let value = unsafe { (&*channel.slots[slot].get()).assume_init_read() };
let old_status = channel
.status
.fetch_and(!mark_slot(slot, MARK_EMPTIED), Ordering::AcqRel);
debug_assert!(
has_status(old_status, slot, READING) || has_status(old_status, slot, FILLED)
);
channel.wake_next_sender();
return Ok(value);
}
if is_connected {
Err(RecvError::Empty)
} else {
Err(RecvError::Disconnected)
}
}
fn try_peek<T>(channel: &Channel<T>) -> Result<&T, RecvError> {
let is_connected = sender_count(channel.ref_count.load(Ordering::Relaxed)) > 0;
let status = channel.status.load(Ordering::Acquire);
let cap = channel.slots.len();
let start = receiver_pos(status, cap);
for slot in (0..cap).cycle().skip(start).take(cap) {
if !is_filled(status, slot) {
continue;
}
return Ok(unsafe { (&*channel.slots[slot].get()).assume_init_ref() });
}
if is_connected {
Err(RecvError::Empty)
} else {
Err(RecvError::Disconnected)
}
}
impl<T: fmt::Debug> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver")
.field("channel", &self.channel())
.finish()
}
}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T> Sync for Receiver<T> {}
impl<T> Unpin for Receiver<T> {}
impl<T> Drop for Receiver<T> {
#[rustfmt::skip]
fn drop(&mut self) {
let old_ref_count = self.channel().ref_count.fetch_and(!RECEIVER_ALIVE, Ordering::Release);
if has_manager(old_ref_count) {
return;
}
while let Ok(msg) = self.try_recv() {
drop(msg);
}
self.channel().wake_all_join();
let old_ref_count = self.channel().ref_count.fetch_and(!RECEIVER_ACCESS, Ordering::Release);
if old_ref_count != RECEIVER_ACCESS {
return;
}
fence!(self.channel().ref_count, Ordering::Acquire);
unsafe { drop(Box::from_raw(self.channel.as_ptr())) }
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct RecvValue<'r, T> {
channel: &'r Channel<T>,
}
impl<'r, T> Future for RecvValue<'r, T> {
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
match try_recv(self.channel) {
Ok(value) => Poll::Ready(Some(value)),
Err(RecvError::Empty) => {
if !self.channel.receiver_waker.register(ctx.waker()) {
return Poll::Pending;
}
match try_recv(self.channel) {
Ok(value) => Poll::Ready(Some(value)),
Err(RecvError::Empty) => Poll::Pending,
Err(RecvError::Disconnected) => Poll::Ready(None),
}
}
Err(RecvError::Disconnected) => Poll::Ready(None),
}
}
}
impl<'r, T> Unpin for RecvValue<'r, T> {}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PeekValue<'r, T> {
channel: &'r Channel<T>,
}
impl<'r, T> Future for PeekValue<'r, T> {
type Output = Option<&'r T>;
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
match try_peek(self.channel) {
Ok(value) => Poll::Ready(Some(value)),
Err(RecvError::Empty) => {
if !self.channel.receiver_waker.register(ctx.waker()) {
return Poll::Pending;
}
match try_peek(self.channel) {
Ok(value) => Poll::Ready(Some(value)),
Err(RecvError::Empty) => Poll::Pending,
Err(RecvError::Disconnected) => Poll::Ready(None),
}
}
Err(RecvError::Disconnected) => Poll::Ready(None),
}
}
}
impl<'r, T> Unpin for PeekValue<'r, T> {}
struct Channel<T> {
inner: Inner,
slots: [UnsafeCell<MaybeUninit<T>>],
}
struct Inner {
status: AtomicU64,
ref_count: AtomicUsize,
sender_wakers: Mutex<Vec<task::Waker>>,
join_wakers: Mutex<Vec<task::Waker>>,
receiver_waker: WakerRegistration,
}
unsafe impl<T: Send> Send for Channel<T> {}
unsafe impl<T> Sync for Channel<T> {}
impl<T> Channel<T> {
fn new(capacity: usize) -> NonNull<Channel<T>> {
assert!(capacity >= MIN_CAP, "capacity can't be zero");
assert!(capacity <= MAX_CAP, "capacity too large");
let (layout, _) = Layout::array::<UnsafeCell<MaybeUninit<T>>>(capacity)
.and_then(|slots_layout| Layout::new::<Inner>().extend(slots_layout))
.unwrap();
let ptr = unsafe { alloc(layout) };
if ptr.is_null() {
handle_alloc_error(layout);
}
let ptr = ptr::slice_from_raw_parts_mut(ptr as *mut T, capacity) as *mut Channel<T>;
unsafe {
ptr::addr_of_mut!((*ptr).inner.status).write(AtomicU64::new(0));
ptr::addr_of_mut!((*ptr).inner.ref_count).write(AtomicUsize::new(
RECEIVER_ALIVE | RECEIVER_ACCESS | SENDER_ACCESS | 1,
));
ptr::addr_of_mut!((*ptr).inner.sender_wakers).write(Mutex::new(Vec::new()));
ptr::addr_of_mut!((*ptr).inner.join_wakers).write(Mutex::new(Vec::new()));
ptr::addr_of_mut!((*ptr).inner.receiver_waker).write(WakerRegistration::new());
}
unsafe { NonNull::new_unchecked(ptr) }
}
fn wake_next_sender(&self) {
let mut sender_wakers = self.sender_wakers.lock().unwrap();
let waker = (!sender_wakers.is_empty()).then(|| sender_wakers.swap_remove(0));
unlock(sender_wakers);
if let Some(waker) = waker {
waker.wake();
}
}
fn wake_all_join(&self) {
let mut join_wakers = self.join_wakers.lock().unwrap();
let wakers = take(&mut *join_wakers);
unlock(join_wakers);
for waker in wakers {
waker.wake();
}
}
fn wake_receiver(&self) {
self.receiver_waker.wake();
}
}
impl<T> Deref for Channel<T> {
type Target = Inner;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> fmt::Debug for Channel<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let status = self.status.load(Ordering::Relaxed);
let ref_count = self.ref_count.load(Ordering::Relaxed);
let sender_count = sender_count(ref_count);
let recv_pos = receiver_pos(status, self.slots.len());
let mut slots = [""; MAX_CAP];
for n in 0..self.slots.len() {
slots[n] = dbg_status(slot_status(status, n));
}
let slots = &slots[..self.slots.len()];
f.debug_struct("Channel")
.field("senders_alive", &sender_count)
.field("receiver_alive", &has_receiver(ref_count))
.field("manager_alive", &has_manager(ref_count))
.field("receiver_position", &recv_pos)
.field("slots", &slots)
.finish()
}
}
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
let status: u64 = self.status.load(Ordering::Relaxed);
for slot in 0..self.slots.len() {
if is_filled(status, slot) {
unsafe { self.slots[slot].get_mut().assume_init_drop() };
}
}
}
}
pub struct Manager<T> {
channel: NonNull<Channel<T>>,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct ReceiverConnected;
impl fmt::Display for ReceiverConnected {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("receiver already connected")
}
}
impl Error for ReceiverConnected {}
impl<T> Manager<T> {
pub fn new_small_channel() -> (Manager<T>, Sender<T>, Receiver<T>) {
Manager::new_channel(SMALL_CAP)
}
pub fn new_channel(capacity: usize) -> (Manager<T>, Sender<T>, Receiver<T>) {
let (sender, receiver) = new(capacity);
let old_count = sender
.channel()
.ref_count
.fetch_or(MANAGER_ALIVE | MANAGER_ACCESS, Ordering::Relaxed);
debug_assert!(!has_manager(old_count));
let manager = Manager {
channel: sender.channel,
};
(manager, sender, receiver)
}
pub fn new_sender(&self) -> Sender<T> {
let old_ref_count = self.channel().ref_count.fetch_add(1, Ordering::Relaxed);
if old_ref_count & SENDER_ACCESS != 0 {
let _ = self
.channel()
.ref_count
.fetch_or(SENDER_ACCESS, Ordering::Relaxed);
}
Sender {
channel: self.channel,
}
}
pub fn new_receiver(&self) -> Result<Receiver<T>, ReceiverConnected> {
let old_count = self
.channel()
.ref_count
.fetch_or(RECEIVER_ALIVE, Ordering::AcqRel);
if has_receiver(old_count) {
Err(ReceiverConnected)
} else {
debug_assert!(old_count & RECEIVER_ACCESS != 0);
Ok(Receiver {
channel: self.channel,
})
}
}
fn channel(&self) -> &Channel<T> {
unsafe { self.channel.as_ref() }
}
}
impl<T> fmt::Debug for Manager<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Manager")
.field("channel", &self.channel())
.finish()
}
}
unsafe impl<T: Send> Send for Manager<T> {}
unsafe impl<T> Sync for Manager<T> {}
impl<T> Unpin for Manager<T> {}
impl<T> Drop for Manager<T> {
#[rustfmt::skip]
fn drop(&mut self) {
let old_ref_count = self.channel().ref_count.fetch_and(!MANAGER_ALIVE, Ordering::Release);
if has_receiver(old_ref_count) {
let _ = self.channel().ref_count.fetch_and(!MANAGER_ACCESS, Ordering::Release);
return;
}
debug_assert!(!has_receiver(old_ref_count));
debug_assert!(old_ref_count & RECEIVER_ACCESS != 0);
let receiver = Receiver { channel: self.channel };
let _ = self.channel().ref_count.fetch_and(!MANAGER_ACCESS, Ordering::Release);
drop(receiver);
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Id(usize);