zng_task/
lib.rs

1#![doc(html_favicon_url = "https://zng-ui.github.io/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://zng-ui.github.io/res/zng-logo.png")]
3//!
4//! Parallel async tasks and async task runners.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9#![warn(unused_extern_crates)]
10#![warn(missing_docs)]
11
12use std::{
13    any::Any,
14    fmt,
15    hash::Hash,
16    mem, panic,
17    pin::Pin,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, Ordering},
21    },
22    task::Poll,
23};
24
25#[doc(no_inline)]
26pub use parking_lot;
27use parking_lot::Mutex;
28
29use zng_app_context::{LocalContext, app_local};
30use zng_time::Deadline;
31use zng_var::{ResponseVar, VarValue, response_done_var, response_var};
32
33#[cfg(test)]
34mod tests;
35
36#[doc(no_inline)]
37pub use rayon;
38
39pub mod channel;
40pub mod fs;
41pub mod io;
42mod ui;
43
44pub mod http;
45
46pub mod process;
47
48mod rayon_ctx;
49
50pub use rayon_ctx::*;
51
52pub use ui::*;
53
54mod progress;
55pub use progress::*;
56
57/// Spawn a parallel async task, this function is not blocking and the `task` starts executing immediately.
58///
59/// # Parallel
60///
61/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
62///
63/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
64/// otherwise it will run in a single thread at a time, still not blocking the UI.
65///
66/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
67///
68/// # Async
69///
70/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
71/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
72/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
73/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
74///
75/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
76///
77/// # Examples
78///
79/// ```
80/// # use zng_task::{self as task, *, rayon::iter::*};
81/// # use zng_var::*;
82/// # struct SomeStruct { sum_response: ResponseVar<usize> }
83/// # impl SomeStruct {
84/// fn on_event(&mut self) {
85///     let (responder, response) = response_var();
86///     self.sum_response = response;
87///
88///     task::spawn(async move {
89///         let r = (0..1000).into_par_iter().map(|i| i * i).sum();
90///
91///         responder.respond(r);
92///     });
93/// }
94///
95/// fn on_update(&mut self) {
96///     if let Some(result) = self.sum_response.rsp_new() {
97///         println!("sum of squares 0..1000: {result}");
98///     }
99/// }
100/// # }
101/// ```
102///
103/// The example uses the `rayon` parallel iterator to compute a result and uses a [`response_var`] to send the result to the UI.
104/// The task captures the caller [`LocalContext`] so the response variable will set correctly.
105///
106/// Note that this function is the most basic way to spawn a parallel task where you must setup channels to the rest of the app yourself,
107/// you can use [`respond`] to avoid having to manually set a response, or [`run`] to `.await` the result.
108///
109/// # Panic Handling
110///
111/// If the `task` panics the panic message is logged as an error, and can observed using [`set_spawn_panic_handler`]. It
112/// is otherwise ignored.
113///
114/// # Unwind Safety
115///
116/// This function disables the [unwind safety validation], meaning that in case of a panic shared
117/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
118/// poisoning mutexes or atomics to mutate shared data or use [`run_catch`] to detect a panic or [`run`]
119/// to propagate a panic.
120///
121/// [unwind safety validation]: std::panic::UnwindSafe
122/// [`Waker`]: std::task::Waker
123/// [`rayon`]: https://docs.rs/rayon
124/// [`LocalContext`]: zng_app_context::LocalContext
125/// [`response_var`]: zng_var::response_var
126pub fn spawn<F>(task: impl IntoFuture<IntoFuture = F>)
127where
128    F: Future<Output = ()> + Send + 'static,
129{
130    Arc::new(RayonTask {
131        ctx: LocalContext::capture(),
132        fut: Mutex::new(Some(Box::pin(task.into_future()))),
133    })
134    .poll()
135}
136
137/// Polls the `task` once immediately on the calling thread, if the `task` is pending, continues execution in [`spawn`].
138pub fn poll_spawn<F>(task: impl IntoFuture<IntoFuture = F>)
139where
140    F: Future<Output = ()> + Send + 'static,
141{
142    struct PollRayonTask {
143        fut: Mutex<Option<(RayonSpawnFut, Option<LocalContext>)>>,
144    }
145    impl PollRayonTask {
146        // start task in calling thread
147        fn poll(self: Arc<Self>) {
148            let mut task = self.fut.lock();
149            let (mut t, _) = task.take().unwrap();
150
151            let waker = self.clone().into();
152
153            match t.as_mut().poll(&mut std::task::Context::from_waker(&waker)) {
154                Poll::Ready(()) => {}
155                Poll::Pending => {
156                    let ctx = LocalContext::capture();
157                    *task = Some((t, Some(ctx)));
158                }
159            }
160        }
161    }
162    impl std::task::Wake for PollRayonTask {
163        fn wake(self: Arc<Self>) {
164            // continue task in spawn threads
165            if let Some((task, Some(ctx))) = self.fut.lock().take() {
166                Arc::new(RayonTask {
167                    ctx,
168                    fut: Mutex::new(Some(Box::pin(task))),
169                })
170                .poll();
171            }
172        }
173    }
174
175    Arc::new(PollRayonTask {
176        fut: Mutex::new(Some((Box::pin(task.into_future()), None))),
177    })
178    .poll()
179}
180
181type RayonSpawnFut = Pin<Box<dyn Future<Output = ()> + Send>>;
182
183// A future that is its own waker that polls inside rayon spawn tasks.
184struct RayonTask {
185    ctx: LocalContext,
186    fut: Mutex<Option<RayonSpawnFut>>,
187}
188impl RayonTask {
189    fn poll(self: Arc<Self>) {
190        rayon::spawn(move || {
191            // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
192            let mut task = self.fut.lock();
193            if let Some(mut t) = task.take() {
194                let waker = self.clone().into();
195
196                // load app context
197                self.ctx.clone().with_context(move || {
198                    let r = panic::catch_unwind(panic::AssertUnwindSafe(move || {
199                        // poll future
200                        if t.as_mut().poll(&mut std::task::Context::from_waker(&waker)).is_pending() {
201                            // not done
202                            *task = Some(t);
203                        }
204                    }));
205                    if let Err(p) = r {
206                        let p = TaskPanicError::new(p);
207                        tracing::error!("panic in `task::spawn`: {}", p.panic_str().unwrap_or(""));
208                        on_spawn_panic(p);
209                    }
210                });
211            }
212        })
213    }
214}
215impl std::task::Wake for RayonTask {
216    fn wake(self: Arc<Self>) {
217        self.poll()
218    }
219}
220
221/// Rayon join with local context.
222///
223/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
224/// operations.
225///
226/// See `rayon::join` for more details about join.
227///
228/// [`LocalContext`]: zng_app_context::LocalContext
229pub fn join<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
230where
231    A: FnOnce() -> RA + Send,
232    B: FnOnce() -> RB + Send,
233    RA: Send,
234    RB: Send,
235{
236    self::join_context(move |_| op_a(), move |_| op_b())
237}
238
239/// Rayon join context with local context.
240///
241/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
242/// operations.
243///
244/// See `rayon::join_context` for more details about join.
245///
246/// [`LocalContext`]: zng_app_context::LocalContext
247pub fn join_context<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
248where
249    A: FnOnce(rayon::FnContext) -> RA + Send,
250    B: FnOnce(rayon::FnContext) -> RB + Send,
251    RA: Send,
252    RB: Send,
253{
254    let ctx = LocalContext::capture();
255    let ctx = &ctx;
256    rayon::join_context(
257        move |a| {
258            if a.migrated() {
259                ctx.clone().with_context(|| op_a(a))
260            } else {
261                op_a(a)
262            }
263        },
264        move |b| {
265            if b.migrated() {
266                ctx.clone().with_context(|| op_b(b))
267            } else {
268                op_b(b)
269            }
270        },
271    )
272}
273
274/// Rayon scope with local context.
275///
276/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
277/// operations.
278///
279/// See `rayon::scope` for more details about scope.
280///
281/// [`LocalContext`]: zng_app_context::LocalContext
282pub fn scope<'scope, OP, R>(op: OP) -> R
283where
284    OP: FnOnce(ScopeCtx<'_, 'scope>) -> R + Send,
285    R: Send,
286{
287    let ctx = LocalContext::capture();
288
289    // Cast `&'_ ctx` to `&'scope ctx` to "inject" the context in the scope.
290    // Is there a better way to do this? I hope so.
291    //
292    // SAFETY:
293    // * We are extending `'_` to `'scope`, that is one of the documented valid usages of `transmute`.
294    // * No use after free because `rayon::scope` joins all threads before returning and we only drop `ctx` after.
295    let ctx_ref: &'_ LocalContext = &ctx;
296    let ctx_scope_ref: &'scope LocalContext = unsafe { std::mem::transmute(ctx_ref) };
297
298    let r = rayon::scope(move |s| {
299        op(ScopeCtx {
300            scope: s,
301            ctx: ctx_scope_ref,
302        })
303    });
304
305    drop(ctx);
306
307    r
308}
309
310/// Represents a fork-join scope which can be used to spawn any number of tasks that run in the caller's thread context.
311///
312/// See [`scope`] for more details.
313#[derive(Clone, Copy, Debug)]
314pub struct ScopeCtx<'a, 'scope: 'a> {
315    scope: &'a rayon::Scope<'scope>,
316    ctx: &'scope LocalContext,
317}
318impl<'a, 'scope: 'a> ScopeCtx<'a, 'scope> {
319    /// Spawns a job into the fork-join scope `self`. The job runs in the captured thread context.
320    ///
321    /// See `rayon::Scope::spawn` for more details.
322    pub fn spawn<F>(self, f: F)
323    where
324        F: FnOnce(ScopeCtx<'_, 'scope>) + Send + 'scope,
325    {
326        let ctx = self.ctx;
327        self.scope
328            .spawn(move |s| ctx.clone().with_context(move || f(ScopeCtx { scope: s, ctx })));
329    }
330}
331
332/// Spawn a parallel async task that can also be `.await` for the task result.
333///
334/// # Parallel
335///
336/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
337///
338/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
339/// otherwise it will run in a single thread at a time, still not blocking the UI.
340///
341/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
342///
343/// # Async
344///
345/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
346/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
347/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
348/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
349///
350/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
351///
352/// # Examples
353///
354/// ```
355/// # use zng_task::{self as task, rayon::iter::*};
356/// # struct SomeStruct { sum: usize }
357/// # async fn read_numbers() -> Vec<usize> { vec![] }
358/// # impl SomeStruct {
359/// async fn on_event(&mut self) {
360///     self.sum = task::run(async { read_numbers().await.par_iter().map(|i| i * i).sum() }).await;
361/// }
362/// # }
363/// ```
364///
365/// The example `.await` for some numbers and then uses a parallel iterator to compute a result, this all runs in parallel
366/// because it is inside a `run` task. The task result is then `.await` inside one of the UI async tasks. Note that the
367/// task captures the caller [`LocalContext`] so you can interact with variables and UI services directly inside the task too.
368///
369/// # Cancellation
370///
371/// The task starts running immediately, awaiting the returned future merely awaits for a message from the worker threads and
372/// that means the `task` future is not owned by the returned future. Usually to *cancel* a future you only need to drop it,
373/// in this task dropping the returned future will only drop the `task` once it reaches a `.await` point and detects that the
374/// result channel is disconnected.
375///
376/// If you want to deterministically known that the `task` was cancelled use a cancellation signal.
377///
378/// # Panic Propagation
379///
380/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
381/// can use [`run_catch`] to get the panic as an error instead.
382///
383/// [`resume_unwind`]: panic::resume_unwind
384/// [`Waker`]: std::task::Waker
385/// [`rayon`]: https://docs.rs/rayon
386/// [`LocalContext`]: zng_app_context::LocalContext
387pub async fn run<R, T>(task: impl IntoFuture<IntoFuture = T>) -> R
388where
389    R: Send + 'static,
390    T: Future<Output = R> + Send + 'static,
391{
392    match run_catch(task).await {
393        Ok(r) => r,
394        Err(p) => panic::resume_unwind(p.payload),
395    }
396}
397
398/// Like [`run`] but catches panics.
399///
400/// This task works the same and has the same utility as [`run`], except if returns panic messages
401/// as an error instead of propagating the panic.
402///
403/// # Unwind Safety
404///
405/// This function disables the [unwind safety validation], meaning that in case of a panic shared
406/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
407/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
408/// if this function returns an error.
409///
410/// [unwind safety validation]: std::panic::UnwindSafe
411pub async fn run_catch<R, T>(task: impl IntoFuture<IntoFuture = T>) -> Result<R, TaskPanicError>
412where
413    R: Send + 'static,
414    T: Future<Output = R> + Send + 'static,
415{
416    type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
417
418    // A future that is its own waker that polls inside the rayon primary thread-pool.
419    struct RayonCatchTask<R> {
420        ctx: LocalContext,
421        fut: Mutex<Option<Fut<R>>>,
422        sender: flume::Sender<Result<R, TaskPanicError>>,
423    }
424    impl<R: Send + 'static> RayonCatchTask<R> {
425        fn poll(self: Arc<Self>) {
426            let sender = self.sender.clone();
427            if sender.is_disconnected() {
428                return; // cancel.
429            }
430            rayon::spawn(move || {
431                // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
432                let mut task = self.fut.lock();
433                if let Some(mut t) = task.take() {
434                    let waker = self.clone().into();
435                    let mut cx = std::task::Context::from_waker(&waker);
436
437                    self.ctx.clone().with_context(|| {
438                        let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
439                        match r {
440                            Ok(Poll::Ready(r)) => {
441                                drop(task);
442                                let _ = sender.send(Ok(r));
443                            }
444                            Ok(Poll::Pending) => {
445                                *task = Some(t);
446                            }
447                            Err(p) => {
448                                drop(task);
449                                let _ = sender.send(Err(TaskPanicError::new(p)));
450                            }
451                        }
452                    });
453                }
454            })
455        }
456    }
457    impl<R: Send + 'static> std::task::Wake for RayonCatchTask<R> {
458        fn wake(self: Arc<Self>) {
459            self.poll()
460        }
461    }
462
463    let (sender, receiver) = channel::bounded(1);
464
465    Arc::new(RayonCatchTask {
466        ctx: LocalContext::capture(),
467        fut: Mutex::new(Some(Box::pin(task.into_future()))),
468        sender: sender.into(),
469    })
470    .poll();
471
472    receiver.recv().await.unwrap()
473}
474
475/// Spawn a parallel async task that will send its result to a [`ResponseVar<R>`].
476///
477/// The [`run`] documentation explains how `task` is *parallel* and *async*. The `task` starts executing immediately.
478///
479/// # Examples
480///
481/// ```
482/// # use zng_task::{self as task, rayon::iter::*};
483/// # use zng_var::*;
484/// # struct SomeStruct { sum_response: ResponseVar<usize> }
485/// # async fn read_numbers() -> Vec<usize> { vec![] }
486/// # impl SomeStruct {
487/// fn on_event(&mut self) {
488///     self.sum_response = task::respond(async { read_numbers().await.par_iter().map(|i| i * i).sum() });
489/// }
490///
491/// fn on_update(&mut self) {
492///     if let Some(result) = self.sum_response.rsp_new() {
493///         println!("sum of squares: {result}");
494///     }
495/// }
496/// # }
497/// ```
498///
499/// The example `.await` for some numbers and then uses a parallel iterator to compute a result. The result is send to
500/// `sum_response` that is a [`ResponseVar<R>`].
501///
502/// # Cancellation
503///
504/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
505///
506/// # Panic Handling
507///
508/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
509///
510/// [`resume_unwind`]: panic::resume_unwind
511/// [`ResponseVar<R>`]: zng_var::ResponseVar
512/// [`response_var`]: zng_var::response_var
513pub fn respond<R, F>(task: F) -> ResponseVar<R>
514where
515    R: VarValue,
516    F: Future<Output = R> + Send + 'static,
517{
518    type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
519
520    let (responder, response) = response_var();
521
522    // A future that is its own waker that polls inside the rayon primary thread-pool.
523    struct RayonRespondTask<R: VarValue> {
524        ctx: LocalContext,
525        fut: Mutex<Option<Fut<R>>>,
526        responder: zng_var::ResponderVar<R>,
527    }
528    impl<R: VarValue> RayonRespondTask<R> {
529        fn poll(self: Arc<Self>) {
530            let responder = self.responder.clone();
531            if responder.strong_count() == 2 {
532                return; // cancel.
533            }
534            rayon::spawn(move || {
535                // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
536                let mut task = self.fut.lock();
537                if let Some(mut t) = task.take() {
538                    let waker = self.clone().into();
539                    let mut cx = std::task::Context::from_waker(&waker);
540
541                    self.ctx.clone().with_context(|| {
542                        let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
543                        match r {
544                            Ok(Poll::Ready(r)) => {
545                                drop(task);
546
547                                responder.respond(r);
548                            }
549                            Ok(Poll::Pending) => {
550                                *task = Some(t);
551                            }
552                            Err(p) => {
553                                let p = TaskPanicError::new(p);
554                                tracing::error!("panic in `task::respond`: {}", p.panic_str().unwrap_or(""));
555                                drop(task);
556                                responder.modify(move |_| panic::resume_unwind(p.payload));
557                            }
558                        }
559                    });
560                }
561            })
562        }
563    }
564    impl<R: VarValue> std::task::Wake for RayonRespondTask<R> {
565        fn wake(self: Arc<Self>) {
566            self.poll()
567        }
568    }
569
570    Arc::new(RayonRespondTask {
571        ctx: LocalContext::capture(),
572        fut: Mutex::new(Some(Box::pin(task))),
573        responder,
574    })
575    .poll();
576
577    response
578}
579
580/// Polls the `task` once immediately on the calling thread, if the `task` is ready returns the response already set,
581/// if the `task` is pending continues execution like [`respond`].
582pub fn poll_respond<R, F>(task: impl IntoFuture<IntoFuture = F>) -> ResponseVar<R>
583where
584    R: VarValue,
585    F: Future<Output = R> + Send + 'static,
586{
587    enum QuickResponse<R: VarValue> {
588        Quick(Option<R>),
589        Response(zng_var::ResponderVar<R>),
590    }
591    let task = task.into_future();
592    let q = Arc::new(Mutex::new(QuickResponse::Quick(None)));
593    poll_spawn(zng_clone_move::async_clmv!(q, {
594        let rsp = task.await;
595
596        match &mut *q.lock() {
597            QuickResponse::Quick(q) => *q = Some(rsp),
598            QuickResponse::Response(r) => r.respond(rsp),
599        }
600    }));
601
602    let mut q = q.lock();
603    match &mut *q {
604        QuickResponse::Quick(q) if q.is_some() => response_done_var(q.take().unwrap()),
605        _ => {
606            let (responder, response) = response_var();
607            *q = QuickResponse::Response(responder);
608            response
609        }
610    }
611}
612
613/// Create a parallel `task` that blocks awaiting for an IO operation, the `task` starts on the first `.await`.
614///
615/// # Parallel
616///
617/// The `task` runs in the [`blocking`] thread-pool which is optimized for awaiting blocking operations.
618/// If the `task` is computation heavy you should use [`run`] and then `wait` inside that task for the
619/// parts that are blocking.
620///
621/// # Examples
622///
623/// ```
624/// # fn main() { }
625/// # use zng_task as task;
626/// # async fn example() {
627/// task::wait(|| std::fs::read_to_string("file.txt")).await
628/// # ; }
629/// ```
630///
631/// The example reads a file, that is a blocking file IO operation, most of the time is spend waiting for the operating system,
632/// so we offload this to a `wait` task. The task can be `.await` inside a [`run`] task or inside one of the UI tasks
633/// like in a async event handler.
634///
635/// # Async Read/Write
636///
637/// For [`std::io::Read`] and [`std::io::Write`] operations you can also use [`io`] and [`fs`] alternatives when you don't
638/// have or want the full file in memory or when you want to apply multiple operations to the file.
639///
640/// # Panic Propagation
641///
642/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
643/// can use [`wait_catch`] to get the panic as an error instead.
644///
645/// [`blocking`]: https://docs.rs/blocking
646/// [`resume_unwind`]: panic::resume_unwind
647pub async fn wait<T, F>(task: F) -> T
648where
649    F: FnOnce() -> T + Send + 'static,
650    T: Send + 'static,
651{
652    match wait_catch(task).await {
653        Ok(r) => r,
654        Err(p) => panic::resume_unwind(p.payload),
655    }
656}
657
658/// Like [`wait`] but catches panics.
659///
660/// This task works the same and has the same utility as [`wait`], except if returns panic messages
661/// as an error instead of propagating the panic.
662///
663/// # Unwind Safety
664///
665/// This function disables the [unwind safety validation], meaning that in case of a panic shared
666/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
667/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
668/// if this function returns an error.
669///
670/// [unwind safety validation]: std::panic::UnwindSafe
671pub async fn wait_catch<T, F>(task: F) -> Result<T, TaskPanicError>
672where
673    F: FnOnce() -> T + Send + 'static,
674    T: Send + 'static,
675{
676    let mut ctx = LocalContext::capture();
677    blocking::unblock(move || ctx.with_context(move || panic::catch_unwind(panic::AssertUnwindSafe(task))))
678        .await
679        .map_err(TaskPanicError::new)
680}
681
682/// Fire and forget a [`wait`] task. The `task` starts executing immediately.
683///
684/// # Panic Handling
685///
686/// If the `task` panics the panic message is logged as an error, and can observed using [`set_spawn_panic_handler`]. It
687/// is otherwise ignored.
688///
689/// # Unwind Safety
690///
691/// This function disables the [unwind safety validation], meaning that in case of a panic shared
692/// data can end-up in an invalid (still memory safe) state. If you are worried about that only use
693/// poisoning mutexes or atomics to mutate shared data or use [`wait_catch`] to detect a panic or [`wait`]
694/// to propagate a panic.
695///
696/// [unwind safety validation]: std::panic::UnwindSafe
697pub fn spawn_wait<F>(task: F)
698where
699    F: FnOnce() + Send + 'static,
700{
701    spawn(async move {
702        if let Err(p) = wait_catch(task).await {
703            tracing::error!("parallel `spawn_wait` task panicked: {}", p.panic_str().unwrap_or(""));
704            on_spawn_panic(p);
705        }
706    });
707}
708
709/// Like [`spawn_wait`], but the task will send its result to a [`ResponseVar<R>`].
710///
711/// # Cancellation
712///
713/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
714///
715/// # Panic Handling
716///
717/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
718pub fn wait_respond<R, F>(task: F) -> ResponseVar<R>
719where
720    R: VarValue,
721    F: FnOnce() -> R + Send + 'static,
722{
723    let (responder, response) = response_var();
724    spawn_wait(move || match panic::catch_unwind(panic::AssertUnwindSafe(task)) {
725        Ok(r) => responder.respond(r),
726        Err(p) => {
727            let p = TaskPanicError::new(p);
728            tracing::error!("panic in `task::wait_respond`: {}", p.panic_str().unwrap_or(""));
729            responder.modify(move |_| panic::resume_unwind(p.payload));
730        }
731    });
732    response
733}
734
735/// Blocks the thread until the `task` future finishes.
736///
737/// This function is useful for implementing async tests, using it in an app will probably cause
738/// the app to stop responding.
739///
740/// The crate [`futures-lite`] is used to execute the task.
741///
742/// # Examples
743///
744/// Test a [`run`] call:
745///
746/// ```
747/// use zng_task as task;
748/// # use zng_unit::*;
749/// # async fn foo(u: u8) -> Result<u8, ()> { task::deadline(1.ms()).await; Ok(u) }
750///
751/// #[test]
752/// # fn __() { }
753/// pub fn run_ok() {
754///     let r = task::block_on(task::run(async { foo(32).await }));
755///
756///     # let value =
757///     r.expect("foo(32) was not Ok");
758///     # assert_eq!(32, value);
759/// }
760/// # run_ok();
761/// ```
762///
763/// [`futures-lite`]: https://docs.rs/futures-lite/
764pub fn block_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
765where
766    F: Future,
767{
768    futures_lite::future::block_on(task.into_future())
769}
770
771/// Continuous poll the `task` until if finishes.
772///
773/// This function is useful for implementing some async tests only, futures don't expect to be polled
774/// continuously. This function is only available in test builds.
775#[cfg(any(test, doc, feature = "test_util"))]
776pub fn spin_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
777where
778    F: Future,
779{
780    use std::pin::pin;
781
782    let mut task = pin!(task.into_future());
783    block_on(future_fn(|cx| match task.as_mut().poll(cx) {
784        Poll::Ready(r) => Poll::Ready(r),
785        Poll::Pending => {
786            cx.waker().wake_by_ref();
787            Poll::Pending
788        }
789    }))
790}
791
792/// Executor used in async doc tests.
793///
794/// If `spin` is `true` the [`spin_on`] executor is used with a timeout of 500 milliseconds.
795/// IF `spin` is `false` the [`block_on`] executor is used with a timeout of 5 seconds.
796#[cfg(any(test, doc, feature = "test_util"))]
797pub fn doc_test<F>(spin: bool, task: impl IntoFuture<IntoFuture = F>) -> F::Output
798where
799    F: Future,
800{
801    use zng_unit::TimeUnits;
802
803    if spin {
804        spin_on(with_deadline(task, 500.ms())).expect("async doc-test timeout")
805    } else {
806        block_on(with_deadline(task, 5.secs())).expect("async doc-test timeout")
807    }
808}
809
810/// A future that is [`Pending`] once and wakes the current task.
811///
812/// After the first `.await` the future is always [`Ready`] and on the first `.await` it calls [`wake`].
813///
814/// [`Pending`]: std::task::Poll::Pending
815/// [`Ready`]: std::task::Poll::Ready
816/// [`wake`]: std::task::Waker::wake
817pub async fn yield_now() {
818    struct YieldNowFut(bool);
819    impl Future for YieldNowFut {
820        type Output = ();
821
822        fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
823            if self.0 {
824                Poll::Ready(())
825            } else {
826                self.0 = true;
827                cx.waker().wake_by_ref();
828                Poll::Pending
829            }
830        }
831    }
832
833    YieldNowFut(false).await
834}
835
836/// A future that is [`Pending`] until the `deadline` is reached.
837///
838/// # Examples
839///
840/// Await 5 seconds in a [`spawn`] parallel task:
841///
842/// ```
843/// use zng_task as task;
844/// use zng_unit::*;
845///
846/// task::spawn(async {
847///     println!("waiting 5 seconds..");
848///     task::deadline(5.secs()).await;
849///     println!("5 seconds elapsed.")
850/// });
851/// ```
852///
853/// The future runs on an app provider timer executor, or on the [`futures_timer`] by default.
854///
855/// Note that deadlines from [`Duration`](std::time::Duration) starts *counting* at the moment this function is called,
856/// not at the moment of the first `.await` call.
857///
858/// [`Pending`]: std::task::Poll::Pending
859/// [`futures_timer`]: https://docs.rs/futures-timer
860pub fn deadline(deadline: impl Into<Deadline>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
861    let deadline = deadline.into();
862    if zng_app_context::LocalContext::current_app().is_some() {
863        DEADLINE_SV.read().0(deadline)
864    } else {
865        default_deadline(deadline)
866    }
867}
868
869app_local! {
870    static DEADLINE_SV: (DeadlineService, bool) = const { (default_deadline, false) };
871}
872
873type DeadlineService = fn(Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
874
875fn default_deadline(deadline: Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
876    if let Some(timeout) = deadline.time_left() {
877        Box::pin(futures_timer::Delay::new(timeout))
878    } else {
879        Box::pin(std::future::ready(()))
880    }
881}
882
883/// Deadline APP integration.
884#[expect(non_camel_case_types)]
885pub struct DEADLINE_APP;
886
887impl DEADLINE_APP {
888    /// Called by the app implementer to setup the [`deadline`] executor.
889    ///
890    /// If no app calls this the [`futures_timer`] executor is used.
891    ///
892    /// [`futures_timer`]: https://docs.rs/futures-timer
893    ///
894    /// # Panics
895    ///
896    /// Panics if called more than once for the same app.
897    pub fn init_deadline_service(&self, service: DeadlineService) {
898        let (prev, already_set) = mem::replace(&mut *DEADLINE_SV.write(), (service, true));
899        if already_set {
900            *DEADLINE_SV.write() = (prev, true);
901            panic!("deadline service already inited for this app");
902        }
903    }
904}
905
906/// Implements a [`Future`] from a closure.
907///
908/// # Examples
909///
910/// A future that is ready with a closure returns `Some(R)`.
911///
912/// ```
913/// use std::task::Poll;
914/// use zng_task as task;
915///
916/// async fn ready_some<R>(mut closure: impl FnMut() -> Option<R>) -> R {
917///     task::future_fn(|cx| match closure() {
918///         Some(r) => Poll::Ready(r),
919///         None => Poll::Pending,
920///     })
921///     .await
922/// }
923/// ```
924pub async fn future_fn<T, F>(fn_: F) -> T
925where
926    F: FnMut(&mut std::task::Context) -> Poll<T>,
927{
928    struct PollFn<F>(F);
929    impl<F> Unpin for PollFn<F> {}
930    impl<T, F: FnMut(&mut std::task::Context<'_>) -> Poll<T>> Future for PollFn<F> {
931        type Output = T;
932
933        fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
934            (self.0)(cx)
935        }
936    }
937    PollFn(fn_).await
938}
939
940/// Error when [`with_deadline`] reach a time limit before a task finishes.
941#[derive(Debug, Clone, Copy)]
942#[non_exhaustive]
943pub struct DeadlineError {}
944impl fmt::Display for DeadlineError {
945    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
946        write!(f, "reached deadline")
947    }
948}
949impl std::error::Error for DeadlineError {}
950
951/// Add a [`deadline`] to a future.
952///
953/// Returns the `fut` output or [`DeadlineError`] if the deadline elapses first.
954pub async fn with_deadline<O, F: Future<Output = O>>(
955    fut: impl IntoFuture<IntoFuture = F>,
956    deadline: impl Into<Deadline>,
957) -> Result<F::Output, DeadlineError> {
958    let deadline = deadline.into();
959    any!(async { Ok(fut.await) }, async {
960        self::deadline(deadline).await;
961        Err(DeadlineError {})
962    })
963    .await
964}
965
966/// <span data-del-macro-root></span> A future that *zips* other futures.
967///
968/// The macro input is a comma separated list of future expressions. The macro output is a future
969/// that when ".awaited" produces a tuple of results in the same order as the inputs.
970///
971/// At least one input future is required and any number of futures is accepted. For more than
972/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
973/// some IDEs.
974///
975/// Each input must implement [`IntoFuture`]. Note that each input must be known at compile time, use the [`fn@all`] async
976/// function to await on all futures in a dynamic list of futures.
977///
978/// # Examples
979///
980/// Await for three different futures to complete:
981///
982/// ```
983/// use zng_task as task;
984///
985/// # task::doc_test(false, async {
986/// let (a, b, c) = task::all!(task::run(async { 'a' }), task::wait(|| "b"), async { b"c" }).await;
987/// # });
988/// ```
989#[macro_export]
990macro_rules! all {
991    ($fut0:expr $(,)?) => { $crate::__all! { fut0: $fut0; } };
992    ($fut0:expr, $fut1:expr $(,)?) => {
993        $crate::__all! {
994            fut0: $fut0;
995            fut1: $fut1;
996        }
997    };
998    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
999        $crate::__all! {
1000            fut0: $fut0;
1001            fut1: $fut1;
1002            fut2: $fut2;
1003        }
1004    };
1005    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1006        $crate::__all! {
1007            fut0: $fut0;
1008            fut1: $fut1;
1009            fut2: $fut2;
1010            fut3: $fut3;
1011        }
1012    };
1013    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1014        $crate::__all! {
1015            fut0: $fut0;
1016            fut1: $fut1;
1017            fut2: $fut2;
1018            fut3: $fut3;
1019            fut4: $fut4;
1020        }
1021    };
1022    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1023        $crate::__all! {
1024            fut0: $fut0;
1025            fut1: $fut1;
1026            fut2: $fut2;
1027            fut3: $fut3;
1028            fut4: $fut4;
1029            fut5: $fut5;
1030        }
1031    };
1032    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1033        $crate::__all! {
1034            fut0: $fut0;
1035            fut1: $fut1;
1036            fut2: $fut2;
1037            fut3: $fut3;
1038            fut4: $fut4;
1039            fut5: $fut5;
1040            fut6: $fut6;
1041        }
1042    };
1043    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1044        $crate::__all! {
1045            fut0: $fut0;
1046            fut1: $fut1;
1047            fut2: $fut2;
1048            fut3: $fut3;
1049            fut4: $fut4;
1050            fut5: $fut5;
1051            fut6: $fut6;
1052            fut7: $fut7;
1053        }
1054    };
1055    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all; $($fut),+ } }
1056}
1057
1058#[doc(hidden)]
1059#[macro_export]
1060macro_rules! __all {
1061    ($($ident:ident: $fut:expr;)+) => {
1062        {
1063            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1064            $crate::future_fn(move |cx| {
1065                use std::task::Poll;
1066
1067                let mut pending = false;
1068
1069                $(
1070                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1071                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1072                        // Future::poll call, so it will not move.
1073                        let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1074                        if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1075                            $ident = $crate::FutureOrOutput::Output(r);
1076                        } else {
1077                            pending = true;
1078                        }
1079                    }
1080                )+
1081
1082                if pending {
1083                    Poll::Pending
1084                } else {
1085                    Poll::Ready(($($ident.take_output()),+))
1086                }
1087            })
1088        }
1089    }
1090}
1091
1092#[doc(hidden)]
1093pub enum FutureOrOutput<F: Future> {
1094    Future(F),
1095    Output(F::Output),
1096    Taken,
1097}
1098impl<F: Future> FutureOrOutput<F> {
1099    pub fn take_output(&mut self) -> F::Output {
1100        match std::mem::replace(self, Self::Taken) {
1101            FutureOrOutput::Output(o) => o,
1102            _ => unreachable!(),
1103        }
1104    }
1105}
1106
1107/// A future that awaits on all `futures` at the same time and returns all results when all futures are ready.
1108///
1109/// This is the dynamic version of [`all!`].
1110pub async fn all<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> Vec<F::Output> {
1111    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1112    future_fn(move |cx| {
1113        let mut pending = false;
1114        for input in &mut futures {
1115            if let FutureOrOutput::Future(fut) = input {
1116                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1117                // Future::poll call, so it will not move.
1118                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1119                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1120                    *input = FutureOrOutput::Output(r);
1121                } else {
1122                    pending = true;
1123                }
1124            }
1125        }
1126
1127        if pending {
1128            Poll::Pending
1129        } else {
1130            Poll::Ready(futures.iter_mut().map(FutureOrOutput::take_output).collect())
1131        }
1132    })
1133    .await
1134}
1135
1136/// <span data-del-macro-root></span> A future that awaits for the first future that is ready.
1137///
1138/// The macro input is comma separated list of future expressions, the futures must
1139/// all have the same output type. The macro output is a future that when ".awaited" produces
1140/// a single output type instance returned by the first input future that completes.
1141///
1142/// At least one input future is required and any number of futures is accepted. For more than
1143/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1144/// some IDEs.
1145///
1146/// If two futures are ready at the same time the result of the first future in the input list is used.
1147/// After one future is ready the other futures are not polled again and are dropped.
1148///
1149/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1150/// known at compile time, use the [`fn@any`] async function to await on all futures in a dynamic list of futures.
1151///
1152/// # Examples
1153///
1154/// Await for the first of three futures to complete:
1155///
1156/// ```
1157/// use zng_task as task;
1158/// use zng_unit::*;
1159///
1160/// # task::doc_test(false, async {
1161/// let r = task::any!(
1162///     task::run(async {
1163///         task::deadline(300.ms()).await;
1164///         'a'
1165///     }),
1166///     task::wait(|| 'b'),
1167///     async {
1168///         task::deadline(300.ms()).await;
1169///         'c'
1170///     }
1171/// )
1172/// .await;
1173///
1174/// assert_eq!('b', r);
1175/// # });
1176/// ```
1177#[macro_export]
1178macro_rules! any {
1179    ($fut0:expr $(,)?) => { $crate::__any! { fut0: $fut0; } };
1180    ($fut0:expr, $fut1:expr $(,)?) => {
1181        $crate::__any! {
1182            fut0: $fut0;
1183            fut1: $fut1;
1184        }
1185    };
1186    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1187        $crate::__any! {
1188            fut0: $fut0;
1189            fut1: $fut1;
1190            fut2: $fut2;
1191        }
1192    };
1193    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1194        $crate::__any! {
1195            fut0: $fut0;
1196            fut1: $fut1;
1197            fut2: $fut2;
1198            fut3: $fut3;
1199        }
1200    };
1201    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1202        $crate::__any! {
1203            fut0: $fut0;
1204            fut1: $fut1;
1205            fut2: $fut2;
1206            fut3: $fut3;
1207            fut4: $fut4;
1208        }
1209    };
1210    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1211        $crate::__any! {
1212            fut0: $fut0;
1213            fut1: $fut1;
1214            fut2: $fut2;
1215            fut3: $fut3;
1216            fut4: $fut4;
1217            fut5: $fut5;
1218        }
1219    };
1220    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1221        $crate::__any! {
1222            fut0: $fut0;
1223            fut1: $fut1;
1224            fut2: $fut2;
1225            fut3: $fut3;
1226            fut4: $fut4;
1227            fut5: $fut5;
1228            fut6: $fut6;
1229        }
1230    };
1231    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1232        $crate::__any! {
1233            fut0: $fut0;
1234            fut1: $fut1;
1235            fut2: $fut2;
1236            fut3: $fut3;
1237            fut4: $fut4;
1238            fut5: $fut5;
1239            fut6: $fut6;
1240            fut7: $fut7;
1241        }
1242    };
1243    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any; $($fut),+ } }
1244}
1245#[doc(hidden)]
1246#[macro_export]
1247macro_rules! __any {
1248    ($($ident:ident: $fut:expr;)+) => {
1249        {
1250            $(let mut $ident = std::future::IntoFuture::into_future($fut);)+
1251            $crate::future_fn(move |cx| {
1252                use std::task::Poll;
1253                $(
1254                    // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1255                    // Future::poll call, so it will not move.
1256                    let mut $ident = unsafe { std::pin::Pin::new_unchecked(&mut $ident) };
1257                    if let Poll::Ready(r) = $ident.as_mut().poll(cx) {
1258                        return Poll::Ready(r)
1259                    }
1260                )+
1261
1262                Poll::Pending
1263            })
1264        }
1265    }
1266}
1267#[doc(hidden)]
1268pub use zng_task_proc_macros::task_any_all as __proc_any_all;
1269
1270/// A future that awaits on all `futures` at the same time and returns the first result when the first future is ready.
1271///
1272/// This is the dynamic version of [`any!`].
1273pub async fn any<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> F::Output {
1274    let mut futures: Vec<_> = futures.into_iter().map(IntoFuture::into_future).collect();
1275    future_fn(move |cx| {
1276        for fut in &mut futures {
1277            // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1278            // Future::poll call, so it will not move.
1279            let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1280            if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1281                return Poll::Ready(r);
1282            }
1283        }
1284        Poll::Pending
1285    })
1286    .await
1287}
1288
1289/// <span data-del-macro-root></span> A future that waits for the first future that is ready with an `Ok(T)` result.
1290///
1291/// The macro input is comma separated list of future expressions, the futures must
1292/// all have the same output `Result<T, E>` type, but each can have a different `E`. The macro output is a future
1293/// that when ".awaited" produces a single output of type `Result<T, (E0, E1, ..)>` that is `Ok(T)` if any of the futures
1294/// is `Ok(T)` or is `Err((E0, E1, ..))` is all futures are `Err`.
1295///
1296/// At least one input future is required and any number of futures is accepted. For more than
1297/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1298/// some IDEs.
1299///
1300/// If two futures are ready and `Ok(T)` at the same time the result of the first future in the input list is used.
1301/// After one future is ready and `Ok(T)` the other futures are not polled again and are dropped. After a future
1302/// is ready and `Err(E)` it is also not polled again and dropped.
1303///
1304/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1305/// known at compile time, use the [`fn@any_ok`] async function to await on all futures in a dynamic list of futures.
1306///
1307/// # Examples
1308///
1309/// Await for the first of three futures to complete with `Ok`:
1310///
1311/// ```
1312/// use zng_task as task;
1313/// # #[derive(Debug, PartialEq)]
1314/// # pub struct FooError;
1315/// # task::doc_test(false, async {
1316/// let r = task::any_ok!(
1317///     task::run(async { Err::<char, _>("error") }),
1318///     task::wait(|| Ok::<_, FooError>('b')),
1319///     async { Err::<char, _>(FooError) }
1320/// )
1321/// .await;
1322///
1323/// assert_eq!(Ok('b'), r);
1324/// # });
1325/// ```
1326#[macro_export]
1327macro_rules! any_ok {
1328    ($fut0:expr $(,)?) => { $crate::__any_ok! { fut0: $fut0; } };
1329    ($fut0:expr, $fut1:expr $(,)?) => {
1330        $crate::__any_ok! {
1331            fut0: $fut0;
1332            fut1: $fut1;
1333        }
1334    };
1335    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1336        $crate::__any_ok! {
1337            fut0: $fut0;
1338            fut1: $fut1;
1339            fut2: $fut2;
1340        }
1341    };
1342    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1343        $crate::__any_ok! {
1344            fut0: $fut0;
1345            fut1: $fut1;
1346            fut2: $fut2;
1347            fut3: $fut3;
1348        }
1349    };
1350    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1351        $crate::__any_ok! {
1352            fut0: $fut0;
1353            fut1: $fut1;
1354            fut2: $fut2;
1355            fut3: $fut3;
1356            fut4: $fut4;
1357        }
1358    };
1359    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1360        $crate::__any_ok! {
1361            fut0: $fut0;
1362            fut1: $fut1;
1363            fut2: $fut2;
1364            fut3: $fut3;
1365            fut4: $fut4;
1366            fut5: $fut5;
1367        }
1368    };
1369    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1370        $crate::__any_ok! {
1371            fut0: $fut0;
1372            fut1: $fut1;
1373            fut2: $fut2;
1374            fut3: $fut3;
1375            fut4: $fut4;
1376            fut5: $fut5;
1377            fut6: $fut6;
1378        }
1379    };
1380    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1381        $crate::__any_ok! {
1382            fut0: $fut0;
1383            fut1: $fut1;
1384            fut2: $fut2;
1385            fut3: $fut3;
1386            fut4: $fut4;
1387            fut5: $fut5;
1388            fut6: $fut6;
1389            fut7: $fut7;
1390        }
1391    };
1392    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_ok; $($fut),+ } }
1393}
1394
1395#[doc(hidden)]
1396#[macro_export]
1397macro_rules! __any_ok {
1398    ($($ident:ident: $fut: expr;)+) => {
1399        {
1400            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1401            $crate::future_fn(move |cx| {
1402                use std::task::Poll;
1403
1404                let mut pending = false;
1405
1406                $(
1407                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1408                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1409                        // Future::poll call, so it will not move.
1410                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1411                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1412                            match r {
1413                                Ok(r) => return Poll::Ready(Ok(r)),
1414                                Err(e) => {
1415                                    $ident = $crate::FutureOrOutput::Output(Err(e));
1416                                }
1417                            }
1418                        } else {
1419                            pending = true;
1420                        }
1421                    }
1422                )+
1423
1424                if pending {
1425                    Poll::Pending
1426                } else {
1427                    Poll::Ready(Err((
1428                        $($ident.take_output().unwrap_err()),+
1429                    )))
1430                }
1431            })
1432        }
1433    }
1434}
1435
1436/// A future that awaits on all `futures` at the same time and returns when any future is `Ok(_)` or all are `Err(_)`.
1437///
1438/// This is the dynamic version of [`all_some!`].
1439pub async fn any_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Ok, Vec<Err>> {
1440    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1441    future_fn(move |cx| {
1442        let mut pending = false;
1443        for input in &mut futures {
1444            if let FutureOrOutput::Future(fut) = input {
1445                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1446                // Future::poll call, so it will not move.
1447                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1448                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1449                    match r {
1450                        Ok(r) => return Poll::Ready(Ok(r)),
1451                        Err(e) => *input = FutureOrOutput::Output(Err(e)),
1452                    }
1453                } else {
1454                    pending = true;
1455                }
1456            }
1457        }
1458
1459        if pending {
1460            Poll::Pending
1461        } else {
1462            Poll::Ready(Err(futures
1463                .iter_mut()
1464                .map(|f| match f.take_output() {
1465                    Ok(_) => unreachable!(),
1466                    Err(e) => e,
1467                })
1468                .collect()))
1469        }
1470    })
1471    .await
1472}
1473
1474/// <span data-del-macro-root></span> A future that is ready when any of the futures is ready and `Some(T)`.
1475///
1476/// The macro input is comma separated list of future expressions, the futures must
1477/// all have the same output `Option<T>` type. The macro output is a future that when ".awaited" produces
1478/// a single output type instance returned by the first input future that completes with a `Some`.
1479/// If all futures complete with a `None` the output is `None`.
1480///
1481/// At least one input future is required and any number of futures is accepted. For more than
1482/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1483/// some IDEs.
1484///
1485/// If two futures are ready and `Some(T)` at the same time the result of the first future in the input list is used.
1486/// After one future is ready and `Some(T)` the other futures are not polled again and are dropped. After a future
1487/// is ready and `None` it is also not polled again and dropped.
1488///
1489/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1490/// known at compile time, use the [`fn@any_some`] async function to await on all futures in a dynamic list of futures.
1491///
1492/// # Examples
1493///
1494/// Await for the first of three futures to complete with `Some`:
1495///
1496/// ```
1497/// use zng_task as task;
1498/// # task::doc_test(false, async {
1499/// let r = task::any_some!(task::run(async { None::<char> }), task::wait(|| Some('b')), async { None::<char> }).await;
1500///
1501/// assert_eq!(Some('b'), r);
1502/// # });
1503/// ```
1504#[macro_export]
1505macro_rules! any_some {
1506    ($fut0:expr $(,)?) => { $crate::__any_some! { fut0: $fut0; } };
1507    ($fut0:expr, $fut1:expr $(,)?) => {
1508        $crate::__any_some! {
1509            fut0: $fut0;
1510            fut1: $fut1;
1511        }
1512    };
1513    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1514        $crate::__any_some! {
1515            fut0: $fut0;
1516            fut1: $fut1;
1517            fut2: $fut2;
1518        }
1519    };
1520    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1521        $crate::__any_some! {
1522            fut0: $fut0;
1523            fut1: $fut1;
1524            fut2: $fut2;
1525            fut3: $fut3;
1526        }
1527    };
1528    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1529        $crate::__any_some! {
1530            fut0: $fut0;
1531            fut1: $fut1;
1532            fut2: $fut2;
1533            fut3: $fut3;
1534            fut4: $fut4;
1535        }
1536    };
1537    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1538        $crate::__any_some! {
1539            fut0: $fut0;
1540            fut1: $fut1;
1541            fut2: $fut2;
1542            fut3: $fut3;
1543            fut4: $fut4;
1544            fut5: $fut5;
1545        }
1546    };
1547    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1548        $crate::__any_some! {
1549            fut0: $fut0;
1550            fut1: $fut1;
1551            fut2: $fut2;
1552            fut3: $fut3;
1553            fut4: $fut4;
1554            fut5: $fut5;
1555            fut6: $fut6;
1556        }
1557    };
1558    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1559        $crate::__any_some! {
1560            fut0: $fut0;
1561            fut1: $fut1;
1562            fut2: $fut2;
1563            fut3: $fut3;
1564            fut4: $fut4;
1565            fut5: $fut5;
1566            fut6: $fut6;
1567            fut7: $fut7;
1568        }
1569    };
1570    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_some; $($fut),+ } }
1571}
1572
1573#[doc(hidden)]
1574#[macro_export]
1575macro_rules! __any_some {
1576    ($($ident:ident: $fut: expr;)+) => {
1577        {
1578            $(let mut $ident = Some(std::future::IntoFuture::into_future($fut));)+
1579            $crate::future_fn(move |cx| {
1580                use std::task::Poll;
1581
1582                let mut pending = false;
1583
1584                $(
1585                    if let Some(fut) = $ident.as_mut() {
1586                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1587                        // Future::poll call, so it will not move.
1588                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1589                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1590                            if let Some(r) = r {
1591                                return Poll::Ready(Some(r));
1592                            }
1593                            $ident = None;
1594                        } else {
1595                            pending = true;
1596                        }
1597                    }
1598                )+
1599
1600                if pending {
1601                    Poll::Pending
1602                } else {
1603                    Poll::Ready(None)
1604                }
1605            })
1606        }
1607    }
1608}
1609
1610/// A future that awaits on all `futures` at the same time and returns when any future is `Some(_)` or all are `None`.
1611///
1612/// This is the dynamic version of [`all_some!`].
1613pub async fn any_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Some> {
1614    let mut futures: Vec<_> = futures.into_iter().map(|f| Some(f.into_future())).collect();
1615    future_fn(move |cx| {
1616        let mut pending = false;
1617        for input in &mut futures {
1618            if let Some(fut) = input {
1619                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1620                // Future::poll call, so it will not move.
1621                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1622                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1623                    match r {
1624                        Some(r) => return Poll::Ready(Some(r)),
1625                        None => *input = None,
1626                    }
1627                } else {
1628                    pending = true;
1629                }
1630            }
1631        }
1632
1633        if pending { Poll::Pending } else { Poll::Ready(None) }
1634    })
1635    .await
1636}
1637
1638/// <span data-del-macro-root></span> A future that is ready when all futures are ready with an `Ok(T)` result or
1639/// any future is ready with an `Err(E)` result.
1640///
1641/// The output type is `Result<(T0, T1, ..), E>`, the `Ok` type is a tuple with all the `Ok` values, the error
1642/// type is the first error encountered, the input futures must have the same `Err` type but can have different
1643/// `Ok` types.
1644///
1645/// At least one input future is required and any number of futures is accepted. For more than
1646/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1647/// some IDEs.
1648///
1649/// If two futures are ready and `Err(E)` at the same time the result of the first future in the input list is used.
1650/// After one future is ready and `Err(T)` the other futures are not polled again and are dropped. After a future
1651/// is ready it is also not polled again and dropped.
1652///
1653/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1654/// known at compile time, use the [`fn@all_ok`] async function to await on all futures in a dynamic list of futures.
1655///
1656/// # Examples
1657///
1658/// Await for the first of three futures to complete with `Ok(T)`:
1659///
1660/// ```
1661/// use zng_task as task;
1662/// # #[derive(Debug, PartialEq)]
1663/// # struct FooError;
1664/// # task::doc_test(false, async {
1665/// let r = task::all_ok!(
1666///     task::run(async { Ok::<_, FooError>('a') }),
1667///     task::wait(|| Ok::<_, FooError>('b')),
1668///     async { Ok::<_, FooError>('c') }
1669/// )
1670/// .await;
1671///
1672/// assert_eq!(Ok(('a', 'b', 'c')), r);
1673/// # });
1674/// ```
1675///
1676/// And in if any completes with `Err(E)`:
1677///
1678/// ```
1679/// use zng_task as task;
1680/// # #[derive(Debug, PartialEq)]
1681/// # struct FooError;
1682/// # task::doc_test(false, async {
1683/// let r = task::all_ok!(task::run(async { Ok('a') }), task::wait(|| Err::<char, _>(FooError)), async {
1684///     Ok('c')
1685/// })
1686/// .await;
1687///
1688/// assert_eq!(Err(FooError), r);
1689/// # });
1690/// ```
1691#[macro_export]
1692macro_rules! all_ok {
1693    ($fut0:expr $(,)?) => { $crate::__all_ok! { fut0: $fut0; } };
1694    ($fut0:expr, $fut1:expr $(,)?) => {
1695        $crate::__all_ok! {
1696            fut0: $fut0;
1697            fut1: $fut1;
1698        }
1699    };
1700    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1701        $crate::__all_ok! {
1702            fut0: $fut0;
1703            fut1: $fut1;
1704            fut2: $fut2;
1705        }
1706    };
1707    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1708        $crate::__all_ok! {
1709            fut0: $fut0;
1710            fut1: $fut1;
1711            fut2: $fut2;
1712            fut3: $fut3;
1713        }
1714    };
1715    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1716        $crate::__all_ok! {
1717            fut0: $fut0;
1718            fut1: $fut1;
1719            fut2: $fut2;
1720            fut3: $fut3;
1721            fut4: $fut4;
1722        }
1723    };
1724    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1725        $crate::__all_ok! {
1726            fut0: $fut0;
1727            fut1: $fut1;
1728            fut2: $fut2;
1729            fut3: $fut3;
1730            fut4: $fut4;
1731            fut5: $fut5;
1732        }
1733    };
1734    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1735        $crate::__all_ok! {
1736            fut0: $fut0;
1737            fut1: $fut1;
1738            fut2: $fut2;
1739            fut3: $fut3;
1740            fut4: $fut4;
1741            fut5: $fut5;
1742            fut6: $fut6;
1743        }
1744    };
1745    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1746        $crate::__all_ok! {
1747            fut0: $fut0;
1748            fut1: $fut1;
1749            fut2: $fut2;
1750            fut3: $fut3;
1751            fut4: $fut4;
1752            fut5: $fut5;
1753            fut6: $fut6;
1754            fut7: $fut7;
1755        }
1756    };
1757    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_ok; $($fut),+ } }
1758}
1759
1760#[doc(hidden)]
1761#[macro_export]
1762macro_rules! __all_ok {
1763    ($($ident:ident: $fut: expr;)+) => {
1764        {
1765            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1766            $crate::future_fn(move |cx| {
1767                use std::task::Poll;
1768
1769                let mut pending = false;
1770
1771                $(
1772                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1773                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1774                        // Future::poll call, so it will not move.
1775                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1776                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1777                            match r {
1778                                Ok(r) => {
1779                                    $ident = $crate::FutureOrOutput::Output(Ok(r))
1780                                },
1781                                Err(e) => return Poll::Ready(Err(e)),
1782                            }
1783                        } else {
1784                            pending = true;
1785                        }
1786                    }
1787                )+
1788
1789                if pending {
1790                    Poll::Pending
1791                } else {
1792                    Poll::Ready(Ok((
1793                        $($ident.take_output().unwrap()),+
1794                    )))
1795                }
1796            })
1797        }
1798    }
1799}
1800
1801/// A future that awaits on all `futures` at the same time and returns when all futures are `Ok(_)` or any future is `Err(_)`.
1802///
1803/// This is the dynamic version of [`all_ok!`].
1804pub async fn all_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Vec<Ok>, Err> {
1805    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1806    future_fn(move |cx| {
1807        let mut pending = false;
1808        for input in &mut futures {
1809            if let FutureOrOutput::Future(fut) = input {
1810                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1811                // Future::poll call, so it will not move.
1812                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1813                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1814                    match r {
1815                        Ok(r) => *input = FutureOrOutput::Output(Ok(r)),
1816                        Err(e) => return Poll::Ready(Err(e)),
1817                    }
1818                } else {
1819                    pending = true;
1820                }
1821            }
1822        }
1823
1824        if pending {
1825            Poll::Pending
1826        } else {
1827            Poll::Ready(Ok(futures
1828                .iter_mut()
1829                .map(|f| f.take_output().unwrap_or_else(|_| unreachable!()))
1830                .collect()))
1831        }
1832    })
1833    .await
1834}
1835
1836/// <span data-del-macro-root></span> A future that is ready when all futures are ready with `Some(T)` or when any
1837/// is future ready with `None`.
1838///
1839/// The macro input is comma separated list of future expressions, the futures must
1840/// all have the `Option<T>` output type, but each can have a different `T`. The macro output is a future that when ".awaited"
1841/// produces `Some<(T0, T1, ..)>` if all futures where `Some(T)` or `None` if any of the futures where `None`.
1842///
1843/// At least one input future is required and any number of futures is accepted. For more than
1844/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1845/// some IDEs.
1846///
1847/// After one future is ready and `None` the other futures are not polled again and are dropped. After a future
1848/// is ready it is also not polled again and dropped.
1849///
1850/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1851/// known at compile time, use the [`fn@all_some`] async function to await on all futures in a dynamic list of futures.
1852///
1853/// # Examples
1854///
1855/// Await for the first of three futures to complete with `Some`:
1856///
1857/// ```
1858/// use zng_task as task;
1859/// # task::doc_test(false, async {
1860/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| Some('b')), async { Some('c') }).await;
1861///
1862/// assert_eq!(Some(('a', 'b', 'c')), r);
1863/// # });
1864/// ```
1865///
1866/// Completes with `None` if any future completes with `None`:
1867///
1868/// ```
1869/// # use zng_task as task;
1870/// # task::doc_test(false, async {
1871/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| None::<char>), async { Some('b') }).await;
1872///
1873/// assert_eq!(None, r);
1874/// # });
1875/// ```
1876#[macro_export]
1877macro_rules! all_some {
1878    ($fut0:expr $(,)?) => { $crate::__all_some! { fut0: $fut0; } };
1879    ($fut0:expr, $fut1:expr $(,)?) => {
1880        $crate::__all_some! {
1881            fut0: $fut0;
1882            fut1: $fut1;
1883        }
1884    };
1885    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1886        $crate::__all_some! {
1887            fut0: $fut0;
1888            fut1: $fut1;
1889            fut2: $fut2;
1890        }
1891    };
1892    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1893        $crate::__all_some! {
1894            fut0: $fut0;
1895            fut1: $fut1;
1896            fut2: $fut2;
1897            fut3: $fut3;
1898        }
1899    };
1900    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1901        $crate::__all_some! {
1902            fut0: $fut0;
1903            fut1: $fut1;
1904            fut2: $fut2;
1905            fut3: $fut3;
1906            fut4: $fut4;
1907        }
1908    };
1909    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1910        $crate::__all_some! {
1911            fut0: $fut0;
1912            fut1: $fut1;
1913            fut2: $fut2;
1914            fut3: $fut3;
1915            fut4: $fut4;
1916            fut5: $fut5;
1917        }
1918    };
1919    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1920        $crate::__all_some! {
1921            fut0: $fut0;
1922            fut1: $fut1;
1923            fut2: $fut2;
1924            fut3: $fut3;
1925            fut4: $fut4;
1926            fut5: $fut5;
1927            fut6: $fut6;
1928        }
1929    };
1930    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1931        $crate::__all_some! {
1932            fut0: $fut0;
1933            fut1: $fut1;
1934            fut2: $fut2;
1935            fut3: $fut3;
1936            fut4: $fut4;
1937            fut5: $fut5;
1938            fut6: $fut6;
1939            fut7: $fut7;
1940        }
1941    };
1942    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_some; $($fut),+ } }
1943}
1944
1945#[doc(hidden)]
1946#[macro_export]
1947macro_rules! __all_some {
1948    ($($ident:ident: $fut: expr;)+) => {
1949        {
1950            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1951            $crate::future_fn(move |cx| {
1952                use std::task::Poll;
1953
1954                let mut pending = false;
1955
1956                $(
1957                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1958                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1959                        // Future::poll call, so it will not move.
1960                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1961                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1962                            if r.is_none() {
1963                                return Poll::Ready(None);
1964                            }
1965
1966                            $ident = $crate::FutureOrOutput::Output(r);
1967                        } else {
1968                            pending = true;
1969                        }
1970                    }
1971                )+
1972
1973                if pending {
1974                    Poll::Pending
1975                } else {
1976                    Poll::Ready(Some((
1977                        $($ident.take_output().unwrap()),+
1978                    )))
1979                }
1980            })
1981        }
1982    }
1983}
1984
1985/// A future that awaits on all `futures` at the same time and returns when all futures are `Some(_)` or any future is `None`.
1986///
1987/// This is the dynamic version of [`all_some!`].
1988pub async fn all_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Vec<Some>> {
1989    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1990    future_fn(move |cx| {
1991        let mut pending = false;
1992        for input in &mut futures {
1993            if let FutureOrOutput::Future(fut) = input {
1994                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1995                // Future::poll call, so it will not move.
1996                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1997                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1998                    match r {
1999                        Some(r) => *input = FutureOrOutput::Output(Some(r)),
2000                        None => return Poll::Ready(None),
2001                    }
2002                } else {
2003                    pending = true;
2004                }
2005            }
2006        }
2007
2008        if pending {
2009            Poll::Pending
2010        } else {
2011            Poll::Ready(Some(futures.iter_mut().map(|f| f.take_output().unwrap()).collect()))
2012        }
2013    })
2014    .await
2015}
2016
2017/// A future that will await until [`set`] is called.
2018///
2019/// # Examples
2020///
2021/// Spawns a parallel task that only writes to stdout after the main thread sets the signal:
2022///
2023/// ```
2024/// use zng_clone_move::async_clmv;
2025/// use zng_task::{self as task, *};
2026///
2027/// let signal = SignalOnce::default();
2028///
2029/// task::spawn(async_clmv!(signal, {
2030///     signal.await;
2031///     println!("After Signal!");
2032/// }));
2033///
2034/// signal.set();
2035/// ```
2036///
2037/// [`set`]: SignalOnce::set
2038#[derive(Default, Clone)]
2039pub struct SignalOnce(Arc<SignalInner>);
2040impl fmt::Debug for SignalOnce {
2041    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2042        write!(f, "SignalOnce({})", self.is_set())
2043    }
2044}
2045impl PartialEq for SignalOnce {
2046    fn eq(&self, other: &Self) -> bool {
2047        Arc::ptr_eq(&self.0, &other.0)
2048    }
2049}
2050impl Eq for SignalOnce {}
2051impl Hash for SignalOnce {
2052    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2053        Arc::as_ptr(&self.0).hash(state)
2054    }
2055}
2056impl SignalOnce {
2057    /// New unsigned.
2058    pub fn new() -> Self {
2059        Self::default()
2060    }
2061
2062    /// New signaled.
2063    pub fn new_set() -> Self {
2064        let s = Self::new();
2065        s.set();
2066        s
2067    }
2068
2069    /// If the signal was set.
2070    pub fn is_set(&self) -> bool {
2071        self.0.signaled.load(Ordering::Relaxed)
2072    }
2073
2074    /// Sets the signal and awakes listeners.
2075    pub fn set(&self) {
2076        if !self.0.signaled.swap(true, Ordering::Relaxed) {
2077            let listeners = mem::take(&mut *self.0.listeners.lock());
2078            for listener in listeners {
2079                listener.wake();
2080            }
2081        }
2082    }
2083}
2084impl Future for SignalOnce {
2085    type Output = ();
2086
2087    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
2088        if self.0.signaled.load(Ordering::Relaxed) {
2089            return Poll::Ready(());
2090        }
2091
2092        let mut listeners = self.0.listeners.lock();
2093        if self.0.signaled.load(Ordering::Relaxed) {
2094            return Poll::Ready(());
2095        }
2096
2097        let waker = cx.waker();
2098        if !listeners.iter().any(|w| w.will_wake(waker)) {
2099            listeners.push(waker.clone());
2100        }
2101
2102        Poll::Pending
2103    }
2104}
2105
2106#[derive(Default)]
2107struct SignalInner {
2108    signaled: AtomicBool,
2109    listeners: Mutex<Vec<std::task::Waker>>,
2110}
2111
2112/// A [`Waker`] that dispatches a wake call to multiple other wakers.
2113///
2114/// This is useful for sharing one wake source with multiple [`Waker`] clients that may not be all
2115/// known at the moment the first request is made.
2116///  
2117/// [`Waker`]: std::task::Waker
2118#[derive(Clone)]
2119pub struct McWaker(Arc<WakeVec>);
2120
2121#[derive(Default)]
2122struct WakeVec(Mutex<Vec<std::task::Waker>>);
2123impl WakeVec {
2124    fn push(&self, waker: std::task::Waker) -> bool {
2125        let mut v = self.0.lock();
2126
2127        let return_waker = v.is_empty();
2128
2129        v.push(waker);
2130
2131        return_waker
2132    }
2133
2134    fn cancel(&self) {
2135        let mut v = self.0.lock();
2136
2137        debug_assert!(!v.is_empty(), "called cancel on an empty McWaker");
2138
2139        v.clear();
2140    }
2141}
2142impl std::task::Wake for WakeVec {
2143    fn wake(self: Arc<Self>) {
2144        for w in mem::take(&mut *self.0.lock()) {
2145            w.wake();
2146        }
2147    }
2148}
2149impl McWaker {
2150    /// New empty waker.
2151    pub fn empty() -> Self {
2152        Self(Arc::new(WakeVec::default()))
2153    }
2154
2155    /// Register a `waker` to wake once when `self` awakes.
2156    ///
2157    /// Returns `Some(self as waker)` if `self` was previously empty, if `None` is returned [`Poll::Pending`] must
2158    /// be returned, if a waker is returned the shared resource must be polled using the waker, if the shared resource
2159    /// is ready [`cancel`] must be called.
2160    ///
2161    /// [`cancel`]: Self::cancel
2162    pub fn push(&self, waker: std::task::Waker) -> Option<std::task::Waker> {
2163        if self.0.push(waker) { Some(self.0.clone().into()) } else { None }
2164    }
2165
2166    /// Clear current registered wakers.
2167    pub fn cancel(&self) {
2168        self.0.cancel()
2169    }
2170}
2171
2172/// Panic payload, captured by [`std::panic::catch_unwind`].
2173#[non_exhaustive]
2174pub struct TaskPanicError {
2175    /// Panic payload.
2176    pub payload: Box<dyn Any + Send + 'static>,
2177}
2178impl TaskPanicError {
2179    /// New from panic payload.
2180    pub fn new(payload: Box<dyn Any + Send + 'static>) -> Self {
2181        Self { payload }
2182    }
2183
2184    /// Get the panic string if the `payload` is string like.
2185    pub fn panic_str(&self) -> Option<&str> {
2186        if let Some(s) = self.payload.downcast_ref::<&str>() {
2187            Some(s)
2188        } else if let Some(s) = self.payload.downcast_ref::<String>() {
2189            Some(s)
2190        } else {
2191            None
2192        }
2193    }
2194}
2195impl fmt::Debug for TaskPanicError {
2196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2197        f.debug_struct("TaskPanicError").field("panic_str()", &self.panic_str()).finish()
2198    }
2199}
2200impl fmt::Display for TaskPanicError {
2201    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2202        if let Some(s) = self.panic_str() { f.write_str(s) } else { Ok(()) }
2203    }
2204}
2205impl std::error::Error for TaskPanicError {}
2206
2207type SpawnPanicHandler = Box<dyn FnMut(TaskPanicError) + Send + 'static>;
2208
2209app_local! {
2210    // Mutex for Sync only
2211    static SPAWN_PANIC_HANDLERS: Option<Mutex<SpawnPanicHandler>> = None;
2212}
2213
2214/// Set a `handler` that is called when spawn tasks panic.
2215///
2216/// On panic the tasks [`spawn`], [`poll_spawn`] and [`spawn_wait`] log an error, notifies the `handler` and otherwise ignores the panic.
2217///
2218/// The handler is set for the process lifetime, only handler can be set per app. The handler is called inside the same [`LocalContext`]
2219/// and thread the task that panicked was called in.
2220///
2221/// ```
2222/// # macro_rules! example { () => {
2223/// task::set_spawn_panic_handler(|p| {
2224///     UPDATES
2225///         .run_hn_once(hn_once!(|_| {
2226///             std::panic::resume_unwind(p.payload);
2227///         }))
2228///         .perm();
2229/// });
2230/// # }}
2231/// ```
2232///
2233/// The example above shows how to set a handler that propagates the panic to the app main thread.
2234///
2235/// # Panics
2236///
2237/// Panics if another handler is already set in the same app.
2238///
2239/// Panics if no app is running in the caller thread.
2240pub fn set_spawn_panic_handler(handler: impl FnMut(TaskPanicError) + Send + 'static) {
2241    let mut h = SPAWN_PANIC_HANDLERS.try_write().expect("a spawn panic handler is already set");
2242    assert!(h.is_none(), "a spawn panic handler is already set");
2243    *h = Some(Mutex::new(Box::new(handler)));
2244}
2245
2246fn on_spawn_panic(p: TaskPanicError) {
2247    if let Some(f) = &mut *SPAWN_PANIC_HANDLERS.write() {
2248        f.get_mut()(p)
2249    }
2250}