#![cfg_attr(not(feature = "std"), no_std)]
#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![doc(
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
#![doc(
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
#[cfg(not(feature = "portable-atomic"))]
extern crate alloc;
use core::fmt;
use core::future::Future;
use core::marker::PhantomPinned;
use core::pin::Pin;
use core::task::{Context, Poll};
#[cfg(not(feature = "portable-atomic"))]
use alloc::sync::Arc;
#[cfg(not(feature = "portable-atomic"))]
use core::sync::atomic::{AtomicUsize, Ordering};
#[cfg(feature = "portable-atomic")]
use portable_atomic::{AtomicUsize, Ordering};
#[cfg(feature = "portable-atomic")]
use portable_atomic_util::Arc;
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
use event_listener_strategy::{
easy_wrapper,
event_listener::{Event, EventListener},
EventListenerFuture, Strategy,
};
use futures_core::ready;
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
struct Channel<T> {
queue: ConcurrentQueue<T>,
send_ops: Event,
recv_ops: Event,
stream_ops: Event,
closed_ops: Event,
sender_count: AtomicUsize,
receiver_count: AtomicUsize,
}
impl<T> Channel<T> {
fn close(&self) -> bool {
if self.queue.close() {
self.send_ops.notify(usize::MAX);
self.recv_ops.notify(usize::MAX);
self.stream_ops.notify(usize::MAX);
self.closed_ops.notify(usize::MAX);
true
} else {
false
}
}
}
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
assert!(cap > 0, "capacity cannot be zero");
let channel = Arc::new(Channel {
queue: ConcurrentQueue::bounded(cap),
send_ops: Event::new(),
recv_ops: Event::new(),
stream_ops: Event::new(),
closed_ops: Event::new(),
sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1),
});
let s = Sender {
channel: channel.clone(),
};
let r = Receiver {
listener: None,
channel,
_pin: PhantomPinned,
};
(s, r)
}
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let channel = Arc::new(Channel {
queue: ConcurrentQueue::unbounded(),
send_ops: Event::new(),
recv_ops: Event::new(),
stream_ops: Event::new(),
closed_ops: Event::new(),
sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1),
});
let s = Sender {
channel: channel.clone(),
};
let r = Receiver {
listener: None,
channel,
_pin: PhantomPinned,
};
(s, r)
}
pub struct Sender<T> {
channel: Arc<Channel<T>>,
}
impl<T> Sender<T> {
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
match self.channel.queue.push(msg) {
Ok(()) => {
self.channel.recv_ops.notify_additional(1);
self.channel.stream_ops.notify(usize::MAX);
Ok(())
}
Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)),
Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)),
}
}
pub fn send(&self, msg: T) -> Send<'_, T> {
Send::_new(SendInner {
sender: self,
msg: Some(msg),
listener: None,
_pin: PhantomPinned,
})
}
pub fn closed(&self) -> Closed<'_, T> {
Closed::_new(ClosedInner {
sender: self,
listener: None,
_pin: PhantomPinned,
})
}
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
self.send(msg).wait()
}
pub fn force_send(&self, msg: T) -> Result<Option<T>, SendError<T>> {
match self.channel.queue.force_push(msg) {
Ok(backlog) => {
self.channel.recv_ops.notify_additional(1);
self.channel.stream_ops.notify(usize::MAX);
Ok(backlog)
}
Err(ForcePushError(reject)) => Err(SendError(reject)),
}
}
pub fn close(&self) -> bool {
self.channel.close()
}
pub fn is_closed(&self) -> bool {
self.channel.queue.is_closed()
}
pub fn is_empty(&self) -> bool {
self.channel.queue.is_empty()
}
pub fn is_full(&self) -> bool {
self.channel.queue.is_full()
}
pub fn len(&self) -> usize {
self.channel.queue.len()
}
pub fn capacity(&self) -> Option<usize> {
self.channel.queue.capacity()
}
pub fn receiver_count(&self) -> usize {
self.channel.receiver_count.load(Ordering::SeqCst)
}
pub fn sender_count(&self) -> usize {
self.channel.sender_count.load(Ordering::SeqCst)
}
pub fn downgrade(&self) -> WeakSender<T> {
WeakSender {
channel: self.channel.clone(),
}
}
pub fn same_channel(&self, other: &Sender<T>) -> bool {
Arc::ptr_eq(&self.channel, &other.channel)
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.channel.close();
}
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Sender {{ .. }}")
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
if count > usize::MAX / 2 {
abort();
}
Sender {
channel: self.channel.clone(),
}
}
}
pin_project! {
pub struct Receiver<T> {
channel: Arc<Channel<T>>,
listener: Option<EventListener>,
#[pin]
_pin: PhantomPinned
}
impl<T> PinnedDrop for Receiver<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
this.channel.close();
}
}
}
}
impl<T> Receiver<T> {
pub fn try_recv(&self) -> Result<T, TryRecvError> {
match self.channel.queue.pop() {
Ok(msg) => {
self.channel.send_ops.notify_additional(1);
Ok(msg)
}
Err(PopError::Empty) => Err(TryRecvError::Empty),
Err(PopError::Closed) => Err(TryRecvError::Closed),
}
}
pub fn recv(&self) -> Recv<'_, T> {
Recv::_new(RecvInner {
receiver: self,
listener: None,
_pin: PhantomPinned,
})
}
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub fn recv_blocking(&self) -> Result<T, RecvError> {
self.recv().wait()
}
pub fn close(&self) -> bool {
self.channel.close()
}
pub fn is_closed(&self) -> bool {
self.channel.queue.is_closed()
}
pub fn is_empty(&self) -> bool {
self.channel.queue.is_empty()
}
pub fn is_full(&self) -> bool {
self.channel.queue.is_full()
}
pub fn len(&self) -> usize {
self.channel.queue.len()
}
pub fn capacity(&self) -> Option<usize> {
self.channel.queue.capacity()
}
pub fn receiver_count(&self) -> usize {
self.channel.receiver_count.load(Ordering::SeqCst)
}
pub fn sender_count(&self) -> usize {
self.channel.sender_count.load(Ordering::SeqCst)
}
pub fn downgrade(&self) -> WeakReceiver<T> {
WeakReceiver {
channel: self.channel.clone(),
}
}
pub fn same_channel(&self, other: &Receiver<T>) -> bool {
Arc::ptr_eq(&self.channel, &other.channel)
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Receiver {{ .. }}")
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Receiver<T> {
let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
if count > usize::MAX / 2 {
abort();
}
Receiver {
channel: self.channel.clone(),
listener: None,
_pin: PhantomPinned,
}
}
}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
{
let this = self.as_mut().project();
if let Some(listener) = this.listener.as_mut() {
ready!(Pin::new(listener).poll(cx));
*this.listener = None;
}
}
loop {
match self.try_recv() {
Ok(msg) => {
let this = self.as_mut().project();
*this.listener = None;
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
let this = self.as_mut().project();
*this.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Empty) => {}
}
let this = self.as_mut().project();
if this.listener.is_some() {
break;
} else {
*this.listener = Some(this.channel.stream_ops.listen());
}
}
}
}
}
impl<T> futures_core::stream::FusedStream for Receiver<T> {
fn is_terminated(&self) -> bool {
self.channel.queue.is_closed() && self.channel.queue.is_empty()
}
}
pub struct WeakSender<T> {
channel: Arc<Channel<T>>,
}
impl<T> WeakSender<T> {
pub fn upgrade(&self) -> Option<Sender<T>> {
if self.channel.queue.is_closed() {
None
} else {
match self.channel.sender_count.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|count| if count == 0 { None } else { Some(count + 1) },
) {
Err(_) => None,
Ok(new_value) if new_value > usize::MAX / 2 => {
abort();
}
Ok(_) => Some(Sender {
channel: self.channel.clone(),
}),
}
}
}
}
impl<T> Clone for WeakSender<T> {
fn clone(&self) -> Self {
WeakSender {
channel: self.channel.clone(),
}
}
}
impl<T> fmt::Debug for WeakSender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WeakSender {{ .. }}")
}
}
pub struct WeakReceiver<T> {
channel: Arc<Channel<T>>,
}
impl<T> WeakReceiver<T> {
pub fn upgrade(&self) -> Option<Receiver<T>> {
if self.channel.queue.is_closed() {
None
} else {
match self.channel.receiver_count.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|count| if count == 0 { None } else { Some(count + 1) },
) {
Err(_) => None,
Ok(new_value) if new_value > usize::MAX / 2 => {
abort();
}
Ok(_) => Some(Receiver {
channel: self.channel.clone(),
listener: None,
_pin: PhantomPinned,
}),
}
}
}
}
impl<T> Clone for WeakReceiver<T> {
fn clone(&self) -> Self {
WeakReceiver {
channel: self.channel.clone(),
}
}
}
impl<T> fmt::Debug for WeakReceiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WeakReceiver {{ .. }}")
}
}
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(pub T);
impl<T> SendError<T> {
pub fn into_inner(self) -> T {
self.0
}
}
#[cfg(feature = "std")]
impl<T> std::error::Error for SendError<T> {}
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SendError(..)")
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "sending into a closed channel")
}
}
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum TrySendError<T> {
Full(T),
Closed(T),
}
impl<T> TrySendError<T> {
pub fn into_inner(self) -> T {
match self {
TrySendError::Full(t) => t,
TrySendError::Closed(t) => t,
}
}
pub fn is_full(&self) -> bool {
match self {
TrySendError::Full(_) => true,
TrySendError::Closed(_) => false,
}
}
pub fn is_closed(&self) -> bool {
match self {
TrySendError::Full(_) => false,
TrySendError::Closed(_) => true,
}
}
}
#[cfg(feature = "std")]
impl<T> std::error::Error for TrySendError<T> {}
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TrySendError::Full(..) => write!(f, "Full(..)"),
TrySendError::Closed(..) => write!(f, "Closed(..)"),
}
}
}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TrySendError::Full(..) => write!(f, "sending into a full channel"),
TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
}
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub struct RecvError;
#[cfg(feature = "std")]
impl std::error::Error for RecvError {}
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "receiving from an empty and closed channel")
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum TryRecvError {
Empty,
Closed,
}
impl TryRecvError {
pub fn is_empty(&self) -> bool {
match self {
TryRecvError::Empty => true,
TryRecvError::Closed => false,
}
}
pub fn is_closed(&self) -> bool {
match self {
TryRecvError::Empty => false,
TryRecvError::Closed => true,
}
}
}
#[cfg(feature = "std")]
impl std::error::Error for TryRecvError {}
impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TryRecvError::Empty => write!(f, "receiving from an empty channel"),
TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
}
}
}
easy_wrapper! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError<T>>);
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub(crate) wait();
}
pin_project! {
#[derive(Debug)]
#[project(!Unpin)]
struct SendInner<'a, T> {
sender: &'a Sender<T>,
msg: Option<T>,
listener: Option<EventListener>,
#[pin]
_pin: PhantomPinned
}
}
impl<T> EventListenerFuture for SendInner<'_, T> {
type Output = Result<(), SendError<T>>;
fn poll_with_strategy<'x, S: Strategy<'x>>(
self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Result<(), SendError<T>>> {
let this = self.project();
loop {
let msg = this.msg.take().unwrap();
match this.sender.try_send(msg) {
Ok(()) => return Poll::Ready(Ok(())),
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
Err(TrySendError::Full(m)) => *this.msg = Some(m),
}
if this.listener.is_some() {
ready!(S::poll(strategy, &mut *this.listener, context));
} else {
*this.listener = Some(this.sender.channel.send_ops.listen());
}
}
}
}
easy_wrapper! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Recv<'a, T>(RecvInner<'a, T> => Result<T, RecvError>);
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub(crate) wait();
}
pin_project! {
#[derive(Debug)]
#[project(!Unpin)]
struct RecvInner<'a, T> {
receiver: &'a Receiver<T>,
listener: Option<EventListener>,
#[pin]
_pin: PhantomPinned
}
}
impl<T> EventListenerFuture for RecvInner<'_, T> {
type Output = Result<T, RecvError>;
fn poll_with_strategy<'x, S: Strategy<'x>>(
self: Pin<&mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Result<T, RecvError>> {
let this = self.project();
loop {
match this.receiver.try_recv() {
Ok(msg) => return Poll::Ready(Ok(msg)),
Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
Err(TryRecvError::Empty) => {}
}
if this.listener.is_some() {
ready!(S::poll(strategy, &mut *this.listener, cx));
} else {
*this.listener = Some(this.receiver.channel.recv_ops.listen());
}
}
}
}
easy_wrapper! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Closed<'a, T>(ClosedInner<'a, T> => ());
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub(crate) wait();
}
pin_project! {
#[derive(Debug)]
#[project(!Unpin)]
struct ClosedInner<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
#[pin]
_pin: PhantomPinned
}
}
impl<'a, T> EventListenerFuture for ClosedInner<'a, T> {
type Output = ();
fn poll_with_strategy<'x, S: Strategy<'x>>(
self: Pin<&mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<()> {
let this = self.project();
loop {
if this.sender.is_closed() {
return Poll::Ready(());
}
if this.listener.is_some() {
ready!(S::poll(strategy, &mut *this.listener, cx));
} else {
*this.listener = Some(this.sender.channel.closed_ops.listen());
}
}
}
}
#[cfg(feature = "std")]
use std::process::abort;
#[cfg(not(feature = "std"))]
fn abort() -> ! {
struct PanicOnDrop;
impl Drop for PanicOnDrop {
fn drop(&mut self) {
panic!("Panic while panicking to abort");
}
}
let _bomb = PanicOnDrop;
panic!("Panic while panicking to abort")
}