#![cfg_attr(not(feature = "std"), no_std)]
#![allow(clippy::multiple_bound_locations)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![doc(
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
#![doc(
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
#[cfg(not(feature = "std"))]
extern crate alloc;
#[cfg(feature = "std")]
extern crate std as alloc;
#[cfg_attr(
any(feature = "std", feature = "critical-section"),
path = "intrusive.rs"
)]
#[cfg_attr(
not(any(feature = "std", feature = "critical-section")),
path = "slab.rs"
)]
mod sys;
mod notify;
#[cfg(not(feature = "std"))]
use alloc::boxed::Box;
use core::borrow::Borrow;
use core::fmt;
use core::future::Future;
use core::mem::ManuallyDrop;
use core::pin::Pin;
use core::ptr;
use core::task::{Context, Poll, Waker};
#[cfg(all(feature = "std", not(target_family = "wasm")))]
use {
parking::{Parker, Unparker},
std::time::{Duration, Instant},
};
use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use sync::Arc;
#[cfg(not(loom))]
use sync::WithMut;
use notify::NotificationPrivate;
pub use notify::{IntoNotification, Notification};
struct Inner<T> {
notified: AtomicUsize,
list: sys::List<T>,
}
impl<T> Inner<T> {
fn new() -> Self {
Self {
notified: AtomicUsize::new(usize::MAX),
list: sys::List::new(),
}
}
}
pub struct Event<T = ()> {
inner: AtomicPtr<Inner<T>>,
}
unsafe impl<T: Send> Send for Event<T> {}
unsafe impl<T: Send> Sync for Event<T> {}
impl<T> core::panic::UnwindSafe for Event<T> {}
impl<T> core::panic::RefUnwindSafe for Event<T> {}
impl<T> fmt::Debug for Event<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.try_inner() {
Some(inner) => {
let notified_count = inner.notified.load(Ordering::Relaxed);
let total_count = match inner.list.try_total_listeners() {
Some(total_count) => total_count,
None => {
return f
.debug_tuple("Event")
.field(&format_args!("<locked>"))
.finish()
}
};
f.debug_struct("Event")
.field("listeners_notified", ¬ified_count)
.field("listeners_total", &total_count)
.finish()
}
None => f
.debug_tuple("Event")
.field(&format_args!("<uninitialized>"))
.finish(),
}
}
}
impl Default for Event {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl<T> Event<T> {
#[cfg(all(feature = "std", not(loom)))]
#[inline]
pub const fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}
#[cfg(all(feature = "std", loom))]
#[inline]
pub fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}
#[inline]
pub fn is_notified(&self) -> bool {
self.try_inner()
.map_or(false, |inner| inner.notified.load(Ordering::Acquire) > 0)
}
#[cold]
pub fn listen(&self) -> EventListener<T> {
let inner = ManuallyDrop::new(unsafe { Arc::from_raw(self.inner()) });
let mut listener = Box::pin(InnerListener {
event: Arc::clone(&inner),
listener: None,
});
listener.as_mut().listen();
EventListener { listener }
}
#[inline]
pub fn notify(&self, notify: impl IntoNotification<Tag = T>) -> usize {
let notify = notify.into_notification();
notify.fence(notify::Internal::new());
let inner = unsafe { &*self.inner() };
inner.notify(notify)
}
#[inline]
fn try_inner(&self) -> Option<&Inner<T>> {
let inner = self.inner.load(Ordering::Acquire);
unsafe { inner.as_ref() }
}
fn inner(&self) -> *const Inner<T> {
let mut inner = self.inner.load(Ordering::Acquire);
if inner.is_null() {
let new = Arc::new(Inner::<T>::new());
let new = Arc::into_raw(new) as *mut Inner<T>;
inner = self
.inner
.compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire)
.unwrap_or_else(|x| x);
if inner.is_null() {
inner = new;
} else {
unsafe {
drop(Arc::from_raw(new));
}
}
}
inner
}
#[cfg(feature = "std")]
#[inline]
pub fn total_listeners(&self) -> usize {
if let Some(inner) = self.try_inner() {
inner.list.total_listeners()
} else {
0
}
}
}
impl Event<()> {
#[inline]
#[cfg(not(loom))]
pub const fn new() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}
#[inline]
#[cfg(loom)]
pub fn new() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}
#[inline]
pub fn notify_relaxed(&self, n: usize) -> usize {
self.notify(n.relaxed())
}
#[inline]
pub fn notify_additional(&self, n: usize) -> usize {
self.notify(n.additional())
}
#[inline]
pub fn notify_additional_relaxed(&self, n: usize) -> usize {
self.notify(n.additional().relaxed())
}
}
impl<T> Drop for Event<T> {
#[inline]
fn drop(&mut self) {
self.inner.with_mut(|&mut inner| {
if !inner.is_null() {
unsafe {
drop(Arc::from_raw(inner));
}
}
})
}
}
pub trait Listener<T = ()>: Future<Output = T> + __sealed::Sealed {
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait(self) -> T;
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_timeout(self, timeout: Duration) -> Option<T>;
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_deadline(self, deadline: Instant) -> Option<T>;
fn discard(self) -> bool;
fn listens_to(&self, event: &Event<T>) -> bool;
fn same_event(&self, other: &Self) -> bool;
}
macro_rules! forward_impl_to_listener {
($gen:ident => $ty:ty) => {
impl<$gen> crate::Listener<$gen> for $ty {
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait(mut self) -> $gen {
self.listener_mut().wait_internal(None).unwrap()
}
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_timeout(mut self, timeout: std::time::Duration) -> Option<$gen> {
self.listener_mut()
.wait_internal(std::time::Instant::now().checked_add(timeout))
}
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_deadline(mut self, deadline: std::time::Instant) -> Option<$gen> {
self.listener_mut().wait_internal(Some(deadline))
}
fn discard(mut self) -> bool {
self.listener_mut().discard()
}
#[inline]
fn listens_to(&self, event: &Event<$gen>) -> bool {
core::ptr::eq::<Inner<$gen>>(
&*self.listener().event,
event.inner.load(core::sync::atomic::Ordering::Acquire),
)
}
#[inline]
fn same_event(&self, other: &$ty) -> bool {
core::ptr::eq::<Inner<$gen>>(&*self.listener().event, &*other.listener().event)
}
}
impl<$gen> Future for $ty {
type Output = $gen;
#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<$gen> {
self.listener_mut().poll_internal(cx)
}
}
};
}
pub struct EventListener<T = ()> {
listener: Pin<Box<InnerListener<T, Arc<Inner<T>>>>>,
}
unsafe impl<T: Send> Send for EventListener<T> {}
unsafe impl<T: Send> Sync for EventListener<T> {}
impl<T> core::panic::UnwindSafe for EventListener<T> {}
impl<T> core::panic::RefUnwindSafe for EventListener<T> {}
impl<T> Unpin for EventListener<T> {}
impl<T> fmt::Debug for EventListener<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventListener").finish_non_exhaustive()
}
}
impl<T> EventListener<T> {
#[inline]
fn listener(&self) -> &InnerListener<T, Arc<Inner<T>>> {
&self.listener
}
#[inline]
fn listener_mut(&mut self) -> Pin<&mut InnerListener<T, Arc<Inner<T>>>> {
self.listener.as_mut()
}
}
forward_impl_to_listener! { T => EventListener<T> }
#[macro_export]
macro_rules! listener {
($event:expr => $listener:ident) => {
let mut $listener = $crate::__private::StackSlot::new(&$event);
let mut $listener = unsafe { $crate::__private::Pin::new_unchecked(&mut $listener) };
#[allow(unused_mut)]
let mut $listener = $listener.listen();
};
}
pin_project_lite::pin_project! {
#[project(!Unpin)]
#[project = ListenerProject]
struct InnerListener<T, B: Borrow<Inner<T>>>
where
B: Unpin,
{
event: B,
#[pin]
listener: Option<sys::Listener<T>>,
}
impl<T, B: Borrow<Inner<T>>> PinnedDrop for InnerListener<T, B>
where
B: Unpin,
{
fn drop(mut this: Pin<&mut Self>) {
let this = this.project();
(*this.event).borrow().remove(this.listener, true);
}
}
}
unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Send> Send for InnerListener<T, B> {}
unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Sync> Sync for InnerListener<T, B> {}
impl<T, B: Borrow<Inner<T>> + Unpin> InnerListener<T, B> {
#[inline]
fn listen(self: Pin<&mut Self>) {
let this = self.project();
(*this.event).borrow().insert(this.listener);
}
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_internal(mut self: Pin<&mut Self>, deadline: Option<Instant>) -> Option<T> {
fn parker_and_task() -> (Parker, Task) {
let parker = Parker::new();
let unparker = parker.unparker();
(parker, Task::Unparker(unparker))
}
crate::sync::thread_local! {
static PARKER: (Parker, Task) = parker_and_task();
}
PARKER
.try_with({
let this = self.as_mut();
|(parker, unparker)| this.wait_with_parker(deadline, parker, unparker.as_task_ref())
})
.unwrap_or_else(|_| {
let (parker, unparker) = parking::pair();
self.as_mut()
.wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker))
})
}
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_with_parker(
self: Pin<&mut Self>,
deadline: Option<Instant>,
parker: &Parker,
unparker: TaskRef<'_>,
) -> Option<T> {
let mut this = self.project();
let inner = (*this.event).borrow();
if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() {
return Some(tag);
}
loop {
match deadline {
None => parker.park(),
#[cfg(loom)]
Some(_deadline) => {
panic!("parking does not support timeouts under loom");
}
#[cfg(not(loom))]
Some(deadline) => {
let now = Instant::now();
if now >= deadline {
return inner
.remove(this.listener.as_mut(), false)
.expect("We never removed ourself from the list")
.notified();
}
parker.park_deadline(deadline);
}
}
if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() {
return Some(tag);
}
}
}
fn discard(self: Pin<&mut Self>) -> bool {
let this = self.project();
(*this.event)
.borrow()
.remove(this.listener, false)
.map_or(false, |state| state.is_notified())
}
fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let this = self.project();
let inner = (*this.event).borrow();
match inner
.register(this.listener, TaskRef::Waker(cx.waker()))
.notified()
{
Some(tag) => {
Poll::Ready(tag)
}
None => {
Poll::Pending
}
}
}
}
#[derive(PartialEq)]
enum State<T> {
Created,
Notified {
additional: bool,
tag: T,
},
Task(Task),
NotifiedTaken,
}
impl<T> fmt::Debug for State<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Created => f.write_str("Created"),
Self::Notified { additional, .. } => f
.debug_struct("Notified")
.field("additional", additional)
.finish(),
Self::Task(_) => f.write_str("Task(_)"),
Self::NotifiedTaken => f.write_str("NotifiedTaken"),
}
}
}
impl<T> State<T> {
fn is_notified(&self) -> bool {
matches!(self, Self::Notified { .. } | Self::NotifiedTaken)
}
#[allow(unused)]
fn notified(self) -> Option<T> {
match self {
Self::Notified { tag, .. } => Some(tag),
Self::NotifiedTaken => panic!("listener was already notified but taken"),
_ => None,
}
}
}
#[derive(Debug, PartialEq)]
enum RegisterResult<T> {
Notified(T),
Registered,
NeverInserted,
}
impl<T> RegisterResult<T> {
fn notified(self) -> Option<T> {
match self {
Self::Notified(tag) => Some(tag),
Self::Registered => None,
Self::NeverInserted => panic!("{}", NEVER_INSERTED_PANIC),
}
}
}
#[derive(Debug, Clone)]
enum Task {
Waker(Waker),
#[cfg(all(feature = "std", not(target_family = "wasm")))]
Unparker(Unparker),
}
impl Task {
fn as_task_ref(&self) -> TaskRef<'_> {
match self {
Self::Waker(waker) => TaskRef::Waker(waker),
#[cfg(all(feature = "std", not(target_family = "wasm")))]
Self::Unparker(unparker) => TaskRef::Unparker(unparker),
}
}
fn wake(self) {
match self {
Self::Waker(waker) => waker.wake(),
#[cfg(all(feature = "std", not(target_family = "wasm")))]
Self::Unparker(unparker) => {
unparker.unpark();
}
}
}
}
impl PartialEq for Task {
fn eq(&self, other: &Self) -> bool {
self.as_task_ref().will_wake(other.as_task_ref())
}
}
#[derive(Clone, Copy)]
enum TaskRef<'a> {
Waker(&'a Waker),
#[cfg(all(feature = "std", not(target_family = "wasm")))]
Unparker(&'a Unparker),
}
impl TaskRef<'_> {
#[allow(unreachable_patterns)]
fn will_wake(self, other: Self) -> bool {
match (self, other) {
(Self::Waker(a), Self::Waker(b)) => a.will_wake(b),
#[cfg(all(feature = "std", not(target_family = "wasm")))]
(Self::Unparker(_), Self::Unparker(_)) => {
false
}
_ => false,
}
}
fn into_task(self) -> Task {
match self {
Self::Waker(waker) => Task::Waker(waker.clone()),
#[cfg(all(feature = "std", not(target_family = "wasm")))]
Self::Unparker(unparker) => Task::Unparker(unparker.clone()),
}
}
}
const NEVER_INSERTED_PANIC: &str = "\
EventListener was not inserted into the linked list, make sure you're not polling \
EventListener/listener! after it has finished";
#[cfg(not(loom))]
mod sync {
#[cfg(not(feature = "portable-atomic"))]
pub(super) use alloc::sync::Arc;
#[cfg(not(feature = "portable-atomic"))]
pub(super) use core::sync::atomic;
#[cfg(feature = "portable-atomic")]
pub(super) use portable_atomic_crate as atomic;
#[cfg(feature = "portable-atomic")]
pub(super) use portable_atomic_util::Arc;
#[allow(unused)]
#[cfg(all(feature = "std", not(feature = "critical-section"), not(loom)))]
pub(super) use std::sync::{Mutex, MutexGuard};
#[cfg(all(feature = "std", not(target_family = "wasm"), not(loom)))]
pub(super) use std::thread_local;
pub(super) trait WithMut {
type Output;
fn with_mut<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Self::Output) -> R;
}
impl<T> WithMut for atomic::AtomicPtr<T> {
type Output = *mut T;
#[inline]
fn with_mut<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Self::Output) -> R,
{
f(self.get_mut())
}
}
pub(crate) mod cell {
pub(crate) use core::cell::Cell;
pub(crate) struct ConstPtr<T>(*mut T);
impl<T> ConstPtr<T> {
pub(crate) unsafe fn deref(&self) -> &T {
&*self.0
}
#[allow(unused)] pub(crate) unsafe fn deref_mut(&mut self) -> &mut T {
&mut *self.0
}
}
#[derive(Debug, Default)]
pub(crate) struct UnsafeCell<T>(core::cell::UnsafeCell<T>);
impl<T> UnsafeCell<T> {
pub(crate) fn new(data: T) -> UnsafeCell<T> {
UnsafeCell(core::cell::UnsafeCell::new(data))
}
pub(crate) fn get(&self) -> ConstPtr<T> {
ConstPtr(self.0.get())
}
#[allow(dead_code)] pub(crate) fn into_inner(self) -> T {
self.0.into_inner()
}
}
}
}
#[cfg(loom)]
mod sync {
pub(super) use loom::sync::{atomic, Arc, Mutex, MutexGuard};
pub(super) use loom::{cell, thread_local};
}
fn __test_send_and_sync() {
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
_assert_send::<crate::__private::StackSlot<'_, ()>>();
_assert_sync::<crate::__private::StackSlot<'_, ()>>();
_assert_send::<crate::__private::StackListener<'_, '_, ()>>();
_assert_sync::<crate::__private::StackListener<'_, '_, ()>>();
_assert_send::<Event<()>>();
_assert_sync::<Event<()>>();
_assert_send::<EventListener<()>>();
_assert_sync::<EventListener<()>>();
}
#[doc(hidden)]
mod __sealed {
use super::{EventListener, __private::StackListener};
pub trait Sealed {}
impl<T> Sealed for EventListener<T> {}
impl<T> Sealed for StackListener<'_, '_, T> {}
}
#[doc(hidden)]
pub mod __private {
pub use core::pin::Pin;
use super::{Event, Inner, InnerListener};
use core::fmt;
use core::future::Future;
use core::task::{Context, Poll};
pin_project_lite::pin_project! {
#[doc(hidden)]
#[project(!Unpin)]
pub struct StackSlot<'ev, T> {
#[pin]
listener: InnerListener<T, &'ev Inner<T>>
}
}
impl<T> fmt::Debug for StackSlot<'_, T> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StackSlot").finish_non_exhaustive()
}
}
impl<T> core::panic::UnwindSafe for StackSlot<'_, T> {}
impl<T> core::panic::RefUnwindSafe for StackSlot<'_, T> {}
unsafe impl<T> Send for StackSlot<'_, T> {}
unsafe impl<T> Sync for StackSlot<'_, T> {}
impl<'ev, T> StackSlot<'ev, T> {
#[inline]
#[doc(hidden)]
pub fn new(event: &'ev Event<T>) -> Self {
let inner = unsafe { &*event.inner() };
Self {
listener: InnerListener {
event: inner,
listener: None,
},
}
}
#[inline]
#[doc(hidden)]
pub fn listen(mut self: Pin<&mut Self>) -> StackListener<'ev, '_, T> {
self.as_mut().project().listener.listen();
StackListener { slot: self }
}
}
#[doc(hidden)]
pub struct StackListener<'ev, 'stack, T> {
slot: Pin<&'stack mut StackSlot<'ev, T>>,
}
impl<T> core::panic::UnwindSafe for StackListener<'_, '_, T> {}
impl<T> core::panic::RefUnwindSafe for StackListener<'_, '_, T> {}
impl<T> Unpin for StackListener<'_, '_, T> {}
impl<T> fmt::Debug for StackListener<'_, '_, T> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StackListener").finish_non_exhaustive()
}
}
impl<'ev, T> StackListener<'ev, '_, T> {
#[inline]
fn listener(&self) -> &InnerListener<T, &'ev Inner<T>> {
&self.slot.listener
}
#[inline]
fn listener_mut(&mut self) -> Pin<&mut InnerListener<T, &'ev Inner<T>>> {
self.slot.as_mut().project().listener
}
}
forward_impl_to_listener! { T => StackListener<'_, '_, T> }
}