Skip to main content

zng_task/
lib.rs

1#![doc(html_favicon_url = "https://zng-ui.github.io/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://zng-ui.github.io/res/zng-logo.png")]
3//!
4//! Parallel async tasks and async task runners.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9#![warn(unused_extern_crates)]
10#![warn(missing_docs)]
11
12use std::{
13    any::Any,
14    fmt,
15    hash::Hash,
16    mem, panic,
17    pin::Pin,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, Ordering},
21    },
22    task::Poll,
23};
24
25use zng_app_context::{LocalContext, app_local};
26use zng_time::Deadline;
27use zng_var::{ResponseVar, VarValue, response_done_var, response_var};
28
29#[cfg(test)]
30mod tests;
31
32mod reexports;
33pub use reexports::*;
34
35use crate::parking_lot::Mutex;
36
37pub mod channel;
38pub mod fs;
39pub mod io;
40
41mod ui;
42pub use ui::*;
43
44pub mod http;
45
46pub mod process;
47
48mod rayon_ctx;
49
50mod progress;
51pub use progress::*;
52
53/// Spawn a parallel async task, this function is not blocking and the `task` starts executing immediately.
54///
55/// # Parallel
56///
57/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
58///
59/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
60/// otherwise it will run in a single thread at a time, still not blocking the UI.
61///
62/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
63///
64/// # Async
65///
66/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
67/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
68/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
69/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
70///
71/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
72///
73/// # Examples
74///
75/// ```
76/// # use zng_task::{self as task, *, rayon::iter::*};
77/// # use zng_var::*;
78/// # struct SomeStruct { sum_response: ResponseVar<usize> }
79/// # impl SomeStruct {
80/// fn on_event(&mut self) {
81///     let (responder, response) = response_var();
82///     self.sum_response = response;
83///
84///     task::spawn(async move {
85///         let r = (0..1000).into_par_iter().map(|i| i * i).sum();
86///
87///         responder.respond(r);
88///     });
89/// }
90///
91/// fn on_update(&mut self) {
92///     if let Some(result) = self.sum_response.rsp_new() {
93///         println!("sum of squares 0..1000: {result}");
94///     }
95/// }
96/// # }
97/// ```
98///
99/// The example uses the `rayon` parallel iterator to compute a result and uses a [`response_var`] to send the result to the UI.
100/// The task captures the caller [`LocalContext`] so the response variable will set correctly.
101///
102/// Note that this function is the most basic way to spawn a parallel task where you must setup channels to the rest of the app yourself,
103/// you can use [`respond`] to avoid having to manually set a response, or [`run`] to `.await` the result.
104///
105/// # Panic Handling
106///
107/// If the `task` panics the panic message is logged as an error, and can observed using [`set_spawn_panic_handler`]. It
108/// is otherwise ignored.
109///
110/// # Unwind Safety
111///
112/// This function disables the [unwind safety validation], meaning that in case of a panic shared
113/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
114/// poisoning mutexes or atomics to mutate shared data or use [`run_catch`] to detect a panic or [`run`]
115/// to propagate a panic.
116///
117/// [unwind safety validation]: std::panic::UnwindSafe
118/// [`Waker`]: std::task::Waker
119/// [`rayon`]: https://docs.rs/rayon
120/// [`LocalContext`]: zng_app_context::LocalContext
121/// [`response_var`]: zng_var::response_var
122pub fn spawn<F>(task: impl IntoFuture<IntoFuture = F>)
123where
124    F: Future<Output = ()> + Send + 'static,
125{
126    Arc::new(RayonTask {
127        ctx: LocalContext::capture(),
128        fut: Mutex::new(Some(Box::pin(task.into_future()))),
129    })
130    .poll()
131}
132
133/// Polls the `task` once immediately on the calling thread, if the `task` is pending, continues execution in [`spawn`].
134pub fn poll_spawn<F>(task: impl IntoFuture<IntoFuture = F>)
135where
136    F: Future<Output = ()> + Send + 'static,
137{
138    struct PollRayonTask {
139        fut: Mutex<Option<(RayonSpawnFut, Option<LocalContext>)>>,
140    }
141    impl PollRayonTask {
142        // start task in calling thread
143        fn poll(self: Arc<Self>) {
144            let mut task = self.fut.lock();
145            let (mut t, _) = task.take().unwrap();
146
147            let waker = self.clone().into();
148
149            match t.as_mut().poll(&mut std::task::Context::from_waker(&waker)) {
150                Poll::Ready(()) => {}
151                Poll::Pending => {
152                    let ctx = LocalContext::capture();
153                    *task = Some((t, Some(ctx)));
154                }
155            }
156        }
157    }
158    impl std::task::Wake for PollRayonTask {
159        fn wake(self: Arc<Self>) {
160            // continue task in spawn threads
161            if let Some((task, Some(ctx))) = self.fut.lock().take() {
162                Arc::new(RayonTask {
163                    ctx,
164                    fut: Mutex::new(Some(Box::pin(task))),
165                })
166                .poll();
167            }
168        }
169    }
170
171    Arc::new(PollRayonTask {
172        fut: Mutex::new(Some((Box::pin(task.into_future()), None))),
173    })
174    .poll()
175}
176
177type RayonSpawnFut = Pin<Box<dyn Future<Output = ()> + Send>>;
178
179// A future that is its own waker that polls inside rayon spawn tasks.
180struct RayonTask {
181    ctx: LocalContext,
182    fut: Mutex<Option<RayonSpawnFut>>,
183}
184impl RayonTask {
185    fn poll(self: Arc<Self>) {
186        ::rayon::spawn(move || {
187            // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
188            let mut task = self.fut.lock();
189            if let Some(mut t) = task.take() {
190                let waker = self.clone().into();
191
192                // load app context
193                self.ctx.clone().with_context(move || {
194                    let r = panic::catch_unwind(panic::AssertUnwindSafe(move || {
195                        // poll future
196                        if t.as_mut().poll(&mut std::task::Context::from_waker(&waker)).is_pending() {
197                            // not done
198                            *task = Some(t);
199                        }
200                    }));
201                    if let Err(p) = r {
202                        let p = TaskPanicError::new(p);
203                        tracing::error!("panic in `task::spawn`: {}", p.panic_str().unwrap_or(""));
204                        on_spawn_panic(p);
205                    }
206                });
207            }
208        })
209    }
210}
211impl std::task::Wake for RayonTask {
212    fn wake(self: Arc<Self>) {
213        self.poll()
214    }
215}
216
217/// Rayon join with local context.
218///
219/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
220/// operations.
221///
222/// See `rayon::join` for more details about join.
223///
224/// [`LocalContext`]: zng_app_context::LocalContext
225pub fn join<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
226where
227    A: FnOnce() -> RA + Send,
228    B: FnOnce() -> RB + Send,
229    RA: Send,
230    RB: Send,
231{
232    self::join_context(move |_| op_a(), move |_| op_b())
233}
234
235/// Rayon join context with local context.
236///
237/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
238/// operations.
239///
240/// See `rayon::join_context` for more details about join.
241///
242/// [`LocalContext`]: zng_app_context::LocalContext
243pub fn join_context<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
244where
245    A: FnOnce(::rayon::FnContext) -> RA + Send,
246    B: FnOnce(::rayon::FnContext) -> RB + Send,
247    RA: Send,
248    RB: Send,
249{
250    let ctx = LocalContext::capture();
251    let ctx = &ctx;
252    ::rayon::join_context(
253        move |a| {
254            if a.migrated() {
255                ctx.clone().with_context(|| op_a(a))
256            } else {
257                op_a(a)
258            }
259        },
260        move |b| {
261            if b.migrated() {
262                ctx.clone().with_context(|| op_b(b))
263            } else {
264                op_b(b)
265            }
266        },
267    )
268}
269
270/// Rayon scope with local context.
271///
272/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
273/// operations.
274///
275/// See `rayon::scope` for more details about scope.
276///
277/// [`LocalContext`]: zng_app_context::LocalContext
278pub fn scope<'scope, OP, R>(op: OP) -> R
279where
280    OP: FnOnce(ScopeCtx<'_, 'scope>) -> R + Send,
281    R: Send,
282{
283    let ctx = LocalContext::capture();
284
285    // Cast `&'_ ctx` to `&'scope ctx` to "inject" the context in the scope.
286    // Is there a better way to do this? I hope so.
287    //
288    // SAFETY:
289    // * We are extending `'_` to `'scope`, that is one of the documented valid usages of `transmute`.
290    // * No use after free because `rayon::scope` joins all threads before returning and we only drop `ctx` after.
291    let ctx_ref: &'_ LocalContext = &ctx;
292    let ctx_scope_ref: &'scope LocalContext = unsafe { std::mem::transmute(ctx_ref) };
293
294    let r = ::rayon::scope(move |s| {
295        op(ScopeCtx {
296            scope: s,
297            ctx: ctx_scope_ref,
298        })
299    });
300
301    drop(ctx);
302
303    r
304}
305
306/// Represents a fork-join scope which can be used to spawn any number of tasks that run in the caller's thread context.
307///
308/// See [`scope`] for more details.
309#[derive(Clone, Copy, Debug)]
310pub struct ScopeCtx<'a, 'scope: 'a> {
311    scope: &'a ::rayon::Scope<'scope>,
312    ctx: &'scope LocalContext,
313}
314impl<'a, 'scope: 'a> ScopeCtx<'a, 'scope> {
315    /// Spawns a job into the fork-join scope `self`. The job runs in the captured thread context.
316    ///
317    /// See `rayon::Scope::spawn` for more details.
318    pub fn spawn<F>(self, f: F)
319    where
320        F: FnOnce(ScopeCtx<'_, 'scope>) + Send + 'scope,
321    {
322        let ctx = self.ctx;
323        self.scope
324            .spawn(move |s| ctx.clone().with_context(move || f(ScopeCtx { scope: s, ctx })));
325    }
326}
327
328/// Spawn a parallel async task that can also be `.await` for the task result.
329///
330/// # Parallel
331///
332/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
333///
334/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
335/// otherwise it will run in a single thread at a time, still not blocking the UI.
336///
337/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
338///
339/// # Async
340///
341/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
342/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
343/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
344/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
345///
346/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
347///
348/// # Examples
349///
350/// ```
351/// # use zng_task::{self as task, rayon::iter::*};
352/// # struct SomeStruct { sum: usize }
353/// # async fn read_numbers() -> Vec<usize> { vec![] }
354/// # impl SomeStruct {
355/// async fn on_event(&mut self) {
356///     self.sum = task::run(async { read_numbers().await.par_iter().map(|i| i * i).sum() }).await;
357/// }
358/// # }
359/// ```
360///
361/// The example `.await` for some numbers and then uses a parallel iterator to compute a result, this all runs in parallel
362/// because it is inside a `run` task. The task result is then `.await` inside one of the UI async tasks. Note that the
363/// task captures the caller [`LocalContext`] so you can interact with variables and UI services directly inside the task too.
364///
365/// # Cancellation
366///
367/// The task starts running immediately, awaiting the returned future merely awaits for a message from the worker threads and
368/// that means the `task` future is not owned by the returned future. Usually to *cancel* a future you only need to drop it,
369/// in this task dropping the returned future will only drop the `task` once it reaches a `.await` point and detects that the
370/// result channel is disconnected.
371///
372/// If you want to deterministically known that the `task` was cancelled use a cancellation signal.
373///
374/// # Panic Propagation
375///
376/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
377/// can use [`run_catch`] to get the panic as an error instead.
378///
379/// [`resume_unwind`]: panic::resume_unwind
380/// [`Waker`]: std::task::Waker
381/// [`rayon`]: https://docs.rs/rayon
382/// [`LocalContext`]: zng_app_context::LocalContext
383pub async fn run<R, T>(task: impl IntoFuture<IntoFuture = T>) -> R
384where
385    R: Send + 'static,
386    T: Future<Output = R> + Send + 'static,
387{
388    match run_catch(task).await {
389        Ok(r) => r,
390        Err(p) => panic::resume_unwind(p.payload),
391    }
392}
393
394/// Like [`run`] but catches panics.
395///
396/// This task works the same and has the same utility as [`run`], except if returns panic messages
397/// as an error instead of propagating the panic.
398///
399/// # Unwind Safety
400///
401/// This function disables the [unwind safety validation], meaning that in case of a panic shared
402/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
403/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
404/// if this function returns an error.
405///
406/// [unwind safety validation]: std::panic::UnwindSafe
407pub async fn run_catch<R, T>(task: impl IntoFuture<IntoFuture = T>) -> Result<R, TaskPanicError>
408where
409    R: Send + 'static,
410    T: Future<Output = R> + Send + 'static,
411{
412    type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
413
414    // A future that is its own waker that polls inside the rayon primary thread-pool.
415    struct RayonCatchTask<R> {
416        ctx: LocalContext,
417        fut: Mutex<Option<Fut<R>>>,
418        sender: flume::Sender<Result<R, TaskPanicError>>,
419    }
420    impl<R: Send + 'static> RayonCatchTask<R> {
421        fn poll(self: Arc<Self>) {
422            let sender = self.sender.clone();
423            if sender.is_disconnected() {
424                return; // cancel.
425            }
426            ::rayon::spawn(move || {
427                // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
428                let mut task = self.fut.lock();
429                if let Some(mut t) = task.take() {
430                    let waker = self.clone().into();
431                    let mut cx = std::task::Context::from_waker(&waker);
432
433                    self.ctx.clone().with_context(|| {
434                        let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
435                        match r {
436                            Ok(Poll::Ready(r)) => {
437                                drop(task);
438                                let _ = sender.send(Ok(r));
439                            }
440                            Ok(Poll::Pending) => {
441                                *task = Some(t);
442                            }
443                            Err(p) => {
444                                drop(task);
445                                let _ = sender.send(Err(TaskPanicError::new(p)));
446                            }
447                        }
448                    });
449                }
450            })
451        }
452    }
453    impl<R: Send + 'static> std::task::Wake for RayonCatchTask<R> {
454        fn wake(self: Arc<Self>) {
455            self.poll()
456        }
457    }
458
459    let (sender, receiver) = channel::bounded(1);
460
461    Arc::new(RayonCatchTask {
462        ctx: LocalContext::capture(),
463        fut: Mutex::new(Some(Box::pin(task.into_future()))),
464        sender: sender.into(),
465    })
466    .poll();
467
468    receiver.recv().await.unwrap()
469}
470
471/// Spawn a parallel async task that will send its result to a [`ResponseVar<R>`].
472///
473/// The [`run`] documentation explains how `task` is *parallel* and *async*. The `task` starts executing immediately.
474///
475/// # Examples
476///
477/// ```
478/// # use zng_task::{self as task, rayon::iter::*};
479/// # use zng_var::*;
480/// # struct SomeStruct { sum_response: ResponseVar<usize> }
481/// # async fn read_numbers() -> Vec<usize> { vec![] }
482/// # impl SomeStruct {
483/// fn on_event(&mut self) {
484///     self.sum_response = task::respond(async { read_numbers().await.par_iter().map(|i| i * i).sum() });
485/// }
486///
487/// fn on_update(&mut self) {
488///     if let Some(result) = self.sum_response.rsp_new() {
489///         println!("sum of squares: {result}");
490///     }
491/// }
492/// # }
493/// ```
494///
495/// The example `.await` for some numbers and then uses a parallel iterator to compute a result. The result is send to
496/// `sum_response` that is a [`ResponseVar<R>`].
497///
498/// # Cancellation
499///
500/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
501///
502/// # Panic Handling
503///
504/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
505///
506/// [`resume_unwind`]: panic::resume_unwind
507/// [`ResponseVar<R>`]: zng_var::ResponseVar
508/// [`response_var`]: zng_var::response_var
509pub fn respond<R, F>(task: F) -> ResponseVar<R>
510where
511    R: VarValue,
512    F: Future<Output = R> + Send + 'static,
513{
514    type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
515
516    let (responder, response) = response_var();
517
518    // A future that is its own waker that polls inside the rayon primary thread-pool.
519    struct RayonRespondTask<R: VarValue> {
520        ctx: LocalContext,
521        fut: Mutex<Option<Fut<R>>>,
522        responder: zng_var::ResponderVar<R>,
523    }
524    impl<R: VarValue> RayonRespondTask<R> {
525        fn poll(self: Arc<Self>) {
526            let responder = self.responder.clone();
527            if responder.strong_count() == 2 {
528                return; // cancel.
529            }
530            ::rayon::spawn(move || {
531                // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
532                let mut task = self.fut.lock();
533                if let Some(mut t) = task.take() {
534                    let waker = self.clone().into();
535                    let mut cx = std::task::Context::from_waker(&waker);
536
537                    self.ctx.clone().with_context(|| {
538                        let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
539                        match r {
540                            Ok(Poll::Ready(r)) => {
541                                drop(task);
542
543                                responder.respond(r);
544                            }
545                            Ok(Poll::Pending) => {
546                                *task = Some(t);
547                            }
548                            Err(p) => {
549                                let p = TaskPanicError::new(p);
550                                tracing::error!("panic in `task::respond`: {}", p.panic_str().unwrap_or(""));
551                                drop(task);
552                                responder.modify(move |_| panic::resume_unwind(p.payload));
553                            }
554                        }
555                    });
556                }
557            })
558        }
559    }
560    impl<R: VarValue> std::task::Wake for RayonRespondTask<R> {
561        fn wake(self: Arc<Self>) {
562            self.poll()
563        }
564    }
565
566    Arc::new(RayonRespondTask {
567        ctx: LocalContext::capture(),
568        fut: Mutex::new(Some(Box::pin(task))),
569        responder,
570    })
571    .poll();
572
573    response
574}
575
576/// Polls the `task` once immediately on the calling thread, if the `task` is ready returns the response already set,
577/// if the `task` is pending continues execution like [`respond`].
578pub fn poll_respond<R, F>(task: impl IntoFuture<IntoFuture = F>) -> ResponseVar<R>
579where
580    R: VarValue,
581    F: Future<Output = R> + Send + 'static,
582{
583    enum QuickResponse<R: VarValue> {
584        Quick(Option<R>),
585        Response(zng_var::ResponderVar<R>),
586    }
587    let task = task.into_future();
588    let q = Arc::new(Mutex::new(QuickResponse::Quick(None)));
589    poll_spawn(zng_clone_move::async_clmv!(q, {
590        let rsp = task.await;
591
592        match &mut *q.lock() {
593            QuickResponse::Quick(q) => *q = Some(rsp),
594            QuickResponse::Response(r) => r.respond(rsp),
595        }
596    }));
597
598    let mut q = q.lock();
599    match &mut *q {
600        QuickResponse::Quick(q) if q.is_some() => response_done_var(q.take().unwrap()),
601        _ => {
602            let (responder, response) = response_var();
603            *q = QuickResponse::Response(responder);
604            response
605        }
606    }
607}
608
609/// Create a parallel `task` that blocks awaiting for an IO operation, the `task` starts on the first `.await`.
610///
611/// # Parallel
612///
613/// The `task` runs in the [`blocking`] thread-pool which is optimized for awaiting blocking operations.
614/// If the `task` is computation heavy you should use [`run`] and then `wait` inside that task for the
615/// parts that are blocking.
616///
617/// # Examples
618///
619/// ```
620/// # fn main() { }
621/// # use zng_task as task;
622/// # async fn example() {
623/// task::wait(|| std::fs::read_to_string("file.txt")).await
624/// # ; }
625/// ```
626///
627/// The example reads a file, that is a blocking file IO operation, most of the time is spend waiting for the operating system,
628/// so we offload this to a `wait` task. The task can be `.await` inside a [`run`] task or inside one of the UI tasks
629/// like in a async event handler.
630///
631/// # Async Read/Write
632///
633/// For [`std::io::Read`] and [`std::io::Write`] operations you can also use [`io`] and [`fs`] alternatives when you don't
634/// have or want the full file in memory or when you want to apply multiple operations to the file.
635///
636/// # Panic Propagation
637///
638/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
639/// can use [`wait_catch`] to get the panic as an error instead.
640///
641/// [`blocking`]: https://docs.rs/blocking
642/// [`resume_unwind`]: panic::resume_unwind
643pub async fn wait<T, F>(task: F) -> T
644where
645    F: FnOnce() -> T + Send + 'static,
646    T: Send + 'static,
647{
648    match wait_catch(task).await {
649        Ok(r) => r,
650        Err(p) => panic::resume_unwind(p.payload),
651    }
652}
653
654/// Like [`wait`] but catches panics.
655///
656/// This task works the same and has the same utility as [`wait`], except if returns panic messages
657/// as an error instead of propagating the panic.
658///
659/// # Unwind Safety
660///
661/// This function disables the [unwind safety validation], meaning that in case of a panic shared
662/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
663/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
664/// if this function returns an error.
665///
666/// [unwind safety validation]: std::panic::UnwindSafe
667pub async fn wait_catch<T, F>(task: F) -> Result<T, TaskPanicError>
668where
669    F: FnOnce() -> T + Send + 'static,
670    T: Send + 'static,
671{
672    let mut ctx = LocalContext::capture();
673    blocking::unblock(move || ctx.with_context(move || panic::catch_unwind(panic::AssertUnwindSafe(task))))
674        .await
675        .map_err(TaskPanicError::new)
676}
677
678/// Fire and forget a [`wait`] task. The `task` starts executing immediately.
679///
680/// # Panic Handling
681///
682/// If the `task` panics the panic message is logged as an error, and can observed using [`set_spawn_panic_handler`]. It
683/// is otherwise ignored.
684///
685/// # Unwind Safety
686///
687/// This function disables the [unwind safety validation], meaning that in case of a panic shared
688/// data can end-up in an invalid (still memory safe) state. If you are worried about that only use
689/// poisoning mutexes or atomics to mutate shared data or use [`wait_catch`] to detect a panic or [`wait`]
690/// to propagate a panic.
691///
692/// [unwind safety validation]: std::panic::UnwindSafe
693pub fn spawn_wait<F>(task: F)
694where
695    F: FnOnce() + Send + 'static,
696{
697    spawn(async move {
698        if let Err(p) = wait_catch(task).await {
699            tracing::error!("parallel `spawn_wait` task panicked: {}", p.panic_str().unwrap_or(""));
700            on_spawn_panic(p);
701        }
702    });
703}
704
705/// Like [`spawn_wait`], but the task will send its result to a [`ResponseVar<R>`].
706///
707/// # Cancellation
708///
709/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
710///
711/// # Panic Handling
712///
713/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
714pub fn wait_respond<R, F>(task: F) -> ResponseVar<R>
715where
716    R: VarValue,
717    F: FnOnce() -> R + Send + 'static,
718{
719    let (responder, response) = response_var();
720    spawn_wait(move || match panic::catch_unwind(panic::AssertUnwindSafe(task)) {
721        Ok(r) => responder.respond(r),
722        Err(p) => {
723            let p = TaskPanicError::new(p);
724            tracing::error!("panic in `task::wait_respond`: {}", p.panic_str().unwrap_or(""));
725            responder.modify(move |_| panic::resume_unwind(p.payload));
726        }
727    });
728    response
729}
730
731/// Blocks the thread until the `task` future finishes.
732///
733/// This function is useful for implementing async tests, using it in an app will probably cause
734/// the app to stop responding.
735///
736/// The crate [`futures-lite`] is used to execute the task.
737///
738/// # Examples
739///
740/// Test a [`run`] call:
741///
742/// ```
743/// use zng_task as task;
744/// # use zng_unit::*;
745/// # async fn foo(u: u8) -> Result<u8, ()> { task::deadline(1.ms()).await; Ok(u) }
746///
747/// # #[test]
748/// # fn __() { }
749/// pub fn run_ok() {
750///     let r = task::block_on(task::run(async { foo(32).await }));
751///
752///     # let value =
753///     r.expect("foo(32) was not Ok");
754///     # assert_eq!(32, value);
755/// }
756/// # run_ok();
757/// ```
758///
759/// [`futures-lite`]: https://docs.rs/futures-lite/
760pub fn block_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
761where
762    F: Future,
763{
764    futures_lite::future::block_on(task.into_future())
765}
766
767/// Continuous poll the `task` until if finishes.
768///
769/// This function is useful for implementing some async tests only, futures don't expect to be polled
770/// continuously. This function is only available in test builds.
771#[cfg(any(test, doc, feature = "test_util"))]
772pub fn spin_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
773where
774    F: Future,
775{
776    use std::pin::pin;
777
778    let mut task = pin!(task.into_future());
779    block_on(future_fn(|cx| match task.as_mut().poll(cx) {
780        Poll::Ready(r) => Poll::Ready(r),
781        Poll::Pending => {
782            cx.waker().wake_by_ref();
783            Poll::Pending
784        }
785    }))
786}
787
788/// Executor used in async doc tests.
789///
790/// If `spin` is `true` the [`spin_on`] executor is used with a timeout of 500 milliseconds.
791/// IF `spin` is `false` the [`block_on`] executor is used with a timeout of 5 seconds.
792#[cfg(any(test, doc, feature = "test_util"))]
793pub fn doc_test<F>(spin: bool, task: impl IntoFuture<IntoFuture = F>) -> F::Output
794where
795    F: Future,
796{
797    use zng_unit::TimeUnits;
798
799    if spin {
800        spin_on(with_deadline(task, 500.ms())).expect("async doc-test timeout")
801    } else {
802        block_on(with_deadline(task, 5.secs())).expect("async doc-test timeout")
803    }
804}
805
806/// A future that is [`Pending`] once and wakes the current task.
807///
808/// After the first `.await` the future is always [`Ready`] and on the first `.await` it calls [`wake`].
809///
810/// [`Pending`]: std::task::Poll::Pending
811/// [`Ready`]: std::task::Poll::Ready
812/// [`wake`]: std::task::Waker::wake
813pub async fn yield_now() {
814    struct YieldNowFut(bool);
815    impl Future for YieldNowFut {
816        type Output = ();
817
818        fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
819            if self.0 {
820                Poll::Ready(())
821            } else {
822                self.0 = true;
823                cx.waker().wake_by_ref();
824                Poll::Pending
825            }
826        }
827    }
828
829    YieldNowFut(false).await
830}
831
832/// A future that is [`Pending`] until the `deadline` is reached.
833///
834/// # Examples
835///
836/// Await 5 seconds in a [`spawn`] parallel task:
837///
838/// ```
839/// use zng_task as task;
840/// use zng_unit::*;
841///
842/// task::spawn(async {
843///     println!("waiting 5 seconds..");
844///     task::deadline(5.secs()).await;
845///     println!("5 seconds elapsed.")
846/// });
847/// ```
848///
849/// The future runs on an app provider timer executor, or on the [`futures_timer`] by default.
850///
851/// Note that deadlines from [`Duration`](std::time::Duration) starts *counting* at the moment this function is called,
852/// not at the moment of the first `.await` call.
853///
854/// [`Pending`]: std::task::Poll::Pending
855/// [`futures_timer`]: https://docs.rs/futures-timer
856pub fn deadline(deadline: impl Into<Deadline>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
857    let deadline = deadline.into();
858    if zng_app_context::LocalContext::current_app().is_some() {
859        DEADLINE_SV.read().0(deadline)
860    } else {
861        default_deadline(deadline)
862    }
863}
864
865app_local! {
866    static DEADLINE_SV: (DeadlineService, bool) = const { (default_deadline, false) };
867}
868
869type DeadlineService = fn(Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
870
871fn default_deadline(deadline: Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
872    if let Some(timeout) = deadline.time_left() {
873        Box::pin(futures_timer::Delay::new(timeout))
874    } else {
875        Box::pin(std::future::ready(()))
876    }
877}
878
879/// Deadline APP integration.
880#[expect(non_camel_case_types)]
881pub struct DEADLINE_APP;
882
883impl DEADLINE_APP {
884    /// Called by the app implementer to setup the [`deadline`] executor.
885    ///
886    /// If no app calls this the [`futures_timer`] executor is used.
887    ///
888    /// [`futures_timer`]: https://docs.rs/futures-timer
889    ///
890    /// # Panics
891    ///
892    /// Panics if called more than once for the same app.
893    pub fn init_deadline_service(&self, service: DeadlineService) {
894        let (prev, already_set) = mem::replace(&mut *DEADLINE_SV.write(), (service, true));
895        if already_set {
896            *DEADLINE_SV.write() = (prev, true);
897            panic!("deadline service already inited for this app");
898        }
899    }
900}
901
902/// Implements a [`Future`] from a closure.
903///
904/// # Examples
905///
906/// A future that is ready with a closure returns `Some(R)`.
907///
908/// ```
909/// use std::task::Poll;
910/// use zng_task as task;
911///
912/// async fn ready_some<R>(mut closure: impl FnMut() -> Option<R>) -> R {
913///     task::future_fn(|cx| match closure() {
914///         Some(r) => Poll::Ready(r),
915///         None => Poll::Pending,
916///     })
917///     .await
918/// }
919/// ```
920pub async fn future_fn<T, F>(fn_: F) -> T
921where
922    F: FnMut(&mut std::task::Context) -> Poll<T>,
923{
924    struct PollFn<F>(F);
925    impl<F> Unpin for PollFn<F> {}
926    impl<T, F: FnMut(&mut std::task::Context<'_>) -> Poll<T>> Future for PollFn<F> {
927        type Output = T;
928
929        fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
930            (self.0)(cx)
931        }
932    }
933    PollFn(fn_).await
934}
935
936/// Error when [`with_deadline`] reach a time limit before a task finishes.
937#[derive(Debug, Clone, Copy)]
938#[non_exhaustive]
939pub struct DeadlineError {}
940impl fmt::Display for DeadlineError {
941    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
942        write!(f, "reached deadline")
943    }
944}
945impl std::error::Error for DeadlineError {}
946
947/// Add a [`deadline`] to a future.
948///
949/// Returns the `fut` output or [`DeadlineError`] if the deadline elapses first.
950pub async fn with_deadline<O, F: Future<Output = O>>(
951    fut: impl IntoFuture<IntoFuture = F>,
952    deadline: impl Into<Deadline>,
953) -> Result<F::Output, DeadlineError> {
954    let deadline = deadline.into();
955    any!(async { Ok(fut.await) }, async {
956        self::deadline(deadline).await;
957        Err(DeadlineError {})
958    })
959    .await
960}
961
962/// <span data-del-macro-root></span> A future that *zips* other futures.
963///
964/// The macro input is a comma separated list of future expressions. The macro output is a future
965/// that when ".awaited" produces a tuple of results in the same order as the inputs.
966///
967/// At least one input future is required and any number of futures is accepted. For more than
968/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
969/// some IDEs.
970///
971/// Each input must implement [`IntoFuture`]. Note that each input must be known at compile time, use the [`fn@all`] async
972/// function to await on all futures in a dynamic list of futures.
973///
974/// # Examples
975///
976/// Await for three different futures to complete:
977///
978/// ```
979/// use zng_task as task;
980///
981/// # task::doc_test(false, async {
982/// let (a, b, c) = task::all!(task::run(async { 'a' }), task::wait(|| "b"), async { b"c" }).await;
983/// # });
984/// ```
985#[macro_export]
986macro_rules! all {
987    ($fut0:expr $(,)?) => { $crate::__all! { fut0: $fut0; } };
988    ($fut0:expr, $fut1:expr $(,)?) => {
989        $crate::__all! {
990            fut0: $fut0;
991            fut1: $fut1;
992        }
993    };
994    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
995        $crate::__all! {
996            fut0: $fut0;
997            fut1: $fut1;
998            fut2: $fut2;
999        }
1000    };
1001    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1002        $crate::__all! {
1003            fut0: $fut0;
1004            fut1: $fut1;
1005            fut2: $fut2;
1006            fut3: $fut3;
1007        }
1008    };
1009    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1010        $crate::__all! {
1011            fut0: $fut0;
1012            fut1: $fut1;
1013            fut2: $fut2;
1014            fut3: $fut3;
1015            fut4: $fut4;
1016        }
1017    };
1018    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1019        $crate::__all! {
1020            fut0: $fut0;
1021            fut1: $fut1;
1022            fut2: $fut2;
1023            fut3: $fut3;
1024            fut4: $fut4;
1025            fut5: $fut5;
1026        }
1027    };
1028    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1029        $crate::__all! {
1030            fut0: $fut0;
1031            fut1: $fut1;
1032            fut2: $fut2;
1033            fut3: $fut3;
1034            fut4: $fut4;
1035            fut5: $fut5;
1036            fut6: $fut6;
1037        }
1038    };
1039    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1040        $crate::__all! {
1041            fut0: $fut0;
1042            fut1: $fut1;
1043            fut2: $fut2;
1044            fut3: $fut3;
1045            fut4: $fut4;
1046            fut5: $fut5;
1047            fut6: $fut6;
1048            fut7: $fut7;
1049        }
1050    };
1051    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all; $($fut),+ } }
1052}
1053
1054#[doc(hidden)]
1055#[macro_export]
1056macro_rules! __all {
1057    ($($ident:ident: $fut:expr;)+) => {
1058        {
1059            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1060            $crate::future_fn(move |cx| {
1061                use std::task::Poll;
1062
1063                let mut pending = false;
1064
1065                $(
1066                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1067                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1068                        // Future::poll call, so it will not move.
1069                        let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1070                        if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1071                            $ident = $crate::FutureOrOutput::Output(r);
1072                        } else {
1073                            pending = true;
1074                        }
1075                    }
1076                )+
1077
1078                if pending {
1079                    Poll::Pending
1080                } else {
1081                    Poll::Ready(($($ident.take_output()),+))
1082                }
1083            })
1084        }
1085    }
1086}
1087
1088#[doc(hidden)]
1089pub enum FutureOrOutput<F: Future> {
1090    Future(F),
1091    Output(F::Output),
1092    Taken,
1093}
1094impl<F: Future> FutureOrOutput<F> {
1095    pub fn take_output(&mut self) -> F::Output {
1096        match std::mem::replace(self, Self::Taken) {
1097            FutureOrOutput::Output(o) => o,
1098            _ => unreachable!(),
1099        }
1100    }
1101}
1102
1103/// A future that awaits on all `futures` at the same time and returns all results when all futures are ready.
1104///
1105/// This is the dynamic version of [`all!`].
1106pub async fn all<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> Vec<F::Output> {
1107    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1108    future_fn(move |cx| {
1109        let mut pending = false;
1110        for input in &mut futures {
1111            if let FutureOrOutput::Future(fut) = input {
1112                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1113                // Future::poll call, so it will not move.
1114                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1115                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1116                    *input = FutureOrOutput::Output(r);
1117                } else {
1118                    pending = true;
1119                }
1120            }
1121        }
1122
1123        if pending {
1124            Poll::Pending
1125        } else {
1126            Poll::Ready(futures.iter_mut().map(FutureOrOutput::take_output).collect())
1127        }
1128    })
1129    .await
1130}
1131
1132/// <span data-del-macro-root></span> A future that awaits for the first future that is ready.
1133///
1134/// The macro input is comma separated list of future expressions, the futures must
1135/// all have the same output type. The macro output is a future that when ".awaited" produces
1136/// a single output type instance returned by the first input future that completes.
1137///
1138/// At least one input future is required and any number of futures is accepted. For more than
1139/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1140/// some IDEs.
1141///
1142/// If two futures are ready at the same time the result of the first future in the input list is used.
1143/// After one future is ready the other futures are not polled again and are dropped.
1144///
1145/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1146/// known at compile time, use the [`fn@any`] async function to await on all futures in a dynamic list of futures.
1147///
1148/// # Examples
1149///
1150/// Await for the first of three futures to complete:
1151///
1152/// ```
1153/// use zng_task as task;
1154/// use zng_unit::*;
1155///
1156/// # task::doc_test(false, async {
1157/// let r = task::any!(
1158///     task::run(async {
1159///         task::deadline(300.ms()).await;
1160///         'a'
1161///     }),
1162///     task::wait(|| 'b'),
1163///     async {
1164///         task::deadline(300.ms()).await;
1165///         'c'
1166///     }
1167/// )
1168/// .await;
1169///
1170/// assert_eq!('b', r);
1171/// # });
1172/// ```
1173#[macro_export]
1174macro_rules! any {
1175    ($fut0:expr $(,)?) => { $crate::__any! { fut0: $fut0; } };
1176    ($fut0:expr, $fut1:expr $(,)?) => {
1177        $crate::__any! {
1178            fut0: $fut0;
1179            fut1: $fut1;
1180        }
1181    };
1182    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1183        $crate::__any! {
1184            fut0: $fut0;
1185            fut1: $fut1;
1186            fut2: $fut2;
1187        }
1188    };
1189    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1190        $crate::__any! {
1191            fut0: $fut0;
1192            fut1: $fut1;
1193            fut2: $fut2;
1194            fut3: $fut3;
1195        }
1196    };
1197    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1198        $crate::__any! {
1199            fut0: $fut0;
1200            fut1: $fut1;
1201            fut2: $fut2;
1202            fut3: $fut3;
1203            fut4: $fut4;
1204        }
1205    };
1206    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1207        $crate::__any! {
1208            fut0: $fut0;
1209            fut1: $fut1;
1210            fut2: $fut2;
1211            fut3: $fut3;
1212            fut4: $fut4;
1213            fut5: $fut5;
1214        }
1215    };
1216    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1217        $crate::__any! {
1218            fut0: $fut0;
1219            fut1: $fut1;
1220            fut2: $fut2;
1221            fut3: $fut3;
1222            fut4: $fut4;
1223            fut5: $fut5;
1224            fut6: $fut6;
1225        }
1226    };
1227    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1228        $crate::__any! {
1229            fut0: $fut0;
1230            fut1: $fut1;
1231            fut2: $fut2;
1232            fut3: $fut3;
1233            fut4: $fut4;
1234            fut5: $fut5;
1235            fut6: $fut6;
1236            fut7: $fut7;
1237        }
1238    };
1239    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any; $($fut),+ } }
1240}
1241#[doc(hidden)]
1242#[macro_export]
1243macro_rules! __any {
1244    ($($ident:ident: $fut:expr;)+) => {
1245        {
1246            $(let mut $ident = std::future::IntoFuture::into_future($fut);)+
1247            $crate::future_fn(move |cx| {
1248                use std::task::Poll;
1249                $(
1250                    // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1251                    // Future::poll call, so it will not move.
1252                    let mut $ident = unsafe { std::pin::Pin::new_unchecked(&mut $ident) };
1253                    if let Poll::Ready(r) = $ident.as_mut().poll(cx) {
1254                        return Poll::Ready(r)
1255                    }
1256                )+
1257
1258                Poll::Pending
1259            })
1260        }
1261    }
1262}
1263#[doc(hidden)]
1264pub use zng_task_proc_macros::task_any_all as __proc_any_all;
1265
1266/// A future that awaits on all `futures` at the same time and returns the first result when the first future is ready.
1267///
1268/// This is the dynamic version of [`any!`].
1269pub async fn any<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> F::Output {
1270    let mut futures: Vec<_> = futures.into_iter().map(IntoFuture::into_future).collect();
1271    future_fn(move |cx| {
1272        for fut in &mut futures {
1273            // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1274            // Future::poll call, so it will not move.
1275            let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1276            if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1277                return Poll::Ready(r);
1278            }
1279        }
1280        Poll::Pending
1281    })
1282    .await
1283}
1284
1285/// <span data-del-macro-root></span> A future that waits for the first future that is ready with an `Ok(T)` result.
1286///
1287/// The macro input is comma separated list of future expressions, the futures must
1288/// all have the same output `Result<T, E>` type, but each can have a different `E`. The macro output is a future
1289/// that when ".awaited" produces a single output of type `Result<T, (E0, E1, ..)>` that is `Ok(T)` if any of the futures
1290/// is `Ok(T)` or is `Err((E0, E1, ..))` is all futures are `Err`.
1291///
1292/// At least one input future is required and any number of futures is accepted. For more than
1293/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1294/// some IDEs.
1295///
1296/// If two futures are ready and `Ok(T)` at the same time the result of the first future in the input list is used.
1297/// After one future is ready and `Ok(T)` the other futures are not polled again and are dropped. After a future
1298/// is ready and `Err(E)` it is also not polled again and dropped.
1299///
1300/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1301/// known at compile time, use the [`fn@any_ok`] async function to await on all futures in a dynamic list of futures.
1302///
1303/// # Examples
1304///
1305/// Await for the first of three futures to complete with `Ok`:
1306///
1307/// ```
1308/// use zng_task as task;
1309/// # #[derive(Debug, PartialEq)]
1310/// # pub struct FooError;
1311/// # task::doc_test(false, async {
1312/// let r = task::any_ok!(
1313///     task::run(async { Err::<char, _>("error") }),
1314///     task::wait(|| Ok::<_, FooError>('b')),
1315///     async { Err::<char, _>(FooError) }
1316/// )
1317/// .await;
1318///
1319/// assert_eq!(Ok('b'), r);
1320/// # });
1321/// ```
1322#[macro_export]
1323macro_rules! any_ok {
1324    ($fut0:expr $(,)?) => { $crate::__any_ok! { fut0: $fut0; } };
1325    ($fut0:expr, $fut1:expr $(,)?) => {
1326        $crate::__any_ok! {
1327            fut0: $fut0;
1328            fut1: $fut1;
1329        }
1330    };
1331    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1332        $crate::__any_ok! {
1333            fut0: $fut0;
1334            fut1: $fut1;
1335            fut2: $fut2;
1336        }
1337    };
1338    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1339        $crate::__any_ok! {
1340            fut0: $fut0;
1341            fut1: $fut1;
1342            fut2: $fut2;
1343            fut3: $fut3;
1344        }
1345    };
1346    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1347        $crate::__any_ok! {
1348            fut0: $fut0;
1349            fut1: $fut1;
1350            fut2: $fut2;
1351            fut3: $fut3;
1352            fut4: $fut4;
1353        }
1354    };
1355    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1356        $crate::__any_ok! {
1357            fut0: $fut0;
1358            fut1: $fut1;
1359            fut2: $fut2;
1360            fut3: $fut3;
1361            fut4: $fut4;
1362            fut5: $fut5;
1363        }
1364    };
1365    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1366        $crate::__any_ok! {
1367            fut0: $fut0;
1368            fut1: $fut1;
1369            fut2: $fut2;
1370            fut3: $fut3;
1371            fut4: $fut4;
1372            fut5: $fut5;
1373            fut6: $fut6;
1374        }
1375    };
1376    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1377        $crate::__any_ok! {
1378            fut0: $fut0;
1379            fut1: $fut1;
1380            fut2: $fut2;
1381            fut3: $fut3;
1382            fut4: $fut4;
1383            fut5: $fut5;
1384            fut6: $fut6;
1385            fut7: $fut7;
1386        }
1387    };
1388    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_ok; $($fut),+ } }
1389}
1390
1391#[doc(hidden)]
1392#[macro_export]
1393macro_rules! __any_ok {
1394    ($($ident:ident: $fut: expr;)+) => {
1395        {
1396            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1397            $crate::future_fn(move |cx| {
1398                use std::task::Poll;
1399
1400                let mut pending = false;
1401
1402                $(
1403                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1404                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1405                        // Future::poll call, so it will not move.
1406                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1407                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1408                            match r {
1409                                Ok(r) => return Poll::Ready(Ok(r)),
1410                                Err(e) => {
1411                                    $ident = $crate::FutureOrOutput::Output(Err(e));
1412                                }
1413                            }
1414                        } else {
1415                            pending = true;
1416                        }
1417                    }
1418                )+
1419
1420                if pending {
1421                    Poll::Pending
1422                } else {
1423                    Poll::Ready(Err((
1424                        $($ident.take_output().unwrap_err()),+
1425                    )))
1426                }
1427            })
1428        }
1429    }
1430}
1431
1432/// A future that awaits on all `futures` at the same time and returns when any future is `Ok(_)` or all are `Err(_)`.
1433///
1434/// This is the dynamic version of [`all_some!`].
1435pub async fn any_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Ok, Vec<Err>> {
1436    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1437    future_fn(move |cx| {
1438        let mut pending = false;
1439        for input in &mut futures {
1440            if let FutureOrOutput::Future(fut) = input {
1441                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1442                // Future::poll call, so it will not move.
1443                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1444                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1445                    match r {
1446                        Ok(r) => return Poll::Ready(Ok(r)),
1447                        Err(e) => *input = FutureOrOutput::Output(Err(e)),
1448                    }
1449                } else {
1450                    pending = true;
1451                }
1452            }
1453        }
1454
1455        if pending {
1456            Poll::Pending
1457        } else {
1458            Poll::Ready(Err(futures
1459                .iter_mut()
1460                .map(|f| match f.take_output() {
1461                    Ok(_) => unreachable!(),
1462                    Err(e) => e,
1463                })
1464                .collect()))
1465        }
1466    })
1467    .await
1468}
1469
1470/// <span data-del-macro-root></span> A future that is ready when any of the futures is ready and `Some(T)`.
1471///
1472/// The macro input is comma separated list of future expressions, the futures must
1473/// all have the same output `Option<T>` type. The macro output is a future that when ".awaited" produces
1474/// a single output type instance returned by the first input future that completes with a `Some`.
1475/// If all futures complete with a `None` the output is `None`.
1476///
1477/// At least one input future is required and any number of futures is accepted. For more than
1478/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1479/// some IDEs.
1480///
1481/// If two futures are ready and `Some(T)` at the same time the result of the first future in the input list is used.
1482/// After one future is ready and `Some(T)` the other futures are not polled again and are dropped. After a future
1483/// is ready and `None` it is also not polled again and dropped.
1484///
1485/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1486/// known at compile time, use the [`fn@any_some`] async function to await on all futures in a dynamic list of futures.
1487///
1488/// # Examples
1489///
1490/// Await for the first of three futures to complete with `Some`:
1491///
1492/// ```
1493/// use zng_task as task;
1494/// # task::doc_test(false, async {
1495/// let r = task::any_some!(task::run(async { None::<char> }), task::wait(|| Some('b')), async { None::<char> }).await;
1496///
1497/// assert_eq!(Some('b'), r);
1498/// # });
1499/// ```
1500#[macro_export]
1501macro_rules! any_some {
1502    ($fut0:expr $(,)?) => { $crate::__any_some! { fut0: $fut0; } };
1503    ($fut0:expr, $fut1:expr $(,)?) => {
1504        $crate::__any_some! {
1505            fut0: $fut0;
1506            fut1: $fut1;
1507        }
1508    };
1509    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1510        $crate::__any_some! {
1511            fut0: $fut0;
1512            fut1: $fut1;
1513            fut2: $fut2;
1514        }
1515    };
1516    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1517        $crate::__any_some! {
1518            fut0: $fut0;
1519            fut1: $fut1;
1520            fut2: $fut2;
1521            fut3: $fut3;
1522        }
1523    };
1524    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1525        $crate::__any_some! {
1526            fut0: $fut0;
1527            fut1: $fut1;
1528            fut2: $fut2;
1529            fut3: $fut3;
1530            fut4: $fut4;
1531        }
1532    };
1533    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1534        $crate::__any_some! {
1535            fut0: $fut0;
1536            fut1: $fut1;
1537            fut2: $fut2;
1538            fut3: $fut3;
1539            fut4: $fut4;
1540            fut5: $fut5;
1541        }
1542    };
1543    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1544        $crate::__any_some! {
1545            fut0: $fut0;
1546            fut1: $fut1;
1547            fut2: $fut2;
1548            fut3: $fut3;
1549            fut4: $fut4;
1550            fut5: $fut5;
1551            fut6: $fut6;
1552        }
1553    };
1554    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1555        $crate::__any_some! {
1556            fut0: $fut0;
1557            fut1: $fut1;
1558            fut2: $fut2;
1559            fut3: $fut3;
1560            fut4: $fut4;
1561            fut5: $fut5;
1562            fut6: $fut6;
1563            fut7: $fut7;
1564        }
1565    };
1566    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_some; $($fut),+ } }
1567}
1568
1569#[doc(hidden)]
1570#[macro_export]
1571macro_rules! __any_some {
1572    ($($ident:ident: $fut: expr;)+) => {
1573        {
1574            $(let mut $ident = Some(std::future::IntoFuture::into_future($fut));)+
1575            $crate::future_fn(move |cx| {
1576                use std::task::Poll;
1577
1578                let mut pending = false;
1579
1580                $(
1581                    if let Some(fut) = $ident.as_mut() {
1582                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1583                        // Future::poll call, so it will not move.
1584                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1585                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1586                            if let Some(r) = r {
1587                                return Poll::Ready(Some(r));
1588                            }
1589                            $ident = None;
1590                        } else {
1591                            pending = true;
1592                        }
1593                    }
1594                )+
1595
1596                if pending {
1597                    Poll::Pending
1598                } else {
1599                    Poll::Ready(None)
1600                }
1601            })
1602        }
1603    }
1604}
1605
1606/// A future that awaits on all `futures` at the same time and returns when any future is `Some(_)` or all are `None`.
1607///
1608/// This is the dynamic version of [`all_some!`].
1609pub async fn any_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Some> {
1610    let mut futures: Vec<_> = futures.into_iter().map(|f| Some(f.into_future())).collect();
1611    future_fn(move |cx| {
1612        let mut pending = false;
1613        for input in &mut futures {
1614            if let Some(fut) = input {
1615                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1616                // Future::poll call, so it will not move.
1617                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1618                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1619                    match r {
1620                        Some(r) => return Poll::Ready(Some(r)),
1621                        None => *input = None,
1622                    }
1623                } else {
1624                    pending = true;
1625                }
1626            }
1627        }
1628
1629        if pending { Poll::Pending } else { Poll::Ready(None) }
1630    })
1631    .await
1632}
1633
1634/// <span data-del-macro-root></span> A future that is ready when all futures are ready with an `Ok(T)` result or
1635/// any future is ready with an `Err(E)` result.
1636///
1637/// The output type is `Result<(T0, T1, ..), E>`, the `Ok` type is a tuple with all the `Ok` values, the error
1638/// type is the first error encountered, the input futures must have the same `Err` type but can have different
1639/// `Ok` types.
1640///
1641/// At least one input future is required and any number of futures is accepted. For more than
1642/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1643/// some IDEs.
1644///
1645/// If two futures are ready and `Err(E)` at the same time the result of the first future in the input list is used.
1646/// After one future is ready and `Err(T)` the other futures are not polled again and are dropped. After a future
1647/// is ready it is also not polled again and dropped.
1648///
1649/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1650/// known at compile time, use the [`fn@all_ok`] async function to await on all futures in a dynamic list of futures.
1651///
1652/// # Examples
1653///
1654/// Await for the first of three futures to complete with `Ok(T)`:
1655///
1656/// ```
1657/// use zng_task as task;
1658/// # #[derive(Debug, PartialEq)]
1659/// # struct FooError;
1660/// # task::doc_test(false, async {
1661/// let r = task::all_ok!(
1662///     task::run(async { Ok::<_, FooError>('a') }),
1663///     task::wait(|| Ok::<_, FooError>('b')),
1664///     async { Ok::<_, FooError>('c') }
1665/// )
1666/// .await;
1667///
1668/// assert_eq!(Ok(('a', 'b', 'c')), r);
1669/// # });
1670/// ```
1671///
1672/// And in if any completes with `Err(E)`:
1673///
1674/// ```
1675/// use zng_task as task;
1676/// # #[derive(Debug, PartialEq)]
1677/// # struct FooError;
1678/// # task::doc_test(false, async {
1679/// let r = task::all_ok!(task::run(async { Ok('a') }), task::wait(|| Err::<char, _>(FooError)), async {
1680///     Ok('c')
1681/// })
1682/// .await;
1683///
1684/// assert_eq!(Err(FooError), r);
1685/// # });
1686/// ```
1687#[macro_export]
1688macro_rules! all_ok {
1689    ($fut0:expr $(,)?) => { $crate::__all_ok! { fut0: $fut0; } };
1690    ($fut0:expr, $fut1:expr $(,)?) => {
1691        $crate::__all_ok! {
1692            fut0: $fut0;
1693            fut1: $fut1;
1694        }
1695    };
1696    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1697        $crate::__all_ok! {
1698            fut0: $fut0;
1699            fut1: $fut1;
1700            fut2: $fut2;
1701        }
1702    };
1703    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1704        $crate::__all_ok! {
1705            fut0: $fut0;
1706            fut1: $fut1;
1707            fut2: $fut2;
1708            fut3: $fut3;
1709        }
1710    };
1711    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1712        $crate::__all_ok! {
1713            fut0: $fut0;
1714            fut1: $fut1;
1715            fut2: $fut2;
1716            fut3: $fut3;
1717            fut4: $fut4;
1718        }
1719    };
1720    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1721        $crate::__all_ok! {
1722            fut0: $fut0;
1723            fut1: $fut1;
1724            fut2: $fut2;
1725            fut3: $fut3;
1726            fut4: $fut4;
1727            fut5: $fut5;
1728        }
1729    };
1730    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1731        $crate::__all_ok! {
1732            fut0: $fut0;
1733            fut1: $fut1;
1734            fut2: $fut2;
1735            fut3: $fut3;
1736            fut4: $fut4;
1737            fut5: $fut5;
1738            fut6: $fut6;
1739        }
1740    };
1741    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1742        $crate::__all_ok! {
1743            fut0: $fut0;
1744            fut1: $fut1;
1745            fut2: $fut2;
1746            fut3: $fut3;
1747            fut4: $fut4;
1748            fut5: $fut5;
1749            fut6: $fut6;
1750            fut7: $fut7;
1751        }
1752    };
1753    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_ok; $($fut),+ } }
1754}
1755
1756#[doc(hidden)]
1757#[macro_export]
1758macro_rules! __all_ok {
1759    ($($ident:ident: $fut: expr;)+) => {
1760        {
1761            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1762            $crate::future_fn(move |cx| {
1763                use std::task::Poll;
1764
1765                let mut pending = false;
1766
1767                $(
1768                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1769                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1770                        // Future::poll call, so it will not move.
1771                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1772                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1773                            match r {
1774                                Ok(r) => {
1775                                    $ident = $crate::FutureOrOutput::Output(Ok(r))
1776                                },
1777                                Err(e) => return Poll::Ready(Err(e)),
1778                            }
1779                        } else {
1780                            pending = true;
1781                        }
1782                    }
1783                )+
1784
1785                if pending {
1786                    Poll::Pending
1787                } else {
1788                    Poll::Ready(Ok((
1789                        $($ident.take_output().unwrap()),+
1790                    )))
1791                }
1792            })
1793        }
1794    }
1795}
1796
1797/// A future that awaits on all `futures` at the same time and returns when all futures are `Ok(_)` or any future is `Err(_)`.
1798///
1799/// This is the dynamic version of [`all_ok!`].
1800pub async fn all_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Vec<Ok>, Err> {
1801    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1802    future_fn(move |cx| {
1803        let mut pending = false;
1804        for input in &mut futures {
1805            if let FutureOrOutput::Future(fut) = input {
1806                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1807                // Future::poll call, so it will not move.
1808                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1809                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1810                    match r {
1811                        Ok(r) => *input = FutureOrOutput::Output(Ok(r)),
1812                        Err(e) => return Poll::Ready(Err(e)),
1813                    }
1814                } else {
1815                    pending = true;
1816                }
1817            }
1818        }
1819
1820        if pending {
1821            Poll::Pending
1822        } else {
1823            Poll::Ready(Ok(futures
1824                .iter_mut()
1825                .map(|f| f.take_output().unwrap_or_else(|_| unreachable!()))
1826                .collect()))
1827        }
1828    })
1829    .await
1830}
1831
1832/// <span data-del-macro-root></span> A future that is ready when all futures are ready with `Some(T)` or when any
1833/// is future ready with `None`.
1834///
1835/// The macro input is comma separated list of future expressions, the futures must
1836/// all have the `Option<T>` output type, but each can have a different `T`. The macro output is a future that when ".awaited"
1837/// produces `Some<(T0, T1, ..)>` if all futures where `Some(T)` or `None` if any of the futures where `None`.
1838///
1839/// At least one input future is required and any number of futures is accepted. For more than
1840/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1841/// some IDEs.
1842///
1843/// After one future is ready and `None` the other futures are not polled again and are dropped. After a future
1844/// is ready it is also not polled again and dropped.
1845///
1846/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1847/// known at compile time, use the [`fn@all_some`] async function to await on all futures in a dynamic list of futures.
1848///
1849/// # Examples
1850///
1851/// Await for the first of three futures to complete with `Some`:
1852///
1853/// ```
1854/// use zng_task as task;
1855/// # task::doc_test(false, async {
1856/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| Some('b')), async { Some('c') }).await;
1857///
1858/// assert_eq!(Some(('a', 'b', 'c')), r);
1859/// # });
1860/// ```
1861///
1862/// Completes with `None` if any future completes with `None`:
1863///
1864/// ```
1865/// # use zng_task as task;
1866/// # task::doc_test(false, async {
1867/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| None::<char>), async { Some('b') }).await;
1868///
1869/// assert_eq!(None, r);
1870/// # });
1871/// ```
1872#[macro_export]
1873macro_rules! all_some {
1874    ($fut0:expr $(,)?) => { $crate::__all_some! { fut0: $fut0; } };
1875    ($fut0:expr, $fut1:expr $(,)?) => {
1876        $crate::__all_some! {
1877            fut0: $fut0;
1878            fut1: $fut1;
1879        }
1880    };
1881    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1882        $crate::__all_some! {
1883            fut0: $fut0;
1884            fut1: $fut1;
1885            fut2: $fut2;
1886        }
1887    };
1888    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1889        $crate::__all_some! {
1890            fut0: $fut0;
1891            fut1: $fut1;
1892            fut2: $fut2;
1893            fut3: $fut3;
1894        }
1895    };
1896    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1897        $crate::__all_some! {
1898            fut0: $fut0;
1899            fut1: $fut1;
1900            fut2: $fut2;
1901            fut3: $fut3;
1902            fut4: $fut4;
1903        }
1904    };
1905    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1906        $crate::__all_some! {
1907            fut0: $fut0;
1908            fut1: $fut1;
1909            fut2: $fut2;
1910            fut3: $fut3;
1911            fut4: $fut4;
1912            fut5: $fut5;
1913        }
1914    };
1915    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1916        $crate::__all_some! {
1917            fut0: $fut0;
1918            fut1: $fut1;
1919            fut2: $fut2;
1920            fut3: $fut3;
1921            fut4: $fut4;
1922            fut5: $fut5;
1923            fut6: $fut6;
1924        }
1925    };
1926    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1927        $crate::__all_some! {
1928            fut0: $fut0;
1929            fut1: $fut1;
1930            fut2: $fut2;
1931            fut3: $fut3;
1932            fut4: $fut4;
1933            fut5: $fut5;
1934            fut6: $fut6;
1935            fut7: $fut7;
1936        }
1937    };
1938    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_some; $($fut),+ } }
1939}
1940
1941#[doc(hidden)]
1942#[macro_export]
1943macro_rules! __all_some {
1944    ($($ident:ident: $fut: expr;)+) => {
1945        {
1946            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1947            $crate::future_fn(move |cx| {
1948                use std::task::Poll;
1949
1950                let mut pending = false;
1951
1952                $(
1953                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1954                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1955                        // Future::poll call, so it will not move.
1956                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1957                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1958                            if r.is_none() {
1959                                return Poll::Ready(None);
1960                            }
1961
1962                            $ident = $crate::FutureOrOutput::Output(r);
1963                        } else {
1964                            pending = true;
1965                        }
1966                    }
1967                )+
1968
1969                if pending {
1970                    Poll::Pending
1971                } else {
1972                    Poll::Ready(Some((
1973                        $($ident.take_output().unwrap()),+
1974                    )))
1975                }
1976            })
1977        }
1978    }
1979}
1980
1981/// A future that awaits on all `futures` at the same time and returns when all futures are `Some(_)` or any future is `None`.
1982///
1983/// This is the dynamic version of [`all_some!`].
1984pub async fn all_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Vec<Some>> {
1985    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1986    future_fn(move |cx| {
1987        let mut pending = false;
1988        for input in &mut futures {
1989            if let FutureOrOutput::Future(fut) = input {
1990                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1991                // Future::poll call, so it will not move.
1992                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1993                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1994                    match r {
1995                        Some(r) => *input = FutureOrOutput::Output(Some(r)),
1996                        None => return Poll::Ready(None),
1997                    }
1998                } else {
1999                    pending = true;
2000                }
2001            }
2002        }
2003
2004        if pending {
2005            Poll::Pending
2006        } else {
2007            Poll::Ready(Some(futures.iter_mut().map(|f| f.take_output().unwrap()).collect()))
2008        }
2009    })
2010    .await
2011}
2012
2013/// A future that will await until [`set`] is called.
2014///
2015/// # Examples
2016///
2017/// Spawns a parallel task that only writes to stdout after the main thread sets the signal:
2018///
2019/// ```
2020/// use zng_clone_move::async_clmv;
2021/// use zng_task::{self as task, *};
2022///
2023/// let signal = SignalOnce::default();
2024///
2025/// task::spawn(async_clmv!(signal, {
2026///     signal.await;
2027///     println!("After Signal!");
2028/// }));
2029///
2030/// signal.set();
2031/// ```
2032///
2033/// [`set`]: SignalOnce::set
2034#[derive(Default, Clone)]
2035pub struct SignalOnce(Arc<SignalInner>);
2036impl fmt::Debug for SignalOnce {
2037    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2038        write!(f, "SignalOnce({})", self.is_set())
2039    }
2040}
2041impl PartialEq for SignalOnce {
2042    fn eq(&self, other: &Self) -> bool {
2043        Arc::ptr_eq(&self.0, &other.0)
2044    }
2045}
2046impl Eq for SignalOnce {}
2047impl Hash for SignalOnce {
2048    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2049        Arc::as_ptr(&self.0).hash(state)
2050    }
2051}
2052impl SignalOnce {
2053    /// New unsigned.
2054    pub fn new() -> Self {
2055        Self::default()
2056    }
2057
2058    /// New signaled.
2059    pub fn new_set() -> Self {
2060        let s = Self::new();
2061        s.set();
2062        s
2063    }
2064
2065    /// If the signal was set.
2066    pub fn is_set(&self) -> bool {
2067        self.0.signaled.load(Ordering::Relaxed)
2068    }
2069
2070    /// Sets the signal and awakes listeners.
2071    pub fn set(&self) {
2072        if !self.0.signaled.swap(true, Ordering::Relaxed) {
2073            let listeners = mem::take(&mut *self.0.listeners.lock());
2074            for listener in listeners {
2075                listener.wake();
2076            }
2077        }
2078    }
2079}
2080impl Future for SignalOnce {
2081    type Output = ();
2082
2083    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
2084        if self.0.signaled.load(Ordering::Relaxed) {
2085            return Poll::Ready(());
2086        }
2087
2088        let mut listeners = self.0.listeners.lock();
2089        if self.0.signaled.load(Ordering::Relaxed) {
2090            return Poll::Ready(());
2091        }
2092
2093        let waker = cx.waker();
2094        if !listeners.iter().any(|w| w.will_wake(waker)) {
2095            listeners.push(waker.clone());
2096        }
2097
2098        Poll::Pending
2099    }
2100}
2101
2102#[derive(Default)]
2103struct SignalInner {
2104    signaled: AtomicBool,
2105    listeners: Mutex<Vec<std::task::Waker>>,
2106}
2107
2108/// A [`Waker`] that dispatches a wake call to multiple other wakers.
2109///
2110/// This is useful for sharing one wake source with multiple [`Waker`] clients that may not be all
2111/// known at the moment the first request is made.
2112///  
2113/// [`Waker`]: std::task::Waker
2114#[derive(Clone)]
2115pub struct McWaker(Arc<WakeVec>);
2116
2117#[derive(Default)]
2118struct WakeVec(Mutex<Vec<std::task::Waker>>);
2119impl WakeVec {
2120    fn push(&self, waker: std::task::Waker) -> bool {
2121        let mut v = self.0.lock();
2122
2123        let return_waker = v.is_empty();
2124
2125        v.push(waker);
2126
2127        return_waker
2128    }
2129
2130    fn cancel(&self) {
2131        let mut v = self.0.lock();
2132
2133        debug_assert!(!v.is_empty(), "called cancel on an empty McWaker");
2134
2135        v.clear();
2136    }
2137}
2138impl std::task::Wake for WakeVec {
2139    fn wake(self: Arc<Self>) {
2140        for w in mem::take(&mut *self.0.lock()) {
2141            w.wake();
2142        }
2143    }
2144}
2145impl McWaker {
2146    /// New empty waker.
2147    pub fn empty() -> Self {
2148        Self(Arc::new(WakeVec::default()))
2149    }
2150
2151    /// Register a `waker` to wake once when `self` awakes.
2152    ///
2153    /// Returns `Some(self as waker)` if `self` was previously empty, if `None` is returned [`Poll::Pending`] must
2154    /// be returned, if a waker is returned the shared resource must be polled using the waker, if the shared resource
2155    /// is ready [`cancel`] must be called.
2156    ///
2157    /// [`cancel`]: Self::cancel
2158    pub fn push(&self, waker: std::task::Waker) -> Option<std::task::Waker> {
2159        if self.0.push(waker) { Some(self.0.clone().into()) } else { None }
2160    }
2161
2162    /// Clear current registered wakers.
2163    pub fn cancel(&self) {
2164        self.0.cancel()
2165    }
2166}
2167
2168/// Panic payload, captured by [`std::panic::catch_unwind`].
2169#[non_exhaustive]
2170pub struct TaskPanicError {
2171    /// Panic payload.
2172    pub payload: Box<dyn Any + Send + 'static>,
2173}
2174impl TaskPanicError {
2175    /// New from panic payload.
2176    pub fn new(payload: Box<dyn Any + Send + 'static>) -> Self {
2177        Self { payload }
2178    }
2179
2180    /// Get the panic string if the `payload` is string like.
2181    pub fn panic_str(&self) -> Option<&str> {
2182        if let Some(s) = self.payload.downcast_ref::<&str>() {
2183            Some(s)
2184        } else if let Some(s) = self.payload.downcast_ref::<String>() {
2185            Some(s)
2186        } else {
2187            None
2188        }
2189    }
2190}
2191impl fmt::Debug for TaskPanicError {
2192    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2193        f.debug_struct("TaskPanicError").field("panic_str()", &self.panic_str()).finish()
2194    }
2195}
2196impl fmt::Display for TaskPanicError {
2197    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2198        if let Some(s) = self.panic_str() { f.write_str(s) } else { Ok(()) }
2199    }
2200}
2201impl std::error::Error for TaskPanicError {}
2202
2203type SpawnPanicHandler = Box<dyn FnMut(TaskPanicError) + Send + 'static>;
2204
2205app_local! {
2206    // Mutex for Sync only
2207    static SPAWN_PANIC_HANDLERS: Option<Mutex<SpawnPanicHandler>> = None;
2208}
2209
2210/// Set a `handler` that is called when spawn tasks panic.
2211///
2212/// On panic the tasks [`spawn`], [`poll_spawn`] and [`spawn_wait`] log an error, notifies the `handler` and otherwise ignores the panic.
2213///
2214/// The handler is set for the process lifetime, only handler can be set per app. The handler is called inside the same [`LocalContext`]
2215/// and thread the task that panicked was called in.
2216///
2217/// ```
2218/// # macro_rules! example { () => {
2219/// task::set_spawn_panic_handler(|p| {
2220///     UPDATES
2221///         .run_hn_once(hn_once!(|_| {
2222///             std::panic::resume_unwind(p.payload);
2223///         }))
2224///         .perm();
2225/// });
2226/// # }}
2227/// ```
2228///
2229/// The example above shows how to set a handler that propagates the panic to the app main thread.
2230///
2231/// # Panics
2232///
2233/// Panics if another handler is already set in the same app.
2234///
2235/// Panics if no app is running in the caller thread.
2236pub fn set_spawn_panic_handler(handler: impl FnMut(TaskPanicError) + Send + 'static) {
2237    let mut h = SPAWN_PANIC_HANDLERS.try_write().expect("a spawn panic handler is already set");
2238    assert!(h.is_none(), "a spawn panic handler is already set");
2239    *h = Some(Mutex::new(Box::new(handler)));
2240}
2241
2242fn on_spawn_panic(p: TaskPanicError) {
2243    if let Some(f) = &mut *SPAWN_PANIC_HANDLERS.write() {
2244        f.get_mut()(p)
2245    }
2246}