zng_task/
lib.rs

1#![doc(html_favicon_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo.png")]
3//!
4//! Parallel async tasks and async task runners.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9#![warn(unused_extern_crates)]
10#![warn(missing_docs)]
11
12use std::{
13    fmt,
14    hash::Hash,
15    mem, panic,
16    pin::Pin,
17    sync::{
18        Arc,
19        atomic::{AtomicBool, Ordering},
20    },
21    task::Poll,
22};
23
24#[doc(no_inline)]
25pub use parking_lot;
26use parking_lot::Mutex;
27
28mod crate_util;
29
30use crate::crate_util::PanicResult;
31use zng_app_context::{LocalContext, app_local};
32use zng_time::Deadline;
33use zng_var::{AnyVar, ResponseVar, VarValue, response_done_var, response_var};
34
35#[cfg(test)]
36mod tests;
37
38#[doc(no_inline)]
39pub use rayon;
40
41/// Async filesystem primitives.
42///
43/// This module is the [async-fs](https://docs.rs/async-fs) crate re-exported for convenience.
44pub mod fs {
45    #[doc(inline)]
46    pub use async_fs::*;
47}
48
49pub mod channel;
50pub mod io;
51mod ui;
52
53pub mod http;
54
55pub mod ipc;
56
57mod rayon_ctx;
58
59pub use rayon_ctx::*;
60
61pub use ui::*;
62
63mod progress;
64pub use progress::*;
65
66/// Spawn a parallel async task, this function is not blocking and the `task` starts executing immediately.
67///
68/// # Parallel
69///
70/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
71///
72/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
73/// otherwise it will run in a single thread at a time, still not blocking the UI.
74///
75/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
76///
77/// # Async
78///
79/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
80/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
81/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
82/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
83///
84/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
85///
86/// # Examples
87///
88/// ```
89/// # use zng_task::{self as task, *, rayon::iter::*};
90/// # use zng_var::*;
91/// # struct SomeStruct { sum_response: ResponseVar<usize> }
92/// # impl SomeStruct {
93/// fn on_event(&mut self) {
94///     let (responder, response) = response_var();
95///     self.sum_response = response;
96///
97///     task::spawn(async move {
98///         let r = (0..1000).into_par_iter().map(|i| i * i).sum();
99///
100///         responder.respond(r);
101///     });
102/// }
103///
104/// fn on_update(&mut self) {
105///     if let Some(result) = self.sum_response.rsp_new() {
106///         println!("sum of squares 0..1000: {result}");   
107///     }
108/// }
109/// # }
110/// ```
111///
112/// The example uses the `rayon` parallel iterator to compute a result and uses a [`response_var`] to send the result to the UI.
113/// The task captures the caller [`LocalContext`] so the response variable will set correctly.
114///
115/// Note that this function is the most basic way to spawn a parallel task where you must setup channels to the rest of the app yourself,
116/// you can use [`respond`] to avoid having to manually set a response, or [`run`] to `.await` the result.
117///
118/// # Panic Handling
119///
120/// If the `task` panics the panic message is logged as an error, the panic is otherwise ignored.
121///
122/// # Unwind Safety
123///
124/// This function disables the [unwind safety validation], meaning that in case of a panic shared
125/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
126/// poisoning mutexes or atomics to mutate shared data or use [`run_catch`] to detect a panic or [`run`]
127/// to propagate a panic.
128///
129/// [unwind safety validation]: std::panic::UnwindSafe
130/// [`Waker`]: std::task::Waker
131/// [`rayon`]: https://docs.rs/rayon
132/// [`LocalContext`]: zng_app_context::LocalContext
133/// [`response_var`]: zng_var::response_var
134pub fn spawn<F>(task: impl IntoFuture<IntoFuture = F>)
135where
136    F: Future<Output = ()> + Send + 'static,
137{
138    Arc::new(RayonTask {
139        ctx: LocalContext::capture(),
140        fut: Mutex::new(Some(Box::pin(task.into_future()))),
141    })
142    .poll()
143}
144
145/// Polls the `task` once immediately on the calling thread, if the `task` is pending, continues execution in [`spawn`].
146pub fn poll_spawn<F>(task: impl IntoFuture<IntoFuture = F>)
147where
148    F: Future<Output = ()> + Send + 'static,
149{
150    struct PollRayonTask {
151        fut: Mutex<Option<(RayonSpawnFut, Option<LocalContext>)>>,
152    }
153    impl PollRayonTask {
154        // start task in calling thread
155        fn poll(self: Arc<Self>) {
156            let mut task = self.fut.lock();
157            let (mut t, _) = task.take().unwrap();
158
159            let waker = self.clone().into();
160
161            match t.as_mut().poll(&mut std::task::Context::from_waker(&waker)) {
162                Poll::Ready(()) => {}
163                Poll::Pending => {
164                    let ctx = LocalContext::capture();
165                    *task = Some((t, Some(ctx)));
166                }
167            }
168        }
169    }
170    impl std::task::Wake for PollRayonTask {
171        fn wake(self: Arc<Self>) {
172            // continue task in spawn threads
173            if let Some((task, Some(ctx))) = self.fut.lock().take() {
174                Arc::new(RayonTask {
175                    ctx,
176                    fut: Mutex::new(Some(Box::pin(task))),
177                })
178                .poll();
179            }
180        }
181    }
182
183    Arc::new(PollRayonTask {
184        fut: Mutex::new(Some((Box::pin(task.into_future()), None))),
185    })
186    .poll()
187}
188
189type RayonSpawnFut = Pin<Box<dyn Future<Output = ()> + Send>>;
190
191// A future that is its own waker that polls inside rayon spawn tasks.
192struct RayonTask {
193    ctx: LocalContext,
194    fut: Mutex<Option<RayonSpawnFut>>,
195}
196impl RayonTask {
197    fn poll(self: Arc<Self>) {
198        rayon::spawn(move || {
199            // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
200            let mut task = self.fut.lock();
201            if let Some(mut t) = task.take() {
202                let waker = self.clone().into();
203
204                // load app context
205                self.ctx.clone().with_context(move || {
206                    let r = panic::catch_unwind(panic::AssertUnwindSafe(move || {
207                        // poll future
208                        if t.as_mut().poll(&mut std::task::Context::from_waker(&waker)).is_pending() {
209                            // not done
210                            *task = Some(t);
211                        }
212                    }));
213                    if let Err(p) = r {
214                        tracing::error!("panic in `task::spawn`: {}", crate_util::panic_str(&p));
215                    }
216                });
217            }
218        })
219    }
220}
221impl std::task::Wake for RayonTask {
222    fn wake(self: Arc<Self>) {
223        self.poll()
224    }
225}
226
227/// Rayon join with local context.
228///
229/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
230/// operations.
231///
232/// See `rayon::join` for more details about join.
233///
234/// [`LocalContext`]: zng_app_context::LocalContext
235pub fn join<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
236where
237    A: FnOnce() -> RA + Send,
238    B: FnOnce() -> RB + Send,
239    RA: Send,
240    RB: Send,
241{
242    self::join_context(move |_| op_a(), move |_| op_b())
243}
244
245/// Rayon join context with local context.
246///
247/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
248/// operations.
249///
250/// See `rayon::join_context` for more details about join.
251///
252/// [`LocalContext`]: zng_app_context::LocalContext
253pub fn join_context<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
254where
255    A: FnOnce(rayon::FnContext) -> RA + Send,
256    B: FnOnce(rayon::FnContext) -> RB + Send,
257    RA: Send,
258    RB: Send,
259{
260    let ctx = LocalContext::capture();
261    let ctx = &ctx;
262    rayon::join_context(
263        move |a| {
264            if a.migrated() {
265                ctx.clone().with_context(|| op_a(a))
266            } else {
267                op_a(a)
268            }
269        },
270        move |b| {
271            if b.migrated() {
272                ctx.clone().with_context(|| op_b(b))
273            } else {
274                op_b(b)
275            }
276        },
277    )
278}
279
280/// Rayon scope with local context.
281///
282/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
283/// operations.
284///
285/// See `rayon::scope` for more details about scope.
286///
287/// [`LocalContext`]: zng_app_context::LocalContext
288pub fn scope<'scope, OP, R>(op: OP) -> R
289where
290    OP: FnOnce(ScopeCtx<'_, 'scope>) -> R + Send,
291    R: Send,
292{
293    let ctx = LocalContext::capture();
294
295    // Cast `&'_ ctx` to `&'scope ctx` to "inject" the context in the scope.
296    // Is there a better way to do this? I hope so.
297    //
298    // SAFETY:
299    // * We are extending `'_` to `'scope`, that is one of the documented valid usages of `transmute`.
300    // * No use after free because `rayon::scope` joins all threads before returning and we only drop `ctx` after.
301    let ctx_ref: &'_ LocalContext = &ctx;
302    let ctx_scope_ref: &'scope LocalContext = unsafe { std::mem::transmute(ctx_ref) };
303
304    let r = rayon::scope(move |s| {
305        op(ScopeCtx {
306            scope: s,
307            ctx: ctx_scope_ref,
308        })
309    });
310
311    drop(ctx);
312
313    r
314}
315
316/// Represents a fork-join scope which can be used to spawn any number of tasks that run in the caller's thread context.
317///
318/// See [`scope`] for more details.
319#[derive(Clone, Copy, Debug)]
320pub struct ScopeCtx<'a, 'scope: 'a> {
321    scope: &'a rayon::Scope<'scope>,
322    ctx: &'scope LocalContext,
323}
324impl<'a, 'scope: 'a> ScopeCtx<'a, 'scope> {
325    /// Spawns a job into the fork-join scope `self`. The job runs in the captured thread context.
326    ///
327    /// See `rayon::Scope::spawn` for more details.
328    pub fn spawn<F>(self, f: F)
329    where
330        F: FnOnce(ScopeCtx<'_, 'scope>) + Send + 'scope,
331    {
332        let ctx = self.ctx;
333        self.scope
334            .spawn(move |s| ctx.clone().with_context(move || f(ScopeCtx { scope: s, ctx })));
335    }
336}
337
338/// Spawn a parallel async task that can also be `.await` for the task result.
339///
340/// # Parallel
341///
342/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
343///
344/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
345/// otherwise it will run in a single thread at a time, still not blocking the UI.
346///
347/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
348///
349/// # Async
350///
351/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
352/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
353/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
354/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
355///
356/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
357///
358/// # Examples
359///
360/// ```
361/// # use zng_task::{self as task, rayon::iter::*};
362/// # struct SomeStruct { sum: usize }
363/// # async fn read_numbers() -> Vec<usize> { vec![] }
364/// # impl SomeStruct {
365/// async fn on_event(&mut self) {
366///     self.sum = task::run(async {
367///         read_numbers().await.par_iter().map(|i| i * i).sum()
368///     }).await;
369/// }
370/// # }
371/// ```
372///
373/// The example `.await` for some numbers and then uses a parallel iterator to compute a result, this all runs in parallel
374/// because it is inside a `run` task. The task result is then `.await` inside one of the UI async tasks. Note that the
375/// task captures the caller [`LocalContext`] so you can interact with variables and UI services directly inside the task too.
376///
377/// # Cancellation
378///
379/// The task starts running immediately, awaiting the returned future merely awaits for a message from the worker threads and
380/// that means the `task` future is not owned by the returned future. Usually to *cancel* a future you only need to drop it,
381/// in this task dropping the returned future will only drop the `task` once it reaches a `.await` point and detects that the
382/// result channel is disconnected.
383///
384/// If you want to deterministically known that the `task` was cancelled use a cancellation signal.
385///
386/// # Panic Propagation
387///
388/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
389/// can use [`run_catch`] to get the panic as an error instead.
390///
391/// [`resume_unwind`]: panic::resume_unwind
392/// [`Waker`]: std::task::Waker
393/// [`rayon`]: https://docs.rs/rayon
394/// [`LocalContext`]: zng_app_context::LocalContext
395pub async fn run<R, T>(task: impl IntoFuture<IntoFuture = T>) -> R
396where
397    R: Send + 'static,
398    T: Future<Output = R> + Send + 'static,
399{
400    match run_catch(task).await {
401        Ok(r) => r,
402        Err(p) => panic::resume_unwind(p),
403    }
404}
405
406/// Like [`run`] but catches panics.
407///
408/// This task works the same and has the same utility as [`run`], except if returns panic messages
409/// as an error instead of propagating the panic.
410///
411/// # Unwind Safety
412///
413/// This function disables the [unwind safety validation], meaning that in case of a panic shared
414/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
415/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
416/// if this function returns an error.
417///
418/// [unwind safety validation]: std::panic::UnwindSafe
419pub async fn run_catch<R, T>(task: impl IntoFuture<IntoFuture = T>) -> PanicResult<R>
420where
421    R: Send + 'static,
422    T: Future<Output = R> + Send + 'static,
423{
424    type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
425
426    // A future that is its own waker that polls inside the rayon primary thread-pool.
427    struct RayonCatchTask<R> {
428        ctx: LocalContext,
429        fut: Mutex<Option<Fut<R>>>,
430        sender: flume::Sender<PanicResult<R>>,
431    }
432    impl<R: Send + 'static> RayonCatchTask<R> {
433        fn poll(self: Arc<Self>) {
434            let sender = self.sender.clone();
435            if sender.is_disconnected() {
436                return; // cancel.
437            }
438            rayon::spawn(move || {
439                // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
440                let mut task = self.fut.lock();
441                if let Some(mut t) = task.take() {
442                    let waker = self.clone().into();
443                    let mut cx = std::task::Context::from_waker(&waker);
444
445                    self.ctx.clone().with_context(|| {
446                        let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
447                        match r {
448                            Ok(Poll::Ready(r)) => {
449                                drop(task);
450                                let _ = sender.send(Ok(r));
451                            }
452                            Ok(Poll::Pending) => {
453                                *task = Some(t);
454                            }
455                            Err(p) => {
456                                drop(task);
457                                let _ = sender.send(Err(p));
458                            }
459                        }
460                    });
461                }
462            })
463        }
464    }
465    impl<R: Send + 'static> std::task::Wake for RayonCatchTask<R> {
466        fn wake(self: Arc<Self>) {
467            self.poll()
468        }
469    }
470
471    let (sender, receiver) = channel::bounded(1);
472
473    Arc::new(RayonCatchTask {
474        ctx: LocalContext::capture(),
475        fut: Mutex::new(Some(Box::pin(task.into_future()))),
476        sender: sender.into(),
477    })
478    .poll();
479
480    receiver.recv().await.unwrap()
481}
482
483/// Spawn a parallel async task that will send its result to a [`ResponseVar<R>`].
484///
485/// The [`run`] documentation explains how `task` is *parallel* and *async*. The `task` starts executing immediately.
486///
487/// # Examples
488///
489/// ```
490/// # use zng_task::{self as task, rayon::iter::*};
491/// # use zng_var::*;
492/// # struct SomeStruct { sum_response: ResponseVar<usize> }
493/// # async fn read_numbers() -> Vec<usize> { vec![] }
494/// # impl SomeStruct {
495/// fn on_event(&mut self) {
496///     self.sum_response = task::respond(async {
497///         read_numbers().await.par_iter().map(|i| i * i).sum()
498///     });
499/// }
500///
501/// fn on_update(&mut self) {
502///     if let Some(result) = self.sum_response.rsp_new() {
503///         println!("sum of squares: {result}");   
504///     }
505/// }
506/// # }
507/// ```
508///
509/// The example `.await` for some numbers and then uses a parallel iterator to compute a result. The result is send to
510/// `sum_response` that is a [`ResponseVar<R>`].
511///
512/// # Cancellation
513///
514/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
515///
516/// # Panic Handling
517///
518/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
519///
520/// [`resume_unwind`]: panic::resume_unwind
521/// [`ResponseVar<R>`]: zng_var::ResponseVar
522/// [`response_var`]: zng_var::response_var
523pub fn respond<R, F>(task: F) -> ResponseVar<R>
524where
525    R: VarValue,
526    F: Future<Output = R> + Send + 'static,
527{
528    type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
529
530    let (responder, response) = response_var();
531
532    // A future that is its own waker that polls inside the rayon primary thread-pool.
533    struct RayonRespondTask<R: VarValue> {
534        ctx: LocalContext,
535        fut: Mutex<Option<Fut<R>>>,
536        responder: zng_var::ResponderVar<R>,
537    }
538    impl<R: VarValue> RayonRespondTask<R> {
539        fn poll(self: Arc<Self>) {
540            let responder = self.responder.clone();
541            if responder.strong_count() == 2 {
542                return; // cancel.
543            }
544            rayon::spawn(move || {
545                // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
546                let mut task = self.fut.lock();
547                if let Some(mut t) = task.take() {
548                    let waker = self.clone().into();
549                    let mut cx = std::task::Context::from_waker(&waker);
550
551                    self.ctx.clone().with_context(|| {
552                        let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
553                        match r {
554                            Ok(Poll::Ready(r)) => {
555                                drop(task);
556
557                                responder.respond(r);
558                            }
559                            Ok(Poll::Pending) => {
560                                *task = Some(t);
561                            }
562                            Err(p) => {
563                                tracing::error!("panic in `task::respond`: {}", crate_util::panic_str(&p));
564                                drop(task);
565                                responder.modify(move |_| panic::resume_unwind(p));
566                            }
567                        }
568                    });
569                }
570            })
571        }
572    }
573    impl<R: VarValue> std::task::Wake for RayonRespondTask<R> {
574        fn wake(self: Arc<Self>) {
575            self.poll()
576        }
577    }
578
579    Arc::new(RayonRespondTask {
580        ctx: LocalContext::capture(),
581        fut: Mutex::new(Some(Box::pin(task))),
582        responder,
583    })
584    .poll();
585
586    response
587}
588
589/// Polls the `task` once immediately on the calling thread, if the `task` is ready returns the response already set,
590/// if the `task` is pending continues execution like [`respond`].
591pub fn poll_respond<R, F>(task: impl IntoFuture<IntoFuture = F>) -> ResponseVar<R>
592where
593    R: VarValue,
594    F: Future<Output = R> + Send + 'static,
595{
596    enum QuickResponse<R: VarValue> {
597        Quick(Option<R>),
598        Response(zng_var::ResponderVar<R>),
599    }
600    let task = task.into_future();
601    let q = Arc::new(Mutex::new(QuickResponse::Quick(None)));
602    poll_spawn(zng_clone_move::async_clmv!(q, {
603        let rsp = task.await;
604
605        match &mut *q.lock() {
606            QuickResponse::Quick(q) => *q = Some(rsp),
607            QuickResponse::Response(r) => r.respond(rsp),
608        }
609    }));
610
611    let mut q = q.lock();
612    match &mut *q {
613        QuickResponse::Quick(q) if q.is_some() => response_done_var(q.take().unwrap()),
614        _ => {
615            let (responder, response) = response_var();
616            *q = QuickResponse::Response(responder);
617            response
618        }
619    }
620}
621
622/// Create a parallel `task` that blocks awaiting for an IO operation, the `task` starts on the first `.await`.
623///
624/// # Parallel
625///
626/// The `task` runs in the [`blocking`] thread-pool which is optimized for awaiting blocking operations.
627/// If the `task` is computation heavy you should use [`run`] and then `wait` inside that task for the
628/// parts that are blocking.
629///
630/// # Examples
631///
632/// ```
633/// # fn main() { }
634/// # use zng_task as task;
635/// # async fn example() {
636/// task::wait(|| std::fs::read_to_string("file.txt")).await
637/// # ; }
638/// ```
639///
640/// The example reads a file, that is a blocking file IO operation, most of the time is spend waiting for the operating system,
641/// so we offload this to a `wait` task. The task can be `.await` inside a [`run`] task or inside one of the UI tasks
642/// like in a async event handler.
643///
644/// # Async Read/Write
645///
646/// For [`std::io::Read`] and [`std::io::Write`] operations you can also use [`io`] and [`fs`] alternatives when you don't
647/// have or want the full file in memory or when you want to apply multiple operations to the file.
648///
649/// # Panic Propagation
650///
651/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
652/// can use [`wait_catch`] to get the panic as an error instead.
653///
654/// [`blocking`]: https://docs.rs/blocking
655/// [`resume_unwind`]: panic::resume_unwind
656pub async fn wait<T, F>(task: F) -> T
657where
658    F: FnOnce() -> T + Send + 'static,
659    T: Send + 'static,
660{
661    match wait_catch(task).await {
662        Ok(r) => r,
663        Err(p) => panic::resume_unwind(p),
664    }
665}
666
667/// Like [`wait`] but catches panics.
668///
669/// This task works the same and has the same utility as [`wait`], except if returns panic messages
670/// as an error instead of propagating the panic.
671///
672/// # Unwind Safety
673///
674/// This function disables the [unwind safety validation], meaning that in case of a panic shared
675/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
676/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
677/// if this function returns an error.
678///
679/// [unwind safety validation]: std::panic::UnwindSafe
680pub async fn wait_catch<T, F>(task: F) -> PanicResult<T>
681where
682    F: FnOnce() -> T + Send + 'static,
683    T: Send + 'static,
684{
685    let mut ctx = LocalContext::capture();
686    blocking::unblock(move || ctx.with_context(move || panic::catch_unwind(panic::AssertUnwindSafe(task)))).await
687}
688
689/// Fire and forget a [`wait`] task. The `task` starts executing immediately.
690///
691/// # Panic Handling
692///
693/// If the `task` panics the panic message is logged as an error, the panic is otherwise ignored.
694///
695/// # Unwind Safety
696///
697/// This function disables the [unwind safety validation], meaning that in case of a panic shared
698/// data can end-up in an invalid (still memory safe) state. If you are worried about that only use
699/// poisoning mutexes or atomics to mutate shared data or use [`wait_catch`] to detect a panic or [`wait`]
700/// to propagate a panic.
701///
702/// [unwind safety validation]: std::panic::UnwindSafe
703pub fn spawn_wait<F>(task: F)
704where
705    F: FnOnce() + Send + 'static,
706{
707    spawn(async move {
708        if let Err(p) = wait_catch(task).await {
709            tracing::error!("parallel `spawn_wait` task panicked: {}", crate_util::panic_str(&p))
710        }
711    });
712}
713
714/// Like [`spawn_wait`], but the task will send its result to a [`ResponseVar<R>`].
715///
716/// # Cancellation
717///
718/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
719///
720/// # Panic Handling
721///
722/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
723pub fn wait_respond<R, F>(task: F) -> ResponseVar<R>
724where
725    R: VarValue,
726    F: FnOnce() -> R + Send + 'static,
727{
728    let (responder, response) = response_var();
729    spawn_wait(move || match panic::catch_unwind(panic::AssertUnwindSafe(task)) {
730        Ok(r) => responder.respond(r),
731        Err(p) => {
732            tracing::error!("panic in `task::wait_respond`: {}", crate_util::panic_str(&p));
733            responder.modify(move |_| panic::resume_unwind(p))
734        }
735    });
736    response
737}
738
739/// Blocks the thread until the `task` future finishes.
740///
741/// This function is useful for implementing async tests, using it in an app will probably cause
742/// the app to stop responding.
743///
744/// The crate [`futures-lite`] is used to execute the task.
745///
746/// # Examples
747///
748/// Test a [`run`] call:
749///
750/// ```
751/// use zng_task as task;
752/// # use zng_unit::*;
753/// # async fn foo(u: u8) -> Result<u8, ()> { task::deadline(1.ms()).await; Ok(u) }
754///
755/// #[test]
756/// # fn __() { }
757/// pub fn run_ok() {
758///     let r = task::block_on(task::run(async {
759///         foo(32).await
760///     }));
761///     
762/// #   let value =
763///     r.expect("foo(32) was not Ok");
764/// #   assert_eq!(32, value);
765/// }
766/// # run_ok();
767/// ```
768///
769/// [`futures-lite`]: https://docs.rs/futures-lite/
770pub fn block_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
771where
772    F: Future,
773{
774    futures_lite::future::block_on(task.into_future())
775}
776
777/// Continuous poll the `task` until if finishes.
778///
779/// This function is useful for implementing some async tests only, futures don't expect to be polled
780/// continuously. This function is only available in test builds.
781#[cfg(any(test, doc, feature = "test_util"))]
782pub fn spin_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
783where
784    F: Future,
785{
786    use std::pin::pin;
787
788    let mut task = pin!(task.into_future());
789    block_on(future_fn(|cx| match task.as_mut().poll(cx) {
790        Poll::Ready(r) => Poll::Ready(r),
791        Poll::Pending => {
792            cx.waker().wake_by_ref();
793            Poll::Pending
794        }
795    }))
796}
797
798/// Executor used in async doc tests.
799///
800/// If `spin` is `true` the [`spin_on`] executor is used with a timeout of 500 milliseconds.
801/// IF `spin` is `false` the [`block_on`] executor is used with a timeout of 5 seconds.
802#[cfg(any(test, doc, feature = "test_util"))]
803pub fn doc_test<F>(spin: bool, task: impl IntoFuture<IntoFuture = F>) -> F::Output
804where
805    F: Future,
806{
807    use zng_unit::TimeUnits;
808
809    if spin {
810        spin_on(with_deadline(task, 500.ms())).expect("async doc-test timeout")
811    } else {
812        block_on(with_deadline(task, 5.secs())).expect("async doc-test timeout")
813    }
814}
815
816/// A future that is [`Pending`] once and wakes the current task.
817///
818/// After the first `.await` the future is always [`Ready`] and on the first `.await` it calls [`wake`].
819///
820/// [`Pending`]: std::task::Poll::Pending
821/// [`Ready`]: std::task::Poll::Ready
822/// [`wake`]: std::task::Waker::wake
823pub async fn yield_now() {
824    struct YieldNowFut(bool);
825    impl Future for YieldNowFut {
826        type Output = ();
827
828        fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
829            if self.0 {
830                Poll::Ready(())
831            } else {
832                self.0 = true;
833                cx.waker().wake_by_ref();
834                Poll::Pending
835            }
836        }
837    }
838
839    YieldNowFut(false).await
840}
841
842/// A future that is [`Pending`] until the `deadline` is reached.
843///
844/// # Examples
845///
846/// Await 5 seconds in a [`spawn`] parallel task:
847///
848/// ```
849/// use zng_task as task;
850/// use zng_unit::*;
851///
852/// task::spawn(async {
853///     println!("waiting 5 seconds..");
854///     task::deadline(5.secs()).await;
855///     println!("5 seconds elapsed.")
856/// });
857/// ```
858///
859/// The future runs on an app provider timer executor, or on the [`futures_timer`] by default.
860///
861/// Note that deadlines from [`Duration`](std::time::Duration) starts *counting* at the moment this function is called,
862/// not at the moment of the first `.await` call.
863///
864/// [`Pending`]: std::task::Poll::Pending
865/// [`futures_timer`]: https://docs.rs/futures-timer
866pub fn deadline(deadline: impl Into<Deadline>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
867    let deadline = deadline.into();
868    if zng_app_context::LocalContext::current_app().is_some() {
869        DEADLINE_SV.read().0(deadline)
870    } else {
871        default_deadline(deadline)
872    }
873}
874
875app_local! {
876    static DEADLINE_SV: (DeadlineService, bool) = const { (default_deadline, false) };
877}
878
879type DeadlineService = fn(Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
880
881fn default_deadline(deadline: Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
882    if let Some(timeout) = deadline.time_left() {
883        Box::pin(futures_timer::Delay::new(timeout))
884    } else {
885        Box::pin(std::future::ready(()))
886    }
887}
888
889/// Deadline APP integration.
890#[expect(non_camel_case_types)]
891pub struct DEADLINE_APP;
892
893impl DEADLINE_APP {
894    /// Called by the app implementer to setup the [`deadline`] executor.
895    ///
896    /// If no app calls this the [`futures_timer`] executor is used.
897    ///
898    /// [`futures_timer`]: https://docs.rs/futures-timer
899    ///
900    /// # Panics
901    ///
902    /// Panics if called more than once for the same app.
903    pub fn init_deadline_service(&self, service: DeadlineService) {
904        let (prev, already_set) = mem::replace(&mut *DEADLINE_SV.write(), (service, true));
905        if already_set {
906            *DEADLINE_SV.write() = (prev, true);
907            panic!("deadline service already inited for this app");
908        }
909    }
910}
911
912/// Implements a [`Future`] from a closure.
913///
914/// # Examples
915///
916/// A future that is ready with a closure returns `Some(R)`.
917///
918/// ```
919/// use zng_task as task;
920/// use std::task::Poll;
921///
922/// async fn ready_some<R>(mut closure: impl FnMut() -> Option<R>) -> R {
923///     task::future_fn(|cx| {
924///         match closure() {
925///             Some(r) => Poll::Ready(r),
926///             None => Poll::Pending
927///         }
928///     }).await
929/// }
930/// ```
931pub async fn future_fn<T, F>(fn_: F) -> T
932where
933    F: FnMut(&mut std::task::Context) -> Poll<T>,
934{
935    struct PollFn<F>(F);
936    impl<F> Unpin for PollFn<F> {}
937    impl<T, F: FnMut(&mut std::task::Context<'_>) -> Poll<T>> Future for PollFn<F> {
938        type Output = T;
939
940        fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
941            (self.0)(cx)
942        }
943    }
944    PollFn(fn_).await
945}
946
947/// Error when [`with_deadline`] reach a time limit before a task finishes.
948#[derive(Debug, Clone, Copy)]
949#[non_exhaustive]
950pub struct DeadlineError {}
951impl fmt::Display for DeadlineError {
952    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
953        write!(f, "reached deadline")
954    }
955}
956impl std::error::Error for DeadlineError {}
957
958/// Add a [`deadline`] to a future.
959///
960/// Returns the `fut` output or [`DeadlineError`] if the deadline elapses first.
961pub async fn with_deadline<O, F: Future<Output = O>>(
962    fut: impl IntoFuture<IntoFuture = F>,
963    deadline: impl Into<Deadline>,
964) -> Result<F::Output, DeadlineError> {
965    let deadline = deadline.into();
966    any!(async { Ok(fut.await) }, async {
967        self::deadline(deadline).await;
968        Err(DeadlineError {})
969    })
970    .await
971}
972
973/// <span data-del-macro-root></span> A future that *zips* other futures.
974///
975/// The macro input is a comma separated list of future expressions. The macro output is a future
976/// that when ".awaited" produces a tuple of results in the same order as the inputs.
977///
978/// At least one input future is required and any number of futures is accepted. For more than
979/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
980/// some IDEs.
981///
982/// Each input must implement [`IntoFuture`]. Note that each input must be known at compile time, use the [`fn@all`] async
983/// function to await on all futures in a dynamic list of futures.
984///
985/// # Examples
986///
987/// Await for three different futures to complete:
988///
989/// ```
990/// use zng_task as task;
991///
992/// # task::doc_test(false, async {
993/// let (a, b, c) = task::all!(
994///     task::run(async { 'a' }),
995///     task::wait(|| "b"),
996///     async { b"c" }
997/// ).await;
998/// # });
999/// ```
1000#[macro_export]
1001macro_rules! all {
1002    ($fut0:expr $(,)?) => { $crate::__all! { fut0: $fut0; } };
1003    ($fut0:expr, $fut1:expr $(,)?) => {
1004        $crate::__all! {
1005            fut0: $fut0;
1006            fut1: $fut1;
1007        }
1008    };
1009    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1010        $crate::__all! {
1011            fut0: $fut0;
1012            fut1: $fut1;
1013            fut2: $fut2;
1014        }
1015    };
1016    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1017        $crate::__all! {
1018            fut0: $fut0;
1019            fut1: $fut1;
1020            fut2: $fut2;
1021            fut3: $fut3;
1022        }
1023    };
1024    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1025        $crate::__all! {
1026            fut0: $fut0;
1027            fut1: $fut1;
1028            fut2: $fut2;
1029            fut3: $fut3;
1030            fut4: $fut4;
1031        }
1032    };
1033    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1034        $crate::__all! {
1035            fut0: $fut0;
1036            fut1: $fut1;
1037            fut2: $fut2;
1038            fut3: $fut3;
1039            fut4: $fut4;
1040            fut5: $fut5;
1041        }
1042    };
1043    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1044        $crate::__all! {
1045            fut0: $fut0;
1046            fut1: $fut1;
1047            fut2: $fut2;
1048            fut3: $fut3;
1049            fut4: $fut4;
1050            fut5: $fut5;
1051            fut6: $fut6;
1052        }
1053    };
1054    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1055        $crate::__all! {
1056            fut0: $fut0;
1057            fut1: $fut1;
1058            fut2: $fut2;
1059            fut3: $fut3;
1060            fut4: $fut4;
1061            fut5: $fut5;
1062            fut6: $fut6;
1063            fut7: $fut7;
1064        }
1065    };
1066    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all; $($fut),+ } }
1067}
1068
1069#[doc(hidden)]
1070#[macro_export]
1071macro_rules! __all {
1072    ($($ident:ident: $fut:expr;)+) => {
1073        {
1074            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1075            $crate::future_fn(move |cx| {
1076                use std::task::Poll;
1077
1078                let mut pending = false;
1079
1080                $(
1081                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1082                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1083                        // Future::poll call, so it will not move.
1084                        let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1085                        if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1086                            $ident = $crate::FutureOrOutput::Output(r);
1087                        } else {
1088                            pending = true;
1089                        }
1090                    }
1091                )+
1092
1093                if pending {
1094                    Poll::Pending
1095                } else {
1096                    Poll::Ready(($($ident.take_output()),+))
1097                }
1098            })
1099        }
1100    }
1101}
1102
1103#[doc(hidden)]
1104pub enum FutureOrOutput<F: Future> {
1105    Future(F),
1106    Output(F::Output),
1107    Taken,
1108}
1109impl<F: Future> FutureOrOutput<F> {
1110    pub fn take_output(&mut self) -> F::Output {
1111        match std::mem::replace(self, Self::Taken) {
1112            FutureOrOutput::Output(o) => o,
1113            _ => unreachable!(),
1114        }
1115    }
1116}
1117
1118/// A future that awaits on all `futures` at the same time and returns all results when all futures are ready.
1119///
1120/// This is the dynamic version of [`all!`].
1121pub async fn all<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> Vec<F::Output> {
1122    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1123    future_fn(move |cx| {
1124        let mut pending = false;
1125        for input in &mut futures {
1126            if let FutureOrOutput::Future(fut) = input {
1127                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1128                // Future::poll call, so it will not move.
1129                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1130                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1131                    *input = FutureOrOutput::Output(r);
1132                } else {
1133                    pending = true;
1134                }
1135            }
1136        }
1137
1138        if pending {
1139            Poll::Pending
1140        } else {
1141            Poll::Ready(futures.iter_mut().map(FutureOrOutput::take_output).collect())
1142        }
1143    })
1144    .await
1145}
1146
1147/// <span data-del-macro-root></span> A future that awaits for the first future that is ready.
1148///
1149/// The macro input is comma separated list of future expressions, the futures must
1150/// all have the same output type. The macro output is a future that when ".awaited" produces
1151/// a single output type instance returned by the first input future that completes.
1152///
1153/// At least one input future is required and any number of futures is accepted. For more than
1154/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1155/// some IDEs.
1156///
1157/// If two futures are ready at the same time the result of the first future in the input list is used.
1158/// After one future is ready the other futures are not polled again and are dropped.
1159///
1160/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1161/// known at compile time, use the [`fn@any`] async function to await on all futures in a dynamic list of futures.
1162///
1163/// # Examples
1164///
1165/// Await for the first of three futures to complete:
1166///
1167/// ```
1168/// use zng_task as task;
1169/// use zng_unit::*;
1170///
1171/// # task::doc_test(false, async {
1172/// let r = task::any!(
1173///     task::run(async { task::deadline(300.ms()).await; 'a' }),
1174///     task::wait(|| 'b'),
1175///     async { task::deadline(300.ms()).await; 'c' }
1176/// ).await;
1177///
1178/// assert_eq!('b', r);
1179/// # });
1180/// ```
1181#[macro_export]
1182macro_rules! any {
1183    ($fut0:expr $(,)?) => { $crate::__any! { fut0: $fut0; } };
1184    ($fut0:expr, $fut1:expr $(,)?) => {
1185        $crate::__any! {
1186            fut0: $fut0;
1187            fut1: $fut1;
1188        }
1189    };
1190    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1191        $crate::__any! {
1192            fut0: $fut0;
1193            fut1: $fut1;
1194            fut2: $fut2;
1195        }
1196    };
1197    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1198        $crate::__any! {
1199            fut0: $fut0;
1200            fut1: $fut1;
1201            fut2: $fut2;
1202            fut3: $fut3;
1203        }
1204    };
1205    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1206        $crate::__any! {
1207            fut0: $fut0;
1208            fut1: $fut1;
1209            fut2: $fut2;
1210            fut3: $fut3;
1211            fut4: $fut4;
1212        }
1213    };
1214    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1215        $crate::__any! {
1216            fut0: $fut0;
1217            fut1: $fut1;
1218            fut2: $fut2;
1219            fut3: $fut3;
1220            fut4: $fut4;
1221            fut5: $fut5;
1222        }
1223    };
1224    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1225        $crate::__any! {
1226            fut0: $fut0;
1227            fut1: $fut1;
1228            fut2: $fut2;
1229            fut3: $fut3;
1230            fut4: $fut4;
1231            fut5: $fut5;
1232            fut6: $fut6;
1233        }
1234    };
1235    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1236        $crate::__any! {
1237            fut0: $fut0;
1238            fut1: $fut1;
1239            fut2: $fut2;
1240            fut3: $fut3;
1241            fut4: $fut4;
1242            fut5: $fut5;
1243            fut6: $fut6;
1244            fut7: $fut7;
1245        }
1246    };
1247    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any; $($fut),+ } }
1248}
1249#[doc(hidden)]
1250#[macro_export]
1251macro_rules! __any {
1252    ($($ident:ident: $fut:expr;)+) => {
1253        {
1254            $(let mut $ident = std::future::IntoFuture::into_future($fut);)+
1255            $crate::future_fn(move |cx| {
1256                use std::task::Poll;
1257                $(
1258                    // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1259                    // Future::poll call, so it will not move.
1260                    let mut $ident = unsafe { std::pin::Pin::new_unchecked(&mut $ident) };
1261                    if let Poll::Ready(r) = $ident.as_mut().poll(cx) {
1262                        return Poll::Ready(r)
1263                    }
1264                )+
1265
1266                Poll::Pending
1267            })
1268        }
1269    }
1270}
1271#[doc(hidden)]
1272pub use zng_task_proc_macros::task_any_all as __proc_any_all;
1273
1274/// A future that awaits on all `futures` at the same time and returns the first result when the first future is ready.
1275///
1276/// This is the dynamic version of [`any!`].
1277pub async fn any<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> F::Output {
1278    let mut futures: Vec<_> = futures.into_iter().map(IntoFuture::into_future).collect();
1279    future_fn(move |cx| {
1280        for fut in &mut futures {
1281            // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1282            // Future::poll call, so it will not move.
1283            let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1284            if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1285                return Poll::Ready(r);
1286            }
1287        }
1288        Poll::Pending
1289    })
1290    .await
1291}
1292
1293/// <span data-del-macro-root></span> A future that waits for the first future that is ready with an `Ok(T)` result.
1294///
1295/// The macro input is comma separated list of future expressions, the futures must
1296/// all have the same output `Result<T, E>` type, but each can have a different `E`. The macro output is a future
1297/// that when ".awaited" produces a single output of type `Result<T, (E0, E1, ..)>` that is `Ok(T)` if any of the futures
1298/// is `Ok(T)` or is `Err((E0, E1, ..))` is all futures are `Err`.
1299///
1300/// At least one input future is required and any number of futures is accepted. For more than
1301/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1302/// some IDEs.
1303///
1304/// If two futures are ready and `Ok(T)` at the same time the result of the first future in the input list is used.
1305/// After one future is ready and `Ok(T)` the other futures are not polled again and are dropped. After a future
1306/// is ready and `Err(E)` it is also not polled again and dropped.
1307///
1308/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1309/// known at compile time, use the [`fn@any_ok`] async function to await on all futures in a dynamic list of futures.
1310///
1311/// # Examples
1312///
1313/// Await for the first of three futures to complete with `Ok`:
1314///
1315/// ```
1316/// use zng_task as task;
1317/// # #[derive(Debug, PartialEq)]
1318/// # pub struct FooError;
1319/// # task::doc_test(false, async {
1320/// let r = task::any_ok!(
1321///     task::run(async { Err::<char, _>("error") }),
1322///     task::wait(|| Ok::<_, FooError>('b')),
1323///     async { Err::<char, _>(FooError) }
1324/// ).await;
1325///
1326/// assert_eq!(Ok('b'), r);
1327/// # });
1328/// ```
1329#[macro_export]
1330macro_rules! any_ok {
1331    ($fut0:expr $(,)?) => { $crate::__any_ok! { fut0: $fut0; } };
1332    ($fut0:expr, $fut1:expr $(,)?) => {
1333        $crate::__any_ok! {
1334            fut0: $fut0;
1335            fut1: $fut1;
1336        }
1337    };
1338    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1339        $crate::__any_ok! {
1340            fut0: $fut0;
1341            fut1: $fut1;
1342            fut2: $fut2;
1343        }
1344    };
1345    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1346        $crate::__any_ok! {
1347            fut0: $fut0;
1348            fut1: $fut1;
1349            fut2: $fut2;
1350            fut3: $fut3;
1351        }
1352    };
1353    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1354        $crate::__any_ok! {
1355            fut0: $fut0;
1356            fut1: $fut1;
1357            fut2: $fut2;
1358            fut3: $fut3;
1359            fut4: $fut4;
1360        }
1361    };
1362    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1363        $crate::__any_ok! {
1364            fut0: $fut0;
1365            fut1: $fut1;
1366            fut2: $fut2;
1367            fut3: $fut3;
1368            fut4: $fut4;
1369            fut5: $fut5;
1370        }
1371    };
1372    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1373        $crate::__any_ok! {
1374            fut0: $fut0;
1375            fut1: $fut1;
1376            fut2: $fut2;
1377            fut3: $fut3;
1378            fut4: $fut4;
1379            fut5: $fut5;
1380            fut6: $fut6;
1381        }
1382    };
1383    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1384        $crate::__any_ok! {
1385            fut0: $fut0;
1386            fut1: $fut1;
1387            fut2: $fut2;
1388            fut3: $fut3;
1389            fut4: $fut4;
1390            fut5: $fut5;
1391            fut6: $fut6;
1392            fut7: $fut7;
1393        }
1394    };
1395    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_ok; $($fut),+ } }
1396}
1397
1398#[doc(hidden)]
1399#[macro_export]
1400macro_rules! __any_ok {
1401    ($($ident:ident: $fut: expr;)+) => {
1402        {
1403            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1404            $crate::future_fn(move |cx| {
1405                use std::task::Poll;
1406
1407                let mut pending = false;
1408
1409                $(
1410                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1411                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1412                        // Future::poll call, so it will not move.
1413                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1414                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1415                            match r {
1416                                Ok(r) => return Poll::Ready(Ok(r)),
1417                                Err(e) => {
1418                                    $ident = $crate::FutureOrOutput::Output(Err(e));
1419                                }
1420                            }
1421                        } else {
1422                            pending = true;
1423                        }
1424                    }
1425                )+
1426
1427                if pending {
1428                    Poll::Pending
1429                } else {
1430                    Poll::Ready(Err((
1431                        $($ident.take_output().unwrap_err()),+
1432                    )))
1433                }
1434            })
1435        }
1436    }
1437}
1438
1439/// A future that awaits on all `futures` at the same time and returns when any future is `Ok(_)` or all are `Err(_)`.
1440///
1441/// This is the dynamic version of [`all_some!`].
1442pub async fn any_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Ok, Vec<Err>> {
1443    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1444    future_fn(move |cx| {
1445        let mut pending = false;
1446        for input in &mut futures {
1447            if let FutureOrOutput::Future(fut) = input {
1448                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1449                // Future::poll call, so it will not move.
1450                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1451                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1452                    match r {
1453                        Ok(r) => return Poll::Ready(Ok(r)),
1454                        Err(e) => *input = FutureOrOutput::Output(Err(e)),
1455                    }
1456                } else {
1457                    pending = true;
1458                }
1459            }
1460        }
1461
1462        if pending {
1463            Poll::Pending
1464        } else {
1465            Poll::Ready(Err(futures
1466                .iter_mut()
1467                .map(|f| match f.take_output() {
1468                    Ok(_) => unreachable!(),
1469                    Err(e) => e,
1470                })
1471                .collect()))
1472        }
1473    })
1474    .await
1475}
1476
1477/// <span data-del-macro-root></span> A future that is ready when any of the futures is ready and `Some(T)`.
1478///
1479/// The macro input is comma separated list of future expressions, the futures must
1480/// all have the same output `Option<T>` type. The macro output is a future that when ".awaited" produces
1481/// a single output type instance returned by the first input future that completes with a `Some`.
1482/// If all futures complete with a `None` the output is `None`.
1483///
1484/// At least one input future is required and any number of futures is accepted. For more than
1485/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1486/// some IDEs.
1487///
1488/// If two futures are ready and `Some(T)` at the same time the result of the first future in the input list is used.
1489/// After one future is ready and `Some(T)` the other futures are not polled again and are dropped. After a future
1490/// is ready and `None` it is also not polled again and dropped.
1491///
1492/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1493/// known at compile time, use the [`fn@any_some`] async function to await on all futures in a dynamic list of futures.
1494///
1495/// # Examples
1496///
1497/// Await for the first of three futures to complete with `Some`:
1498///
1499/// ```
1500/// use zng_task as task;
1501/// # task::doc_test(false, async {
1502/// let r = task::any_some!(
1503///     task::run(async { None::<char> }),
1504///     task::wait(|| Some('b')),
1505///     async { None::<char> }
1506/// ).await;
1507///
1508/// assert_eq!(Some('b'), r);
1509/// # });
1510/// ```
1511#[macro_export]
1512macro_rules! any_some {
1513    ($fut0:expr $(,)?) => { $crate::__any_some! { fut0: $fut0; } };
1514    ($fut0:expr, $fut1:expr $(,)?) => {
1515        $crate::__any_some! {
1516            fut0: $fut0;
1517            fut1: $fut1;
1518        }
1519    };
1520    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1521        $crate::__any_some! {
1522            fut0: $fut0;
1523            fut1: $fut1;
1524            fut2: $fut2;
1525        }
1526    };
1527    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1528        $crate::__any_some! {
1529            fut0: $fut0;
1530            fut1: $fut1;
1531            fut2: $fut2;
1532            fut3: $fut3;
1533        }
1534    };
1535    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1536        $crate::__any_some! {
1537            fut0: $fut0;
1538            fut1: $fut1;
1539            fut2: $fut2;
1540            fut3: $fut3;
1541            fut4: $fut4;
1542        }
1543    };
1544    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1545        $crate::__any_some! {
1546            fut0: $fut0;
1547            fut1: $fut1;
1548            fut2: $fut2;
1549            fut3: $fut3;
1550            fut4: $fut4;
1551            fut5: $fut5;
1552        }
1553    };
1554    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1555        $crate::__any_some! {
1556            fut0: $fut0;
1557            fut1: $fut1;
1558            fut2: $fut2;
1559            fut3: $fut3;
1560            fut4: $fut4;
1561            fut5: $fut5;
1562            fut6: $fut6;
1563        }
1564    };
1565    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1566        $crate::__any_some! {
1567            fut0: $fut0;
1568            fut1: $fut1;
1569            fut2: $fut2;
1570            fut3: $fut3;
1571            fut4: $fut4;
1572            fut5: $fut5;
1573            fut6: $fut6;
1574            fut7: $fut7;
1575        }
1576    };
1577    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_some; $($fut),+ } }
1578}
1579
1580#[doc(hidden)]
1581#[macro_export]
1582macro_rules! __any_some {
1583    ($($ident:ident: $fut: expr;)+) => {
1584        {
1585            $(let mut $ident = Some(std::future::IntoFuture::into_future($fut));)+
1586            $crate::future_fn(move |cx| {
1587                use std::task::Poll;
1588
1589                let mut pending = false;
1590
1591                $(
1592                    if let Some(fut) = $ident.as_mut() {
1593                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1594                        // Future::poll call, so it will not move.
1595                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1596                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1597                            if let Some(r) = r {
1598                                return Poll::Ready(Some(r));
1599                            }
1600                            $ident = None;
1601                        } else {
1602                            pending = true;
1603                        }
1604                    }
1605                )+
1606
1607                if pending {
1608                    Poll::Pending
1609                } else {
1610                    Poll::Ready(None)
1611                }
1612            })
1613        }
1614    }
1615}
1616
1617/// A future that awaits on all `futures` at the same time and returns when any future is `Some(_)` or all are `None`.
1618///
1619/// This is the dynamic version of [`all_some!`].
1620pub async fn any_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Some> {
1621    let mut futures: Vec<_> = futures.into_iter().map(|f| Some(f.into_future())).collect();
1622    future_fn(move |cx| {
1623        let mut pending = false;
1624        for input in &mut futures {
1625            if let Some(fut) = input {
1626                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1627                // Future::poll call, so it will not move.
1628                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1629                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1630                    match r {
1631                        Some(r) => return Poll::Ready(Some(r)),
1632                        None => *input = None,
1633                    }
1634                } else {
1635                    pending = true;
1636                }
1637            }
1638        }
1639
1640        if pending { Poll::Pending } else { Poll::Ready(None) }
1641    })
1642    .await
1643}
1644
1645/// <span data-del-macro-root></span> A future that is ready when all futures are ready with an `Ok(T)` result or
1646/// any future is ready with an `Err(E)` result.
1647///
1648/// The output type is `Result<(T0, T1, ..), E>`, the `Ok` type is a tuple with all the `Ok` values, the error
1649/// type is the first error encountered, the input futures must have the same `Err` type but can have different
1650/// `Ok` types.
1651///
1652/// At least one input future is required and any number of futures is accepted. For more than
1653/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1654/// some IDEs.
1655///
1656/// If two futures are ready and `Err(E)` at the same time the result of the first future in the input list is used.
1657/// After one future is ready and `Err(T)` the other futures are not polled again and are dropped. After a future
1658/// is ready it is also not polled again and dropped.
1659///
1660/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1661/// known at compile time, use the [`fn@all_ok`] async function to await on all futures in a dynamic list of futures.
1662///
1663/// # Examples
1664///
1665/// Await for the first of three futures to complete with `Ok(T)`:
1666///
1667/// ```
1668/// use zng_task as task;
1669/// # #[derive(Debug, PartialEq)]
1670/// # struct FooError;
1671/// # task::doc_test(false, async {
1672/// let r = task::all_ok!(
1673///     task::run(async { Ok::<_, FooError>('a') }),
1674///     task::wait(|| Ok::<_, FooError>('b')),
1675///     async { Ok::<_, FooError>('c') }
1676/// ).await;
1677///
1678/// assert_eq!(Ok(('a', 'b', 'c')), r);
1679/// # });
1680/// ```
1681///
1682/// And in if any completes with `Err(E)`:
1683///
1684/// ```
1685/// use zng_task as task;
1686/// # #[derive(Debug, PartialEq)]
1687/// # struct FooError;
1688/// # task::doc_test(false, async {
1689/// let r = task::all_ok!(
1690///     task::run(async { Ok('a') }),
1691///     task::wait(|| Err::<char, _>(FooError)),
1692///     async { Ok('c') }
1693/// ).await;
1694///
1695/// assert_eq!(Err(FooError), r);
1696/// # });
1697/// ```
1698#[macro_export]
1699macro_rules! all_ok {
1700    ($fut0:expr $(,)?) => { $crate::__all_ok! { fut0: $fut0; } };
1701    ($fut0:expr, $fut1:expr $(,)?) => {
1702        $crate::__all_ok! {
1703            fut0: $fut0;
1704            fut1: $fut1;
1705        }
1706    };
1707    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1708        $crate::__all_ok! {
1709            fut0: $fut0;
1710            fut1: $fut1;
1711            fut2: $fut2;
1712        }
1713    };
1714    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1715        $crate::__all_ok! {
1716            fut0: $fut0;
1717            fut1: $fut1;
1718            fut2: $fut2;
1719            fut3: $fut3;
1720        }
1721    };
1722    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1723        $crate::__all_ok! {
1724            fut0: $fut0;
1725            fut1: $fut1;
1726            fut2: $fut2;
1727            fut3: $fut3;
1728            fut4: $fut4;
1729        }
1730    };
1731    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1732        $crate::__all_ok! {
1733            fut0: $fut0;
1734            fut1: $fut1;
1735            fut2: $fut2;
1736            fut3: $fut3;
1737            fut4: $fut4;
1738            fut5: $fut5;
1739        }
1740    };
1741    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1742        $crate::__all_ok! {
1743            fut0: $fut0;
1744            fut1: $fut1;
1745            fut2: $fut2;
1746            fut3: $fut3;
1747            fut4: $fut4;
1748            fut5: $fut5;
1749            fut6: $fut6;
1750        }
1751    };
1752    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1753        $crate::__all_ok! {
1754            fut0: $fut0;
1755            fut1: $fut1;
1756            fut2: $fut2;
1757            fut3: $fut3;
1758            fut4: $fut4;
1759            fut5: $fut5;
1760            fut6: $fut6;
1761            fut7: $fut7;
1762        }
1763    };
1764    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_ok; $($fut),+ } }
1765}
1766
1767#[doc(hidden)]
1768#[macro_export]
1769macro_rules! __all_ok {
1770    ($($ident:ident: $fut: expr;)+) => {
1771        {
1772            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1773            $crate::future_fn(move |cx| {
1774                use std::task::Poll;
1775
1776                let mut pending = false;
1777
1778                $(
1779                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1780                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1781                        // Future::poll call, so it will not move.
1782                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1783                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1784                            match r {
1785                                Ok(r) => {
1786                                    $ident = $crate::FutureOrOutput::Output(Ok(r))
1787                                },
1788                                Err(e) => return Poll::Ready(Err(e)),
1789                            }
1790                        } else {
1791                            pending = true;
1792                        }
1793                    }
1794                )+
1795
1796                if pending {
1797                    Poll::Pending
1798                } else {
1799                    Poll::Ready(Ok((
1800                        $($ident.take_output().unwrap()),+
1801                    )))
1802                }
1803            })
1804        }
1805    }
1806}
1807
1808/// A future that awaits on all `futures` at the same time and returns when all futures are `Ok(_)` or any future is `Err(_)`.
1809///
1810/// This is the dynamic version of [`all_ok!`].
1811pub async fn all_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Vec<Ok>, Err> {
1812    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1813    future_fn(move |cx| {
1814        let mut pending = false;
1815        for input in &mut futures {
1816            if let FutureOrOutput::Future(fut) = input {
1817                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1818                // Future::poll call, so it will not move.
1819                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1820                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1821                    match r {
1822                        Ok(r) => *input = FutureOrOutput::Output(Ok(r)),
1823                        Err(e) => return Poll::Ready(Err(e)),
1824                    }
1825                } else {
1826                    pending = true;
1827                }
1828            }
1829        }
1830
1831        if pending {
1832            Poll::Pending
1833        } else {
1834            Poll::Ready(Ok(futures
1835                .iter_mut()
1836                .map(|f| f.take_output().unwrap_or_else(|_| unreachable!()))
1837                .collect()))
1838        }
1839    })
1840    .await
1841}
1842
1843/// <span data-del-macro-root></span> A future that is ready when all futures are ready with `Some(T)` or when any
1844/// is future ready with `None`.
1845///
1846/// The macro input is comma separated list of future expressions, the futures must
1847/// all have the `Option<T>` output type, but each can have a different `T`. The macro output is a future that when ".awaited"
1848/// produces `Some<(T0, T1, ..)>` if all futures where `Some(T)` or `None` if any of the futures where `None`.
1849///
1850/// At least one input future is required and any number of futures is accepted. For more than
1851/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1852/// some IDEs.
1853///
1854/// After one future is ready and `None` the other futures are not polled again and are dropped. After a future
1855/// is ready it is also not polled again and dropped.
1856///
1857/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1858/// known at compile time, use the [`fn@all_some`] async function to await on all futures in a dynamic list of futures.
1859///
1860/// # Examples
1861///
1862/// Await for the first of three futures to complete with `Some`:
1863///
1864/// ```
1865/// use zng_task as task;
1866/// # task::doc_test(false, async {
1867/// let r = task::all_some!(
1868///     task::run(async { Some('a') }),
1869///     task::wait(|| Some('b')),
1870///     async { Some('c') }
1871/// ).await;
1872///
1873/// assert_eq!(Some(('a', 'b', 'c')), r);
1874/// # });
1875/// ```
1876///
1877/// Completes with `None` if any future completes with `None`:
1878///
1879/// ```
1880/// # use zng_task as task;
1881/// # task::doc_test(false, async {
1882/// let r = task::all_some!(
1883///     task::run(async { Some('a') }),
1884///     task::wait(|| None::<char>),
1885///     async { Some('b') }
1886/// ).await;
1887///
1888/// assert_eq!(None, r);
1889/// # });
1890/// ```
1891#[macro_export]
1892macro_rules! all_some {
1893    ($fut0:expr $(,)?) => { $crate::__all_some! { fut0: $fut0; } };
1894    ($fut0:expr, $fut1:expr $(,)?) => {
1895        $crate::__all_some! {
1896            fut0: $fut0;
1897            fut1: $fut1;
1898        }
1899    };
1900    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1901        $crate::__all_some! {
1902            fut0: $fut0;
1903            fut1: $fut1;
1904            fut2: $fut2;
1905        }
1906    };
1907    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1908        $crate::__all_some! {
1909            fut0: $fut0;
1910            fut1: $fut1;
1911            fut2: $fut2;
1912            fut3: $fut3;
1913        }
1914    };
1915    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1916        $crate::__all_some! {
1917            fut0: $fut0;
1918            fut1: $fut1;
1919            fut2: $fut2;
1920            fut3: $fut3;
1921            fut4: $fut4;
1922        }
1923    };
1924    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1925        $crate::__all_some! {
1926            fut0: $fut0;
1927            fut1: $fut1;
1928            fut2: $fut2;
1929            fut3: $fut3;
1930            fut4: $fut4;
1931            fut5: $fut5;
1932        }
1933    };
1934    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1935        $crate::__all_some! {
1936            fut0: $fut0;
1937            fut1: $fut1;
1938            fut2: $fut2;
1939            fut3: $fut3;
1940            fut4: $fut4;
1941            fut5: $fut5;
1942            fut6: $fut6;
1943        }
1944    };
1945    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1946        $crate::__all_some! {
1947            fut0: $fut0;
1948            fut1: $fut1;
1949            fut2: $fut2;
1950            fut3: $fut3;
1951            fut4: $fut4;
1952            fut5: $fut5;
1953            fut6: $fut6;
1954            fut7: $fut7;
1955        }
1956    };
1957    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_some; $($fut),+ } }
1958}
1959
1960#[doc(hidden)]
1961#[macro_export]
1962macro_rules! __all_some {
1963    ($($ident:ident: $fut: expr;)+) => {
1964        {
1965            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1966            $crate::future_fn(move |cx| {
1967                use std::task::Poll;
1968
1969                let mut pending = false;
1970
1971                $(
1972                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1973                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1974                        // Future::poll call, so it will not move.
1975                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1976                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1977                            if r.is_none() {
1978                                return Poll::Ready(None);
1979                            }
1980
1981                            $ident = $crate::FutureOrOutput::Output(r);
1982                        } else {
1983                            pending = true;
1984                        }
1985                    }
1986                )+
1987
1988                if pending {
1989                    Poll::Pending
1990                } else {
1991                    Poll::Ready(Some((
1992                        $($ident.take_output().unwrap()),+
1993                    )))
1994                }
1995            })
1996        }
1997    }
1998}
1999
2000/// A future that awaits on all `futures` at the same time and returns when all futures are `Some(_)` or any future is `None`.
2001///
2002/// This is the dynamic version of [`all_some!`].
2003pub async fn all_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Vec<Some>> {
2004    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
2005    future_fn(move |cx| {
2006        let mut pending = false;
2007        for input in &mut futures {
2008            if let FutureOrOutput::Future(fut) = input {
2009                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
2010                // Future::poll call, so it will not move.
2011                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
2012                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
2013                    match r {
2014                        Some(r) => *input = FutureOrOutput::Output(Some(r)),
2015                        None => return Poll::Ready(None),
2016                    }
2017                } else {
2018                    pending = true;
2019                }
2020            }
2021        }
2022
2023        if pending {
2024            Poll::Pending
2025        } else {
2026            Poll::Ready(Some(futures.iter_mut().map(|f| f.take_output().unwrap()).collect()))
2027        }
2028    })
2029    .await
2030}
2031
2032/// A future that will await until [`set`] is called.
2033///
2034/// # Examples
2035///
2036/// Spawns a parallel task that only writes to stdout after the main thread sets the signal:
2037///
2038/// ```
2039/// use zng_task::{self as task, *};
2040/// use zng_clone_move::async_clmv;
2041///
2042/// let signal = SignalOnce::default();
2043///
2044/// task::spawn(async_clmv!(signal, {
2045///     signal.await;
2046///     println!("After Signal!");
2047/// }));
2048///
2049/// signal.set();
2050/// ```
2051///
2052/// [`set`]: SignalOnce::set
2053#[derive(Default, Clone)]
2054pub struct SignalOnce(Arc<SignalInner>);
2055impl fmt::Debug for SignalOnce {
2056    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2057        write!(f, "SignalOnce({})", self.is_set())
2058    }
2059}
2060impl PartialEq for SignalOnce {
2061    fn eq(&self, other: &Self) -> bool {
2062        Arc::ptr_eq(&self.0, &other.0)
2063    }
2064}
2065impl Eq for SignalOnce {}
2066impl Hash for SignalOnce {
2067    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2068        Arc::as_ptr(&self.0).hash(state)
2069    }
2070}
2071impl SignalOnce {
2072    /// New unsigned.
2073    pub fn new() -> Self {
2074        Self::default()
2075    }
2076
2077    /// New signaled.
2078    pub fn new_set() -> Self {
2079        let s = Self::new();
2080        s.set();
2081        s
2082    }
2083
2084    /// If the signal was set.
2085    pub fn is_set(&self) -> bool {
2086        self.0.signaled.load(Ordering::Relaxed)
2087    }
2088
2089    /// Sets the signal and awakes listeners.
2090    pub fn set(&self) {
2091        if !self.0.signaled.swap(true, Ordering::Relaxed) {
2092            let listeners = mem::take(&mut *self.0.listeners.lock());
2093            for listener in listeners {
2094                listener.wake();
2095            }
2096        }
2097    }
2098}
2099impl Future for SignalOnce {
2100    type Output = ();
2101
2102    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
2103        if self.as_ref().is_set() {
2104            Poll::Ready(())
2105        } else {
2106            let mut listeners = self.0.listeners.lock();
2107            let waker = cx.waker();
2108            if !listeners.iter().any(|w| w.will_wake(waker)) {
2109                listeners.push(waker.clone());
2110            }
2111            Poll::Pending
2112        }
2113    }
2114}
2115
2116#[derive(Default)]
2117struct SignalInner {
2118    signaled: AtomicBool,
2119    listeners: Mutex<Vec<std::task::Waker>>,
2120}
2121
2122/// A [`Waker`] that dispatches a wake call to multiple other wakers.
2123///
2124/// This is useful for sharing one wake source with multiple [`Waker`] clients that may not be all
2125/// known at the moment the first request is made.
2126///  
2127/// [`Waker`]: std::task::Waker
2128#[derive(Clone)]
2129pub struct McWaker(Arc<WakeVec>);
2130
2131#[derive(Default)]
2132struct WakeVec(Mutex<Vec<std::task::Waker>>);
2133impl WakeVec {
2134    fn push(&self, waker: std::task::Waker) -> bool {
2135        let mut v = self.0.lock();
2136
2137        let return_waker = v.is_empty();
2138
2139        v.push(waker);
2140
2141        return_waker
2142    }
2143
2144    fn cancel(&self) {
2145        let mut v = self.0.lock();
2146
2147        debug_assert!(!v.is_empty(), "called cancel on an empty McWaker");
2148
2149        v.clear();
2150    }
2151}
2152impl std::task::Wake for WakeVec {
2153    fn wake(self: Arc<Self>) {
2154        for w in mem::take(&mut *self.0.lock()) {
2155            w.wake();
2156        }
2157    }
2158}
2159impl McWaker {
2160    /// New empty waker.
2161    pub fn empty() -> Self {
2162        Self(Arc::new(WakeVec::default()))
2163    }
2164
2165    /// Register a `waker` to wake once when `self` awakes.
2166    ///
2167    /// Returns `Some(self as waker)` if `self` was previously empty, if `None` is returned [`Poll::Pending`] must
2168    /// be returned, if a waker is returned the shared resource must be polled using the waker, if the shared resource
2169    /// is ready [`cancel`] must be called.
2170    ///
2171    /// [`cancel`]: Self::cancel
2172    pub fn push(&self, waker: std::task::Waker) -> Option<std::task::Waker> {
2173        if self.0.push(waker) { Some(self.0.clone().into()) } else { None }
2174    }
2175
2176    /// Clear current registered wakers.
2177    pub fn cancel(&self) {
2178        self.0.cancel()
2179    }
2180}