zng_task/lib.rs
1#![doc(html_favicon_url = "https://zng-ui.github.io/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://zng-ui.github.io/res/zng-logo.png")]
3//!
4//! Parallel async tasks and async task runners.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9#![warn(unused_extern_crates)]
10#![warn(missing_docs)]
11
12use std::{
13 any::Any,
14 fmt,
15 hash::Hash,
16 mem, panic,
17 pin::Pin,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21 },
22 task::Poll,
23};
24
25use zng_app_context::{LocalContext, app_local};
26use zng_time::Deadline;
27use zng_var::{ResponseVar, VarValue, response_done_var, response_var};
28
29#[cfg(test)]
30mod tests;
31
32mod reexports;
33pub use reexports::*;
34
35use crate::parking_lot::Mutex;
36
37pub mod channel;
38pub mod fs;
39pub mod io;
40
41mod ui;
42pub use ui::*;
43
44pub mod http;
45
46pub mod process;
47
48mod rayon_ctx;
49
50mod progress;
51pub use progress::*;
52
53/// Spawn a parallel async task, this function is not blocking and the `task` starts executing immediately.
54///
55/// # Parallel
56///
57/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
58///
59/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
60/// otherwise it will run in a single thread at a time, still not blocking the UI.
61///
62/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
63///
64/// # Async
65///
66/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
67/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
68/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
69/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
70///
71/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
72///
73/// # Examples
74///
75/// ```
76/// # use zng_task::{self as task, *, rayon::iter::*};
77/// # use zng_var::*;
78/// # struct SomeStruct { sum_response: ResponseVar<usize> }
79/// # impl SomeStruct {
80/// fn on_event(&mut self) {
81/// let (responder, response) = response_var();
82/// self.sum_response = response;
83///
84/// task::spawn(async move {
85/// let r = (0..1000).into_par_iter().map(|i| i * i).sum();
86///
87/// responder.respond(r);
88/// });
89/// }
90///
91/// fn on_update(&mut self) {
92/// if let Some(result) = self.sum_response.rsp_new() {
93/// println!("sum of squares 0..1000: {result}");
94/// }
95/// }
96/// # }
97/// ```
98///
99/// The example uses the `rayon` parallel iterator to compute a result and uses a [`response_var`] to send the result to the UI.
100/// The task captures the caller [`LocalContext`] so the response variable will set correctly.
101///
102/// Note that this function is the most basic way to spawn a parallel task where you must setup channels to the rest of the app yourself,
103/// you can use [`respond`] to avoid having to manually set a response, or [`run`] to `.await` the result.
104///
105/// # Panic Handling
106///
107/// If the `task` panics the panic message is logged as an error, and can observed using [`set_spawn_panic_handler`]. It
108/// is otherwise ignored.
109///
110/// # Unwind Safety
111///
112/// This function disables the [unwind safety validation], meaning that in case of a panic shared
113/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
114/// poisoning mutexes or atomics to mutate shared data or use [`run_catch`] to detect a panic or [`run`]
115/// to propagate a panic.
116///
117/// [unwind safety validation]: std::panic::UnwindSafe
118/// [`Waker`]: std::task::Waker
119/// [`rayon`]: https://docs.rs/rayon
120/// [`LocalContext`]: zng_app_context::LocalContext
121/// [`response_var`]: zng_var::response_var
122pub fn spawn<F>(task: impl IntoFuture<IntoFuture = F>)
123where
124 F: Future<Output = ()> + Send + 'static,
125{
126 Arc::new(RayonTask {
127 ctx: LocalContext::capture(),
128 fut: Mutex::new(Some(Box::pin(task.into_future()))),
129 })
130 .poll()
131}
132
133/// Polls the `task` once immediately on the calling thread, if the `task` is pending, continues execution in [`spawn`].
134pub fn poll_spawn<F>(task: impl IntoFuture<IntoFuture = F>)
135where
136 F: Future<Output = ()> + Send + 'static,
137{
138 struct PollRayonTask {
139 fut: Mutex<Option<(RayonSpawnFut, Option<LocalContext>)>>,
140 }
141 impl PollRayonTask {
142 // start task in calling thread
143 fn poll(self: Arc<Self>) {
144 let mut task = self.fut.lock();
145 let (mut t, _) = task.take().unwrap();
146
147 let waker = self.clone().into();
148
149 match t.as_mut().poll(&mut std::task::Context::from_waker(&waker)) {
150 Poll::Ready(()) => {}
151 Poll::Pending => {
152 let ctx = LocalContext::capture();
153 *task = Some((t, Some(ctx)));
154 }
155 }
156 }
157 }
158 impl std::task::Wake for PollRayonTask {
159 fn wake(self: Arc<Self>) {
160 // continue task in spawn threads
161 if let Some((task, Some(ctx))) = self.fut.lock().take() {
162 Arc::new(RayonTask {
163 ctx,
164 fut: Mutex::new(Some(Box::pin(task))),
165 })
166 .poll();
167 }
168 }
169 }
170
171 Arc::new(PollRayonTask {
172 fut: Mutex::new(Some((Box::pin(task.into_future()), None))),
173 })
174 .poll()
175}
176
177type RayonSpawnFut = Pin<Box<dyn Future<Output = ()> + Send>>;
178
179// A future that is its own waker that polls inside rayon spawn tasks.
180struct RayonTask {
181 ctx: LocalContext,
182 fut: Mutex<Option<RayonSpawnFut>>,
183}
184impl RayonTask {
185 fn poll(self: Arc<Self>) {
186 ::rayon::spawn(move || {
187 // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
188 let mut task = self.fut.lock();
189 if let Some(mut t) = task.take() {
190 let waker = self.clone().into();
191
192 // load app context
193 self.ctx.clone().with_context(move || {
194 let r = panic::catch_unwind(panic::AssertUnwindSafe(move || {
195 // poll future
196 if t.as_mut().poll(&mut std::task::Context::from_waker(&waker)).is_pending() {
197 // not done
198 *task = Some(t);
199 }
200 }));
201 if let Err(p) = r {
202 let p = TaskPanicError::new(p);
203 tracing::error!("panic in `task::spawn`: {}", p.panic_str().unwrap_or(""));
204 on_spawn_panic(p);
205 }
206 });
207 }
208 })
209 }
210}
211impl std::task::Wake for RayonTask {
212 fn wake(self: Arc<Self>) {
213 self.poll()
214 }
215}
216
217/// Rayon join with local context.
218///
219/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
220/// operations.
221///
222/// See `rayon::join` for more details about join.
223///
224/// [`LocalContext`]: zng_app_context::LocalContext
225pub fn join<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
226where
227 A: FnOnce() -> RA + Send,
228 B: FnOnce() -> RB + Send,
229 RA: Send,
230 RB: Send,
231{
232 self::join_context(move |_| op_a(), move |_| op_b())
233}
234
235/// Rayon join context with local context.
236///
237/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
238/// operations.
239///
240/// See `rayon::join_context` for more details about join.
241///
242/// [`LocalContext`]: zng_app_context::LocalContext
243pub fn join_context<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
244where
245 A: FnOnce(::rayon::FnContext) -> RA + Send,
246 B: FnOnce(::rayon::FnContext) -> RB + Send,
247 RA: Send,
248 RB: Send,
249{
250 let ctx = LocalContext::capture();
251 let ctx = &ctx;
252 ::rayon::join_context(
253 move |a| {
254 if a.migrated() {
255 ctx.clone().with_context(|| op_a(a))
256 } else {
257 op_a(a)
258 }
259 },
260 move |b| {
261 if b.migrated() {
262 ctx.clone().with_context(|| op_b(b))
263 } else {
264 op_b(b)
265 }
266 },
267 )
268}
269
270/// Rayon scope with local context.
271///
272/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
273/// operations.
274///
275/// See `rayon::scope` for more details about scope.
276///
277/// [`LocalContext`]: zng_app_context::LocalContext
278pub fn scope<'scope, OP, R>(op: OP) -> R
279where
280 OP: FnOnce(ScopeCtx<'_, 'scope>) -> R + Send,
281 R: Send,
282{
283 let ctx = LocalContext::capture();
284
285 // Cast `&'_ ctx` to `&'scope ctx` to "inject" the context in the scope.
286 // Is there a better way to do this? I hope so.
287 //
288 // SAFETY:
289 // * We are extending `'_` to `'scope`, that is one of the documented valid usages of `transmute`.
290 // * No use after free because `rayon::scope` joins all threads before returning and we only drop `ctx` after.
291 let ctx_ref: &'_ LocalContext = &ctx;
292 let ctx_scope_ref: &'scope LocalContext = unsafe { std::mem::transmute(ctx_ref) };
293
294 let r = ::rayon::scope(move |s| {
295 op(ScopeCtx {
296 scope: s,
297 ctx: ctx_scope_ref,
298 })
299 });
300
301 drop(ctx);
302
303 r
304}
305
306/// Represents a fork-join scope which can be used to spawn any number of tasks that run in the caller's thread context.
307///
308/// See [`scope`] for more details.
309#[derive(Clone, Copy, Debug)]
310pub struct ScopeCtx<'a, 'scope: 'a> {
311 scope: &'a ::rayon::Scope<'scope>,
312 ctx: &'scope LocalContext,
313}
314impl<'a, 'scope: 'a> ScopeCtx<'a, 'scope> {
315 /// Spawns a job into the fork-join scope `self`. The job runs in the captured thread context.
316 ///
317 /// See `rayon::Scope::spawn` for more details.
318 pub fn spawn<F>(self, f: F)
319 where
320 F: FnOnce(ScopeCtx<'_, 'scope>) + Send + 'scope,
321 {
322 let ctx = self.ctx;
323 self.scope
324 .spawn(move |s| ctx.clone().with_context(move || f(ScopeCtx { scope: s, ctx })));
325 }
326}
327
328/// Spawn a parallel async task that can also be `.await` for the task result.
329///
330/// # Parallel
331///
332/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
333///
334/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
335/// otherwise it will run in a single thread at a time, still not blocking the UI.
336///
337/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
338///
339/// # Async
340///
341/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
342/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
343/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
344/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
345///
346/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
347///
348/// # Examples
349///
350/// ```
351/// # use zng_task::{self as task, rayon::iter::*};
352/// # struct SomeStruct { sum: usize }
353/// # async fn read_numbers() -> Vec<usize> { vec![] }
354/// # impl SomeStruct {
355/// async fn on_event(&mut self) {
356/// self.sum = task::run(async { read_numbers().await.par_iter().map(|i| i * i).sum() }).await;
357/// }
358/// # }
359/// ```
360///
361/// The example `.await` for some numbers and then uses a parallel iterator to compute a result, this all runs in parallel
362/// because it is inside a `run` task. The task result is then `.await` inside one of the UI async tasks. Note that the
363/// task captures the caller [`LocalContext`] so you can interact with variables and UI services directly inside the task too.
364///
365/// # Cancellation
366///
367/// The task starts running immediately, awaiting the returned future merely awaits for a message from the worker threads and
368/// that means the `task` future is not owned by the returned future. Usually to *cancel* a future you only need to drop it,
369/// in this task dropping the returned future will only drop the `task` once it reaches a `.await` point and detects that the
370/// result channel is disconnected.
371///
372/// If you want to deterministically known that the `task` was cancelled use a cancellation signal.
373///
374/// # Panic Propagation
375///
376/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
377/// can use [`run_catch`] to get the panic as an error instead.
378///
379/// [`resume_unwind`]: panic::resume_unwind
380/// [`Waker`]: std::task::Waker
381/// [`rayon`]: https://docs.rs/rayon
382/// [`LocalContext`]: zng_app_context::LocalContext
383pub async fn run<R, T>(task: impl IntoFuture<IntoFuture = T>) -> R
384where
385 R: Send + 'static,
386 T: Future<Output = R> + Send + 'static,
387{
388 match run_catch(task).await {
389 Ok(r) => r,
390 Err(p) => panic::resume_unwind(p.payload),
391 }
392}
393
394/// Like [`run`] but catches panics.
395///
396/// This task works the same and has the same utility as [`run`], except if returns panic messages
397/// as an error instead of propagating the panic.
398///
399/// # Unwind Safety
400///
401/// This function disables the [unwind safety validation], meaning that in case of a panic shared
402/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
403/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
404/// if this function returns an error.
405///
406/// [unwind safety validation]: std::panic::UnwindSafe
407pub async fn run_catch<R, T>(task: impl IntoFuture<IntoFuture = T>) -> Result<R, TaskPanicError>
408where
409 R: Send + 'static,
410 T: Future<Output = R> + Send + 'static,
411{
412 type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
413
414 // A future that is its own waker that polls inside the rayon primary thread-pool.
415 struct RayonCatchTask<R> {
416 ctx: LocalContext,
417 fut: Mutex<Option<Fut<R>>>,
418 sender: flume::Sender<Result<R, TaskPanicError>>,
419 }
420 impl<R: Send + 'static> RayonCatchTask<R> {
421 fn poll(self: Arc<Self>) {
422 let sender = self.sender.clone();
423 if sender.is_disconnected() {
424 return; // cancel.
425 }
426 ::rayon::spawn(move || {
427 // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
428 let mut task = self.fut.lock();
429 if let Some(mut t) = task.take() {
430 let waker = self.clone().into();
431 let mut cx = std::task::Context::from_waker(&waker);
432
433 self.ctx.clone().with_context(|| {
434 let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
435 match r {
436 Ok(Poll::Ready(r)) => {
437 drop(task);
438 let _ = sender.send(Ok(r));
439 }
440 Ok(Poll::Pending) => {
441 *task = Some(t);
442 }
443 Err(p) => {
444 drop(task);
445 let _ = sender.send(Err(TaskPanicError::new(p)));
446 }
447 }
448 });
449 }
450 })
451 }
452 }
453 impl<R: Send + 'static> std::task::Wake for RayonCatchTask<R> {
454 fn wake(self: Arc<Self>) {
455 self.poll()
456 }
457 }
458
459 let (sender, receiver) = channel::bounded(1);
460
461 Arc::new(RayonCatchTask {
462 ctx: LocalContext::capture(),
463 fut: Mutex::new(Some(Box::pin(task.into_future()))),
464 sender: sender.into(),
465 })
466 .poll();
467
468 receiver.recv().await.unwrap()
469}
470
471/// Spawn a parallel async task that will send its result to a [`ResponseVar<R>`].
472///
473/// The [`run`] documentation explains how `task` is *parallel* and *async*. The `task` starts executing immediately.
474///
475/// # Examples
476///
477/// ```
478/// # use zng_task::{self as task, rayon::iter::*};
479/// # use zng_var::*;
480/// # struct SomeStruct { sum_response: ResponseVar<usize> }
481/// # async fn read_numbers() -> Vec<usize> { vec![] }
482/// # impl SomeStruct {
483/// fn on_event(&mut self) {
484/// self.sum_response = task::respond(async { read_numbers().await.par_iter().map(|i| i * i).sum() });
485/// }
486///
487/// fn on_update(&mut self) {
488/// if let Some(result) = self.sum_response.rsp_new() {
489/// println!("sum of squares: {result}");
490/// }
491/// }
492/// # }
493/// ```
494///
495/// The example `.await` for some numbers and then uses a parallel iterator to compute a result. The result is send to
496/// `sum_response` that is a [`ResponseVar<R>`].
497///
498/// # Cancellation
499///
500/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
501///
502/// # Panic Handling
503///
504/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
505///
506/// [`resume_unwind`]: panic::resume_unwind
507/// [`ResponseVar<R>`]: zng_var::ResponseVar
508/// [`response_var`]: zng_var::response_var
509pub fn respond<R, F>(task: F) -> ResponseVar<R>
510where
511 R: VarValue,
512 F: Future<Output = R> + Send + 'static,
513{
514 type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
515
516 let (responder, response) = response_var();
517
518 // A future that is its own waker that polls inside the rayon primary thread-pool.
519 struct RayonRespondTask<R: VarValue> {
520 ctx: LocalContext,
521 fut: Mutex<Option<Fut<R>>>,
522 responder: zng_var::ResponderVar<R>,
523 }
524 impl<R: VarValue> RayonRespondTask<R> {
525 fn poll(self: Arc<Self>) {
526 let responder = self.responder.clone();
527 if responder.strong_count() == 2 {
528 return; // cancel.
529 }
530 ::rayon::spawn(move || {
531 // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
532 let mut task = self.fut.lock();
533 if let Some(mut t) = task.take() {
534 let waker = self.clone().into();
535 let mut cx = std::task::Context::from_waker(&waker);
536
537 self.ctx.clone().with_context(|| {
538 let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
539 match r {
540 Ok(Poll::Ready(r)) => {
541 drop(task);
542
543 responder.respond(r);
544 }
545 Ok(Poll::Pending) => {
546 *task = Some(t);
547 }
548 Err(p) => {
549 let p = TaskPanicError::new(p);
550 tracing::error!("panic in `task::respond`: {}", p.panic_str().unwrap_or(""));
551 drop(task);
552 responder.modify(move |_| panic::resume_unwind(p.payload));
553 }
554 }
555 });
556 }
557 })
558 }
559 }
560 impl<R: VarValue> std::task::Wake for RayonRespondTask<R> {
561 fn wake(self: Arc<Self>) {
562 self.poll()
563 }
564 }
565
566 Arc::new(RayonRespondTask {
567 ctx: LocalContext::capture(),
568 fut: Mutex::new(Some(Box::pin(task))),
569 responder,
570 })
571 .poll();
572
573 response
574}
575
576/// Polls the `task` once immediately on the calling thread, if the `task` is ready returns the response already set,
577/// if the `task` is pending continues execution like [`respond`].
578pub fn poll_respond<R, F>(task: impl IntoFuture<IntoFuture = F>) -> ResponseVar<R>
579where
580 R: VarValue,
581 F: Future<Output = R> + Send + 'static,
582{
583 enum QuickResponse<R: VarValue> {
584 Quick(Option<R>),
585 Response(zng_var::ResponderVar<R>),
586 }
587 let task = task.into_future();
588 let q = Arc::new(Mutex::new(QuickResponse::Quick(None)));
589 poll_spawn(zng_clone_move::async_clmv!(q, {
590 let rsp = task.await;
591
592 match &mut *q.lock() {
593 QuickResponse::Quick(q) => *q = Some(rsp),
594 QuickResponse::Response(r) => r.respond(rsp),
595 }
596 }));
597
598 let mut q = q.lock();
599 match &mut *q {
600 QuickResponse::Quick(q) if q.is_some() => response_done_var(q.take().unwrap()),
601 _ => {
602 let (responder, response) = response_var();
603 *q = QuickResponse::Response(responder);
604 response
605 }
606 }
607}
608
609/// Create a parallel `task` that blocks awaiting for an IO operation, the `task` starts on the first `.await`.
610///
611/// # Parallel
612///
613/// The `task` runs in the [`blocking`] thread-pool which is optimized for awaiting blocking operations.
614/// If the `task` is computation heavy you should use [`run`] and then `wait` inside that task for the
615/// parts that are blocking.
616///
617/// # Examples
618///
619/// ```
620/// # fn main() { }
621/// # use zng_task as task;
622/// # async fn example() {
623/// task::wait(|| std::fs::read_to_string("file.txt")).await
624/// # ; }
625/// ```
626///
627/// The example reads a file, that is a blocking file IO operation, most of the time is spend waiting for the operating system,
628/// so we offload this to a `wait` task. The task can be `.await` inside a [`run`] task or inside one of the UI tasks
629/// like in a async event handler.
630///
631/// # Async Read/Write
632///
633/// For [`std::io::Read`] and [`std::io::Write`] operations you can also use [`io`] and [`fs`] alternatives when you don't
634/// have or want the full file in memory or when you want to apply multiple operations to the file.
635///
636/// # Panic Propagation
637///
638/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
639/// can use [`wait_catch`] to get the panic as an error instead.
640///
641/// [`blocking`]: https://docs.rs/blocking
642/// [`resume_unwind`]: panic::resume_unwind
643pub async fn wait<T, F>(task: F) -> T
644where
645 F: FnOnce() -> T + Send + 'static,
646 T: Send + 'static,
647{
648 match wait_catch(task).await {
649 Ok(r) => r,
650 Err(p) => panic::resume_unwind(p.payload),
651 }
652}
653
654/// Like [`wait`] but catches panics.
655///
656/// This task works the same and has the same utility as [`wait`], except if returns panic messages
657/// as an error instead of propagating the panic.
658///
659/// # Unwind Safety
660///
661/// This function disables the [unwind safety validation], meaning that in case of a panic shared
662/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
663/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
664/// if this function returns an error.
665///
666/// [unwind safety validation]: std::panic::UnwindSafe
667pub async fn wait_catch<T, F>(task: F) -> Result<T, TaskPanicError>
668where
669 F: FnOnce() -> T + Send + 'static,
670 T: Send + 'static,
671{
672 let mut ctx = LocalContext::capture();
673 blocking::unblock(move || ctx.with_context(move || panic::catch_unwind(panic::AssertUnwindSafe(task))))
674 .await
675 .map_err(TaskPanicError::new)
676}
677
678/// Fire and forget a [`wait`] task. The `task` starts executing immediately.
679///
680/// # Panic Handling
681///
682/// If the `task` panics the panic message is logged as an error, and can observed using [`set_spawn_panic_handler`]. It
683/// is otherwise ignored.
684///
685/// # Unwind Safety
686///
687/// This function disables the [unwind safety validation], meaning that in case of a panic shared
688/// data can end-up in an invalid (still memory safe) state. If you are worried about that only use
689/// poisoning mutexes or atomics to mutate shared data or use [`wait_catch`] to detect a panic or [`wait`]
690/// to propagate a panic.
691///
692/// [unwind safety validation]: std::panic::UnwindSafe
693pub fn spawn_wait<F>(task: F)
694where
695 F: FnOnce() + Send + 'static,
696{
697 spawn(async move {
698 if let Err(p) = wait_catch(task).await {
699 tracing::error!("parallel `spawn_wait` task panicked: {}", p.panic_str().unwrap_or(""));
700 on_spawn_panic(p);
701 }
702 });
703}
704
705/// Like [`spawn_wait`], but the task will send its result to a [`ResponseVar<R>`].
706///
707/// # Cancellation
708///
709/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
710///
711/// # Panic Handling
712///
713/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
714pub fn wait_respond<R, F>(task: F) -> ResponseVar<R>
715where
716 R: VarValue,
717 F: FnOnce() -> R + Send + 'static,
718{
719 let (responder, response) = response_var();
720 spawn_wait(move || match panic::catch_unwind(panic::AssertUnwindSafe(task)) {
721 Ok(r) => responder.respond(r),
722 Err(p) => {
723 let p = TaskPanicError::new(p);
724 tracing::error!("panic in `task::wait_respond`: {}", p.panic_str().unwrap_or(""));
725 responder.modify(move |_| panic::resume_unwind(p.payload));
726 }
727 });
728 response
729}
730
731/// Blocks the thread until the `task` future finishes.
732///
733/// The crate [`futures-lite`] is used to execute the task.
734///
735/// # Examples
736///
737/// Test a [`run`] call:
738///
739/// ```
740/// use zng_task as task;
741/// # use zng_unit::*;
742/// # async fn foo(u: u8) -> Result<u8, ()> { task::deadline(1.ms()).await; Ok(u) }
743///
744/// # #[test]
745/// # fn __() { }
746/// pub fn run_ok() {
747/// let r = task::block_on(task::run(async { foo(32).await }));
748///
749/// # let value =
750/// r.expect("foo(32) was not Ok");
751/// # assert_eq!(32, value);
752/// }
753/// # run_ok();
754/// ```
755///
756/// [`futures-lite`]: https://docs.rs/futures-lite/
757pub fn block_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
758where
759 F: Future,
760{
761 futures_lite::future::block_on(task.into_future())
762}
763
764/// Continuous poll the `task` until if finishes.
765///
766/// This function is useful for implementing some async tests only, futures don't expect to be polled
767/// continuously. This function is only available in test builds.
768#[cfg(any(test, doc, feature = "test_util"))]
769pub fn spin_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
770where
771 F: Future,
772{
773 use std::pin::pin;
774
775 let mut task = pin!(task.into_future());
776 block_on(future_fn(|cx| match task.as_mut().poll(cx) {
777 Poll::Ready(r) => Poll::Ready(r),
778 Poll::Pending => {
779 cx.waker().wake_by_ref();
780 Poll::Pending
781 }
782 }))
783}
784
785/// Executor used in async doc tests.
786///
787/// If `spin` is `true` the [`spin_on`] executor is used with a timeout of 500 milliseconds.
788/// IF `spin` is `false` the [`block_on`] executor is used with a timeout of 5 seconds.
789#[cfg(any(test, doc, feature = "test_util"))]
790pub fn doc_test<F>(spin: bool, task: impl IntoFuture<IntoFuture = F>) -> F::Output
791where
792 F: Future,
793{
794 use zng_unit::TimeUnits;
795
796 if spin {
797 spin_on(with_deadline(task, 500.ms())).expect("async doc-test timeout")
798 } else {
799 block_on(with_deadline(task, 5.secs())).expect("async doc-test timeout")
800 }
801}
802
803/// A future that is [`Pending`] once and wakes the current task.
804///
805/// After the first `.await` the future is always [`Ready`] and on the first `.await` it calls [`wake`].
806///
807/// [`Pending`]: std::task::Poll::Pending
808/// [`Ready`]: std::task::Poll::Ready
809/// [`wake`]: std::task::Waker::wake
810pub async fn yield_now() {
811 struct YieldNowFut(bool);
812 impl Future for YieldNowFut {
813 type Output = ();
814
815 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
816 if self.0 {
817 Poll::Ready(())
818 } else {
819 self.0 = true;
820 cx.waker().wake_by_ref();
821 Poll::Pending
822 }
823 }
824 }
825
826 YieldNowFut(false).await
827}
828
829/// A future that is [`Pending`] until the `deadline` is reached.
830///
831/// # Examples
832///
833/// Await 5 seconds in a [`spawn`] parallel task:
834///
835/// ```
836/// use zng_task as task;
837/// use zng_unit::*;
838///
839/// task::spawn(async {
840/// println!("waiting 5 seconds..");
841/// task::deadline(5.secs()).await;
842/// println!("5 seconds elapsed.")
843/// });
844/// ```
845///
846/// The future runs on an app provider timer executor, or on the [`futures_timer`] by default.
847///
848/// Note that deadlines from [`Duration`](std::time::Duration) starts *counting* at the moment this function is called,
849/// not at the moment of the first `.await` call.
850///
851/// [`Pending`]: std::task::Poll::Pending
852/// [`futures_timer`]: https://docs.rs/futures-timer
853pub fn deadline(deadline: impl Into<Deadline>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
854 let deadline = deadline.into();
855 if zng_app_context::LocalContext::current_app().is_some() {
856 DEADLINE_SV.read().0(deadline)
857 } else {
858 default_deadline(deadline)
859 }
860}
861
862app_local! {
863 static DEADLINE_SV: (DeadlineService, bool) = const { (default_deadline, false) };
864}
865
866type DeadlineService = fn(Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
867
868fn default_deadline(deadline: Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
869 if let Some(timeout) = deadline.time_left() {
870 Box::pin(futures_timer::Delay::new(timeout))
871 } else {
872 Box::pin(std::future::ready(()))
873 }
874}
875
876/// Deadline APP integration.
877#[expect(non_camel_case_types)]
878pub struct DEADLINE_APP;
879
880impl DEADLINE_APP {
881 /// Called by the app implementer to setup the [`deadline`] executor.
882 ///
883 /// If no app calls this the [`futures_timer`] executor is used.
884 ///
885 /// [`futures_timer`]: https://docs.rs/futures-timer
886 ///
887 /// # Panics
888 ///
889 /// Panics if called more than once for the same app.
890 pub fn init_deadline_service(&self, service: DeadlineService) {
891 let (prev, already_set) = mem::replace(&mut *DEADLINE_SV.write(), (service, true));
892 if already_set {
893 *DEADLINE_SV.write() = (prev, true);
894 panic!("deadline service already inited for this app");
895 }
896 }
897}
898
899/// Implements a [`Future`] from a closure.
900///
901/// # Examples
902///
903/// A future that is ready with a closure returns `Some(R)`.
904///
905/// ```
906/// use std::task::Poll;
907/// use zng_task as task;
908///
909/// async fn ready_some<R>(mut closure: impl FnMut() -> Option<R>) -> R {
910/// task::future_fn(|cx| match closure() {
911/// Some(r) => Poll::Ready(r),
912/// None => Poll::Pending,
913/// })
914/// .await
915/// }
916/// ```
917pub async fn future_fn<T, F>(fn_: F) -> T
918where
919 F: FnMut(&mut std::task::Context) -> Poll<T>,
920{
921 struct PollFn<F>(F);
922 impl<F> Unpin for PollFn<F> {}
923 impl<T, F: FnMut(&mut std::task::Context<'_>) -> Poll<T>> Future for PollFn<F> {
924 type Output = T;
925
926 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
927 (self.0)(cx)
928 }
929 }
930 PollFn(fn_).await
931}
932
933/// Error when [`with_deadline`] reach a time limit before a task finishes.
934#[derive(Debug, Clone, Copy)]
935#[non_exhaustive]
936pub struct DeadlineError {}
937impl fmt::Display for DeadlineError {
938 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
939 write!(f, "reached deadline")
940 }
941}
942impl std::error::Error for DeadlineError {}
943
944/// Add a [`deadline`] to a future.
945///
946/// Returns the `fut` output or [`DeadlineError`] if the deadline elapses first.
947pub async fn with_deadline<O, F: Future<Output = O>>(
948 fut: impl IntoFuture<IntoFuture = F>,
949 deadline: impl Into<Deadline>,
950) -> Result<F::Output, DeadlineError> {
951 let deadline = deadline.into();
952 any!(async { Ok(fut.await) }, async {
953 self::deadline(deadline).await;
954 Err(DeadlineError {})
955 })
956 .await
957}
958
959/// <span data-del-macro-root></span> A future that *zips* other futures.
960///
961/// The macro input is a comma separated list of future expressions. The macro output is a future
962/// that when ".awaited" produces a tuple of results in the same order as the inputs.
963///
964/// At least one input future is required and any number of futures is accepted. For more than
965/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
966/// some IDEs.
967///
968/// Each input must implement [`IntoFuture`]. Note that each input must be known at compile time, use the [`fn@all`] async
969/// function to await on all futures in a dynamic list of futures.
970///
971/// # Examples
972///
973/// Await for three different futures to complete:
974///
975/// ```
976/// use zng_task as task;
977///
978/// # task::doc_test(false, async {
979/// let (a, b, c) = task::all!(task::run(async { 'a' }), task::wait(|| "b"), async { b"c" }).await;
980/// # });
981/// ```
982#[macro_export]
983macro_rules! all {
984 ($fut0:expr $(,)?) => { $crate::__all! { fut0: $fut0; } };
985 ($fut0:expr, $fut1:expr $(,)?) => {
986 $crate::__all! {
987 fut0: $fut0;
988 fut1: $fut1;
989 }
990 };
991 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
992 $crate::__all! {
993 fut0: $fut0;
994 fut1: $fut1;
995 fut2: $fut2;
996 }
997 };
998 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
999 $crate::__all! {
1000 fut0: $fut0;
1001 fut1: $fut1;
1002 fut2: $fut2;
1003 fut3: $fut3;
1004 }
1005 };
1006 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1007 $crate::__all! {
1008 fut0: $fut0;
1009 fut1: $fut1;
1010 fut2: $fut2;
1011 fut3: $fut3;
1012 fut4: $fut4;
1013 }
1014 };
1015 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1016 $crate::__all! {
1017 fut0: $fut0;
1018 fut1: $fut1;
1019 fut2: $fut2;
1020 fut3: $fut3;
1021 fut4: $fut4;
1022 fut5: $fut5;
1023 }
1024 };
1025 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1026 $crate::__all! {
1027 fut0: $fut0;
1028 fut1: $fut1;
1029 fut2: $fut2;
1030 fut3: $fut3;
1031 fut4: $fut4;
1032 fut5: $fut5;
1033 fut6: $fut6;
1034 }
1035 };
1036 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1037 $crate::__all! {
1038 fut0: $fut0;
1039 fut1: $fut1;
1040 fut2: $fut2;
1041 fut3: $fut3;
1042 fut4: $fut4;
1043 fut5: $fut5;
1044 fut6: $fut6;
1045 fut7: $fut7;
1046 }
1047 };
1048 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all; $($fut),+ } }
1049}
1050
1051#[doc(hidden)]
1052#[macro_export]
1053macro_rules! __all {
1054 ($($ident:ident: $fut:expr;)+) => {
1055 {
1056 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1057 $crate::future_fn(move |cx| {
1058 use std::task::Poll;
1059
1060 let mut pending = false;
1061
1062 $(
1063 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1064 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1065 // Future::poll call, so it will not move.
1066 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1067 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1068 $ident = $crate::FutureOrOutput::Output(r);
1069 } else {
1070 pending = true;
1071 }
1072 }
1073 )+
1074
1075 if pending {
1076 Poll::Pending
1077 } else {
1078 Poll::Ready(($($ident.take_output()),+))
1079 }
1080 })
1081 }
1082 }
1083}
1084
1085#[doc(hidden)]
1086pub enum FutureOrOutput<F: Future> {
1087 Future(F),
1088 Output(F::Output),
1089 Taken,
1090}
1091impl<F: Future> FutureOrOutput<F> {
1092 pub fn take_output(&mut self) -> F::Output {
1093 match std::mem::replace(self, Self::Taken) {
1094 FutureOrOutput::Output(o) => o,
1095 _ => unreachable!(),
1096 }
1097 }
1098}
1099
1100/// A future that awaits on all `futures` at the same time and returns all results when all futures are ready.
1101///
1102/// This is the dynamic version of [`all!`].
1103pub async fn all<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> Vec<F::Output> {
1104 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1105 future_fn(move |cx| {
1106 let mut pending = false;
1107 for input in &mut futures {
1108 if let FutureOrOutput::Future(fut) = input {
1109 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1110 // Future::poll call, so it will not move.
1111 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1112 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1113 *input = FutureOrOutput::Output(r);
1114 } else {
1115 pending = true;
1116 }
1117 }
1118 }
1119
1120 if pending {
1121 Poll::Pending
1122 } else {
1123 Poll::Ready(futures.iter_mut().map(FutureOrOutput::take_output).collect())
1124 }
1125 })
1126 .await
1127}
1128
1129/// <span data-del-macro-root></span> A future that awaits for the first future that is ready.
1130///
1131/// The macro input is comma separated list of future expressions, the futures must
1132/// all have the same output type. The macro output is a future that when ".awaited" produces
1133/// a single output type instance returned by the first input future that completes.
1134///
1135/// At least one input future is required and any number of futures is accepted. For more than
1136/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1137/// some IDEs.
1138///
1139/// If two futures are ready at the same time the result of the first future in the input list is used.
1140/// After one future is ready the other futures are not polled again and are dropped.
1141///
1142/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1143/// known at compile time, use the [`fn@any`] async function to await on all futures in a dynamic list of futures.
1144///
1145/// # Examples
1146///
1147/// Await for the first of three futures to complete:
1148///
1149/// ```
1150/// use zng_task as task;
1151/// use zng_unit::*;
1152///
1153/// # task::doc_test(false, async {
1154/// let r = task::any!(
1155/// task::run(async {
1156/// task::deadline(300.ms()).await;
1157/// 'a'
1158/// }),
1159/// task::wait(|| 'b'),
1160/// async {
1161/// task::deadline(300.ms()).await;
1162/// 'c'
1163/// }
1164/// )
1165/// .await;
1166///
1167/// assert_eq!('b', r);
1168/// # });
1169/// ```
1170#[macro_export]
1171macro_rules! any {
1172 ($fut0:expr $(,)?) => { $crate::__any! { fut0: $fut0; } };
1173 ($fut0:expr, $fut1:expr $(,)?) => {
1174 $crate::__any! {
1175 fut0: $fut0;
1176 fut1: $fut1;
1177 }
1178 };
1179 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1180 $crate::__any! {
1181 fut0: $fut0;
1182 fut1: $fut1;
1183 fut2: $fut2;
1184 }
1185 };
1186 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1187 $crate::__any! {
1188 fut0: $fut0;
1189 fut1: $fut1;
1190 fut2: $fut2;
1191 fut3: $fut3;
1192 }
1193 };
1194 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1195 $crate::__any! {
1196 fut0: $fut0;
1197 fut1: $fut1;
1198 fut2: $fut2;
1199 fut3: $fut3;
1200 fut4: $fut4;
1201 }
1202 };
1203 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1204 $crate::__any! {
1205 fut0: $fut0;
1206 fut1: $fut1;
1207 fut2: $fut2;
1208 fut3: $fut3;
1209 fut4: $fut4;
1210 fut5: $fut5;
1211 }
1212 };
1213 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1214 $crate::__any! {
1215 fut0: $fut0;
1216 fut1: $fut1;
1217 fut2: $fut2;
1218 fut3: $fut3;
1219 fut4: $fut4;
1220 fut5: $fut5;
1221 fut6: $fut6;
1222 }
1223 };
1224 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1225 $crate::__any! {
1226 fut0: $fut0;
1227 fut1: $fut1;
1228 fut2: $fut2;
1229 fut3: $fut3;
1230 fut4: $fut4;
1231 fut5: $fut5;
1232 fut6: $fut6;
1233 fut7: $fut7;
1234 }
1235 };
1236 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any; $($fut),+ } }
1237}
1238#[doc(hidden)]
1239#[macro_export]
1240macro_rules! __any {
1241 ($($ident:ident: $fut:expr;)+) => {
1242 {
1243 $(let mut $ident = std::future::IntoFuture::into_future($fut);)+
1244 $crate::future_fn(move |cx| {
1245 use std::task::Poll;
1246 $(
1247 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1248 // Future::poll call, so it will not move.
1249 let mut $ident = unsafe { std::pin::Pin::new_unchecked(&mut $ident) };
1250 if let Poll::Ready(r) = $ident.as_mut().poll(cx) {
1251 return Poll::Ready(r)
1252 }
1253 )+
1254
1255 Poll::Pending
1256 })
1257 }
1258 }
1259}
1260#[doc(hidden)]
1261pub use zng_task_proc_macros::task_any_all as __proc_any_all;
1262
1263/// A future that awaits on all `futures` at the same time and returns the first result when the first future is ready.
1264///
1265/// This is the dynamic version of [`any!`].
1266pub async fn any<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> F::Output {
1267 let mut futures: Vec<_> = futures.into_iter().map(IntoFuture::into_future).collect();
1268 future_fn(move |cx| {
1269 for fut in &mut futures {
1270 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1271 // Future::poll call, so it will not move.
1272 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1273 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1274 return Poll::Ready(r);
1275 }
1276 }
1277 Poll::Pending
1278 })
1279 .await
1280}
1281
1282/// <span data-del-macro-root></span> A future that waits for the first future that is ready with an `Ok(T)` result.
1283///
1284/// The macro input is comma separated list of future expressions, the futures must
1285/// all have the same output `Result<T, E>` type, but each can have a different `E`. The macro output is a future
1286/// that when ".awaited" produces a single output of type `Result<T, (E0, E1, ..)>` that is `Ok(T)` if any of the futures
1287/// is `Ok(T)` or is `Err((E0, E1, ..))` is all futures are `Err`.
1288///
1289/// At least one input future is required and any number of futures is accepted. For more than
1290/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1291/// some IDEs.
1292///
1293/// If two futures are ready and `Ok(T)` at the same time the result of the first future in the input list is used.
1294/// After one future is ready and `Ok(T)` the other futures are not polled again and are dropped. After a future
1295/// is ready and `Err(E)` it is also not polled again and dropped.
1296///
1297/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1298/// known at compile time, use the [`fn@any_ok`] async function to await on all futures in a dynamic list of futures.
1299///
1300/// # Examples
1301///
1302/// Await for the first of three futures to complete with `Ok`:
1303///
1304/// ```
1305/// use zng_task as task;
1306/// # #[derive(Debug, PartialEq)]
1307/// # pub struct FooError;
1308/// # task::doc_test(false, async {
1309/// let r = task::any_ok!(
1310/// task::run(async { Err::<char, _>("error") }),
1311/// task::wait(|| Ok::<_, FooError>('b')),
1312/// async { Err::<char, _>(FooError) }
1313/// )
1314/// .await;
1315///
1316/// assert_eq!(Ok('b'), r);
1317/// # });
1318/// ```
1319#[macro_export]
1320macro_rules! any_ok {
1321 ($fut0:expr $(,)?) => { $crate::__any_ok! { fut0: $fut0; } };
1322 ($fut0:expr, $fut1:expr $(,)?) => {
1323 $crate::__any_ok! {
1324 fut0: $fut0;
1325 fut1: $fut1;
1326 }
1327 };
1328 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1329 $crate::__any_ok! {
1330 fut0: $fut0;
1331 fut1: $fut1;
1332 fut2: $fut2;
1333 }
1334 };
1335 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1336 $crate::__any_ok! {
1337 fut0: $fut0;
1338 fut1: $fut1;
1339 fut2: $fut2;
1340 fut3: $fut3;
1341 }
1342 };
1343 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1344 $crate::__any_ok! {
1345 fut0: $fut0;
1346 fut1: $fut1;
1347 fut2: $fut2;
1348 fut3: $fut3;
1349 fut4: $fut4;
1350 }
1351 };
1352 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1353 $crate::__any_ok! {
1354 fut0: $fut0;
1355 fut1: $fut1;
1356 fut2: $fut2;
1357 fut3: $fut3;
1358 fut4: $fut4;
1359 fut5: $fut5;
1360 }
1361 };
1362 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1363 $crate::__any_ok! {
1364 fut0: $fut0;
1365 fut1: $fut1;
1366 fut2: $fut2;
1367 fut3: $fut3;
1368 fut4: $fut4;
1369 fut5: $fut5;
1370 fut6: $fut6;
1371 }
1372 };
1373 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1374 $crate::__any_ok! {
1375 fut0: $fut0;
1376 fut1: $fut1;
1377 fut2: $fut2;
1378 fut3: $fut3;
1379 fut4: $fut4;
1380 fut5: $fut5;
1381 fut6: $fut6;
1382 fut7: $fut7;
1383 }
1384 };
1385 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_ok; $($fut),+ } }
1386}
1387
1388#[doc(hidden)]
1389#[macro_export]
1390macro_rules! __any_ok {
1391 ($($ident:ident: $fut: expr;)+) => {
1392 {
1393 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1394 $crate::future_fn(move |cx| {
1395 use std::task::Poll;
1396
1397 let mut pending = false;
1398
1399 $(
1400 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1401 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1402 // Future::poll call, so it will not move.
1403 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1404 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1405 match r {
1406 Ok(r) => return Poll::Ready(Ok(r)),
1407 Err(e) => {
1408 $ident = $crate::FutureOrOutput::Output(Err(e));
1409 }
1410 }
1411 } else {
1412 pending = true;
1413 }
1414 }
1415 )+
1416
1417 if pending {
1418 Poll::Pending
1419 } else {
1420 Poll::Ready(Err((
1421 $($ident.take_output().unwrap_err()),+
1422 )))
1423 }
1424 })
1425 }
1426 }
1427}
1428
1429/// A future that awaits on all `futures` at the same time and returns when any future is `Ok(_)` or all are `Err(_)`.
1430///
1431/// This is the dynamic version of [`all_some!`].
1432pub async fn any_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Ok, Vec<Err>> {
1433 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1434 future_fn(move |cx| {
1435 let mut pending = false;
1436 for input in &mut futures {
1437 if let FutureOrOutput::Future(fut) = input {
1438 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1439 // Future::poll call, so it will not move.
1440 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1441 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1442 match r {
1443 Ok(r) => return Poll::Ready(Ok(r)),
1444 Err(e) => *input = FutureOrOutput::Output(Err(e)),
1445 }
1446 } else {
1447 pending = true;
1448 }
1449 }
1450 }
1451
1452 if pending {
1453 Poll::Pending
1454 } else {
1455 Poll::Ready(Err(futures
1456 .iter_mut()
1457 .map(|f| match f.take_output() {
1458 Ok(_) => unreachable!(),
1459 Err(e) => e,
1460 })
1461 .collect()))
1462 }
1463 })
1464 .await
1465}
1466
1467/// <span data-del-macro-root></span> A future that is ready when any of the futures is ready and `Some(T)`.
1468///
1469/// The macro input is comma separated list of future expressions, the futures must
1470/// all have the same output `Option<T>` type. The macro output is a future that when ".awaited" produces
1471/// a single output type instance returned by the first input future that completes with a `Some`.
1472/// If all futures complete with a `None` the output is `None`.
1473///
1474/// At least one input future is required and any number of futures is accepted. For more than
1475/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1476/// some IDEs.
1477///
1478/// If two futures are ready and `Some(T)` at the same time the result of the first future in the input list is used.
1479/// After one future is ready and `Some(T)` the other futures are not polled again and are dropped. After a future
1480/// is ready and `None` it is also not polled again and dropped.
1481///
1482/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1483/// known at compile time, use the [`fn@any_some`] async function to await on all futures in a dynamic list of futures.
1484///
1485/// # Examples
1486///
1487/// Await for the first of three futures to complete with `Some`:
1488///
1489/// ```
1490/// use zng_task as task;
1491/// # task::doc_test(false, async {
1492/// let r = task::any_some!(task::run(async { None::<char> }), task::wait(|| Some('b')), async { None::<char> }).await;
1493///
1494/// assert_eq!(Some('b'), r);
1495/// # });
1496/// ```
1497#[macro_export]
1498macro_rules! any_some {
1499 ($fut0:expr $(,)?) => { $crate::__any_some! { fut0: $fut0; } };
1500 ($fut0:expr, $fut1:expr $(,)?) => {
1501 $crate::__any_some! {
1502 fut0: $fut0;
1503 fut1: $fut1;
1504 }
1505 };
1506 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1507 $crate::__any_some! {
1508 fut0: $fut0;
1509 fut1: $fut1;
1510 fut2: $fut2;
1511 }
1512 };
1513 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1514 $crate::__any_some! {
1515 fut0: $fut0;
1516 fut1: $fut1;
1517 fut2: $fut2;
1518 fut3: $fut3;
1519 }
1520 };
1521 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1522 $crate::__any_some! {
1523 fut0: $fut0;
1524 fut1: $fut1;
1525 fut2: $fut2;
1526 fut3: $fut3;
1527 fut4: $fut4;
1528 }
1529 };
1530 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1531 $crate::__any_some! {
1532 fut0: $fut0;
1533 fut1: $fut1;
1534 fut2: $fut2;
1535 fut3: $fut3;
1536 fut4: $fut4;
1537 fut5: $fut5;
1538 }
1539 };
1540 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1541 $crate::__any_some! {
1542 fut0: $fut0;
1543 fut1: $fut1;
1544 fut2: $fut2;
1545 fut3: $fut3;
1546 fut4: $fut4;
1547 fut5: $fut5;
1548 fut6: $fut6;
1549 }
1550 };
1551 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1552 $crate::__any_some! {
1553 fut0: $fut0;
1554 fut1: $fut1;
1555 fut2: $fut2;
1556 fut3: $fut3;
1557 fut4: $fut4;
1558 fut5: $fut5;
1559 fut6: $fut6;
1560 fut7: $fut7;
1561 }
1562 };
1563 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_some; $($fut),+ } }
1564}
1565
1566#[doc(hidden)]
1567#[macro_export]
1568macro_rules! __any_some {
1569 ($($ident:ident: $fut: expr;)+) => {
1570 {
1571 $(let mut $ident = Some(std::future::IntoFuture::into_future($fut));)+
1572 $crate::future_fn(move |cx| {
1573 use std::task::Poll;
1574
1575 let mut pending = false;
1576
1577 $(
1578 if let Some(fut) = $ident.as_mut() {
1579 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1580 // Future::poll call, so it will not move.
1581 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1582 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1583 if let Some(r) = r {
1584 return Poll::Ready(Some(r));
1585 }
1586 $ident = None;
1587 } else {
1588 pending = true;
1589 }
1590 }
1591 )+
1592
1593 if pending {
1594 Poll::Pending
1595 } else {
1596 Poll::Ready(None)
1597 }
1598 })
1599 }
1600 }
1601}
1602
1603/// A future that awaits on all `futures` at the same time and returns when any future is `Some(_)` or all are `None`.
1604///
1605/// This is the dynamic version of [`all_some!`].
1606pub async fn any_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Some> {
1607 let mut futures: Vec<_> = futures.into_iter().map(|f| Some(f.into_future())).collect();
1608 future_fn(move |cx| {
1609 let mut pending = false;
1610 for input in &mut futures {
1611 if let Some(fut) = input {
1612 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1613 // Future::poll call, so it will not move.
1614 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1615 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1616 match r {
1617 Some(r) => return Poll::Ready(Some(r)),
1618 None => *input = None,
1619 }
1620 } else {
1621 pending = true;
1622 }
1623 }
1624 }
1625
1626 if pending { Poll::Pending } else { Poll::Ready(None) }
1627 })
1628 .await
1629}
1630
1631/// <span data-del-macro-root></span> A future that is ready when all futures are ready with an `Ok(T)` result or
1632/// any future is ready with an `Err(E)` result.
1633///
1634/// The output type is `Result<(T0, T1, ..), E>`, the `Ok` type is a tuple with all the `Ok` values, the error
1635/// type is the first error encountered, the input futures must have the same `Err` type but can have different
1636/// `Ok` types.
1637///
1638/// At least one input future is required and any number of futures is accepted. For more than
1639/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1640/// some IDEs.
1641///
1642/// If two futures are ready and `Err(E)` at the same time the result of the first future in the input list is used.
1643/// After one future is ready and `Err(T)` the other futures are not polled again and are dropped. After a future
1644/// is ready it is also not polled again and dropped.
1645///
1646/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1647/// known at compile time, use the [`fn@all_ok`] async function to await on all futures in a dynamic list of futures.
1648///
1649/// # Examples
1650///
1651/// Await for the first of three futures to complete with `Ok(T)`:
1652///
1653/// ```
1654/// use zng_task as task;
1655/// # #[derive(Debug, PartialEq)]
1656/// # struct FooError;
1657/// # task::doc_test(false, async {
1658/// let r = task::all_ok!(
1659/// task::run(async { Ok::<_, FooError>('a') }),
1660/// task::wait(|| Ok::<_, FooError>('b')),
1661/// async { Ok::<_, FooError>('c') }
1662/// )
1663/// .await;
1664///
1665/// assert_eq!(Ok(('a', 'b', 'c')), r);
1666/// # });
1667/// ```
1668///
1669/// And in if any completes with `Err(E)`:
1670///
1671/// ```
1672/// use zng_task as task;
1673/// # #[derive(Debug, PartialEq)]
1674/// # struct FooError;
1675/// # task::doc_test(false, async {
1676/// let r = task::all_ok!(task::run(async { Ok('a') }), task::wait(|| Err::<char, _>(FooError)), async {
1677/// Ok('c')
1678/// })
1679/// .await;
1680///
1681/// assert_eq!(Err(FooError), r);
1682/// # });
1683/// ```
1684#[macro_export]
1685macro_rules! all_ok {
1686 ($fut0:expr $(,)?) => { $crate::__all_ok! { fut0: $fut0; } };
1687 ($fut0:expr, $fut1:expr $(,)?) => {
1688 $crate::__all_ok! {
1689 fut0: $fut0;
1690 fut1: $fut1;
1691 }
1692 };
1693 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1694 $crate::__all_ok! {
1695 fut0: $fut0;
1696 fut1: $fut1;
1697 fut2: $fut2;
1698 }
1699 };
1700 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1701 $crate::__all_ok! {
1702 fut0: $fut0;
1703 fut1: $fut1;
1704 fut2: $fut2;
1705 fut3: $fut3;
1706 }
1707 };
1708 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1709 $crate::__all_ok! {
1710 fut0: $fut0;
1711 fut1: $fut1;
1712 fut2: $fut2;
1713 fut3: $fut3;
1714 fut4: $fut4;
1715 }
1716 };
1717 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1718 $crate::__all_ok! {
1719 fut0: $fut0;
1720 fut1: $fut1;
1721 fut2: $fut2;
1722 fut3: $fut3;
1723 fut4: $fut4;
1724 fut5: $fut5;
1725 }
1726 };
1727 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1728 $crate::__all_ok! {
1729 fut0: $fut0;
1730 fut1: $fut1;
1731 fut2: $fut2;
1732 fut3: $fut3;
1733 fut4: $fut4;
1734 fut5: $fut5;
1735 fut6: $fut6;
1736 }
1737 };
1738 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1739 $crate::__all_ok! {
1740 fut0: $fut0;
1741 fut1: $fut1;
1742 fut2: $fut2;
1743 fut3: $fut3;
1744 fut4: $fut4;
1745 fut5: $fut5;
1746 fut6: $fut6;
1747 fut7: $fut7;
1748 }
1749 };
1750 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_ok; $($fut),+ } }
1751}
1752
1753#[doc(hidden)]
1754#[macro_export]
1755macro_rules! __all_ok {
1756 ($($ident:ident: $fut: expr;)+) => {
1757 {
1758 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1759 $crate::future_fn(move |cx| {
1760 use std::task::Poll;
1761
1762 let mut pending = false;
1763
1764 $(
1765 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1766 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1767 // Future::poll call, so it will not move.
1768 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1769 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1770 match r {
1771 Ok(r) => {
1772 $ident = $crate::FutureOrOutput::Output(Ok(r))
1773 },
1774 Err(e) => return Poll::Ready(Err(e)),
1775 }
1776 } else {
1777 pending = true;
1778 }
1779 }
1780 )+
1781
1782 if pending {
1783 Poll::Pending
1784 } else {
1785 Poll::Ready(Ok((
1786 $($ident.take_output().unwrap()),+
1787 )))
1788 }
1789 })
1790 }
1791 }
1792}
1793
1794/// A future that awaits on all `futures` at the same time and returns when all futures are `Ok(_)` or any future is `Err(_)`.
1795///
1796/// This is the dynamic version of [`all_ok!`].
1797pub async fn all_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Vec<Ok>, Err> {
1798 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1799 future_fn(move |cx| {
1800 let mut pending = false;
1801 for input in &mut futures {
1802 if let FutureOrOutput::Future(fut) = input {
1803 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1804 // Future::poll call, so it will not move.
1805 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1806 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1807 match r {
1808 Ok(r) => *input = FutureOrOutput::Output(Ok(r)),
1809 Err(e) => return Poll::Ready(Err(e)),
1810 }
1811 } else {
1812 pending = true;
1813 }
1814 }
1815 }
1816
1817 if pending {
1818 Poll::Pending
1819 } else {
1820 Poll::Ready(Ok(futures
1821 .iter_mut()
1822 .map(|f| f.take_output().unwrap_or_else(|_| unreachable!()))
1823 .collect()))
1824 }
1825 })
1826 .await
1827}
1828
1829/// <span data-del-macro-root></span> A future that is ready when all futures are ready with `Some(T)` or when any
1830/// is future ready with `None`.
1831///
1832/// The macro input is comma separated list of future expressions, the futures must
1833/// all have the `Option<T>` output type, but each can have a different `T`. The macro output is a future that when ".awaited"
1834/// produces `Some<(T0, T1, ..)>` if all futures where `Some(T)` or `None` if any of the futures where `None`.
1835///
1836/// At least one input future is required and any number of futures is accepted. For more than
1837/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1838/// some IDEs.
1839///
1840/// After one future is ready and `None` the other futures are not polled again and are dropped. After a future
1841/// is ready it is also not polled again and dropped.
1842///
1843/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1844/// known at compile time, use the [`fn@all_some`] async function to await on all futures in a dynamic list of futures.
1845///
1846/// # Examples
1847///
1848/// Await for the first of three futures to complete with `Some`:
1849///
1850/// ```
1851/// use zng_task as task;
1852/// # task::doc_test(false, async {
1853/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| Some('b')), async { Some('c') }).await;
1854///
1855/// assert_eq!(Some(('a', 'b', 'c')), r);
1856/// # });
1857/// ```
1858///
1859/// Completes with `None` if any future completes with `None`:
1860///
1861/// ```
1862/// # use zng_task as task;
1863/// # task::doc_test(false, async {
1864/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| None::<char>), async { Some('b') }).await;
1865///
1866/// assert_eq!(None, r);
1867/// # });
1868/// ```
1869#[macro_export]
1870macro_rules! all_some {
1871 ($fut0:expr $(,)?) => { $crate::__all_some! { fut0: $fut0; } };
1872 ($fut0:expr, $fut1:expr $(,)?) => {
1873 $crate::__all_some! {
1874 fut0: $fut0;
1875 fut1: $fut1;
1876 }
1877 };
1878 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1879 $crate::__all_some! {
1880 fut0: $fut0;
1881 fut1: $fut1;
1882 fut2: $fut2;
1883 }
1884 };
1885 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1886 $crate::__all_some! {
1887 fut0: $fut0;
1888 fut1: $fut1;
1889 fut2: $fut2;
1890 fut3: $fut3;
1891 }
1892 };
1893 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1894 $crate::__all_some! {
1895 fut0: $fut0;
1896 fut1: $fut1;
1897 fut2: $fut2;
1898 fut3: $fut3;
1899 fut4: $fut4;
1900 }
1901 };
1902 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1903 $crate::__all_some! {
1904 fut0: $fut0;
1905 fut1: $fut1;
1906 fut2: $fut2;
1907 fut3: $fut3;
1908 fut4: $fut4;
1909 fut5: $fut5;
1910 }
1911 };
1912 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1913 $crate::__all_some! {
1914 fut0: $fut0;
1915 fut1: $fut1;
1916 fut2: $fut2;
1917 fut3: $fut3;
1918 fut4: $fut4;
1919 fut5: $fut5;
1920 fut6: $fut6;
1921 }
1922 };
1923 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1924 $crate::__all_some! {
1925 fut0: $fut0;
1926 fut1: $fut1;
1927 fut2: $fut2;
1928 fut3: $fut3;
1929 fut4: $fut4;
1930 fut5: $fut5;
1931 fut6: $fut6;
1932 fut7: $fut7;
1933 }
1934 };
1935 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_some; $($fut),+ } }
1936}
1937
1938#[doc(hidden)]
1939#[macro_export]
1940macro_rules! __all_some {
1941 ($($ident:ident: $fut: expr;)+) => {
1942 {
1943 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1944 $crate::future_fn(move |cx| {
1945 use std::task::Poll;
1946
1947 let mut pending = false;
1948
1949 $(
1950 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1951 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1952 // Future::poll call, so it will not move.
1953 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1954 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1955 if r.is_none() {
1956 return Poll::Ready(None);
1957 }
1958
1959 $ident = $crate::FutureOrOutput::Output(r);
1960 } else {
1961 pending = true;
1962 }
1963 }
1964 )+
1965
1966 if pending {
1967 Poll::Pending
1968 } else {
1969 Poll::Ready(Some((
1970 $($ident.take_output().unwrap()),+
1971 )))
1972 }
1973 })
1974 }
1975 }
1976}
1977
1978/// A future that awaits on all `futures` at the same time and returns when all futures are `Some(_)` or any future is `None`.
1979///
1980/// This is the dynamic version of [`all_some!`].
1981pub async fn all_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Vec<Some>> {
1982 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1983 future_fn(move |cx| {
1984 let mut pending = false;
1985 for input in &mut futures {
1986 if let FutureOrOutput::Future(fut) = input {
1987 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1988 // Future::poll call, so it will not move.
1989 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1990 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1991 match r {
1992 Some(r) => *input = FutureOrOutput::Output(Some(r)),
1993 None => return Poll::Ready(None),
1994 }
1995 } else {
1996 pending = true;
1997 }
1998 }
1999 }
2000
2001 if pending {
2002 Poll::Pending
2003 } else {
2004 Poll::Ready(Some(futures.iter_mut().map(|f| f.take_output().unwrap()).collect()))
2005 }
2006 })
2007 .await
2008}
2009
2010/// A future that will await until [`set`] is called.
2011///
2012/// # Examples
2013///
2014/// Spawns a parallel task that only writes to stdout after the main thread sets the signal:
2015///
2016/// ```
2017/// use zng_clone_move::async_clmv;
2018/// use zng_task::{self as task, *};
2019///
2020/// let signal = SignalOnce::default();
2021///
2022/// task::spawn(async_clmv!(signal, {
2023/// signal.await;
2024/// println!("After Signal!");
2025/// }));
2026///
2027/// signal.set();
2028/// ```
2029///
2030/// [`set`]: SignalOnce::set
2031#[derive(Default, Clone)]
2032pub struct SignalOnce(Arc<SignalInner>);
2033impl fmt::Debug for SignalOnce {
2034 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2035 write!(f, "SignalOnce({})", self.is_set())
2036 }
2037}
2038impl PartialEq for SignalOnce {
2039 fn eq(&self, other: &Self) -> bool {
2040 Arc::ptr_eq(&self.0, &other.0)
2041 }
2042}
2043impl Eq for SignalOnce {}
2044impl Hash for SignalOnce {
2045 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2046 Arc::as_ptr(&self.0).hash(state)
2047 }
2048}
2049impl SignalOnce {
2050 /// New unsigned.
2051 pub fn new() -> Self {
2052 Self::default()
2053 }
2054
2055 /// New signaled.
2056 pub fn new_set() -> Self {
2057 let s = Self::new();
2058 s.set();
2059 s
2060 }
2061
2062 /// If the signal was set.
2063 pub fn is_set(&self) -> bool {
2064 self.0.signaled.load(Ordering::Relaxed)
2065 }
2066
2067 /// Sets the signal and awakes listeners.
2068 pub fn set(&self) {
2069 if !self.0.signaled.swap(true, Ordering::Relaxed) {
2070 let listeners = mem::take(&mut *self.0.listeners.lock());
2071 for listener in listeners {
2072 listener.wake();
2073 }
2074 }
2075 }
2076}
2077impl Future for SignalOnce {
2078 type Output = ();
2079
2080 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
2081 if self.0.signaled.load(Ordering::Relaxed) {
2082 return Poll::Ready(());
2083 }
2084
2085 let mut listeners = self.0.listeners.lock();
2086 if self.0.signaled.load(Ordering::Relaxed) {
2087 return Poll::Ready(());
2088 }
2089
2090 let waker = cx.waker();
2091 if !listeners.iter().any(|w| w.will_wake(waker)) {
2092 listeners.push(waker.clone());
2093 }
2094
2095 Poll::Pending
2096 }
2097}
2098
2099#[derive(Default)]
2100struct SignalInner {
2101 signaled: AtomicBool,
2102 listeners: Mutex<Vec<std::task::Waker>>,
2103}
2104
2105/// A [`Waker`] that dispatches a wake call to multiple other wakers.
2106///
2107/// This is useful for sharing one wake source with multiple [`Waker`] clients that may not be all
2108/// known at the moment the first request is made.
2109///
2110/// [`Waker`]: std::task::Waker
2111#[derive(Clone)]
2112pub struct McWaker(Arc<WakeVec>);
2113
2114#[derive(Default)]
2115struct WakeVec(Mutex<Vec<std::task::Waker>>);
2116impl WakeVec {
2117 fn push(&self, waker: std::task::Waker) -> bool {
2118 let mut v = self.0.lock();
2119
2120 let return_waker = v.is_empty();
2121
2122 v.push(waker);
2123
2124 return_waker
2125 }
2126
2127 fn cancel(&self) {
2128 let mut v = self.0.lock();
2129
2130 debug_assert!(!v.is_empty(), "called cancel on an empty McWaker");
2131
2132 v.clear();
2133 }
2134}
2135impl std::task::Wake for WakeVec {
2136 fn wake(self: Arc<Self>) {
2137 for w in mem::take(&mut *self.0.lock()) {
2138 w.wake();
2139 }
2140 }
2141}
2142impl McWaker {
2143 /// New empty waker.
2144 pub fn empty() -> Self {
2145 Self(Arc::new(WakeVec::default()))
2146 }
2147
2148 /// Register a `waker` to wake once when `self` awakes.
2149 ///
2150 /// Returns `Some(self as waker)` if `self` was previously empty, if `None` is returned [`Poll::Pending`] must
2151 /// be returned, if a waker is returned the shared resource must be polled using the waker, if the shared resource
2152 /// is ready [`cancel`] must be called.
2153 ///
2154 /// [`cancel`]: Self::cancel
2155 pub fn push(&self, waker: std::task::Waker) -> Option<std::task::Waker> {
2156 if self.0.push(waker) { Some(self.0.clone().into()) } else { None }
2157 }
2158
2159 /// Clear current registered wakers.
2160 pub fn cancel(&self) {
2161 self.0.cancel()
2162 }
2163}
2164
2165/// Panic payload, captured by [`std::panic::catch_unwind`].
2166#[non_exhaustive]
2167pub struct TaskPanicError {
2168 /// Panic payload.
2169 pub payload: Box<dyn Any + Send + 'static>,
2170}
2171impl TaskPanicError {
2172 /// New from panic payload.
2173 pub fn new(payload: Box<dyn Any + Send + 'static>) -> Self {
2174 Self { payload }
2175 }
2176
2177 /// Get the panic string if the `payload` is string like.
2178 pub fn panic_str(&self) -> Option<&str> {
2179 extract_panic_message(&self.payload)
2180 }
2181}
2182impl fmt::Debug for TaskPanicError {
2183 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2184 f.debug_struct("TaskPanicError").field("panic_str()", &self.panic_str()).finish()
2185 }
2186}
2187impl fmt::Display for TaskPanicError {
2188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2189 if let Some(s) = self.panic_str() { f.write_str(s) } else { Ok(()) }
2190 }
2191}
2192impl std::error::Error for TaskPanicError {}
2193
2194type SpawnPanicHandler = Box<dyn FnMut(TaskPanicError) + Send + 'static>;
2195
2196pub(crate) fn extract_panic_message(p: &dyn Any) -> Option<&str> {
2197 if let Some(s) = p.downcast_ref::<&'static str>() {
2198 Some(s)
2199 } else if let Some(s) = p.downcast_ref::<String>() {
2200 Some(s)
2201 } else {
2202 None
2203 }
2204}
2205
2206app_local! {
2207 // Mutex for Sync only
2208 static SPAWN_PANIC_HANDLERS: Option<Mutex<SpawnPanicHandler>> = None;
2209}
2210
2211/// Set a `handler` that is called when spawn tasks panic.
2212///
2213/// On panic the tasks [`spawn`], [`poll_spawn`] and [`spawn_wait`] log an error, notifies the `handler` and otherwise ignores the panic.
2214///
2215/// The handler is set for the process lifetime, only handler can be set per app. The handler is called inside the same [`LocalContext`]
2216/// and thread the task that panicked was called in.
2217///
2218/// ```
2219/// # macro_rules! example { () => {
2220/// task::set_spawn_panic_handler(|p| {
2221/// UPDATES
2222/// .run_hn_once(hn_once!(|_| {
2223/// std::panic::resume_unwind(p.payload);
2224/// }))
2225/// .perm();
2226/// });
2227/// # }}
2228/// ```
2229///
2230/// The example above shows how to set a handler that propagates the panic to the app main thread.
2231///
2232/// # Panics
2233///
2234/// Panics if another handler is already set in the same app.
2235///
2236/// Panics if no app is running in the caller thread.
2237pub fn set_spawn_panic_handler(handler: impl FnMut(TaskPanicError) + Send + 'static) {
2238 let mut h = SPAWN_PANIC_HANDLERS.try_write().expect("a spawn panic handler is already set");
2239 assert!(h.is_none(), "a spawn panic handler is already set");
2240 *h = Some(Mutex::new(Box::new(handler)));
2241}
2242
2243fn on_spawn_panic(p: TaskPanicError) {
2244 if let Some(f) = &mut *SPAWN_PANIC_HANDLERS.write() {
2245 f.get_mut()(p)
2246 }
2247}