#![doc(html_favicon_url = "https://zng-ui.github.io/res/zng-logo-icon.png")]
#![doc(html_logo_url = "https://zng-ui.github.io/res/zng-logo.png")]
#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
#![warn(unused_extern_crates)]
#![warn(missing_docs)]
use std::{
any::Any,
fmt,
hash::Hash,
mem, panic,
pin::Pin,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
task::Poll,
};
#[doc(no_inline)]
pub use parking_lot;
use parking_lot::Mutex;
use zng_app_context::{LocalContext, app_local};
use zng_time::Deadline;
use zng_var::{ResponseVar, VarValue, response_done_var, response_var};
#[cfg(test)]
mod tests;
#[doc(no_inline)]
pub use rayon;
pub mod channel;
pub mod fs;
pub mod io;
mod ui;
pub mod http;
pub mod process;
mod rayon_ctx;
pub use rayon_ctx::*;
pub use ui::*;
mod progress;
pub use progress::*;
pub fn spawn<F>(task: impl IntoFuture<IntoFuture = F>)
where
F: Future<Output = ()> + Send + 'static,
{
Arc::new(RayonTask {
ctx: LocalContext::capture(),
fut: Mutex::new(Some(Box::pin(task.into_future()))),
})
.poll()
}
pub fn poll_spawn<F>(task: impl IntoFuture<IntoFuture = F>)
where
F: Future<Output = ()> + Send + 'static,
{
struct PollRayonTask {
fut: Mutex<Option<(RayonSpawnFut, Option<LocalContext>)>>,
}
impl PollRayonTask {
fn poll(self: Arc<Self>) {
let mut task = self.fut.lock();
let (mut t, _) = task.take().unwrap();
let waker = self.clone().into();
match t.as_mut().poll(&mut std::task::Context::from_waker(&waker)) {
Poll::Ready(()) => {}
Poll::Pending => {
let ctx = LocalContext::capture();
*task = Some((t, Some(ctx)));
}
}
}
}
impl std::task::Wake for PollRayonTask {
fn wake(self: Arc<Self>) {
if let Some((task, Some(ctx))) = self.fut.lock().take() {
Arc::new(RayonTask {
ctx,
fut: Mutex::new(Some(Box::pin(task))),
})
.poll();
}
}
}
Arc::new(PollRayonTask {
fut: Mutex::new(Some((Box::pin(task.into_future()), None))),
})
.poll()
}
type RayonSpawnFut = Pin<Box<dyn Future<Output = ()> + Send>>;
struct RayonTask {
ctx: LocalContext,
fut: Mutex<Option<RayonSpawnFut>>,
}
impl RayonTask {
fn poll(self: Arc<Self>) {
rayon::spawn(move || {
let mut task = self.fut.lock();
if let Some(mut t) = task.take() {
let waker = self.clone().into();
self.ctx.clone().with_context(move || {
let r = panic::catch_unwind(panic::AssertUnwindSafe(move || {
if t.as_mut().poll(&mut std::task::Context::from_waker(&waker)).is_pending() {
*task = Some(t);
}
}));
if let Err(p) = r {
let p = TaskPanicError::new(p);
tracing::error!("panic in `task::spawn`: {}", p.panic_str().unwrap_or(""));
on_spawn_panic(p);
}
});
}
})
}
}
impl std::task::Wake for RayonTask {
fn wake(self: Arc<Self>) {
self.poll()
}
}
pub fn join<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
where
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
self::join_context(move |_| op_a(), move |_| op_b())
}
pub fn join_context<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
where
A: FnOnce(rayon::FnContext) -> RA + Send,
B: FnOnce(rayon::FnContext) -> RB + Send,
RA: Send,
RB: Send,
{
let ctx = LocalContext::capture();
let ctx = &ctx;
rayon::join_context(
move |a| {
if a.migrated() {
ctx.clone().with_context(|| op_a(a))
} else {
op_a(a)
}
},
move |b| {
if b.migrated() {
ctx.clone().with_context(|| op_b(b))
} else {
op_b(b)
}
},
)
}
pub fn scope<'scope, OP, R>(op: OP) -> R
where
OP: FnOnce(ScopeCtx<'_, 'scope>) -> R + Send,
R: Send,
{
let ctx = LocalContext::capture();
let ctx_ref: &'_ LocalContext = &ctx;
let ctx_scope_ref: &'scope LocalContext = unsafe { std::mem::transmute(ctx_ref) };
let r = rayon::scope(move |s| {
op(ScopeCtx {
scope: s,
ctx: ctx_scope_ref,
})
});
drop(ctx);
r
}
#[derive(Clone, Copy, Debug)]
pub struct ScopeCtx<'a, 'scope: 'a> {
scope: &'a rayon::Scope<'scope>,
ctx: &'scope LocalContext,
}
impl<'a, 'scope: 'a> ScopeCtx<'a, 'scope> {
pub fn spawn<F>(self, f: F)
where
F: FnOnce(ScopeCtx<'_, 'scope>) + Send + 'scope,
{
let ctx = self.ctx;
self.scope
.spawn(move |s| ctx.clone().with_context(move || f(ScopeCtx { scope: s, ctx })));
}
}
pub async fn run<R, T>(task: impl IntoFuture<IntoFuture = T>) -> R
where
R: Send + 'static,
T: Future<Output = R> + Send + 'static,
{
match run_catch(task).await {
Ok(r) => r,
Err(p) => panic::resume_unwind(p.payload),
}
}
pub async fn run_catch<R, T>(task: impl IntoFuture<IntoFuture = T>) -> Result<R, TaskPanicError>
where
R: Send + 'static,
T: Future<Output = R> + Send + 'static,
{
type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
struct RayonCatchTask<R> {
ctx: LocalContext,
fut: Mutex<Option<Fut<R>>>,
sender: flume::Sender<Result<R, TaskPanicError>>,
}
impl<R: Send + 'static> RayonCatchTask<R> {
fn poll(self: Arc<Self>) {
let sender = self.sender.clone();
if sender.is_disconnected() {
return; }
rayon::spawn(move || {
let mut task = self.fut.lock();
if let Some(mut t) = task.take() {
let waker = self.clone().into();
let mut cx = std::task::Context::from_waker(&waker);
self.ctx.clone().with_context(|| {
let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
match r {
Ok(Poll::Ready(r)) => {
drop(task);
let _ = sender.send(Ok(r));
}
Ok(Poll::Pending) => {
*task = Some(t);
}
Err(p) => {
drop(task);
let _ = sender.send(Err(TaskPanicError::new(p)));
}
}
});
}
})
}
}
impl<R: Send + 'static> std::task::Wake for RayonCatchTask<R> {
fn wake(self: Arc<Self>) {
self.poll()
}
}
let (sender, receiver) = channel::bounded(1);
Arc::new(RayonCatchTask {
ctx: LocalContext::capture(),
fut: Mutex::new(Some(Box::pin(task.into_future()))),
sender: sender.into(),
})
.poll();
receiver.recv().await.unwrap()
}
pub fn respond<R, F>(task: F) -> ResponseVar<R>
where
R: VarValue,
F: Future<Output = R> + Send + 'static,
{
type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
let (responder, response) = response_var();
struct RayonRespondTask<R: VarValue> {
ctx: LocalContext,
fut: Mutex<Option<Fut<R>>>,
responder: zng_var::ResponderVar<R>,
}
impl<R: VarValue> RayonRespondTask<R> {
fn poll(self: Arc<Self>) {
let responder = self.responder.clone();
if responder.strong_count() == 2 {
return; }
rayon::spawn(move || {
let mut task = self.fut.lock();
if let Some(mut t) = task.take() {
let waker = self.clone().into();
let mut cx = std::task::Context::from_waker(&waker);
self.ctx.clone().with_context(|| {
let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
match r {
Ok(Poll::Ready(r)) => {
drop(task);
responder.respond(r);
}
Ok(Poll::Pending) => {
*task = Some(t);
}
Err(p) => {
let p = TaskPanicError::new(p);
tracing::error!("panic in `task::respond`: {}", p.panic_str().unwrap_or(""));
drop(task);
responder.modify(move |_| panic::resume_unwind(p.payload));
}
}
});
}
})
}
}
impl<R: VarValue> std::task::Wake for RayonRespondTask<R> {
fn wake(self: Arc<Self>) {
self.poll()
}
}
Arc::new(RayonRespondTask {
ctx: LocalContext::capture(),
fut: Mutex::new(Some(Box::pin(task))),
responder,
})
.poll();
response
}
pub fn poll_respond<R, F>(task: impl IntoFuture<IntoFuture = F>) -> ResponseVar<R>
where
R: VarValue,
F: Future<Output = R> + Send + 'static,
{
enum QuickResponse<R: VarValue> {
Quick(Option<R>),
Response(zng_var::ResponderVar<R>),
}
let task = task.into_future();
let q = Arc::new(Mutex::new(QuickResponse::Quick(None)));
poll_spawn(zng_clone_move::async_clmv!(q, {
let rsp = task.await;
match &mut *q.lock() {
QuickResponse::Quick(q) => *q = Some(rsp),
QuickResponse::Response(r) => r.respond(rsp),
}
}));
let mut q = q.lock();
match &mut *q {
QuickResponse::Quick(q) if q.is_some() => response_done_var(q.take().unwrap()),
_ => {
let (responder, response) = response_var();
*q = QuickResponse::Response(responder);
response
}
}
}
pub async fn wait<T, F>(task: F) -> T
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
match wait_catch(task).await {
Ok(r) => r,
Err(p) => panic::resume_unwind(p.payload),
}
}
pub async fn wait_catch<T, F>(task: F) -> Result<T, TaskPanicError>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let mut ctx = LocalContext::capture();
blocking::unblock(move || ctx.with_context(move || panic::catch_unwind(panic::AssertUnwindSafe(task))))
.await
.map_err(TaskPanicError::new)
}
pub fn spawn_wait<F>(task: F)
where
F: FnOnce() + Send + 'static,
{
spawn(async move {
if let Err(p) = wait_catch(task).await {
tracing::error!("parallel `spawn_wait` task panicked: {}", p.panic_str().unwrap_or(""));
on_spawn_panic(p);
}
});
}
pub fn wait_respond<R, F>(task: F) -> ResponseVar<R>
where
R: VarValue,
F: FnOnce() -> R + Send + 'static,
{
let (responder, response) = response_var();
spawn_wait(move || match panic::catch_unwind(panic::AssertUnwindSafe(task)) {
Ok(r) => responder.respond(r),
Err(p) => {
let p = TaskPanicError::new(p);
tracing::error!("panic in `task::wait_respond`: {}", p.panic_str().unwrap_or(""));
responder.modify(move |_| panic::resume_unwind(p.payload));
}
});
response
}
pub fn block_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
where
F: Future,
{
futures_lite::future::block_on(task.into_future())
}
#[cfg(any(test, doc, feature = "test_util"))]
pub fn spin_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
where
F: Future,
{
use std::pin::pin;
let mut task = pin!(task.into_future());
block_on(future_fn(|cx| match task.as_mut().poll(cx) {
Poll::Ready(r) => Poll::Ready(r),
Poll::Pending => {
cx.waker().wake_by_ref();
Poll::Pending
}
}))
}
#[cfg(any(test, doc, feature = "test_util"))]
pub fn doc_test<F>(spin: bool, task: impl IntoFuture<IntoFuture = F>) -> F::Output
where
F: Future,
{
use zng_unit::TimeUnits;
if spin {
spin_on(with_deadline(task, 500.ms())).expect("async doc-test timeout")
} else {
block_on(with_deadline(task, 5.secs())).expect("async doc-test timeout")
}
}
pub async fn yield_now() {
struct YieldNowFut(bool);
impl Future for YieldNowFut {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if self.0 {
Poll::Ready(())
} else {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
YieldNowFut(false).await
}
pub fn deadline(deadline: impl Into<Deadline>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
let deadline = deadline.into();
if zng_app_context::LocalContext::current_app().is_some() {
DEADLINE_SV.read().0(deadline)
} else {
default_deadline(deadline)
}
}
app_local! {
static DEADLINE_SV: (DeadlineService, bool) = const { (default_deadline, false) };
}
type DeadlineService = fn(Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
fn default_deadline(deadline: Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
if let Some(timeout) = deadline.time_left() {
Box::pin(futures_timer::Delay::new(timeout))
} else {
Box::pin(std::future::ready(()))
}
}
#[expect(non_camel_case_types)]
pub struct DEADLINE_APP;
impl DEADLINE_APP {
pub fn init_deadline_service(&self, service: DeadlineService) {
let (prev, already_set) = mem::replace(&mut *DEADLINE_SV.write(), (service, true));
if already_set {
*DEADLINE_SV.write() = (prev, true);
panic!("deadline service already inited for this app");
}
}
}
pub async fn future_fn<T, F>(fn_: F) -> T
where
F: FnMut(&mut std::task::Context) -> Poll<T>,
{
struct PollFn<F>(F);
impl<F> Unpin for PollFn<F> {}
impl<T, F: FnMut(&mut std::task::Context<'_>) -> Poll<T>> Future for PollFn<F> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
(self.0)(cx)
}
}
PollFn(fn_).await
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub struct DeadlineError {}
impl fmt::Display for DeadlineError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "reached deadline")
}
}
impl std::error::Error for DeadlineError {}
pub async fn with_deadline<O, F: Future<Output = O>>(
fut: impl IntoFuture<IntoFuture = F>,
deadline: impl Into<Deadline>,
) -> Result<F::Output, DeadlineError> {
let deadline = deadline.into();
any!(async { Ok(fut.await) }, async {
self::deadline(deadline).await;
Err(DeadlineError {})
})
.await
}
#[macro_export]
macro_rules! all {
($fut0:expr $(,)?) => { $crate::__all! { fut0: $fut0; } };
($fut0:expr, $fut1:expr $(,)?) => {
$crate::__all! {
fut0: $fut0;
fut1: $fut1;
}
};
($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
$crate::__all! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
$crate::__all! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
$crate::__all! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
$crate::__all! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
$crate::__all! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
fut6: $fut6;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
$crate::__all! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
fut6: $fut6;
fut7: $fut7;
}
};
($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all; $($fut),+ } }
}
#[doc(hidden)]
#[macro_export]
macro_rules! __all {
($($ident:ident: $fut:expr;)+) => {
{
$(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
$crate::future_fn(move |cx| {
use std::task::Poll;
let mut pending = false;
$(
if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
$ident = $crate::FutureOrOutput::Output(r);
} else {
pending = true;
}
}
)+
if pending {
Poll::Pending
} else {
Poll::Ready(($($ident.take_output()),+))
}
})
}
}
}
#[doc(hidden)]
pub enum FutureOrOutput<F: Future> {
Future(F),
Output(F::Output),
Taken,
}
impl<F: Future> FutureOrOutput<F> {
pub fn take_output(&mut self) -> F::Output {
match std::mem::replace(self, Self::Taken) {
FutureOrOutput::Output(o) => o,
_ => unreachable!(),
}
}
}
pub async fn all<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> Vec<F::Output> {
let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
future_fn(move |cx| {
let mut pending = false;
for input in &mut futures {
if let FutureOrOutput::Future(fut) = input {
let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
*input = FutureOrOutput::Output(r);
} else {
pending = true;
}
}
}
if pending {
Poll::Pending
} else {
Poll::Ready(futures.iter_mut().map(FutureOrOutput::take_output).collect())
}
})
.await
}
#[macro_export]
macro_rules! any {
($fut0:expr $(,)?) => { $crate::__any! { fut0: $fut0; } };
($fut0:expr, $fut1:expr $(,)?) => {
$crate::__any! {
fut0: $fut0;
fut1: $fut1;
}
};
($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
$crate::__any! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
$crate::__any! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
$crate::__any! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
$crate::__any! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
$crate::__any! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
fut6: $fut6;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
$crate::__any! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
fut6: $fut6;
fut7: $fut7;
}
};
($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any; $($fut),+ } }
}
#[doc(hidden)]
#[macro_export]
macro_rules! __any {
($($ident:ident: $fut:expr;)+) => {
{
$(let mut $ident = std::future::IntoFuture::into_future($fut);)+
$crate::future_fn(move |cx| {
use std::task::Poll;
$(
let mut $ident = unsafe { std::pin::Pin::new_unchecked(&mut $ident) };
if let Poll::Ready(r) = $ident.as_mut().poll(cx) {
return Poll::Ready(r)
}
)+
Poll::Pending
})
}
}
}
#[doc(hidden)]
pub use zng_task_proc_macros::task_any_all as __proc_any_all;
pub async fn any<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> F::Output {
let mut futures: Vec<_> = futures.into_iter().map(IntoFuture::into_future).collect();
future_fn(move |cx| {
for fut in &mut futures {
let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
return Poll::Ready(r);
}
}
Poll::Pending
})
.await
}
#[macro_export]
macro_rules! any_ok {
($fut0:expr $(,)?) => { $crate::__any_ok! { fut0: $fut0; } };
($fut0:expr, $fut1:expr $(,)?) => {
$crate::__any_ok! {
fut0: $fut0;
fut1: $fut1;
}
};
($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
$crate::__any_ok! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
$crate::__any_ok! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
$crate::__any_ok! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
$crate::__any_ok! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
$crate::__any_ok! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
fut6: $fut6;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
$crate::__any_ok! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
fut6: $fut6;
fut7: $fut7;
}
};
($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_ok; $($fut),+ } }
}
#[doc(hidden)]
#[macro_export]
macro_rules! __any_ok {
($($ident:ident: $fut: expr;)+) => {
{
$(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
$crate::future_fn(move |cx| {
use std::task::Poll;
let mut pending = false;
$(
if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
if let Poll::Ready(r) = fut.as_mut().poll(cx) {
match r {
Ok(r) => return Poll::Ready(Ok(r)),
Err(e) => {
$ident = $crate::FutureOrOutput::Output(Err(e));
}
}
} else {
pending = true;
}
}
)+
if pending {
Poll::Pending
} else {
Poll::Ready(Err((
$($ident.take_output().unwrap_err()),+
)))
}
})
}
}
}
pub async fn any_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Ok, Vec<Err>> {
let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
future_fn(move |cx| {
let mut pending = false;
for input in &mut futures {
if let FutureOrOutput::Future(fut) = input {
let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
match r {
Ok(r) => return Poll::Ready(Ok(r)),
Err(e) => *input = FutureOrOutput::Output(Err(e)),
}
} else {
pending = true;
}
}
}
if pending {
Poll::Pending
} else {
Poll::Ready(Err(futures
.iter_mut()
.map(|f| match f.take_output() {
Ok(_) => unreachable!(),
Err(e) => e,
})
.collect()))
}
})
.await
}
#[macro_export]
macro_rules! any_some {
($fut0:expr $(,)?) => { $crate::__any_some! { fut0: $fut0; } };
($fut0:expr, $fut1:expr $(,)?) => {
$crate::__any_some! {
fut0: $fut0;
fut1: $fut1;
}
};
($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
$crate::__any_some! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
$crate::__any_some! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
$crate::__any_some! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
$crate::__any_some! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
$crate::__any_some! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
fut6: $fut6;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
$crate::__any_some! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
fut6: $fut6;
fut7: $fut7;
}
};
($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_some; $($fut),+ } }
}
#[doc(hidden)]
#[macro_export]
macro_rules! __any_some {
($($ident:ident: $fut: expr;)+) => {
{
$(let mut $ident = Some(std::future::IntoFuture::into_future($fut));)+
$crate::future_fn(move |cx| {
use std::task::Poll;
let mut pending = false;
$(
if let Some(fut) = $ident.as_mut() {
let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
if let Poll::Ready(r) = fut.as_mut().poll(cx) {
if let Some(r) = r {
return Poll::Ready(Some(r));
}
$ident = None;
} else {
pending = true;
}
}
)+
if pending {
Poll::Pending
} else {
Poll::Ready(None)
}
})
}
}
}
pub async fn any_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Some> {
let mut futures: Vec<_> = futures.into_iter().map(|f| Some(f.into_future())).collect();
future_fn(move |cx| {
let mut pending = false;
for input in &mut futures {
if let Some(fut) = input {
let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
match r {
Some(r) => return Poll::Ready(Some(r)),
None => *input = None,
}
} else {
pending = true;
}
}
}
if pending { Poll::Pending } else { Poll::Ready(None) }
})
.await
}
#[macro_export]
macro_rules! all_ok {
($fut0:expr $(,)?) => { $crate::__all_ok! { fut0: $fut0; } };
($fut0:expr, $fut1:expr $(,)?) => {
$crate::__all_ok! {
fut0: $fut0;
fut1: $fut1;
}
};
($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
$crate::__all_ok! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
$crate::__all_ok! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
$crate::__all_ok! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
$crate::__all_ok! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
$crate::__all_ok! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
fut6: $fut6;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
$crate::__all_ok! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
fut6: $fut6;
fut7: $fut7;
}
};
($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_ok; $($fut),+ } }
}
#[doc(hidden)]
#[macro_export]
macro_rules! __all_ok {
($($ident:ident: $fut: expr;)+) => {
{
$(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
$crate::future_fn(move |cx| {
use std::task::Poll;
let mut pending = false;
$(
if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
if let Poll::Ready(r) = fut.as_mut().poll(cx) {
match r {
Ok(r) => {
$ident = $crate::FutureOrOutput::Output(Ok(r))
},
Err(e) => return Poll::Ready(Err(e)),
}
} else {
pending = true;
}
}
)+
if pending {
Poll::Pending
} else {
Poll::Ready(Ok((
$($ident.take_output().unwrap()),+
)))
}
})
}
}
}
pub async fn all_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Vec<Ok>, Err> {
let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
future_fn(move |cx| {
let mut pending = false;
for input in &mut futures {
if let FutureOrOutput::Future(fut) = input {
let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
match r {
Ok(r) => *input = FutureOrOutput::Output(Ok(r)),
Err(e) => return Poll::Ready(Err(e)),
}
} else {
pending = true;
}
}
}
if pending {
Poll::Pending
} else {
Poll::Ready(Ok(futures
.iter_mut()
.map(|f| f.take_output().unwrap_or_else(|_| unreachable!()))
.collect()))
}
})
.await
}
#[macro_export]
macro_rules! all_some {
($fut0:expr $(,)?) => { $crate::__all_some! { fut0: $fut0; } };
($fut0:expr, $fut1:expr $(,)?) => {
$crate::__all_some! {
fut0: $fut0;
fut1: $fut1;
}
};
($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
$crate::__all_some! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
$crate::__all_some! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
$crate::__all_some! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
$crate::__all_some! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
$crate::__all_some! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
fut6: $fut6;
}
};
($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
$crate::__all_some! {
fut0: $fut0;
fut1: $fut1;
fut2: $fut2;
fut3: $fut3;
fut4: $fut4;
fut5: $fut5;
fut6: $fut6;
fut7: $fut7;
}
};
($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_some; $($fut),+ } }
}
#[doc(hidden)]
#[macro_export]
macro_rules! __all_some {
($($ident:ident: $fut: expr;)+) => {
{
$(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
$crate::future_fn(move |cx| {
use std::task::Poll;
let mut pending = false;
$(
if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
if let Poll::Ready(r) = fut.as_mut().poll(cx) {
if r.is_none() {
return Poll::Ready(None);
}
$ident = $crate::FutureOrOutput::Output(r);
} else {
pending = true;
}
}
)+
if pending {
Poll::Pending
} else {
Poll::Ready(Some((
$($ident.take_output().unwrap()),+
)))
}
})
}
}
}
pub async fn all_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Vec<Some>> {
let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
future_fn(move |cx| {
let mut pending = false;
for input in &mut futures {
if let FutureOrOutput::Future(fut) = input {
let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
match r {
Some(r) => *input = FutureOrOutput::Output(Some(r)),
None => return Poll::Ready(None),
}
} else {
pending = true;
}
}
}
if pending {
Poll::Pending
} else {
Poll::Ready(Some(futures.iter_mut().map(|f| f.take_output().unwrap()).collect()))
}
})
.await
}
#[derive(Default, Clone)]
pub struct SignalOnce(Arc<SignalInner>);
impl fmt::Debug for SignalOnce {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SignalOnce({})", self.is_set())
}
}
impl PartialEq for SignalOnce {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}
impl Eq for SignalOnce {}
impl Hash for SignalOnce {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
Arc::as_ptr(&self.0).hash(state)
}
}
impl SignalOnce {
pub fn new() -> Self {
Self::default()
}
pub fn new_set() -> Self {
let s = Self::new();
s.set();
s
}
pub fn is_set(&self) -> bool {
self.0.signaled.load(Ordering::Relaxed)
}
pub fn set(&self) {
if !self.0.signaled.swap(true, Ordering::Relaxed) {
let listeners = mem::take(&mut *self.0.listeners.lock());
for listener in listeners {
listener.wake();
}
}
}
}
impl Future for SignalOnce {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
if self.0.signaled.load(Ordering::Relaxed) {
return Poll::Ready(());
}
let mut listeners = self.0.listeners.lock();
if self.0.signaled.load(Ordering::Relaxed) {
return Poll::Ready(());
}
let waker = cx.waker();
if !listeners.iter().any(|w| w.will_wake(waker)) {
listeners.push(waker.clone());
}
Poll::Pending
}
}
#[derive(Default)]
struct SignalInner {
signaled: AtomicBool,
listeners: Mutex<Vec<std::task::Waker>>,
}
#[derive(Clone)]
pub struct McWaker(Arc<WakeVec>);
#[derive(Default)]
struct WakeVec(Mutex<Vec<std::task::Waker>>);
impl WakeVec {
fn push(&self, waker: std::task::Waker) -> bool {
let mut v = self.0.lock();
let return_waker = v.is_empty();
v.push(waker);
return_waker
}
fn cancel(&self) {
let mut v = self.0.lock();
debug_assert!(!v.is_empty(), "called cancel on an empty McWaker");
v.clear();
}
}
impl std::task::Wake for WakeVec {
fn wake(self: Arc<Self>) {
for w in mem::take(&mut *self.0.lock()) {
w.wake();
}
}
}
impl McWaker {
pub fn empty() -> Self {
Self(Arc::new(WakeVec::default()))
}
pub fn push(&self, waker: std::task::Waker) -> Option<std::task::Waker> {
if self.0.push(waker) { Some(self.0.clone().into()) } else { None }
}
pub fn cancel(&self) {
self.0.cancel()
}
}
#[non_exhaustive]
pub struct TaskPanicError {
pub payload: Box<dyn Any + Send + 'static>,
}
impl TaskPanicError {
pub fn new(payload: Box<dyn Any + Send + 'static>) -> Self {
Self { payload }
}
pub fn panic_str(&self) -> Option<&str> {
if let Some(s) = self.payload.downcast_ref::<&str>() {
Some(s)
} else if let Some(s) = self.payload.downcast_ref::<String>() {
Some(s)
} else {
None
}
}
}
impl fmt::Debug for TaskPanicError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TaskPanicError").field("panic_str()", &self.panic_str()).finish()
}
}
impl fmt::Display for TaskPanicError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(s) = self.panic_str() { f.write_str(s) } else { Ok(()) }
}
}
impl std::error::Error for TaskPanicError {}
type SpawnPanicHandler = Box<dyn FnMut(TaskPanicError) + Send + 'static>;
app_local! {
static SPAWN_PANIC_HANDLERS: Option<Mutex<SpawnPanicHandler>> = None;
}
pub fn set_spawn_panic_handler(handler: impl FnMut(TaskPanicError) + Send + 'static) {
let mut h = SPAWN_PANIC_HANDLERS.try_write().expect("a spawn panic handler is already set");
assert!(h.is_none(), "a spawn panic handler is already set");
*h = Some(Mutex::new(Box::new(handler)));
}
fn on_spawn_panic(p: TaskPanicError) {
if let Some(f) = &mut *SPAWN_PANIC_HANDLERS.write() {
f.get_mut()(p)
}
}