Skip to main content

zng_task/
lib.rs

1#![doc(html_favicon_url = "https://zng-ui.github.io/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://zng-ui.github.io/res/zng-logo.png")]
3//!
4//! Parallel async tasks and async task runners.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9#![warn(unused_extern_crates)]
10#![warn(missing_docs)]
11
12use std::{
13    any::Any,
14    fmt,
15    hash::Hash,
16    mem, panic,
17    pin::Pin,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, Ordering},
21    },
22    task::Poll,
23};
24
25use zng_app_context::{LocalContext, app_local};
26use zng_time::Deadline;
27use zng_var::{ResponseVar, VarValue, response_done_var, response_var};
28
29#[cfg(test)]
30mod tests;
31
32mod reexports;
33pub use reexports::*;
34
35use crate::parking_lot::Mutex;
36
37pub mod channel;
38pub mod fs;
39pub mod io;
40
41mod ui;
42pub use ui::*;
43
44pub mod http;
45
46pub mod process;
47
48mod rayon_ctx;
49
50mod progress;
51pub use progress::*;
52
53/// Spawn a parallel async task, this function is not blocking and the `task` starts executing immediately.
54///
55/// # Parallel
56///
57/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
58///
59/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
60/// otherwise it will run in a single thread at a time, still not blocking the UI.
61///
62/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
63///
64/// # Async
65///
66/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
67/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
68/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
69/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
70///
71/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
72///
73/// # Examples
74///
75/// ```
76/// # use zng_task::{self as task, *, rayon::iter::*};
77/// # use zng_var::*;
78/// # struct SomeStruct { sum_response: ResponseVar<usize> }
79/// # impl SomeStruct {
80/// fn on_event(&mut self) {
81///     let (responder, response) = response_var();
82///     self.sum_response = response;
83///
84///     task::spawn(async move {
85///         let r = (0..1000).into_par_iter().map(|i| i * i).sum();
86///
87///         responder.respond(r);
88///     });
89/// }
90///
91/// fn on_update(&mut self) {
92///     if let Some(result) = self.sum_response.rsp_new() {
93///         println!("sum of squares 0..1000: {result}");
94///     }
95/// }
96/// # }
97/// ```
98///
99/// The example uses the `rayon` parallel iterator to compute a result and uses a [`response_var`] to send the result to the UI.
100/// The task captures the caller [`LocalContext`] so the response variable will set correctly.
101///
102/// Note that this function is the most basic way to spawn a parallel task where you must setup channels to the rest of the app yourself,
103/// you can use [`respond`] to avoid having to manually set a response, or [`run`] to `.await` the result.
104///
105/// # Panic Handling
106///
107/// If the `task` panics the panic message is logged as an error, and can observed using [`set_spawn_panic_handler`]. It
108/// is otherwise ignored.
109///
110/// # Unwind Safety
111///
112/// This function disables the [unwind safety validation], meaning that in case of a panic shared
113/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
114/// poisoning mutexes or atomics to mutate shared data or use [`run_catch`] to detect a panic or [`run`]
115/// to propagate a panic.
116///
117/// [unwind safety validation]: std::panic::UnwindSafe
118/// [`Waker`]: std::task::Waker
119/// [`rayon`]: https://docs.rs/rayon
120/// [`LocalContext`]: zng_app_context::LocalContext
121/// [`response_var`]: zng_var::response_var
122pub fn spawn<F>(task: impl IntoFuture<IntoFuture = F>)
123where
124    F: Future<Output = ()> + Send + 'static,
125{
126    Arc::new(RayonTask {
127        ctx: LocalContext::capture(),
128        fut: Mutex::new(Some(Box::pin(task.into_future()))),
129    })
130    .poll()
131}
132
133/// Polls the `task` once immediately on the calling thread, if the `task` is pending, continues execution in [`spawn`].
134pub fn poll_spawn<F>(task: impl IntoFuture<IntoFuture = F>)
135where
136    F: Future<Output = ()> + Send + 'static,
137{
138    struct PollRayonTask {
139        fut: Mutex<Option<(RayonSpawnFut, Option<LocalContext>)>>,
140    }
141    impl PollRayonTask {
142        // start task in calling thread
143        fn poll(self: Arc<Self>) {
144            let mut task = self.fut.lock();
145            let (mut t, _) = task.take().unwrap();
146
147            let waker = self.clone().into();
148
149            match t.as_mut().poll(&mut std::task::Context::from_waker(&waker)) {
150                Poll::Ready(()) => {}
151                Poll::Pending => {
152                    let ctx = LocalContext::capture();
153                    *task = Some((t, Some(ctx)));
154                }
155            }
156        }
157    }
158    impl std::task::Wake for PollRayonTask {
159        fn wake(self: Arc<Self>) {
160            // continue task in spawn threads
161            if let Some((task, Some(ctx))) = self.fut.lock().take() {
162                Arc::new(RayonTask {
163                    ctx,
164                    fut: Mutex::new(Some(Box::pin(task))),
165                })
166                .poll();
167            }
168        }
169    }
170
171    Arc::new(PollRayonTask {
172        fut: Mutex::new(Some((Box::pin(task.into_future()), None))),
173    })
174    .poll()
175}
176
177type RayonSpawnFut = Pin<Box<dyn Future<Output = ()> + Send>>;
178
179// A future that is its own waker that polls inside rayon spawn tasks.
180struct RayonTask {
181    ctx: LocalContext,
182    fut: Mutex<Option<RayonSpawnFut>>,
183}
184impl RayonTask {
185    fn poll(self: Arc<Self>) {
186        ::rayon::spawn(move || {
187            // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
188            let mut task = self.fut.lock();
189            if let Some(mut t) = task.take() {
190                let waker = self.clone().into();
191
192                // load app context
193                self.ctx.clone().with_context(move || {
194                    let r = panic::catch_unwind(panic::AssertUnwindSafe(move || {
195                        // poll future
196                        if t.as_mut().poll(&mut std::task::Context::from_waker(&waker)).is_pending() {
197                            // not done
198                            *task = Some(t);
199                        }
200                    }));
201                    if let Err(p) = r {
202                        let p = TaskPanicError::new(p);
203                        tracing::error!("panic in `task::spawn`: {}", p.panic_str().unwrap_or(""));
204                        on_spawn_panic(p);
205                    }
206                });
207            }
208        })
209    }
210}
211impl std::task::Wake for RayonTask {
212    fn wake(self: Arc<Self>) {
213        self.poll()
214    }
215}
216
217/// Rayon join with local context.
218///
219/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
220/// operations.
221///
222/// See `rayon::join` for more details about join.
223///
224/// [`LocalContext`]: zng_app_context::LocalContext
225pub fn join<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
226where
227    A: FnOnce() -> RA + Send,
228    B: FnOnce() -> RB + Send,
229    RA: Send,
230    RB: Send,
231{
232    self::join_context(move |_| op_a(), move |_| op_b())
233}
234
235/// Rayon join context with local context.
236///
237/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
238/// operations.
239///
240/// See `rayon::join_context` for more details about join.
241///
242/// [`LocalContext`]: zng_app_context::LocalContext
243pub fn join_context<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
244where
245    A: FnOnce(::rayon::FnContext) -> RA + Send,
246    B: FnOnce(::rayon::FnContext) -> RB + Send,
247    RA: Send,
248    RB: Send,
249{
250    let ctx = LocalContext::capture();
251    let ctx = &ctx;
252    ::rayon::join_context(
253        move |a| {
254            if a.migrated() {
255                ctx.clone().with_context(|| op_a(a))
256            } else {
257                op_a(a)
258            }
259        },
260        move |b| {
261            if b.migrated() {
262                ctx.clone().with_context(|| op_b(b))
263            } else {
264                op_b(b)
265            }
266        },
267    )
268}
269
270/// Rayon scope with local context.
271///
272/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
273/// operations.
274///
275/// See `rayon::scope` for more details about scope.
276///
277/// [`LocalContext`]: zng_app_context::LocalContext
278pub fn scope<'scope, OP, R>(op: OP) -> R
279where
280    OP: FnOnce(ScopeCtx<'_, 'scope>) -> R + Send,
281    R: Send,
282{
283    let ctx = LocalContext::capture();
284
285    // Cast `&'_ ctx` to `&'scope ctx` to "inject" the context in the scope.
286    // Is there a better way to do this? I hope so.
287    //
288    // SAFETY:
289    // * We are extending `'_` to `'scope`, that is one of the documented valid usages of `transmute`.
290    // * No use after free because `rayon::scope` joins all threads before returning and we only drop `ctx` after.
291    let ctx_ref: &'_ LocalContext = &ctx;
292    let ctx_scope_ref: &'scope LocalContext = unsafe { std::mem::transmute(ctx_ref) };
293
294    let r = ::rayon::scope(move |s| {
295        op(ScopeCtx {
296            scope: s,
297            ctx: ctx_scope_ref,
298        })
299    });
300
301    drop(ctx);
302
303    r
304}
305
306/// Represents a fork-join scope which can be used to spawn any number of tasks that run in the caller's thread context.
307///
308/// See [`scope`] for more details.
309#[derive(Clone, Copy, Debug)]
310pub struct ScopeCtx<'a, 'scope: 'a> {
311    scope: &'a ::rayon::Scope<'scope>,
312    ctx: &'scope LocalContext,
313}
314impl<'a, 'scope: 'a> ScopeCtx<'a, 'scope> {
315    /// Spawns a job into the fork-join scope `self`. The job runs in the captured thread context.
316    ///
317    /// See `rayon::Scope::spawn` for more details.
318    pub fn spawn<F>(self, f: F)
319    where
320        F: FnOnce(ScopeCtx<'_, 'scope>) + Send + 'scope,
321    {
322        let ctx = self.ctx;
323        self.scope
324            .spawn(move |s| ctx.clone().with_context(move || f(ScopeCtx { scope: s, ctx })));
325    }
326}
327
328/// Spawn a parallel async task that can also be `.await` for the task result.
329///
330/// # Parallel
331///
332/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
333///
334/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
335/// otherwise it will run in a single thread at a time, still not blocking the UI.
336///
337/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
338///
339/// # Async
340///
341/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
342/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
343/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
344/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
345///
346/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
347///
348/// # Examples
349///
350/// ```
351/// # use zng_task::{self as task, rayon::iter::*};
352/// # struct SomeStruct { sum: usize }
353/// # async fn read_numbers() -> Vec<usize> { vec![] }
354/// # impl SomeStruct {
355/// async fn on_event(&mut self) {
356///     self.sum = task::run(async { read_numbers().await.par_iter().map(|i| i * i).sum() }).await;
357/// }
358/// # }
359/// ```
360///
361/// The example `.await` for some numbers and then uses a parallel iterator to compute a result, this all runs in parallel
362/// because it is inside a `run` task. The task result is then `.await` inside one of the UI async tasks. Note that the
363/// task captures the caller [`LocalContext`] so you can interact with variables and UI services directly inside the task too.
364///
365/// # Cancellation
366///
367/// The task starts running immediately, awaiting the returned future merely awaits for a message from the worker threads and
368/// that means the `task` future is not owned by the returned future. Usually to *cancel* a future you only need to drop it,
369/// in this task dropping the returned future will only drop the `task` once it reaches a `.await` point and detects that the
370/// result channel is disconnected.
371///
372/// If you want to deterministically known that the `task` was cancelled use a cancellation signal.
373///
374/// # Panic Propagation
375///
376/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
377/// can use [`run_catch`] to get the panic as an error instead.
378///
379/// [`resume_unwind`]: panic::resume_unwind
380/// [`Waker`]: std::task::Waker
381/// [`rayon`]: https://docs.rs/rayon
382/// [`LocalContext`]: zng_app_context::LocalContext
383pub async fn run<R, T>(task: impl IntoFuture<IntoFuture = T>) -> R
384where
385    R: Send + 'static,
386    T: Future<Output = R> + Send + 'static,
387{
388    match run_catch(task).await {
389        Ok(r) => r,
390        Err(p) => panic::resume_unwind(p.payload),
391    }
392}
393
394/// Like [`run`] but catches panics.
395///
396/// This task works the same and has the same utility as [`run`], except if returns panic messages
397/// as an error instead of propagating the panic.
398///
399/// # Unwind Safety
400///
401/// This function disables the [unwind safety validation], meaning that in case of a panic shared
402/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
403/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
404/// if this function returns an error.
405///
406/// [unwind safety validation]: std::panic::UnwindSafe
407pub async fn run_catch<R, T>(task: impl IntoFuture<IntoFuture = T>) -> Result<R, TaskPanicError>
408where
409    R: Send + 'static,
410    T: Future<Output = R> + Send + 'static,
411{
412    type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
413
414    // A future that is its own waker that polls inside the rayon primary thread-pool.
415    struct RayonCatchTask<R> {
416        ctx: LocalContext,
417        fut: Mutex<Option<Fut<R>>>,
418        sender: flume::Sender<Result<R, TaskPanicError>>,
419    }
420    impl<R: Send + 'static> RayonCatchTask<R> {
421        fn poll(self: Arc<Self>) {
422            let sender = self.sender.clone();
423            if sender.is_disconnected() {
424                return; // cancel.
425            }
426            ::rayon::spawn(move || {
427                // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
428                let mut task = self.fut.lock();
429                if let Some(mut t) = task.take() {
430                    let waker = self.clone().into();
431                    let mut cx = std::task::Context::from_waker(&waker);
432
433                    self.ctx.clone().with_context(|| {
434                        let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
435                        match r {
436                            Ok(Poll::Ready(r)) => {
437                                drop(task);
438                                let _ = sender.send(Ok(r));
439                            }
440                            Ok(Poll::Pending) => {
441                                *task = Some(t);
442                            }
443                            Err(p) => {
444                                drop(task);
445                                let _ = sender.send(Err(TaskPanicError::new(p)));
446                            }
447                        }
448                    });
449                }
450            })
451        }
452    }
453    impl<R: Send + 'static> std::task::Wake for RayonCatchTask<R> {
454        fn wake(self: Arc<Self>) {
455            self.poll()
456        }
457    }
458
459    let (sender, receiver) = channel::bounded(1);
460
461    Arc::new(RayonCatchTask {
462        ctx: LocalContext::capture(),
463        fut: Mutex::new(Some(Box::pin(task.into_future()))),
464        sender: sender.into(),
465    })
466    .poll();
467
468    receiver.recv().await.unwrap()
469}
470
471/// Spawn a parallel async task that will send its result to a [`ResponseVar<R>`].
472///
473/// The [`run`] documentation explains how `task` is *parallel* and *async*. The `task` starts executing immediately.
474///
475/// # Examples
476///
477/// ```
478/// # use zng_task::{self as task, rayon::iter::*};
479/// # use zng_var::*;
480/// # struct SomeStruct { sum_response: ResponseVar<usize> }
481/// # async fn read_numbers() -> Vec<usize> { vec![] }
482/// # impl SomeStruct {
483/// fn on_event(&mut self) {
484///     self.sum_response = task::respond(async { read_numbers().await.par_iter().map(|i| i * i).sum() });
485/// }
486///
487/// fn on_update(&mut self) {
488///     if let Some(result) = self.sum_response.rsp_new() {
489///         println!("sum of squares: {result}");
490///     }
491/// }
492/// # }
493/// ```
494///
495/// The example `.await` for some numbers and then uses a parallel iterator to compute a result. The result is send to
496/// `sum_response` that is a [`ResponseVar<R>`].
497///
498/// # Cancellation
499///
500/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
501///
502/// # Panic Handling
503///
504/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
505///
506/// [`resume_unwind`]: panic::resume_unwind
507/// [`ResponseVar<R>`]: zng_var::ResponseVar
508/// [`response_var`]: zng_var::response_var
509pub fn respond<R, F>(task: F) -> ResponseVar<R>
510where
511    R: VarValue,
512    F: Future<Output = R> + Send + 'static,
513{
514    type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
515
516    let (responder, response) = response_var();
517
518    // A future that is its own waker that polls inside the rayon primary thread-pool.
519    struct RayonRespondTask<R: VarValue> {
520        ctx: LocalContext,
521        fut: Mutex<Option<Fut<R>>>,
522        responder: zng_var::ResponderVar<R>,
523    }
524    impl<R: VarValue> RayonRespondTask<R> {
525        fn poll(self: Arc<Self>) {
526            let responder = self.responder.clone();
527            if responder.strong_count() == 2 {
528                return; // cancel.
529            }
530            ::rayon::spawn(move || {
531                // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
532                let mut task = self.fut.lock();
533                if let Some(mut t) = task.take() {
534                    let waker = self.clone().into();
535                    let mut cx = std::task::Context::from_waker(&waker);
536
537                    self.ctx.clone().with_context(|| {
538                        let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
539                        match r {
540                            Ok(Poll::Ready(r)) => {
541                                drop(task);
542
543                                responder.respond(r);
544                            }
545                            Ok(Poll::Pending) => {
546                                *task = Some(t);
547                            }
548                            Err(p) => {
549                                let p = TaskPanicError::new(p);
550                                tracing::error!("panic in `task::respond`: {}", p.panic_str().unwrap_or(""));
551                                drop(task);
552                                responder.modify(move |_| panic::resume_unwind(p.payload));
553                            }
554                        }
555                    });
556                }
557            })
558        }
559    }
560    impl<R: VarValue> std::task::Wake for RayonRespondTask<R> {
561        fn wake(self: Arc<Self>) {
562            self.poll()
563        }
564    }
565
566    Arc::new(RayonRespondTask {
567        ctx: LocalContext::capture(),
568        fut: Mutex::new(Some(Box::pin(task))),
569        responder,
570    })
571    .poll();
572
573    response
574}
575
576/// Polls the `task` once immediately on the calling thread, if the `task` is ready returns the response already set,
577/// if the `task` is pending continues execution like [`respond`].
578pub fn poll_respond<R, F>(task: impl IntoFuture<IntoFuture = F>) -> ResponseVar<R>
579where
580    R: VarValue,
581    F: Future<Output = R> + Send + 'static,
582{
583    enum QuickResponse<R: VarValue> {
584        Quick(Option<R>),
585        Response(zng_var::ResponderVar<R>),
586    }
587    let task = task.into_future();
588    let q = Arc::new(Mutex::new(QuickResponse::Quick(None)));
589    poll_spawn(zng_clone_move::async_clmv!(q, {
590        let rsp = task.await;
591
592        match &mut *q.lock() {
593            QuickResponse::Quick(q) => *q = Some(rsp),
594            QuickResponse::Response(r) => r.respond(rsp),
595        }
596    }));
597
598    let mut q = q.lock();
599    match &mut *q {
600        QuickResponse::Quick(q) if q.is_some() => response_done_var(q.take().unwrap()),
601        _ => {
602            let (responder, response) = response_var();
603            *q = QuickResponse::Response(responder);
604            response
605        }
606    }
607}
608
609/// Create a parallel `task` that blocks awaiting for an IO operation, the `task` starts on the first `.await`.
610///
611/// # Parallel
612///
613/// The `task` runs in the [`blocking`] thread-pool which is optimized for awaiting blocking operations.
614/// If the `task` is computation heavy you should use [`run`] and then `wait` inside that task for the
615/// parts that are blocking.
616///
617/// # Examples
618///
619/// ```
620/// # fn main() { }
621/// # use zng_task as task;
622/// # async fn example() {
623/// task::wait(|| std::fs::read_to_string("file.txt")).await
624/// # ; }
625/// ```
626///
627/// The example reads a file, that is a blocking file IO operation, most of the time is spend waiting for the operating system,
628/// so we offload this to a `wait` task. The task can be `.await` inside a [`run`] task or inside one of the UI tasks
629/// like in a async event handler.
630///
631/// # Async Read/Write
632///
633/// For [`std::io::Read`] and [`std::io::Write`] operations you can also use [`io`] and [`fs`] alternatives when you don't
634/// have or want the full file in memory or when you want to apply multiple operations to the file.
635///
636/// # Panic Propagation
637///
638/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
639/// can use [`wait_catch`] to get the panic as an error instead.
640///
641/// [`blocking`]: https://docs.rs/blocking
642/// [`resume_unwind`]: panic::resume_unwind
643pub async fn wait<T, F>(task: F) -> T
644where
645    F: FnOnce() -> T + Send + 'static,
646    T: Send + 'static,
647{
648    match wait_catch(task).await {
649        Ok(r) => r,
650        Err(p) => panic::resume_unwind(p.payload),
651    }
652}
653
654/// Like [`wait`] but catches panics.
655///
656/// This task works the same and has the same utility as [`wait`], except if returns panic messages
657/// as an error instead of propagating the panic.
658///
659/// # Unwind Safety
660///
661/// This function disables the [unwind safety validation], meaning that in case of a panic shared
662/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
663/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
664/// if this function returns an error.
665///
666/// [unwind safety validation]: std::panic::UnwindSafe
667pub async fn wait_catch<T, F>(task: F) -> Result<T, TaskPanicError>
668where
669    F: FnOnce() -> T + Send + 'static,
670    T: Send + 'static,
671{
672    let mut ctx = LocalContext::capture();
673    blocking::unblock(move || ctx.with_context(move || panic::catch_unwind(panic::AssertUnwindSafe(task))))
674        .await
675        .map_err(TaskPanicError::new)
676}
677
678/// Fire and forget a [`wait`] task. The `task` starts executing immediately.
679///
680/// # Panic Handling
681///
682/// If the `task` panics the panic message is logged as an error, and can observed using [`set_spawn_panic_handler`]. It
683/// is otherwise ignored.
684///
685/// # Unwind Safety
686///
687/// This function disables the [unwind safety validation], meaning that in case of a panic shared
688/// data can end-up in an invalid (still memory safe) state. If you are worried about that only use
689/// poisoning mutexes or atomics to mutate shared data or use [`wait_catch`] to detect a panic or [`wait`]
690/// to propagate a panic.
691///
692/// [unwind safety validation]: std::panic::UnwindSafe
693pub fn spawn_wait<F>(task: F)
694where
695    F: FnOnce() + Send + 'static,
696{
697    spawn(async move {
698        if let Err(p) = wait_catch(task).await {
699            tracing::error!("parallel `spawn_wait` task panicked: {}", p.panic_str().unwrap_or(""));
700            on_spawn_panic(p);
701        }
702    });
703}
704
705/// Like [`spawn_wait`], but the task will send its result to a [`ResponseVar<R>`].
706///
707/// # Cancellation
708///
709/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
710///
711/// # Panic Handling
712///
713/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
714pub fn wait_respond<R, F>(task: F) -> ResponseVar<R>
715where
716    R: VarValue,
717    F: FnOnce() -> R + Send + 'static,
718{
719    let (responder, response) = response_var();
720    spawn_wait(move || match panic::catch_unwind(panic::AssertUnwindSafe(task)) {
721        Ok(r) => responder.respond(r),
722        Err(p) => {
723            let p = TaskPanicError::new(p);
724            tracing::error!("panic in `task::wait_respond`: {}", p.panic_str().unwrap_or(""));
725            responder.modify(move |_| panic::resume_unwind(p.payload));
726        }
727    });
728    response
729}
730
731/// Blocks the thread until the `task` future finishes.
732///
733/// The crate [`futures-lite`] is used to execute the task.
734///
735/// # Examples
736///
737/// Test a [`run`] call:
738///
739/// ```
740/// use zng_task as task;
741/// # use zng_unit::*;
742/// # async fn foo(u: u8) -> Result<u8, ()> { task::deadline(1.ms()).await; Ok(u) }
743///
744/// # #[test]
745/// # fn __() { }
746/// pub fn run_ok() {
747///     let r = task::block_on(task::run(async { foo(32).await }));
748///
749///     # let value =
750///     r.expect("foo(32) was not Ok");
751///     # assert_eq!(32, value);
752/// }
753/// # run_ok();
754/// ```
755///
756/// [`futures-lite`]: https://docs.rs/futures-lite/
757pub fn block_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
758where
759    F: Future,
760{
761    futures_lite::future::block_on(task.into_future())
762}
763
764/// Continuous poll the `task` until if finishes.
765///
766/// This function is useful for implementing some async tests only, futures don't expect to be polled
767/// continuously. This function is only available in test builds.
768#[cfg(any(test, doc, feature = "test_util"))]
769pub fn spin_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
770where
771    F: Future,
772{
773    use std::pin::pin;
774
775    let mut task = pin!(task.into_future());
776    block_on(future_fn(|cx| match task.as_mut().poll(cx) {
777        Poll::Ready(r) => Poll::Ready(r),
778        Poll::Pending => {
779            cx.waker().wake_by_ref();
780            Poll::Pending
781        }
782    }))
783}
784
785/// Executor used in async doc tests.
786///
787/// If `spin` is `true` the [`spin_on`] executor is used with a timeout of 500 milliseconds.
788/// IF `spin` is `false` the [`block_on`] executor is used with a timeout of 5 seconds.
789#[cfg(any(test, doc, feature = "test_util"))]
790pub fn doc_test<F>(spin: bool, task: impl IntoFuture<IntoFuture = F>) -> F::Output
791where
792    F: Future,
793{
794    use zng_unit::TimeUnits;
795
796    if spin {
797        spin_on(with_deadline(task, 500.ms())).expect("async doc-test timeout")
798    } else {
799        block_on(with_deadline(task, 5.secs())).expect("async doc-test timeout")
800    }
801}
802
803/// A future that is [`Pending`] once and wakes the current task.
804///
805/// After the first `.await` the future is always [`Ready`] and on the first `.await` it calls [`wake`].
806///
807/// [`Pending`]: std::task::Poll::Pending
808/// [`Ready`]: std::task::Poll::Ready
809/// [`wake`]: std::task::Waker::wake
810pub async fn yield_now() {
811    struct YieldNowFut(bool);
812    impl Future for YieldNowFut {
813        type Output = ();
814
815        fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
816            if self.0 {
817                Poll::Ready(())
818            } else {
819                self.0 = true;
820                cx.waker().wake_by_ref();
821                Poll::Pending
822            }
823        }
824    }
825
826    YieldNowFut(false).await
827}
828
829/// A future that is [`Pending`] until the `deadline` is reached.
830///
831/// # Examples
832///
833/// Await 5 seconds in a [`spawn`] parallel task:
834///
835/// ```
836/// use zng_task as task;
837/// use zng_unit::*;
838///
839/// task::spawn(async {
840///     println!("waiting 5 seconds..");
841///     task::deadline(5.secs()).await;
842///     println!("5 seconds elapsed.")
843/// });
844/// ```
845///
846/// The future runs on an app provider timer executor, or on the [`futures_timer`] by default.
847///
848/// Note that deadlines from [`Duration`](std::time::Duration) starts *counting* at the moment this function is called,
849/// not at the moment of the first `.await` call.
850///
851/// [`Pending`]: std::task::Poll::Pending
852/// [`futures_timer`]: https://docs.rs/futures-timer
853pub fn deadline(deadline: impl Into<Deadline>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
854    let deadline = deadline.into();
855    if zng_app_context::LocalContext::current_app().is_some() {
856        DEADLINE_SV.read().0(deadline)
857    } else {
858        default_deadline(deadline)
859    }
860}
861
862app_local! {
863    static DEADLINE_SV: (DeadlineService, bool) = const { (default_deadline, false) };
864}
865
866type DeadlineService = fn(Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
867
868fn default_deadline(deadline: Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
869    if let Some(timeout) = deadline.time_left() {
870        Box::pin(futures_timer::Delay::new(timeout))
871    } else {
872        Box::pin(std::future::ready(()))
873    }
874}
875
876/// Deadline APP integration.
877#[expect(non_camel_case_types)]
878pub struct DEADLINE_APP;
879
880impl DEADLINE_APP {
881    /// Called by the app implementer to setup the [`deadline`] executor.
882    ///
883    /// If no app calls this the [`futures_timer`] executor is used.
884    ///
885    /// [`futures_timer`]: https://docs.rs/futures-timer
886    ///
887    /// # Panics
888    ///
889    /// Panics if called more than once for the same app.
890    pub fn init_deadline_service(&self, service: DeadlineService) {
891        let (prev, already_set) = mem::replace(&mut *DEADLINE_SV.write(), (service, true));
892        if already_set {
893            *DEADLINE_SV.write() = (prev, true);
894            panic!("deadline service already inited for this app");
895        }
896    }
897}
898
899/// Implements a [`Future`] from a closure.
900///
901/// # Examples
902///
903/// A future that is ready with a closure returns `Some(R)`.
904///
905/// ```
906/// use std::task::Poll;
907/// use zng_task as task;
908///
909/// async fn ready_some<R>(mut closure: impl FnMut() -> Option<R>) -> R {
910///     task::future_fn(|cx| match closure() {
911///         Some(r) => Poll::Ready(r),
912///         None => Poll::Pending,
913///     })
914///     .await
915/// }
916/// ```
917pub async fn future_fn<T, F>(fn_: F) -> T
918where
919    F: FnMut(&mut std::task::Context) -> Poll<T>,
920{
921    struct PollFn<F>(F);
922    impl<F> Unpin for PollFn<F> {}
923    impl<T, F: FnMut(&mut std::task::Context<'_>) -> Poll<T>> Future for PollFn<F> {
924        type Output = T;
925
926        fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
927            (self.0)(cx)
928        }
929    }
930    PollFn(fn_).await
931}
932
933/// Error when [`with_deadline`] reach a time limit before a task finishes.
934#[derive(Debug, Clone, Copy)]
935#[non_exhaustive]
936pub struct DeadlineError {}
937impl fmt::Display for DeadlineError {
938    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
939        write!(f, "reached deadline")
940    }
941}
942impl std::error::Error for DeadlineError {}
943
944/// Add a [`deadline`] to a future.
945///
946/// Returns the `fut` output or [`DeadlineError`] if the deadline elapses first.
947pub async fn with_deadline<O, F: Future<Output = O>>(
948    fut: impl IntoFuture<IntoFuture = F>,
949    deadline: impl Into<Deadline>,
950) -> Result<F::Output, DeadlineError> {
951    let deadline = deadline.into();
952    any!(async { Ok(fut.await) }, async {
953        self::deadline(deadline).await;
954        Err(DeadlineError {})
955    })
956    .await
957}
958
959/// <span data-del-macro-root></span> A future that *zips* other futures.
960///
961/// The macro input is a comma separated list of future expressions. The macro output is a future
962/// that when ".awaited" produces a tuple of results in the same order as the inputs.
963///
964/// At least one input future is required and any number of futures is accepted. For more than
965/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
966/// some IDEs.
967///
968/// Each input must implement [`IntoFuture`]. Note that each input must be known at compile time, use the [`fn@all`] async
969/// function to await on all futures in a dynamic list of futures.
970///
971/// # Examples
972///
973/// Await for three different futures to complete:
974///
975/// ```
976/// use zng_task as task;
977///
978/// # task::doc_test(false, async {
979/// let (a, b, c) = task::all!(task::run(async { 'a' }), task::wait(|| "b"), async { b"c" }).await;
980/// # });
981/// ```
982#[macro_export]
983macro_rules! all {
984    ($fut0:expr $(,)?) => { $crate::__all! { fut0: $fut0; } };
985    ($fut0:expr, $fut1:expr $(,)?) => {
986        $crate::__all! {
987            fut0: $fut0;
988            fut1: $fut1;
989        }
990    };
991    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
992        $crate::__all! {
993            fut0: $fut0;
994            fut1: $fut1;
995            fut2: $fut2;
996        }
997    };
998    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
999        $crate::__all! {
1000            fut0: $fut0;
1001            fut1: $fut1;
1002            fut2: $fut2;
1003            fut3: $fut3;
1004        }
1005    };
1006    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1007        $crate::__all! {
1008            fut0: $fut0;
1009            fut1: $fut1;
1010            fut2: $fut2;
1011            fut3: $fut3;
1012            fut4: $fut4;
1013        }
1014    };
1015    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1016        $crate::__all! {
1017            fut0: $fut0;
1018            fut1: $fut1;
1019            fut2: $fut2;
1020            fut3: $fut3;
1021            fut4: $fut4;
1022            fut5: $fut5;
1023        }
1024    };
1025    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1026        $crate::__all! {
1027            fut0: $fut0;
1028            fut1: $fut1;
1029            fut2: $fut2;
1030            fut3: $fut3;
1031            fut4: $fut4;
1032            fut5: $fut5;
1033            fut6: $fut6;
1034        }
1035    };
1036    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1037        $crate::__all! {
1038            fut0: $fut0;
1039            fut1: $fut1;
1040            fut2: $fut2;
1041            fut3: $fut3;
1042            fut4: $fut4;
1043            fut5: $fut5;
1044            fut6: $fut6;
1045            fut7: $fut7;
1046        }
1047    };
1048    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all; $($fut),+ } }
1049}
1050
1051#[doc(hidden)]
1052#[macro_export]
1053macro_rules! __all {
1054    ($($ident:ident: $fut:expr;)+) => {
1055        {
1056            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1057            $crate::future_fn(move |cx| {
1058                use std::task::Poll;
1059
1060                let mut pending = false;
1061
1062                $(
1063                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1064                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1065                        // Future::poll call, so it will not move.
1066                        let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1067                        if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1068                            $ident = $crate::FutureOrOutput::Output(r);
1069                        } else {
1070                            pending = true;
1071                        }
1072                    }
1073                )+
1074
1075                if pending {
1076                    Poll::Pending
1077                } else {
1078                    Poll::Ready(($($ident.take_output()),+))
1079                }
1080            })
1081        }
1082    }
1083}
1084
1085#[doc(hidden)]
1086pub enum FutureOrOutput<F: Future> {
1087    Future(F),
1088    Output(F::Output),
1089    Taken,
1090}
1091impl<F: Future> FutureOrOutput<F> {
1092    pub fn take_output(&mut self) -> F::Output {
1093        match std::mem::replace(self, Self::Taken) {
1094            FutureOrOutput::Output(o) => o,
1095            _ => unreachable!(),
1096        }
1097    }
1098}
1099
1100/// A future that awaits on all `futures` at the same time and returns all results when all futures are ready.
1101///
1102/// This is the dynamic version of [`all!`].
1103pub async fn all<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> Vec<F::Output> {
1104    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1105    future_fn(move |cx| {
1106        let mut pending = false;
1107        for input in &mut futures {
1108            if let FutureOrOutput::Future(fut) = input {
1109                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1110                // Future::poll call, so it will not move.
1111                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1112                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1113                    *input = FutureOrOutput::Output(r);
1114                } else {
1115                    pending = true;
1116                }
1117            }
1118        }
1119
1120        if pending {
1121            Poll::Pending
1122        } else {
1123            Poll::Ready(futures.iter_mut().map(FutureOrOutput::take_output).collect())
1124        }
1125    })
1126    .await
1127}
1128
1129/// <span data-del-macro-root></span> A future that awaits for the first future that is ready.
1130///
1131/// The macro input is comma separated list of future expressions, the futures must
1132/// all have the same output type. The macro output is a future that when ".awaited" produces
1133/// a single output type instance returned by the first input future that completes.
1134///
1135/// At least one input future is required and any number of futures is accepted. For more than
1136/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1137/// some IDEs.
1138///
1139/// If two futures are ready at the same time the result of the first future in the input list is used.
1140/// After one future is ready the other futures are not polled again and are dropped.
1141///
1142/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1143/// known at compile time, use the [`fn@any`] async function to await on all futures in a dynamic list of futures.
1144///
1145/// # Examples
1146///
1147/// Await for the first of three futures to complete:
1148///
1149/// ```
1150/// use zng_task as task;
1151/// use zng_unit::*;
1152///
1153/// # task::doc_test(false, async {
1154/// let r = task::any!(
1155///     task::run(async {
1156///         task::deadline(300.ms()).await;
1157///         'a'
1158///     }),
1159///     task::wait(|| 'b'),
1160///     async {
1161///         task::deadline(300.ms()).await;
1162///         'c'
1163///     }
1164/// )
1165/// .await;
1166///
1167/// assert_eq!('b', r);
1168/// # });
1169/// ```
1170#[macro_export]
1171macro_rules! any {
1172    ($fut0:expr $(,)?) => { $crate::__any! { fut0: $fut0; } };
1173    ($fut0:expr, $fut1:expr $(,)?) => {
1174        $crate::__any! {
1175            fut0: $fut0;
1176            fut1: $fut1;
1177        }
1178    };
1179    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1180        $crate::__any! {
1181            fut0: $fut0;
1182            fut1: $fut1;
1183            fut2: $fut2;
1184        }
1185    };
1186    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1187        $crate::__any! {
1188            fut0: $fut0;
1189            fut1: $fut1;
1190            fut2: $fut2;
1191            fut3: $fut3;
1192        }
1193    };
1194    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1195        $crate::__any! {
1196            fut0: $fut0;
1197            fut1: $fut1;
1198            fut2: $fut2;
1199            fut3: $fut3;
1200            fut4: $fut4;
1201        }
1202    };
1203    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1204        $crate::__any! {
1205            fut0: $fut0;
1206            fut1: $fut1;
1207            fut2: $fut2;
1208            fut3: $fut3;
1209            fut4: $fut4;
1210            fut5: $fut5;
1211        }
1212    };
1213    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1214        $crate::__any! {
1215            fut0: $fut0;
1216            fut1: $fut1;
1217            fut2: $fut2;
1218            fut3: $fut3;
1219            fut4: $fut4;
1220            fut5: $fut5;
1221            fut6: $fut6;
1222        }
1223    };
1224    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1225        $crate::__any! {
1226            fut0: $fut0;
1227            fut1: $fut1;
1228            fut2: $fut2;
1229            fut3: $fut3;
1230            fut4: $fut4;
1231            fut5: $fut5;
1232            fut6: $fut6;
1233            fut7: $fut7;
1234        }
1235    };
1236    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any; $($fut),+ } }
1237}
1238#[doc(hidden)]
1239#[macro_export]
1240macro_rules! __any {
1241    ($($ident:ident: $fut:expr;)+) => {
1242        {
1243            $(let mut $ident = std::future::IntoFuture::into_future($fut);)+
1244            $crate::future_fn(move |cx| {
1245                use std::task::Poll;
1246                $(
1247                    // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1248                    // Future::poll call, so it will not move.
1249                    let mut $ident = unsafe { std::pin::Pin::new_unchecked(&mut $ident) };
1250                    if let Poll::Ready(r) = $ident.as_mut().poll(cx) {
1251                        return Poll::Ready(r)
1252                    }
1253                )+
1254
1255                Poll::Pending
1256            })
1257        }
1258    }
1259}
1260#[doc(hidden)]
1261pub use zng_task_proc_macros::task_any_all as __proc_any_all;
1262
1263/// A future that awaits on all `futures` at the same time and returns the first result when the first future is ready.
1264///
1265/// This is the dynamic version of [`any!`].
1266pub async fn any<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> F::Output {
1267    let mut futures: Vec<_> = futures.into_iter().map(IntoFuture::into_future).collect();
1268    future_fn(move |cx| {
1269        for fut in &mut futures {
1270            // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1271            // Future::poll call, so it will not move.
1272            let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1273            if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1274                return Poll::Ready(r);
1275            }
1276        }
1277        Poll::Pending
1278    })
1279    .await
1280}
1281
1282/// <span data-del-macro-root></span> A future that waits for the first future that is ready with an `Ok(T)` result.
1283///
1284/// The macro input is comma separated list of future expressions, the futures must
1285/// all have the same output `Result<T, E>` type, but each can have a different `E`. The macro output is a future
1286/// that when ".awaited" produces a single output of type `Result<T, (E0, E1, ..)>` that is `Ok(T)` if any of the futures
1287/// is `Ok(T)` or is `Err((E0, E1, ..))` is all futures are `Err`.
1288///
1289/// At least one input future is required and any number of futures is accepted. For more than
1290/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1291/// some IDEs.
1292///
1293/// If two futures are ready and `Ok(T)` at the same time the result of the first future in the input list is used.
1294/// After one future is ready and `Ok(T)` the other futures are not polled again and are dropped. After a future
1295/// is ready and `Err(E)` it is also not polled again and dropped.
1296///
1297/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1298/// known at compile time, use the [`fn@any_ok`] async function to await on all futures in a dynamic list of futures.
1299///
1300/// # Examples
1301///
1302/// Await for the first of three futures to complete with `Ok`:
1303///
1304/// ```
1305/// use zng_task as task;
1306/// # #[derive(Debug, PartialEq)]
1307/// # pub struct FooError;
1308/// # task::doc_test(false, async {
1309/// let r = task::any_ok!(
1310///     task::run(async { Err::<char, _>("error") }),
1311///     task::wait(|| Ok::<_, FooError>('b')),
1312///     async { Err::<char, _>(FooError) }
1313/// )
1314/// .await;
1315///
1316/// assert_eq!(Ok('b'), r);
1317/// # });
1318/// ```
1319#[macro_export]
1320macro_rules! any_ok {
1321    ($fut0:expr $(,)?) => { $crate::__any_ok! { fut0: $fut0; } };
1322    ($fut0:expr, $fut1:expr $(,)?) => {
1323        $crate::__any_ok! {
1324            fut0: $fut0;
1325            fut1: $fut1;
1326        }
1327    };
1328    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1329        $crate::__any_ok! {
1330            fut0: $fut0;
1331            fut1: $fut1;
1332            fut2: $fut2;
1333        }
1334    };
1335    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1336        $crate::__any_ok! {
1337            fut0: $fut0;
1338            fut1: $fut1;
1339            fut2: $fut2;
1340            fut3: $fut3;
1341        }
1342    };
1343    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1344        $crate::__any_ok! {
1345            fut0: $fut0;
1346            fut1: $fut1;
1347            fut2: $fut2;
1348            fut3: $fut3;
1349            fut4: $fut4;
1350        }
1351    };
1352    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1353        $crate::__any_ok! {
1354            fut0: $fut0;
1355            fut1: $fut1;
1356            fut2: $fut2;
1357            fut3: $fut3;
1358            fut4: $fut4;
1359            fut5: $fut5;
1360        }
1361    };
1362    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1363        $crate::__any_ok! {
1364            fut0: $fut0;
1365            fut1: $fut1;
1366            fut2: $fut2;
1367            fut3: $fut3;
1368            fut4: $fut4;
1369            fut5: $fut5;
1370            fut6: $fut6;
1371        }
1372    };
1373    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1374        $crate::__any_ok! {
1375            fut0: $fut0;
1376            fut1: $fut1;
1377            fut2: $fut2;
1378            fut3: $fut3;
1379            fut4: $fut4;
1380            fut5: $fut5;
1381            fut6: $fut6;
1382            fut7: $fut7;
1383        }
1384    };
1385    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_ok; $($fut),+ } }
1386}
1387
1388#[doc(hidden)]
1389#[macro_export]
1390macro_rules! __any_ok {
1391    ($($ident:ident: $fut: expr;)+) => {
1392        {
1393            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1394            $crate::future_fn(move |cx| {
1395                use std::task::Poll;
1396
1397                let mut pending = false;
1398
1399                $(
1400                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1401                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1402                        // Future::poll call, so it will not move.
1403                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1404                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1405                            match r {
1406                                Ok(r) => return Poll::Ready(Ok(r)),
1407                                Err(e) => {
1408                                    $ident = $crate::FutureOrOutput::Output(Err(e));
1409                                }
1410                            }
1411                        } else {
1412                            pending = true;
1413                        }
1414                    }
1415                )+
1416
1417                if pending {
1418                    Poll::Pending
1419                } else {
1420                    Poll::Ready(Err((
1421                        $($ident.take_output().unwrap_err()),+
1422                    )))
1423                }
1424            })
1425        }
1426    }
1427}
1428
1429/// A future that awaits on all `futures` at the same time and returns when any future is `Ok(_)` or all are `Err(_)`.
1430///
1431/// This is the dynamic version of [`all_some!`].
1432pub async fn any_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Ok, Vec<Err>> {
1433    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1434    future_fn(move |cx| {
1435        let mut pending = false;
1436        for input in &mut futures {
1437            if let FutureOrOutput::Future(fut) = input {
1438                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1439                // Future::poll call, so it will not move.
1440                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1441                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1442                    match r {
1443                        Ok(r) => return Poll::Ready(Ok(r)),
1444                        Err(e) => *input = FutureOrOutput::Output(Err(e)),
1445                    }
1446                } else {
1447                    pending = true;
1448                }
1449            }
1450        }
1451
1452        if pending {
1453            Poll::Pending
1454        } else {
1455            Poll::Ready(Err(futures
1456                .iter_mut()
1457                .map(|f| match f.take_output() {
1458                    Ok(_) => unreachable!(),
1459                    Err(e) => e,
1460                })
1461                .collect()))
1462        }
1463    })
1464    .await
1465}
1466
1467/// <span data-del-macro-root></span> A future that is ready when any of the futures is ready and `Some(T)`.
1468///
1469/// The macro input is comma separated list of future expressions, the futures must
1470/// all have the same output `Option<T>` type. The macro output is a future that when ".awaited" produces
1471/// a single output type instance returned by the first input future that completes with a `Some`.
1472/// If all futures complete with a `None` the output is `None`.
1473///
1474/// At least one input future is required and any number of futures is accepted. For more than
1475/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1476/// some IDEs.
1477///
1478/// If two futures are ready and `Some(T)` at the same time the result of the first future in the input list is used.
1479/// After one future is ready and `Some(T)` the other futures are not polled again and are dropped. After a future
1480/// is ready and `None` it is also not polled again and dropped.
1481///
1482/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1483/// known at compile time, use the [`fn@any_some`] async function to await on all futures in a dynamic list of futures.
1484///
1485/// # Examples
1486///
1487/// Await for the first of three futures to complete with `Some`:
1488///
1489/// ```
1490/// use zng_task as task;
1491/// # task::doc_test(false, async {
1492/// let r = task::any_some!(task::run(async { None::<char> }), task::wait(|| Some('b')), async { None::<char> }).await;
1493///
1494/// assert_eq!(Some('b'), r);
1495/// # });
1496/// ```
1497#[macro_export]
1498macro_rules! any_some {
1499    ($fut0:expr $(,)?) => { $crate::__any_some! { fut0: $fut0; } };
1500    ($fut0:expr, $fut1:expr $(,)?) => {
1501        $crate::__any_some! {
1502            fut0: $fut0;
1503            fut1: $fut1;
1504        }
1505    };
1506    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1507        $crate::__any_some! {
1508            fut0: $fut0;
1509            fut1: $fut1;
1510            fut2: $fut2;
1511        }
1512    };
1513    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1514        $crate::__any_some! {
1515            fut0: $fut0;
1516            fut1: $fut1;
1517            fut2: $fut2;
1518            fut3: $fut3;
1519        }
1520    };
1521    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1522        $crate::__any_some! {
1523            fut0: $fut0;
1524            fut1: $fut1;
1525            fut2: $fut2;
1526            fut3: $fut3;
1527            fut4: $fut4;
1528        }
1529    };
1530    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1531        $crate::__any_some! {
1532            fut0: $fut0;
1533            fut1: $fut1;
1534            fut2: $fut2;
1535            fut3: $fut3;
1536            fut4: $fut4;
1537            fut5: $fut5;
1538        }
1539    };
1540    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1541        $crate::__any_some! {
1542            fut0: $fut0;
1543            fut1: $fut1;
1544            fut2: $fut2;
1545            fut3: $fut3;
1546            fut4: $fut4;
1547            fut5: $fut5;
1548            fut6: $fut6;
1549        }
1550    };
1551    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1552        $crate::__any_some! {
1553            fut0: $fut0;
1554            fut1: $fut1;
1555            fut2: $fut2;
1556            fut3: $fut3;
1557            fut4: $fut4;
1558            fut5: $fut5;
1559            fut6: $fut6;
1560            fut7: $fut7;
1561        }
1562    };
1563    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_some; $($fut),+ } }
1564}
1565
1566#[doc(hidden)]
1567#[macro_export]
1568macro_rules! __any_some {
1569    ($($ident:ident: $fut: expr;)+) => {
1570        {
1571            $(let mut $ident = Some(std::future::IntoFuture::into_future($fut));)+
1572            $crate::future_fn(move |cx| {
1573                use std::task::Poll;
1574
1575                let mut pending = false;
1576
1577                $(
1578                    if let Some(fut) = $ident.as_mut() {
1579                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1580                        // Future::poll call, so it will not move.
1581                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1582                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1583                            if let Some(r) = r {
1584                                return Poll::Ready(Some(r));
1585                            }
1586                            $ident = None;
1587                        } else {
1588                            pending = true;
1589                        }
1590                    }
1591                )+
1592
1593                if pending {
1594                    Poll::Pending
1595                } else {
1596                    Poll::Ready(None)
1597                }
1598            })
1599        }
1600    }
1601}
1602
1603/// A future that awaits on all `futures` at the same time and returns when any future is `Some(_)` or all are `None`.
1604///
1605/// This is the dynamic version of [`all_some!`].
1606pub async fn any_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Some> {
1607    let mut futures: Vec<_> = futures.into_iter().map(|f| Some(f.into_future())).collect();
1608    future_fn(move |cx| {
1609        let mut pending = false;
1610        for input in &mut futures {
1611            if let Some(fut) = input {
1612                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1613                // Future::poll call, so it will not move.
1614                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1615                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1616                    match r {
1617                        Some(r) => return Poll::Ready(Some(r)),
1618                        None => *input = None,
1619                    }
1620                } else {
1621                    pending = true;
1622                }
1623            }
1624        }
1625
1626        if pending { Poll::Pending } else { Poll::Ready(None) }
1627    })
1628    .await
1629}
1630
1631/// <span data-del-macro-root></span> A future that is ready when all futures are ready with an `Ok(T)` result or
1632/// any future is ready with an `Err(E)` result.
1633///
1634/// The output type is `Result<(T0, T1, ..), E>`, the `Ok` type is a tuple with all the `Ok` values, the error
1635/// type is the first error encountered, the input futures must have the same `Err` type but can have different
1636/// `Ok` types.
1637///
1638/// At least one input future is required and any number of futures is accepted. For more than
1639/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1640/// some IDEs.
1641///
1642/// If two futures are ready and `Err(E)` at the same time the result of the first future in the input list is used.
1643/// After one future is ready and `Err(T)` the other futures are not polled again and are dropped. After a future
1644/// is ready it is also not polled again and dropped.
1645///
1646/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1647/// known at compile time, use the [`fn@all_ok`] async function to await on all futures in a dynamic list of futures.
1648///
1649/// # Examples
1650///
1651/// Await for the first of three futures to complete with `Ok(T)`:
1652///
1653/// ```
1654/// use zng_task as task;
1655/// # #[derive(Debug, PartialEq)]
1656/// # struct FooError;
1657/// # task::doc_test(false, async {
1658/// let r = task::all_ok!(
1659///     task::run(async { Ok::<_, FooError>('a') }),
1660///     task::wait(|| Ok::<_, FooError>('b')),
1661///     async { Ok::<_, FooError>('c') }
1662/// )
1663/// .await;
1664///
1665/// assert_eq!(Ok(('a', 'b', 'c')), r);
1666/// # });
1667/// ```
1668///
1669/// And in if any completes with `Err(E)`:
1670///
1671/// ```
1672/// use zng_task as task;
1673/// # #[derive(Debug, PartialEq)]
1674/// # struct FooError;
1675/// # task::doc_test(false, async {
1676/// let r = task::all_ok!(task::run(async { Ok('a') }), task::wait(|| Err::<char, _>(FooError)), async {
1677///     Ok('c')
1678/// })
1679/// .await;
1680///
1681/// assert_eq!(Err(FooError), r);
1682/// # });
1683/// ```
1684#[macro_export]
1685macro_rules! all_ok {
1686    ($fut0:expr $(,)?) => { $crate::__all_ok! { fut0: $fut0; } };
1687    ($fut0:expr, $fut1:expr $(,)?) => {
1688        $crate::__all_ok! {
1689            fut0: $fut0;
1690            fut1: $fut1;
1691        }
1692    };
1693    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1694        $crate::__all_ok! {
1695            fut0: $fut0;
1696            fut1: $fut1;
1697            fut2: $fut2;
1698        }
1699    };
1700    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1701        $crate::__all_ok! {
1702            fut0: $fut0;
1703            fut1: $fut1;
1704            fut2: $fut2;
1705            fut3: $fut3;
1706        }
1707    };
1708    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1709        $crate::__all_ok! {
1710            fut0: $fut0;
1711            fut1: $fut1;
1712            fut2: $fut2;
1713            fut3: $fut3;
1714            fut4: $fut4;
1715        }
1716    };
1717    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1718        $crate::__all_ok! {
1719            fut0: $fut0;
1720            fut1: $fut1;
1721            fut2: $fut2;
1722            fut3: $fut3;
1723            fut4: $fut4;
1724            fut5: $fut5;
1725        }
1726    };
1727    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1728        $crate::__all_ok! {
1729            fut0: $fut0;
1730            fut1: $fut1;
1731            fut2: $fut2;
1732            fut3: $fut3;
1733            fut4: $fut4;
1734            fut5: $fut5;
1735            fut6: $fut6;
1736        }
1737    };
1738    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1739        $crate::__all_ok! {
1740            fut0: $fut0;
1741            fut1: $fut1;
1742            fut2: $fut2;
1743            fut3: $fut3;
1744            fut4: $fut4;
1745            fut5: $fut5;
1746            fut6: $fut6;
1747            fut7: $fut7;
1748        }
1749    };
1750    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_ok; $($fut),+ } }
1751}
1752
1753#[doc(hidden)]
1754#[macro_export]
1755macro_rules! __all_ok {
1756    ($($ident:ident: $fut: expr;)+) => {
1757        {
1758            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1759            $crate::future_fn(move |cx| {
1760                use std::task::Poll;
1761
1762                let mut pending = false;
1763
1764                $(
1765                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1766                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1767                        // Future::poll call, so it will not move.
1768                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1769                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1770                            match r {
1771                                Ok(r) => {
1772                                    $ident = $crate::FutureOrOutput::Output(Ok(r))
1773                                },
1774                                Err(e) => return Poll::Ready(Err(e)),
1775                            }
1776                        } else {
1777                            pending = true;
1778                        }
1779                    }
1780                )+
1781
1782                if pending {
1783                    Poll::Pending
1784                } else {
1785                    Poll::Ready(Ok((
1786                        $($ident.take_output().unwrap()),+
1787                    )))
1788                }
1789            })
1790        }
1791    }
1792}
1793
1794/// A future that awaits on all `futures` at the same time and returns when all futures are `Ok(_)` or any future is `Err(_)`.
1795///
1796/// This is the dynamic version of [`all_ok!`].
1797pub async fn all_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Vec<Ok>, Err> {
1798    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1799    future_fn(move |cx| {
1800        let mut pending = false;
1801        for input in &mut futures {
1802            if let FutureOrOutput::Future(fut) = input {
1803                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1804                // Future::poll call, so it will not move.
1805                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1806                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1807                    match r {
1808                        Ok(r) => *input = FutureOrOutput::Output(Ok(r)),
1809                        Err(e) => return Poll::Ready(Err(e)),
1810                    }
1811                } else {
1812                    pending = true;
1813                }
1814            }
1815        }
1816
1817        if pending {
1818            Poll::Pending
1819        } else {
1820            Poll::Ready(Ok(futures
1821                .iter_mut()
1822                .map(|f| f.take_output().unwrap_or_else(|_| unreachable!()))
1823                .collect()))
1824        }
1825    })
1826    .await
1827}
1828
1829/// <span data-del-macro-root></span> A future that is ready when all futures are ready with `Some(T)` or when any
1830/// is future ready with `None`.
1831///
1832/// The macro input is comma separated list of future expressions, the futures must
1833/// all have the `Option<T>` output type, but each can have a different `T`. The macro output is a future that when ".awaited"
1834/// produces `Some<(T0, T1, ..)>` if all futures where `Some(T)` or `None` if any of the futures where `None`.
1835///
1836/// At least one input future is required and any number of futures is accepted. For more than
1837/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1838/// some IDEs.
1839///
1840/// After one future is ready and `None` the other futures are not polled again and are dropped. After a future
1841/// is ready it is also not polled again and dropped.
1842///
1843/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1844/// known at compile time, use the [`fn@all_some`] async function to await on all futures in a dynamic list of futures.
1845///
1846/// # Examples
1847///
1848/// Await for the first of three futures to complete with `Some`:
1849///
1850/// ```
1851/// use zng_task as task;
1852/// # task::doc_test(false, async {
1853/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| Some('b')), async { Some('c') }).await;
1854///
1855/// assert_eq!(Some(('a', 'b', 'c')), r);
1856/// # });
1857/// ```
1858///
1859/// Completes with `None` if any future completes with `None`:
1860///
1861/// ```
1862/// # use zng_task as task;
1863/// # task::doc_test(false, async {
1864/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| None::<char>), async { Some('b') }).await;
1865///
1866/// assert_eq!(None, r);
1867/// # });
1868/// ```
1869#[macro_export]
1870macro_rules! all_some {
1871    ($fut0:expr $(,)?) => { $crate::__all_some! { fut0: $fut0; } };
1872    ($fut0:expr, $fut1:expr $(,)?) => {
1873        $crate::__all_some! {
1874            fut0: $fut0;
1875            fut1: $fut1;
1876        }
1877    };
1878    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1879        $crate::__all_some! {
1880            fut0: $fut0;
1881            fut1: $fut1;
1882            fut2: $fut2;
1883        }
1884    };
1885    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1886        $crate::__all_some! {
1887            fut0: $fut0;
1888            fut1: $fut1;
1889            fut2: $fut2;
1890            fut3: $fut3;
1891        }
1892    };
1893    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1894        $crate::__all_some! {
1895            fut0: $fut0;
1896            fut1: $fut1;
1897            fut2: $fut2;
1898            fut3: $fut3;
1899            fut4: $fut4;
1900        }
1901    };
1902    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1903        $crate::__all_some! {
1904            fut0: $fut0;
1905            fut1: $fut1;
1906            fut2: $fut2;
1907            fut3: $fut3;
1908            fut4: $fut4;
1909            fut5: $fut5;
1910        }
1911    };
1912    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1913        $crate::__all_some! {
1914            fut0: $fut0;
1915            fut1: $fut1;
1916            fut2: $fut2;
1917            fut3: $fut3;
1918            fut4: $fut4;
1919            fut5: $fut5;
1920            fut6: $fut6;
1921        }
1922    };
1923    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1924        $crate::__all_some! {
1925            fut0: $fut0;
1926            fut1: $fut1;
1927            fut2: $fut2;
1928            fut3: $fut3;
1929            fut4: $fut4;
1930            fut5: $fut5;
1931            fut6: $fut6;
1932            fut7: $fut7;
1933        }
1934    };
1935    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_some; $($fut),+ } }
1936}
1937
1938#[doc(hidden)]
1939#[macro_export]
1940macro_rules! __all_some {
1941    ($($ident:ident: $fut: expr;)+) => {
1942        {
1943            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1944            $crate::future_fn(move |cx| {
1945                use std::task::Poll;
1946
1947                let mut pending = false;
1948
1949                $(
1950                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1951                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1952                        // Future::poll call, so it will not move.
1953                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1954                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1955                            if r.is_none() {
1956                                return Poll::Ready(None);
1957                            }
1958
1959                            $ident = $crate::FutureOrOutput::Output(r);
1960                        } else {
1961                            pending = true;
1962                        }
1963                    }
1964                )+
1965
1966                if pending {
1967                    Poll::Pending
1968                } else {
1969                    Poll::Ready(Some((
1970                        $($ident.take_output().unwrap()),+
1971                    )))
1972                }
1973            })
1974        }
1975    }
1976}
1977
1978/// A future that awaits on all `futures` at the same time and returns when all futures are `Some(_)` or any future is `None`.
1979///
1980/// This is the dynamic version of [`all_some!`].
1981pub async fn all_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Vec<Some>> {
1982    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1983    future_fn(move |cx| {
1984        let mut pending = false;
1985        for input in &mut futures {
1986            if let FutureOrOutput::Future(fut) = input {
1987                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1988                // Future::poll call, so it will not move.
1989                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1990                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1991                    match r {
1992                        Some(r) => *input = FutureOrOutput::Output(Some(r)),
1993                        None => return Poll::Ready(None),
1994                    }
1995                } else {
1996                    pending = true;
1997                }
1998            }
1999        }
2000
2001        if pending {
2002            Poll::Pending
2003        } else {
2004            Poll::Ready(Some(futures.iter_mut().map(|f| f.take_output().unwrap()).collect()))
2005        }
2006    })
2007    .await
2008}
2009
2010/// A future that will await until [`set`] is called.
2011///
2012/// # Examples
2013///
2014/// Spawns a parallel task that only writes to stdout after the main thread sets the signal:
2015///
2016/// ```
2017/// use zng_clone_move::async_clmv;
2018/// use zng_task::{self as task, *};
2019///
2020/// let signal = SignalOnce::default();
2021///
2022/// task::spawn(async_clmv!(signal, {
2023///     signal.await;
2024///     println!("After Signal!");
2025/// }));
2026///
2027/// signal.set();
2028/// ```
2029///
2030/// [`set`]: SignalOnce::set
2031#[derive(Default, Clone)]
2032pub struct SignalOnce(Arc<SignalInner>);
2033impl fmt::Debug for SignalOnce {
2034    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2035        write!(f, "SignalOnce({})", self.is_set())
2036    }
2037}
2038impl PartialEq for SignalOnce {
2039    fn eq(&self, other: &Self) -> bool {
2040        Arc::ptr_eq(&self.0, &other.0)
2041    }
2042}
2043impl Eq for SignalOnce {}
2044impl Hash for SignalOnce {
2045    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2046        Arc::as_ptr(&self.0).hash(state)
2047    }
2048}
2049impl SignalOnce {
2050    /// New unsigned.
2051    pub fn new() -> Self {
2052        Self::default()
2053    }
2054
2055    /// New signaled.
2056    pub fn new_set() -> Self {
2057        let s = Self::new();
2058        s.set();
2059        s
2060    }
2061
2062    /// If the signal was set.
2063    pub fn is_set(&self) -> bool {
2064        self.0.signaled.load(Ordering::Relaxed)
2065    }
2066
2067    /// Sets the signal and awakes listeners.
2068    pub fn set(&self) {
2069        if !self.0.signaled.swap(true, Ordering::Relaxed) {
2070            let listeners = mem::take(&mut *self.0.listeners.lock());
2071            for listener in listeners {
2072                listener.wake();
2073            }
2074        }
2075    }
2076}
2077impl Future for SignalOnce {
2078    type Output = ();
2079
2080    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
2081        if self.0.signaled.load(Ordering::Relaxed) {
2082            return Poll::Ready(());
2083        }
2084
2085        let mut listeners = self.0.listeners.lock();
2086        if self.0.signaled.load(Ordering::Relaxed) {
2087            return Poll::Ready(());
2088        }
2089
2090        let waker = cx.waker();
2091        if !listeners.iter().any(|w| w.will_wake(waker)) {
2092            listeners.push(waker.clone());
2093        }
2094
2095        Poll::Pending
2096    }
2097}
2098
2099#[derive(Default)]
2100struct SignalInner {
2101    signaled: AtomicBool,
2102    listeners: Mutex<Vec<std::task::Waker>>,
2103}
2104
2105/// A [`Waker`] that dispatches a wake call to multiple other wakers.
2106///
2107/// This is useful for sharing one wake source with multiple [`Waker`] clients that may not be all
2108/// known at the moment the first request is made.
2109///  
2110/// [`Waker`]: std::task::Waker
2111#[derive(Clone)]
2112pub struct McWaker(Arc<WakeVec>);
2113
2114#[derive(Default)]
2115struct WakeVec(Mutex<Vec<std::task::Waker>>);
2116impl WakeVec {
2117    fn push(&self, waker: std::task::Waker) -> bool {
2118        let mut v = self.0.lock();
2119
2120        let return_waker = v.is_empty();
2121
2122        v.push(waker);
2123
2124        return_waker
2125    }
2126
2127    fn cancel(&self) {
2128        let mut v = self.0.lock();
2129
2130        debug_assert!(!v.is_empty(), "called cancel on an empty McWaker");
2131
2132        v.clear();
2133    }
2134}
2135impl std::task::Wake for WakeVec {
2136    fn wake(self: Arc<Self>) {
2137        for w in mem::take(&mut *self.0.lock()) {
2138            w.wake();
2139        }
2140    }
2141}
2142impl McWaker {
2143    /// New empty waker.
2144    pub fn empty() -> Self {
2145        Self(Arc::new(WakeVec::default()))
2146    }
2147
2148    /// Register a `waker` to wake once when `self` awakes.
2149    ///
2150    /// Returns `Some(self as waker)` if `self` was previously empty, if `None` is returned [`Poll::Pending`] must
2151    /// be returned, if a waker is returned the shared resource must be polled using the waker, if the shared resource
2152    /// is ready [`cancel`] must be called.
2153    ///
2154    /// [`cancel`]: Self::cancel
2155    pub fn push(&self, waker: std::task::Waker) -> Option<std::task::Waker> {
2156        if self.0.push(waker) { Some(self.0.clone().into()) } else { None }
2157    }
2158
2159    /// Clear current registered wakers.
2160    pub fn cancel(&self) {
2161        self.0.cancel()
2162    }
2163}
2164
2165/// Panic payload, captured by [`std::panic::catch_unwind`].
2166#[non_exhaustive]
2167pub struct TaskPanicError {
2168    /// Panic payload.
2169    pub payload: Box<dyn Any + Send + 'static>,
2170}
2171impl TaskPanicError {
2172    /// New from panic payload.
2173    pub fn new(payload: Box<dyn Any + Send + 'static>) -> Self {
2174        Self { payload }
2175    }
2176
2177    /// Get the panic string if the `payload` is string like.
2178    pub fn panic_str(&self) -> Option<&str> {
2179        extract_panic_message(&self.payload)
2180    }
2181}
2182impl fmt::Debug for TaskPanicError {
2183    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2184        f.debug_struct("TaskPanicError").field("panic_str()", &self.panic_str()).finish()
2185    }
2186}
2187impl fmt::Display for TaskPanicError {
2188    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2189        if let Some(s) = self.panic_str() { f.write_str(s) } else { Ok(()) }
2190    }
2191}
2192impl std::error::Error for TaskPanicError {}
2193
2194type SpawnPanicHandler = Box<dyn FnMut(TaskPanicError) + Send + 'static>;
2195
2196pub(crate) fn extract_panic_message(p: &dyn Any) -> Option<&str> {
2197    if let Some(s) = p.downcast_ref::<&'static str>() {
2198        Some(s)
2199    } else if let Some(s) = p.downcast_ref::<String>() {
2200        Some(s)
2201    } else {
2202        None
2203    }
2204}
2205
2206app_local! {
2207    // Mutex for Sync only
2208    static SPAWN_PANIC_HANDLERS: Option<Mutex<SpawnPanicHandler>> = None;
2209}
2210
2211/// Set a `handler` that is called when spawn tasks panic.
2212///
2213/// On panic the tasks [`spawn`], [`poll_spawn`] and [`spawn_wait`] log an error, notifies the `handler` and otherwise ignores the panic.
2214///
2215/// The handler is set for the process lifetime, only handler can be set per app. The handler is called inside the same [`LocalContext`]
2216/// and thread the task that panicked was called in.
2217///
2218/// ```
2219/// # macro_rules! example { () => {
2220/// task::set_spawn_panic_handler(|p| {
2221///     UPDATES
2222///         .run_hn_once(hn_once!(|_| {
2223///             std::panic::resume_unwind(p.payload);
2224///         }))
2225///         .perm();
2226/// });
2227/// # }}
2228/// ```
2229///
2230/// The example above shows how to set a handler that propagates the panic to the app main thread.
2231///
2232/// # Panics
2233///
2234/// Panics if another handler is already set in the same app.
2235///
2236/// Panics if no app is running in the caller thread.
2237pub fn set_spawn_panic_handler(handler: impl FnMut(TaskPanicError) + Send + 'static) {
2238    let mut h = SPAWN_PANIC_HANDLERS.try_write().expect("a spawn panic handler is already set");
2239    assert!(h.is_none(), "a spawn panic handler is already set");
2240    *h = Some(Mutex::new(Box::new(handler)));
2241}
2242
2243fn on_spawn_panic(p: TaskPanicError) {
2244    if let Some(f) = &mut *SPAWN_PANIC_HANDLERS.write() {
2245        f.get_mut()(p)
2246    }
2247}