zng_task/lib.rs
1#![doc(html_favicon_url = "https://zng-ui.github.io/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://zng-ui.github.io/res/zng-logo.png")]
3//!
4//! Parallel async tasks and async task runners.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9#![warn(unused_extern_crates)]
10#![warn(missing_docs)]
11
12use std::{
13 any::Any,
14 fmt,
15 hash::Hash,
16 mem, panic,
17 pin::Pin,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21 },
22 task::Poll,
23};
24
25use zng_app_context::{LocalContext, app_local};
26use zng_time::Deadline;
27use zng_var::{ResponseVar, VarValue, response_done_var, response_var};
28
29#[cfg(test)]
30mod tests;
31
32mod reexports;
33pub use reexports::*;
34
35use crate::parking_lot::Mutex;
36
37pub mod channel;
38pub mod fs;
39pub mod io;
40
41mod ui;
42pub use ui::*;
43
44pub mod http;
45
46pub mod process;
47
48mod rayon_ctx;
49
50mod progress;
51pub use progress::*;
52
53/// Spawn a parallel async task, this function is not blocking and the `task` starts executing immediately.
54///
55/// # Parallel
56///
57/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
58///
59/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
60/// otherwise it will run in a single thread at a time, still not blocking the UI.
61///
62/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
63///
64/// # Async
65///
66/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
67/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
68/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
69/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
70///
71/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
72///
73/// # Examples
74///
75/// ```
76/// # use zng_task::{self as task, *, rayon::iter::*};
77/// # use zng_var::*;
78/// # struct SomeStruct { sum_response: ResponseVar<usize> }
79/// # impl SomeStruct {
80/// fn on_event(&mut self) {
81/// let (responder, response) = response_var();
82/// self.sum_response = response;
83///
84/// task::spawn(async move {
85/// let r = (0..1000).into_par_iter().map(|i| i * i).sum();
86///
87/// responder.respond(r);
88/// });
89/// }
90///
91/// fn on_update(&mut self) {
92/// if let Some(result) = self.sum_response.rsp_new() {
93/// println!("sum of squares 0..1000: {result}");
94/// }
95/// }
96/// # }
97/// ```
98///
99/// The example uses the `rayon` parallel iterator to compute a result and uses a [`response_var`] to send the result to the UI.
100/// The task captures the caller [`LocalContext`] so the response variable will set correctly.
101///
102/// Note that this function is the most basic way to spawn a parallel task where you must setup channels to the rest of the app yourself,
103/// you can use [`respond`] to avoid having to manually set a response, or [`run`] to `.await` the result.
104///
105/// # Panic Handling
106///
107/// If the `task` panics the panic message is logged as an error, and can observed using [`set_spawn_panic_handler`]. It
108/// is otherwise ignored.
109///
110/// # Unwind Safety
111///
112/// This function disables the [unwind safety validation], meaning that in case of a panic shared
113/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
114/// poisoning mutexes or atomics to mutate shared data or use [`run_catch`] to detect a panic or [`run`]
115/// to propagate a panic.
116///
117/// [unwind safety validation]: std::panic::UnwindSafe
118/// [`Waker`]: std::task::Waker
119/// [`rayon`]: https://docs.rs/rayon
120/// [`LocalContext`]: zng_app_context::LocalContext
121/// [`response_var`]: zng_var::response_var
122pub fn spawn<F>(task: impl IntoFuture<IntoFuture = F>)
123where
124 F: Future<Output = ()> + Send + 'static,
125{
126 Arc::new(RayonTask {
127 ctx: LocalContext::capture(),
128 fut: Mutex::new(Some(Box::pin(task.into_future()))),
129 })
130 .poll()
131}
132
133/// Polls the `task` once immediately on the calling thread, if the `task` is pending, continues execution in [`spawn`].
134pub fn poll_spawn<F>(task: impl IntoFuture<IntoFuture = F>)
135where
136 F: Future<Output = ()> + Send + 'static,
137{
138 struct PollRayonTask {
139 fut: Mutex<Option<(RayonSpawnFut, Option<LocalContext>)>>,
140 }
141 impl PollRayonTask {
142 // start task in calling thread
143 fn poll(self: Arc<Self>) {
144 let mut task = self.fut.lock();
145 let (mut t, _) = task.take().unwrap();
146
147 let waker = self.clone().into();
148
149 match t.as_mut().poll(&mut std::task::Context::from_waker(&waker)) {
150 Poll::Ready(()) => {}
151 Poll::Pending => {
152 let ctx = LocalContext::capture();
153 *task = Some((t, Some(ctx)));
154 }
155 }
156 }
157 }
158 impl std::task::Wake for PollRayonTask {
159 fn wake(self: Arc<Self>) {
160 // continue task in spawn threads
161 if let Some((task, Some(ctx))) = self.fut.lock().take() {
162 Arc::new(RayonTask {
163 ctx,
164 fut: Mutex::new(Some(Box::pin(task))),
165 })
166 .poll();
167 }
168 }
169 }
170
171 Arc::new(PollRayonTask {
172 fut: Mutex::new(Some((Box::pin(task.into_future()), None))),
173 })
174 .poll()
175}
176
177type RayonSpawnFut = Pin<Box<dyn Future<Output = ()> + Send>>;
178
179// A future that is its own waker that polls inside rayon spawn tasks.
180struct RayonTask {
181 ctx: LocalContext,
182 fut: Mutex<Option<RayonSpawnFut>>,
183}
184impl RayonTask {
185 fn poll(self: Arc<Self>) {
186 ::rayon::spawn(move || {
187 // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
188 let mut task = self.fut.lock();
189 if let Some(mut t) = task.take() {
190 let waker = self.clone().into();
191
192 // load app context
193 self.ctx.clone().with_context(move || {
194 let r = panic::catch_unwind(panic::AssertUnwindSafe(move || {
195 // poll future
196 if t.as_mut().poll(&mut std::task::Context::from_waker(&waker)).is_pending() {
197 // not done
198 *task = Some(t);
199 }
200 }));
201 if let Err(p) = r {
202 let p = TaskPanicError::new(p);
203 tracing::error!("panic in `task::spawn`: {}", p.panic_str().unwrap_or(""));
204 on_spawn_panic(p);
205 }
206 });
207 }
208 })
209 }
210}
211impl std::task::Wake for RayonTask {
212 fn wake(self: Arc<Self>) {
213 self.poll()
214 }
215}
216
217/// Rayon join with local context.
218///
219/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
220/// operations.
221///
222/// See `rayon::join` for more details about join.
223///
224/// [`LocalContext`]: zng_app_context::LocalContext
225pub fn join<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
226where
227 A: FnOnce() -> RA + Send,
228 B: FnOnce() -> RB + Send,
229 RA: Send,
230 RB: Send,
231{
232 self::join_context(move |_| op_a(), move |_| op_b())
233}
234
235/// Rayon join context with local context.
236///
237/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
238/// operations.
239///
240/// See `rayon::join_context` for more details about join.
241///
242/// [`LocalContext`]: zng_app_context::LocalContext
243pub fn join_context<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
244where
245 A: FnOnce(::rayon::FnContext) -> RA + Send,
246 B: FnOnce(::rayon::FnContext) -> RB + Send,
247 RA: Send,
248 RB: Send,
249{
250 let ctx = LocalContext::capture();
251 let ctx = &ctx;
252 ::rayon::join_context(
253 move |a| {
254 if a.migrated() {
255 ctx.clone().with_context(|| op_a(a))
256 } else {
257 op_a(a)
258 }
259 },
260 move |b| {
261 if b.migrated() {
262 ctx.clone().with_context(|| op_b(b))
263 } else {
264 op_b(b)
265 }
266 },
267 )
268}
269
270/// Rayon scope with local context.
271///
272/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
273/// operations.
274///
275/// See `rayon::scope` for more details about scope.
276///
277/// [`LocalContext`]: zng_app_context::LocalContext
278pub fn scope<'scope, OP, R>(op: OP) -> R
279where
280 OP: FnOnce(ScopeCtx<'_, 'scope>) -> R + Send,
281 R: Send,
282{
283 let ctx = LocalContext::capture();
284
285 // Cast `&'_ ctx` to `&'scope ctx` to "inject" the context in the scope.
286 // Is there a better way to do this? I hope so.
287 //
288 // SAFETY:
289 // * We are extending `'_` to `'scope`, that is one of the documented valid usages of `transmute`.
290 // * No use after free because `rayon::scope` joins all threads before returning and we only drop `ctx` after.
291 let ctx_ref: &'_ LocalContext = &ctx;
292 let ctx_scope_ref: &'scope LocalContext = unsafe { std::mem::transmute(ctx_ref) };
293
294 let r = ::rayon::scope(move |s| {
295 op(ScopeCtx {
296 scope: s,
297 ctx: ctx_scope_ref,
298 })
299 });
300
301 drop(ctx);
302
303 r
304}
305
306/// Represents a fork-join scope which can be used to spawn any number of tasks that run in the caller's thread context.
307///
308/// See [`scope`] for more details.
309#[derive(Clone, Copy, Debug)]
310pub struct ScopeCtx<'a, 'scope: 'a> {
311 scope: &'a ::rayon::Scope<'scope>,
312 ctx: &'scope LocalContext,
313}
314impl<'a, 'scope: 'a> ScopeCtx<'a, 'scope> {
315 /// Spawns a job into the fork-join scope `self`. The job runs in the captured thread context.
316 ///
317 /// See `rayon::Scope::spawn` for more details.
318 pub fn spawn<F>(self, f: F)
319 where
320 F: FnOnce(ScopeCtx<'_, 'scope>) + Send + 'scope,
321 {
322 let ctx = self.ctx;
323 self.scope
324 .spawn(move |s| ctx.clone().with_context(move || f(ScopeCtx { scope: s, ctx })));
325 }
326}
327
328/// Spawn a parallel async task that can also be `.await` for the task result.
329///
330/// # Parallel
331///
332/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
333///
334/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
335/// otherwise it will run in a single thread at a time, still not blocking the UI.
336///
337/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
338///
339/// # Async
340///
341/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
342/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
343/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
344/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
345///
346/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
347///
348/// # Examples
349///
350/// ```
351/// # use zng_task::{self as task, rayon::iter::*};
352/// # struct SomeStruct { sum: usize }
353/// # async fn read_numbers() -> Vec<usize> { vec![] }
354/// # impl SomeStruct {
355/// async fn on_event(&mut self) {
356/// self.sum = task::run(async { read_numbers().await.par_iter().map(|i| i * i).sum() }).await;
357/// }
358/// # }
359/// ```
360///
361/// The example `.await` for some numbers and then uses a parallel iterator to compute a result, this all runs in parallel
362/// because it is inside a `run` task. The task result is then `.await` inside one of the UI async tasks. Note that the
363/// task captures the caller [`LocalContext`] so you can interact with variables and UI services directly inside the task too.
364///
365/// # Cancellation
366///
367/// The task starts running immediately, awaiting the returned future merely awaits for a message from the worker threads and
368/// that means the `task` future is not owned by the returned future. Usually to *cancel* a future you only need to drop it,
369/// in this task dropping the returned future will only drop the `task` once it reaches a `.await` point and detects that the
370/// result channel is disconnected.
371///
372/// If you want to deterministically known that the `task` was cancelled use a cancellation signal.
373///
374/// # Panic Propagation
375///
376/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
377/// can use [`run_catch`] to get the panic as an error instead.
378///
379/// [`resume_unwind`]: panic::resume_unwind
380/// [`Waker`]: std::task::Waker
381/// [`rayon`]: https://docs.rs/rayon
382/// [`LocalContext`]: zng_app_context::LocalContext
383pub async fn run<R, T>(task: impl IntoFuture<IntoFuture = T>) -> R
384where
385 R: Send + 'static,
386 T: Future<Output = R> + Send + 'static,
387{
388 match run_catch(task).await {
389 Ok(r) => r,
390 Err(p) => panic::resume_unwind(p.payload),
391 }
392}
393
394/// Like [`run`] but catches panics.
395///
396/// This task works the same and has the same utility as [`run`], except if returns panic messages
397/// as an error instead of propagating the panic.
398///
399/// # Unwind Safety
400///
401/// This function disables the [unwind safety validation], meaning that in case of a panic shared
402/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
403/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
404/// if this function returns an error.
405///
406/// [unwind safety validation]: std::panic::UnwindSafe
407pub async fn run_catch<R, T>(task: impl IntoFuture<IntoFuture = T>) -> Result<R, TaskPanicError>
408where
409 R: Send + 'static,
410 T: Future<Output = R> + Send + 'static,
411{
412 type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
413
414 // A future that is its own waker that polls inside the rayon primary thread-pool.
415 struct RayonCatchTask<R> {
416 ctx: LocalContext,
417 fut: Mutex<Option<Fut<R>>>,
418 sender: flume::Sender<Result<R, TaskPanicError>>,
419 }
420 impl<R: Send + 'static> RayonCatchTask<R> {
421 fn poll(self: Arc<Self>) {
422 let sender = self.sender.clone();
423 if sender.is_disconnected() {
424 return; // cancel.
425 }
426 ::rayon::spawn(move || {
427 // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
428 let mut task = self.fut.lock();
429 if let Some(mut t) = task.take() {
430 let waker = self.clone().into();
431 let mut cx = std::task::Context::from_waker(&waker);
432
433 self.ctx.clone().with_context(|| {
434 let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
435 match r {
436 Ok(Poll::Ready(r)) => {
437 drop(task);
438 let _ = sender.send(Ok(r));
439 }
440 Ok(Poll::Pending) => {
441 *task = Some(t);
442 }
443 Err(p) => {
444 drop(task);
445 let _ = sender.send(Err(TaskPanicError::new(p)));
446 }
447 }
448 });
449 }
450 })
451 }
452 }
453 impl<R: Send + 'static> std::task::Wake for RayonCatchTask<R> {
454 fn wake(self: Arc<Self>) {
455 self.poll()
456 }
457 }
458
459 let (sender, receiver) = channel::bounded(1);
460
461 Arc::new(RayonCatchTask {
462 ctx: LocalContext::capture(),
463 fut: Mutex::new(Some(Box::pin(task.into_future()))),
464 sender: sender.into(),
465 })
466 .poll();
467
468 receiver.recv().await.unwrap()
469}
470
471/// Spawn a parallel async task that will send its result to a [`ResponseVar<R>`].
472///
473/// The [`run`] documentation explains how `task` is *parallel* and *async*. The `task` starts executing immediately.
474///
475/// # Examples
476///
477/// ```
478/// # use zng_task::{self as task, rayon::iter::*};
479/// # use zng_var::*;
480/// # struct SomeStruct { sum_response: ResponseVar<usize> }
481/// # async fn read_numbers() -> Vec<usize> { vec![] }
482/// # impl SomeStruct {
483/// fn on_event(&mut self) {
484/// self.sum_response = task::respond(async { read_numbers().await.par_iter().map(|i| i * i).sum() });
485/// }
486///
487/// fn on_update(&mut self) {
488/// if let Some(result) = self.sum_response.rsp_new() {
489/// println!("sum of squares: {result}");
490/// }
491/// }
492/// # }
493/// ```
494///
495/// The example `.await` for some numbers and then uses a parallel iterator to compute a result. The result is send to
496/// `sum_response` that is a [`ResponseVar<R>`].
497///
498/// # Cancellation
499///
500/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
501///
502/// # Panic Handling
503///
504/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
505///
506/// [`resume_unwind`]: panic::resume_unwind
507/// [`ResponseVar<R>`]: zng_var::ResponseVar
508/// [`response_var`]: zng_var::response_var
509pub fn respond<R, F>(task: F) -> ResponseVar<R>
510where
511 R: VarValue,
512 F: Future<Output = R> + Send + 'static,
513{
514 type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
515
516 let (responder, response) = response_var();
517
518 // A future that is its own waker that polls inside the rayon primary thread-pool.
519 struct RayonRespondTask<R: VarValue> {
520 ctx: LocalContext,
521 fut: Mutex<Option<Fut<R>>>,
522 responder: zng_var::ResponderVar<R>,
523 }
524 impl<R: VarValue> RayonRespondTask<R> {
525 fn poll(self: Arc<Self>) {
526 let responder = self.responder.clone();
527 if responder.strong_count() == 2 {
528 return; // cancel.
529 }
530 ::rayon::spawn(move || {
531 // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
532 let mut task = self.fut.lock();
533 if let Some(mut t) = task.take() {
534 let waker = self.clone().into();
535 let mut cx = std::task::Context::from_waker(&waker);
536
537 self.ctx.clone().with_context(|| {
538 let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
539 match r {
540 Ok(Poll::Ready(r)) => {
541 drop(task);
542
543 responder.respond(r);
544 }
545 Ok(Poll::Pending) => {
546 *task = Some(t);
547 }
548 Err(p) => {
549 let p = TaskPanicError::new(p);
550 tracing::error!("panic in `task::respond`: {}", p.panic_str().unwrap_or(""));
551 drop(task);
552 responder.modify(move |_| panic::resume_unwind(p.payload));
553 }
554 }
555 });
556 }
557 })
558 }
559 }
560 impl<R: VarValue> std::task::Wake for RayonRespondTask<R> {
561 fn wake(self: Arc<Self>) {
562 self.poll()
563 }
564 }
565
566 Arc::new(RayonRespondTask {
567 ctx: LocalContext::capture(),
568 fut: Mutex::new(Some(Box::pin(task))),
569 responder,
570 })
571 .poll();
572
573 response
574}
575
576/// Polls the `task` once immediately on the calling thread, if the `task` is ready returns the response already set,
577/// if the `task` is pending continues execution like [`respond`].
578pub fn poll_respond<R, F>(task: impl IntoFuture<IntoFuture = F>) -> ResponseVar<R>
579where
580 R: VarValue,
581 F: Future<Output = R> + Send + 'static,
582{
583 enum QuickResponse<R: VarValue> {
584 Quick(Option<R>),
585 Response(zng_var::ResponderVar<R>),
586 }
587 let task = task.into_future();
588 let q = Arc::new(Mutex::new(QuickResponse::Quick(None)));
589 poll_spawn(zng_clone_move::async_clmv!(q, {
590 let rsp = task.await;
591
592 match &mut *q.lock() {
593 QuickResponse::Quick(q) => *q = Some(rsp),
594 QuickResponse::Response(r) => r.respond(rsp),
595 }
596 }));
597
598 let mut q = q.lock();
599 match &mut *q {
600 QuickResponse::Quick(q) if q.is_some() => response_done_var(q.take().unwrap()),
601 _ => {
602 let (responder, response) = response_var();
603 *q = QuickResponse::Response(responder);
604 response
605 }
606 }
607}
608
609/// Create a parallel `task` that blocks awaiting for an IO operation, the `task` starts on the first `.await`.
610///
611/// # Parallel
612///
613/// The `task` runs in the [`blocking`] thread-pool which is optimized for awaiting blocking operations.
614/// If the `task` is computation heavy you should use [`run`] and then `wait` inside that task for the
615/// parts that are blocking.
616///
617/// # Examples
618///
619/// ```
620/// # fn main() { }
621/// # use zng_task as task;
622/// # async fn example() {
623/// task::wait(|| std::fs::read_to_string("file.txt")).await
624/// # ; }
625/// ```
626///
627/// The example reads a file, that is a blocking file IO operation, most of the time is spend waiting for the operating system,
628/// so we offload this to a `wait` task. The task can be `.await` inside a [`run`] task or inside one of the UI tasks
629/// like in a async event handler.
630///
631/// # Async Read/Write
632///
633/// For [`std::io::Read`] and [`std::io::Write`] operations you can also use [`io`] and [`fs`] alternatives when you don't
634/// have or want the full file in memory or when you want to apply multiple operations to the file.
635///
636/// # Panic Propagation
637///
638/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
639/// can use [`wait_catch`] to get the panic as an error instead.
640///
641/// [`blocking`]: https://docs.rs/blocking
642/// [`resume_unwind`]: panic::resume_unwind
643pub async fn wait<T, F>(task: F) -> T
644where
645 F: FnOnce() -> T + Send + 'static,
646 T: Send + 'static,
647{
648 match wait_catch(task).await {
649 Ok(r) => r,
650 Err(p) => panic::resume_unwind(p.payload),
651 }
652}
653
654/// Like [`wait`] but catches panics.
655///
656/// This task works the same and has the same utility as [`wait`], except if returns panic messages
657/// as an error instead of propagating the panic.
658///
659/// # Unwind Safety
660///
661/// This function disables the [unwind safety validation], meaning that in case of a panic shared
662/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
663/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
664/// if this function returns an error.
665///
666/// [unwind safety validation]: std::panic::UnwindSafe
667pub async fn wait_catch<T, F>(task: F) -> Result<T, TaskPanicError>
668where
669 F: FnOnce() -> T + Send + 'static,
670 T: Send + 'static,
671{
672 let mut ctx = LocalContext::capture();
673 blocking::unblock(move || ctx.with_context(move || panic::catch_unwind(panic::AssertUnwindSafe(task))))
674 .await
675 .map_err(TaskPanicError::new)
676}
677
678/// Fire and forget a [`wait`] task. The `task` starts executing immediately.
679///
680/// # Panic Handling
681///
682/// If the `task` panics the panic message is logged as an error, and can observed using [`set_spawn_panic_handler`]. It
683/// is otherwise ignored.
684///
685/// # Unwind Safety
686///
687/// This function disables the [unwind safety validation], meaning that in case of a panic shared
688/// data can end-up in an invalid (still memory safe) state. If you are worried about that only use
689/// poisoning mutexes or atomics to mutate shared data or use [`wait_catch`] to detect a panic or [`wait`]
690/// to propagate a panic.
691///
692/// [unwind safety validation]: std::panic::UnwindSafe
693pub fn spawn_wait<F>(task: F)
694where
695 F: FnOnce() + Send + 'static,
696{
697 spawn(async move {
698 if let Err(p) = wait_catch(task).await {
699 tracing::error!("parallel `spawn_wait` task panicked: {}", p.panic_str().unwrap_or(""));
700 on_spawn_panic(p);
701 }
702 });
703}
704
705/// Like [`spawn_wait`], but the task will send its result to a [`ResponseVar<R>`].
706///
707/// # Cancellation
708///
709/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
710///
711/// # Panic Handling
712///
713/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
714pub fn wait_respond<R, F>(task: F) -> ResponseVar<R>
715where
716 R: VarValue,
717 F: FnOnce() -> R + Send + 'static,
718{
719 let (responder, response) = response_var();
720 spawn_wait(move || match panic::catch_unwind(panic::AssertUnwindSafe(task)) {
721 Ok(r) => responder.respond(r),
722 Err(p) => {
723 let p = TaskPanicError::new(p);
724 tracing::error!("panic in `task::wait_respond`: {}", p.panic_str().unwrap_or(""));
725 responder.modify(move |_| panic::resume_unwind(p.payload));
726 }
727 });
728 response
729}
730
731/// Blocks the thread until the `task` future finishes.
732///
733/// This function is useful for implementing async tests, using it in an app will probably cause
734/// the app to stop responding.
735///
736/// The crate [`futures-lite`] is used to execute the task.
737///
738/// # Examples
739///
740/// Test a [`run`] call:
741///
742/// ```
743/// use zng_task as task;
744/// # use zng_unit::*;
745/// # async fn foo(u: u8) -> Result<u8, ()> { task::deadline(1.ms()).await; Ok(u) }
746///
747/// # #[test]
748/// # fn __() { }
749/// pub fn run_ok() {
750/// let r = task::block_on(task::run(async { foo(32).await }));
751///
752/// # let value =
753/// r.expect("foo(32) was not Ok");
754/// # assert_eq!(32, value);
755/// }
756/// # run_ok();
757/// ```
758///
759/// [`futures-lite`]: https://docs.rs/futures-lite/
760pub fn block_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
761where
762 F: Future,
763{
764 futures_lite::future::block_on(task.into_future())
765}
766
767/// Continuous poll the `task` until if finishes.
768///
769/// This function is useful for implementing some async tests only, futures don't expect to be polled
770/// continuously. This function is only available in test builds.
771#[cfg(any(test, doc, feature = "test_util"))]
772pub fn spin_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
773where
774 F: Future,
775{
776 use std::pin::pin;
777
778 let mut task = pin!(task.into_future());
779 block_on(future_fn(|cx| match task.as_mut().poll(cx) {
780 Poll::Ready(r) => Poll::Ready(r),
781 Poll::Pending => {
782 cx.waker().wake_by_ref();
783 Poll::Pending
784 }
785 }))
786}
787
788/// Executor used in async doc tests.
789///
790/// If `spin` is `true` the [`spin_on`] executor is used with a timeout of 500 milliseconds.
791/// IF `spin` is `false` the [`block_on`] executor is used with a timeout of 5 seconds.
792#[cfg(any(test, doc, feature = "test_util"))]
793pub fn doc_test<F>(spin: bool, task: impl IntoFuture<IntoFuture = F>) -> F::Output
794where
795 F: Future,
796{
797 use zng_unit::TimeUnits;
798
799 if spin {
800 spin_on(with_deadline(task, 500.ms())).expect("async doc-test timeout")
801 } else {
802 block_on(with_deadline(task, 5.secs())).expect("async doc-test timeout")
803 }
804}
805
806/// A future that is [`Pending`] once and wakes the current task.
807///
808/// After the first `.await` the future is always [`Ready`] and on the first `.await` it calls [`wake`].
809///
810/// [`Pending`]: std::task::Poll::Pending
811/// [`Ready`]: std::task::Poll::Ready
812/// [`wake`]: std::task::Waker::wake
813pub async fn yield_now() {
814 struct YieldNowFut(bool);
815 impl Future for YieldNowFut {
816 type Output = ();
817
818 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
819 if self.0 {
820 Poll::Ready(())
821 } else {
822 self.0 = true;
823 cx.waker().wake_by_ref();
824 Poll::Pending
825 }
826 }
827 }
828
829 YieldNowFut(false).await
830}
831
832/// A future that is [`Pending`] until the `deadline` is reached.
833///
834/// # Examples
835///
836/// Await 5 seconds in a [`spawn`] parallel task:
837///
838/// ```
839/// use zng_task as task;
840/// use zng_unit::*;
841///
842/// task::spawn(async {
843/// println!("waiting 5 seconds..");
844/// task::deadline(5.secs()).await;
845/// println!("5 seconds elapsed.")
846/// });
847/// ```
848///
849/// The future runs on an app provider timer executor, or on the [`futures_timer`] by default.
850///
851/// Note that deadlines from [`Duration`](std::time::Duration) starts *counting* at the moment this function is called,
852/// not at the moment of the first `.await` call.
853///
854/// [`Pending`]: std::task::Poll::Pending
855/// [`futures_timer`]: https://docs.rs/futures-timer
856pub fn deadline(deadline: impl Into<Deadline>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
857 let deadline = deadline.into();
858 if zng_app_context::LocalContext::current_app().is_some() {
859 DEADLINE_SV.read().0(deadline)
860 } else {
861 default_deadline(deadline)
862 }
863}
864
865app_local! {
866 static DEADLINE_SV: (DeadlineService, bool) = const { (default_deadline, false) };
867}
868
869type DeadlineService = fn(Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
870
871fn default_deadline(deadline: Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
872 if let Some(timeout) = deadline.time_left() {
873 Box::pin(futures_timer::Delay::new(timeout))
874 } else {
875 Box::pin(std::future::ready(()))
876 }
877}
878
879/// Deadline APP integration.
880#[expect(non_camel_case_types)]
881pub struct DEADLINE_APP;
882
883impl DEADLINE_APP {
884 /// Called by the app implementer to setup the [`deadline`] executor.
885 ///
886 /// If no app calls this the [`futures_timer`] executor is used.
887 ///
888 /// [`futures_timer`]: https://docs.rs/futures-timer
889 ///
890 /// # Panics
891 ///
892 /// Panics if called more than once for the same app.
893 pub fn init_deadline_service(&self, service: DeadlineService) {
894 let (prev, already_set) = mem::replace(&mut *DEADLINE_SV.write(), (service, true));
895 if already_set {
896 *DEADLINE_SV.write() = (prev, true);
897 panic!("deadline service already inited for this app");
898 }
899 }
900}
901
902/// Implements a [`Future`] from a closure.
903///
904/// # Examples
905///
906/// A future that is ready with a closure returns `Some(R)`.
907///
908/// ```
909/// use std::task::Poll;
910/// use zng_task as task;
911///
912/// async fn ready_some<R>(mut closure: impl FnMut() -> Option<R>) -> R {
913/// task::future_fn(|cx| match closure() {
914/// Some(r) => Poll::Ready(r),
915/// None => Poll::Pending,
916/// })
917/// .await
918/// }
919/// ```
920pub async fn future_fn<T, F>(fn_: F) -> T
921where
922 F: FnMut(&mut std::task::Context) -> Poll<T>,
923{
924 struct PollFn<F>(F);
925 impl<F> Unpin for PollFn<F> {}
926 impl<T, F: FnMut(&mut std::task::Context<'_>) -> Poll<T>> Future for PollFn<F> {
927 type Output = T;
928
929 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
930 (self.0)(cx)
931 }
932 }
933 PollFn(fn_).await
934}
935
936/// Error when [`with_deadline`] reach a time limit before a task finishes.
937#[derive(Debug, Clone, Copy)]
938#[non_exhaustive]
939pub struct DeadlineError {}
940impl fmt::Display for DeadlineError {
941 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
942 write!(f, "reached deadline")
943 }
944}
945impl std::error::Error for DeadlineError {}
946
947/// Add a [`deadline`] to a future.
948///
949/// Returns the `fut` output or [`DeadlineError`] if the deadline elapses first.
950pub async fn with_deadline<O, F: Future<Output = O>>(
951 fut: impl IntoFuture<IntoFuture = F>,
952 deadline: impl Into<Deadline>,
953) -> Result<F::Output, DeadlineError> {
954 let deadline = deadline.into();
955 any!(async { Ok(fut.await) }, async {
956 self::deadline(deadline).await;
957 Err(DeadlineError {})
958 })
959 .await
960}
961
962/// <span data-del-macro-root></span> A future that *zips* other futures.
963///
964/// The macro input is a comma separated list of future expressions. The macro output is a future
965/// that when ".awaited" produces a tuple of results in the same order as the inputs.
966///
967/// At least one input future is required and any number of futures is accepted. For more than
968/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
969/// some IDEs.
970///
971/// Each input must implement [`IntoFuture`]. Note that each input must be known at compile time, use the [`fn@all`] async
972/// function to await on all futures in a dynamic list of futures.
973///
974/// # Examples
975///
976/// Await for three different futures to complete:
977///
978/// ```
979/// use zng_task as task;
980///
981/// # task::doc_test(false, async {
982/// let (a, b, c) = task::all!(task::run(async { 'a' }), task::wait(|| "b"), async { b"c" }).await;
983/// # });
984/// ```
985#[macro_export]
986macro_rules! all {
987 ($fut0:expr $(,)?) => { $crate::__all! { fut0: $fut0; } };
988 ($fut0:expr, $fut1:expr $(,)?) => {
989 $crate::__all! {
990 fut0: $fut0;
991 fut1: $fut1;
992 }
993 };
994 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
995 $crate::__all! {
996 fut0: $fut0;
997 fut1: $fut1;
998 fut2: $fut2;
999 }
1000 };
1001 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1002 $crate::__all! {
1003 fut0: $fut0;
1004 fut1: $fut1;
1005 fut2: $fut2;
1006 fut3: $fut3;
1007 }
1008 };
1009 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1010 $crate::__all! {
1011 fut0: $fut0;
1012 fut1: $fut1;
1013 fut2: $fut2;
1014 fut3: $fut3;
1015 fut4: $fut4;
1016 }
1017 };
1018 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1019 $crate::__all! {
1020 fut0: $fut0;
1021 fut1: $fut1;
1022 fut2: $fut2;
1023 fut3: $fut3;
1024 fut4: $fut4;
1025 fut5: $fut5;
1026 }
1027 };
1028 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1029 $crate::__all! {
1030 fut0: $fut0;
1031 fut1: $fut1;
1032 fut2: $fut2;
1033 fut3: $fut3;
1034 fut4: $fut4;
1035 fut5: $fut5;
1036 fut6: $fut6;
1037 }
1038 };
1039 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1040 $crate::__all! {
1041 fut0: $fut0;
1042 fut1: $fut1;
1043 fut2: $fut2;
1044 fut3: $fut3;
1045 fut4: $fut4;
1046 fut5: $fut5;
1047 fut6: $fut6;
1048 fut7: $fut7;
1049 }
1050 };
1051 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all; $($fut),+ } }
1052}
1053
1054#[doc(hidden)]
1055#[macro_export]
1056macro_rules! __all {
1057 ($($ident:ident: $fut:expr;)+) => {
1058 {
1059 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1060 $crate::future_fn(move |cx| {
1061 use std::task::Poll;
1062
1063 let mut pending = false;
1064
1065 $(
1066 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1067 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1068 // Future::poll call, so it will not move.
1069 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1070 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1071 $ident = $crate::FutureOrOutput::Output(r);
1072 } else {
1073 pending = true;
1074 }
1075 }
1076 )+
1077
1078 if pending {
1079 Poll::Pending
1080 } else {
1081 Poll::Ready(($($ident.take_output()),+))
1082 }
1083 })
1084 }
1085 }
1086}
1087
1088#[doc(hidden)]
1089pub enum FutureOrOutput<F: Future> {
1090 Future(F),
1091 Output(F::Output),
1092 Taken,
1093}
1094impl<F: Future> FutureOrOutput<F> {
1095 pub fn take_output(&mut self) -> F::Output {
1096 match std::mem::replace(self, Self::Taken) {
1097 FutureOrOutput::Output(o) => o,
1098 _ => unreachable!(),
1099 }
1100 }
1101}
1102
1103/// A future that awaits on all `futures` at the same time and returns all results when all futures are ready.
1104///
1105/// This is the dynamic version of [`all!`].
1106pub async fn all<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> Vec<F::Output> {
1107 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1108 future_fn(move |cx| {
1109 let mut pending = false;
1110 for input in &mut futures {
1111 if let FutureOrOutput::Future(fut) = input {
1112 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1113 // Future::poll call, so it will not move.
1114 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1115 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1116 *input = FutureOrOutput::Output(r);
1117 } else {
1118 pending = true;
1119 }
1120 }
1121 }
1122
1123 if pending {
1124 Poll::Pending
1125 } else {
1126 Poll::Ready(futures.iter_mut().map(FutureOrOutput::take_output).collect())
1127 }
1128 })
1129 .await
1130}
1131
1132/// <span data-del-macro-root></span> A future that awaits for the first future that is ready.
1133///
1134/// The macro input is comma separated list of future expressions, the futures must
1135/// all have the same output type. The macro output is a future that when ".awaited" produces
1136/// a single output type instance returned by the first input future that completes.
1137///
1138/// At least one input future is required and any number of futures is accepted. For more than
1139/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1140/// some IDEs.
1141///
1142/// If two futures are ready at the same time the result of the first future in the input list is used.
1143/// After one future is ready the other futures are not polled again and are dropped.
1144///
1145/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1146/// known at compile time, use the [`fn@any`] async function to await on all futures in a dynamic list of futures.
1147///
1148/// # Examples
1149///
1150/// Await for the first of three futures to complete:
1151///
1152/// ```
1153/// use zng_task as task;
1154/// use zng_unit::*;
1155///
1156/// # task::doc_test(false, async {
1157/// let r = task::any!(
1158/// task::run(async {
1159/// task::deadline(300.ms()).await;
1160/// 'a'
1161/// }),
1162/// task::wait(|| 'b'),
1163/// async {
1164/// task::deadline(300.ms()).await;
1165/// 'c'
1166/// }
1167/// )
1168/// .await;
1169///
1170/// assert_eq!('b', r);
1171/// # });
1172/// ```
1173#[macro_export]
1174macro_rules! any {
1175 ($fut0:expr $(,)?) => { $crate::__any! { fut0: $fut0; } };
1176 ($fut0:expr, $fut1:expr $(,)?) => {
1177 $crate::__any! {
1178 fut0: $fut0;
1179 fut1: $fut1;
1180 }
1181 };
1182 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1183 $crate::__any! {
1184 fut0: $fut0;
1185 fut1: $fut1;
1186 fut2: $fut2;
1187 }
1188 };
1189 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1190 $crate::__any! {
1191 fut0: $fut0;
1192 fut1: $fut1;
1193 fut2: $fut2;
1194 fut3: $fut3;
1195 }
1196 };
1197 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1198 $crate::__any! {
1199 fut0: $fut0;
1200 fut1: $fut1;
1201 fut2: $fut2;
1202 fut3: $fut3;
1203 fut4: $fut4;
1204 }
1205 };
1206 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1207 $crate::__any! {
1208 fut0: $fut0;
1209 fut1: $fut1;
1210 fut2: $fut2;
1211 fut3: $fut3;
1212 fut4: $fut4;
1213 fut5: $fut5;
1214 }
1215 };
1216 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1217 $crate::__any! {
1218 fut0: $fut0;
1219 fut1: $fut1;
1220 fut2: $fut2;
1221 fut3: $fut3;
1222 fut4: $fut4;
1223 fut5: $fut5;
1224 fut6: $fut6;
1225 }
1226 };
1227 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1228 $crate::__any! {
1229 fut0: $fut0;
1230 fut1: $fut1;
1231 fut2: $fut2;
1232 fut3: $fut3;
1233 fut4: $fut4;
1234 fut5: $fut5;
1235 fut6: $fut6;
1236 fut7: $fut7;
1237 }
1238 };
1239 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any; $($fut),+ } }
1240}
1241#[doc(hidden)]
1242#[macro_export]
1243macro_rules! __any {
1244 ($($ident:ident: $fut:expr;)+) => {
1245 {
1246 $(let mut $ident = std::future::IntoFuture::into_future($fut);)+
1247 $crate::future_fn(move |cx| {
1248 use std::task::Poll;
1249 $(
1250 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1251 // Future::poll call, so it will not move.
1252 let mut $ident = unsafe { std::pin::Pin::new_unchecked(&mut $ident) };
1253 if let Poll::Ready(r) = $ident.as_mut().poll(cx) {
1254 return Poll::Ready(r)
1255 }
1256 )+
1257
1258 Poll::Pending
1259 })
1260 }
1261 }
1262}
1263#[doc(hidden)]
1264pub use zng_task_proc_macros::task_any_all as __proc_any_all;
1265
1266/// A future that awaits on all `futures` at the same time and returns the first result when the first future is ready.
1267///
1268/// This is the dynamic version of [`any!`].
1269pub async fn any<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> F::Output {
1270 let mut futures: Vec<_> = futures.into_iter().map(IntoFuture::into_future).collect();
1271 future_fn(move |cx| {
1272 for fut in &mut futures {
1273 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1274 // Future::poll call, so it will not move.
1275 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1276 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1277 return Poll::Ready(r);
1278 }
1279 }
1280 Poll::Pending
1281 })
1282 .await
1283}
1284
1285/// <span data-del-macro-root></span> A future that waits for the first future that is ready with an `Ok(T)` result.
1286///
1287/// The macro input is comma separated list of future expressions, the futures must
1288/// all have the same output `Result<T, E>` type, but each can have a different `E`. The macro output is a future
1289/// that when ".awaited" produces a single output of type `Result<T, (E0, E1, ..)>` that is `Ok(T)` if any of the futures
1290/// is `Ok(T)` or is `Err((E0, E1, ..))` is all futures are `Err`.
1291///
1292/// At least one input future is required and any number of futures is accepted. For more than
1293/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1294/// some IDEs.
1295///
1296/// If two futures are ready and `Ok(T)` at the same time the result of the first future in the input list is used.
1297/// After one future is ready and `Ok(T)` the other futures are not polled again and are dropped. After a future
1298/// is ready and `Err(E)` it is also not polled again and dropped.
1299///
1300/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1301/// known at compile time, use the [`fn@any_ok`] async function to await on all futures in a dynamic list of futures.
1302///
1303/// # Examples
1304///
1305/// Await for the first of three futures to complete with `Ok`:
1306///
1307/// ```
1308/// use zng_task as task;
1309/// # #[derive(Debug, PartialEq)]
1310/// # pub struct FooError;
1311/// # task::doc_test(false, async {
1312/// let r = task::any_ok!(
1313/// task::run(async { Err::<char, _>("error") }),
1314/// task::wait(|| Ok::<_, FooError>('b')),
1315/// async { Err::<char, _>(FooError) }
1316/// )
1317/// .await;
1318///
1319/// assert_eq!(Ok('b'), r);
1320/// # });
1321/// ```
1322#[macro_export]
1323macro_rules! any_ok {
1324 ($fut0:expr $(,)?) => { $crate::__any_ok! { fut0: $fut0; } };
1325 ($fut0:expr, $fut1:expr $(,)?) => {
1326 $crate::__any_ok! {
1327 fut0: $fut0;
1328 fut1: $fut1;
1329 }
1330 };
1331 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1332 $crate::__any_ok! {
1333 fut0: $fut0;
1334 fut1: $fut1;
1335 fut2: $fut2;
1336 }
1337 };
1338 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1339 $crate::__any_ok! {
1340 fut0: $fut0;
1341 fut1: $fut1;
1342 fut2: $fut2;
1343 fut3: $fut3;
1344 }
1345 };
1346 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1347 $crate::__any_ok! {
1348 fut0: $fut0;
1349 fut1: $fut1;
1350 fut2: $fut2;
1351 fut3: $fut3;
1352 fut4: $fut4;
1353 }
1354 };
1355 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1356 $crate::__any_ok! {
1357 fut0: $fut0;
1358 fut1: $fut1;
1359 fut2: $fut2;
1360 fut3: $fut3;
1361 fut4: $fut4;
1362 fut5: $fut5;
1363 }
1364 };
1365 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1366 $crate::__any_ok! {
1367 fut0: $fut0;
1368 fut1: $fut1;
1369 fut2: $fut2;
1370 fut3: $fut3;
1371 fut4: $fut4;
1372 fut5: $fut5;
1373 fut6: $fut6;
1374 }
1375 };
1376 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1377 $crate::__any_ok! {
1378 fut0: $fut0;
1379 fut1: $fut1;
1380 fut2: $fut2;
1381 fut3: $fut3;
1382 fut4: $fut4;
1383 fut5: $fut5;
1384 fut6: $fut6;
1385 fut7: $fut7;
1386 }
1387 };
1388 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_ok; $($fut),+ } }
1389}
1390
1391#[doc(hidden)]
1392#[macro_export]
1393macro_rules! __any_ok {
1394 ($($ident:ident: $fut: expr;)+) => {
1395 {
1396 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1397 $crate::future_fn(move |cx| {
1398 use std::task::Poll;
1399
1400 let mut pending = false;
1401
1402 $(
1403 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1404 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1405 // Future::poll call, so it will not move.
1406 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1407 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1408 match r {
1409 Ok(r) => return Poll::Ready(Ok(r)),
1410 Err(e) => {
1411 $ident = $crate::FutureOrOutput::Output(Err(e));
1412 }
1413 }
1414 } else {
1415 pending = true;
1416 }
1417 }
1418 )+
1419
1420 if pending {
1421 Poll::Pending
1422 } else {
1423 Poll::Ready(Err((
1424 $($ident.take_output().unwrap_err()),+
1425 )))
1426 }
1427 })
1428 }
1429 }
1430}
1431
1432/// A future that awaits on all `futures` at the same time and returns when any future is `Ok(_)` or all are `Err(_)`.
1433///
1434/// This is the dynamic version of [`all_some!`].
1435pub async fn any_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Ok, Vec<Err>> {
1436 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1437 future_fn(move |cx| {
1438 let mut pending = false;
1439 for input in &mut futures {
1440 if let FutureOrOutput::Future(fut) = input {
1441 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1442 // Future::poll call, so it will not move.
1443 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1444 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1445 match r {
1446 Ok(r) => return Poll::Ready(Ok(r)),
1447 Err(e) => *input = FutureOrOutput::Output(Err(e)),
1448 }
1449 } else {
1450 pending = true;
1451 }
1452 }
1453 }
1454
1455 if pending {
1456 Poll::Pending
1457 } else {
1458 Poll::Ready(Err(futures
1459 .iter_mut()
1460 .map(|f| match f.take_output() {
1461 Ok(_) => unreachable!(),
1462 Err(e) => e,
1463 })
1464 .collect()))
1465 }
1466 })
1467 .await
1468}
1469
1470/// <span data-del-macro-root></span> A future that is ready when any of the futures is ready and `Some(T)`.
1471///
1472/// The macro input is comma separated list of future expressions, the futures must
1473/// all have the same output `Option<T>` type. The macro output is a future that when ".awaited" produces
1474/// a single output type instance returned by the first input future that completes with a `Some`.
1475/// If all futures complete with a `None` the output is `None`.
1476///
1477/// At least one input future is required and any number of futures is accepted. For more than
1478/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1479/// some IDEs.
1480///
1481/// If two futures are ready and `Some(T)` at the same time the result of the first future in the input list is used.
1482/// After one future is ready and `Some(T)` the other futures are not polled again and are dropped. After a future
1483/// is ready and `None` it is also not polled again and dropped.
1484///
1485/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1486/// known at compile time, use the [`fn@any_some`] async function to await on all futures in a dynamic list of futures.
1487///
1488/// # Examples
1489///
1490/// Await for the first of three futures to complete with `Some`:
1491///
1492/// ```
1493/// use zng_task as task;
1494/// # task::doc_test(false, async {
1495/// let r = task::any_some!(task::run(async { None::<char> }), task::wait(|| Some('b')), async { None::<char> }).await;
1496///
1497/// assert_eq!(Some('b'), r);
1498/// # });
1499/// ```
1500#[macro_export]
1501macro_rules! any_some {
1502 ($fut0:expr $(,)?) => { $crate::__any_some! { fut0: $fut0; } };
1503 ($fut0:expr, $fut1:expr $(,)?) => {
1504 $crate::__any_some! {
1505 fut0: $fut0;
1506 fut1: $fut1;
1507 }
1508 };
1509 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1510 $crate::__any_some! {
1511 fut0: $fut0;
1512 fut1: $fut1;
1513 fut2: $fut2;
1514 }
1515 };
1516 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1517 $crate::__any_some! {
1518 fut0: $fut0;
1519 fut1: $fut1;
1520 fut2: $fut2;
1521 fut3: $fut3;
1522 }
1523 };
1524 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1525 $crate::__any_some! {
1526 fut0: $fut0;
1527 fut1: $fut1;
1528 fut2: $fut2;
1529 fut3: $fut3;
1530 fut4: $fut4;
1531 }
1532 };
1533 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1534 $crate::__any_some! {
1535 fut0: $fut0;
1536 fut1: $fut1;
1537 fut2: $fut2;
1538 fut3: $fut3;
1539 fut4: $fut4;
1540 fut5: $fut5;
1541 }
1542 };
1543 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1544 $crate::__any_some! {
1545 fut0: $fut0;
1546 fut1: $fut1;
1547 fut2: $fut2;
1548 fut3: $fut3;
1549 fut4: $fut4;
1550 fut5: $fut5;
1551 fut6: $fut6;
1552 }
1553 };
1554 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1555 $crate::__any_some! {
1556 fut0: $fut0;
1557 fut1: $fut1;
1558 fut2: $fut2;
1559 fut3: $fut3;
1560 fut4: $fut4;
1561 fut5: $fut5;
1562 fut6: $fut6;
1563 fut7: $fut7;
1564 }
1565 };
1566 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_some; $($fut),+ } }
1567}
1568
1569#[doc(hidden)]
1570#[macro_export]
1571macro_rules! __any_some {
1572 ($($ident:ident: $fut: expr;)+) => {
1573 {
1574 $(let mut $ident = Some(std::future::IntoFuture::into_future($fut));)+
1575 $crate::future_fn(move |cx| {
1576 use std::task::Poll;
1577
1578 let mut pending = false;
1579
1580 $(
1581 if let Some(fut) = $ident.as_mut() {
1582 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1583 // Future::poll call, so it will not move.
1584 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1585 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1586 if let Some(r) = r {
1587 return Poll::Ready(Some(r));
1588 }
1589 $ident = None;
1590 } else {
1591 pending = true;
1592 }
1593 }
1594 )+
1595
1596 if pending {
1597 Poll::Pending
1598 } else {
1599 Poll::Ready(None)
1600 }
1601 })
1602 }
1603 }
1604}
1605
1606/// A future that awaits on all `futures` at the same time and returns when any future is `Some(_)` or all are `None`.
1607///
1608/// This is the dynamic version of [`all_some!`].
1609pub async fn any_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Some> {
1610 let mut futures: Vec<_> = futures.into_iter().map(|f| Some(f.into_future())).collect();
1611 future_fn(move |cx| {
1612 let mut pending = false;
1613 for input in &mut futures {
1614 if let Some(fut) = input {
1615 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1616 // Future::poll call, so it will not move.
1617 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1618 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1619 match r {
1620 Some(r) => return Poll::Ready(Some(r)),
1621 None => *input = None,
1622 }
1623 } else {
1624 pending = true;
1625 }
1626 }
1627 }
1628
1629 if pending { Poll::Pending } else { Poll::Ready(None) }
1630 })
1631 .await
1632}
1633
1634/// <span data-del-macro-root></span> A future that is ready when all futures are ready with an `Ok(T)` result or
1635/// any future is ready with an `Err(E)` result.
1636///
1637/// The output type is `Result<(T0, T1, ..), E>`, the `Ok` type is a tuple with all the `Ok` values, the error
1638/// type is the first error encountered, the input futures must have the same `Err` type but can have different
1639/// `Ok` types.
1640///
1641/// At least one input future is required and any number of futures is accepted. For more than
1642/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1643/// some IDEs.
1644///
1645/// If two futures are ready and `Err(E)` at the same time the result of the first future in the input list is used.
1646/// After one future is ready and `Err(T)` the other futures are not polled again and are dropped. After a future
1647/// is ready it is also not polled again and dropped.
1648///
1649/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1650/// known at compile time, use the [`fn@all_ok`] async function to await on all futures in a dynamic list of futures.
1651///
1652/// # Examples
1653///
1654/// Await for the first of three futures to complete with `Ok(T)`:
1655///
1656/// ```
1657/// use zng_task as task;
1658/// # #[derive(Debug, PartialEq)]
1659/// # struct FooError;
1660/// # task::doc_test(false, async {
1661/// let r = task::all_ok!(
1662/// task::run(async { Ok::<_, FooError>('a') }),
1663/// task::wait(|| Ok::<_, FooError>('b')),
1664/// async { Ok::<_, FooError>('c') }
1665/// )
1666/// .await;
1667///
1668/// assert_eq!(Ok(('a', 'b', 'c')), r);
1669/// # });
1670/// ```
1671///
1672/// And in if any completes with `Err(E)`:
1673///
1674/// ```
1675/// use zng_task as task;
1676/// # #[derive(Debug, PartialEq)]
1677/// # struct FooError;
1678/// # task::doc_test(false, async {
1679/// let r = task::all_ok!(task::run(async { Ok('a') }), task::wait(|| Err::<char, _>(FooError)), async {
1680/// Ok('c')
1681/// })
1682/// .await;
1683///
1684/// assert_eq!(Err(FooError), r);
1685/// # });
1686/// ```
1687#[macro_export]
1688macro_rules! all_ok {
1689 ($fut0:expr $(,)?) => { $crate::__all_ok! { fut0: $fut0; } };
1690 ($fut0:expr, $fut1:expr $(,)?) => {
1691 $crate::__all_ok! {
1692 fut0: $fut0;
1693 fut1: $fut1;
1694 }
1695 };
1696 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1697 $crate::__all_ok! {
1698 fut0: $fut0;
1699 fut1: $fut1;
1700 fut2: $fut2;
1701 }
1702 };
1703 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1704 $crate::__all_ok! {
1705 fut0: $fut0;
1706 fut1: $fut1;
1707 fut2: $fut2;
1708 fut3: $fut3;
1709 }
1710 };
1711 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1712 $crate::__all_ok! {
1713 fut0: $fut0;
1714 fut1: $fut1;
1715 fut2: $fut2;
1716 fut3: $fut3;
1717 fut4: $fut4;
1718 }
1719 };
1720 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1721 $crate::__all_ok! {
1722 fut0: $fut0;
1723 fut1: $fut1;
1724 fut2: $fut2;
1725 fut3: $fut3;
1726 fut4: $fut4;
1727 fut5: $fut5;
1728 }
1729 };
1730 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1731 $crate::__all_ok! {
1732 fut0: $fut0;
1733 fut1: $fut1;
1734 fut2: $fut2;
1735 fut3: $fut3;
1736 fut4: $fut4;
1737 fut5: $fut5;
1738 fut6: $fut6;
1739 }
1740 };
1741 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1742 $crate::__all_ok! {
1743 fut0: $fut0;
1744 fut1: $fut1;
1745 fut2: $fut2;
1746 fut3: $fut3;
1747 fut4: $fut4;
1748 fut5: $fut5;
1749 fut6: $fut6;
1750 fut7: $fut7;
1751 }
1752 };
1753 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_ok; $($fut),+ } }
1754}
1755
1756#[doc(hidden)]
1757#[macro_export]
1758macro_rules! __all_ok {
1759 ($($ident:ident: $fut: expr;)+) => {
1760 {
1761 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1762 $crate::future_fn(move |cx| {
1763 use std::task::Poll;
1764
1765 let mut pending = false;
1766
1767 $(
1768 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1769 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1770 // Future::poll call, so it will not move.
1771 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1772 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1773 match r {
1774 Ok(r) => {
1775 $ident = $crate::FutureOrOutput::Output(Ok(r))
1776 },
1777 Err(e) => return Poll::Ready(Err(e)),
1778 }
1779 } else {
1780 pending = true;
1781 }
1782 }
1783 )+
1784
1785 if pending {
1786 Poll::Pending
1787 } else {
1788 Poll::Ready(Ok((
1789 $($ident.take_output().unwrap()),+
1790 )))
1791 }
1792 })
1793 }
1794 }
1795}
1796
1797/// A future that awaits on all `futures` at the same time and returns when all futures are `Ok(_)` or any future is `Err(_)`.
1798///
1799/// This is the dynamic version of [`all_ok!`].
1800pub async fn all_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Vec<Ok>, Err> {
1801 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1802 future_fn(move |cx| {
1803 let mut pending = false;
1804 for input in &mut futures {
1805 if let FutureOrOutput::Future(fut) = input {
1806 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1807 // Future::poll call, so it will not move.
1808 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1809 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1810 match r {
1811 Ok(r) => *input = FutureOrOutput::Output(Ok(r)),
1812 Err(e) => return Poll::Ready(Err(e)),
1813 }
1814 } else {
1815 pending = true;
1816 }
1817 }
1818 }
1819
1820 if pending {
1821 Poll::Pending
1822 } else {
1823 Poll::Ready(Ok(futures
1824 .iter_mut()
1825 .map(|f| f.take_output().unwrap_or_else(|_| unreachable!()))
1826 .collect()))
1827 }
1828 })
1829 .await
1830}
1831
1832/// <span data-del-macro-root></span> A future that is ready when all futures are ready with `Some(T)` or when any
1833/// is future ready with `None`.
1834///
1835/// The macro input is comma separated list of future expressions, the futures must
1836/// all have the `Option<T>` output type, but each can have a different `T`. The macro output is a future that when ".awaited"
1837/// produces `Some<(T0, T1, ..)>` if all futures where `Some(T)` or `None` if any of the futures where `None`.
1838///
1839/// At least one input future is required and any number of futures is accepted. For more than
1840/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1841/// some IDEs.
1842///
1843/// After one future is ready and `None` the other futures are not polled again and are dropped. After a future
1844/// is ready it is also not polled again and dropped.
1845///
1846/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1847/// known at compile time, use the [`fn@all_some`] async function to await on all futures in a dynamic list of futures.
1848///
1849/// # Examples
1850///
1851/// Await for the first of three futures to complete with `Some`:
1852///
1853/// ```
1854/// use zng_task as task;
1855/// # task::doc_test(false, async {
1856/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| Some('b')), async { Some('c') }).await;
1857///
1858/// assert_eq!(Some(('a', 'b', 'c')), r);
1859/// # });
1860/// ```
1861///
1862/// Completes with `None` if any future completes with `None`:
1863///
1864/// ```
1865/// # use zng_task as task;
1866/// # task::doc_test(false, async {
1867/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| None::<char>), async { Some('b') }).await;
1868///
1869/// assert_eq!(None, r);
1870/// # });
1871/// ```
1872#[macro_export]
1873macro_rules! all_some {
1874 ($fut0:expr $(,)?) => { $crate::__all_some! { fut0: $fut0; } };
1875 ($fut0:expr, $fut1:expr $(,)?) => {
1876 $crate::__all_some! {
1877 fut0: $fut0;
1878 fut1: $fut1;
1879 }
1880 };
1881 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1882 $crate::__all_some! {
1883 fut0: $fut0;
1884 fut1: $fut1;
1885 fut2: $fut2;
1886 }
1887 };
1888 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1889 $crate::__all_some! {
1890 fut0: $fut0;
1891 fut1: $fut1;
1892 fut2: $fut2;
1893 fut3: $fut3;
1894 }
1895 };
1896 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1897 $crate::__all_some! {
1898 fut0: $fut0;
1899 fut1: $fut1;
1900 fut2: $fut2;
1901 fut3: $fut3;
1902 fut4: $fut4;
1903 }
1904 };
1905 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1906 $crate::__all_some! {
1907 fut0: $fut0;
1908 fut1: $fut1;
1909 fut2: $fut2;
1910 fut3: $fut3;
1911 fut4: $fut4;
1912 fut5: $fut5;
1913 }
1914 };
1915 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1916 $crate::__all_some! {
1917 fut0: $fut0;
1918 fut1: $fut1;
1919 fut2: $fut2;
1920 fut3: $fut3;
1921 fut4: $fut4;
1922 fut5: $fut5;
1923 fut6: $fut6;
1924 }
1925 };
1926 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1927 $crate::__all_some! {
1928 fut0: $fut0;
1929 fut1: $fut1;
1930 fut2: $fut2;
1931 fut3: $fut3;
1932 fut4: $fut4;
1933 fut5: $fut5;
1934 fut6: $fut6;
1935 fut7: $fut7;
1936 }
1937 };
1938 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_some; $($fut),+ } }
1939}
1940
1941#[doc(hidden)]
1942#[macro_export]
1943macro_rules! __all_some {
1944 ($($ident:ident: $fut: expr;)+) => {
1945 {
1946 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1947 $crate::future_fn(move |cx| {
1948 use std::task::Poll;
1949
1950 let mut pending = false;
1951
1952 $(
1953 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1954 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1955 // Future::poll call, so it will not move.
1956 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1957 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1958 if r.is_none() {
1959 return Poll::Ready(None);
1960 }
1961
1962 $ident = $crate::FutureOrOutput::Output(r);
1963 } else {
1964 pending = true;
1965 }
1966 }
1967 )+
1968
1969 if pending {
1970 Poll::Pending
1971 } else {
1972 Poll::Ready(Some((
1973 $($ident.take_output().unwrap()),+
1974 )))
1975 }
1976 })
1977 }
1978 }
1979}
1980
1981/// A future that awaits on all `futures` at the same time and returns when all futures are `Some(_)` or any future is `None`.
1982///
1983/// This is the dynamic version of [`all_some!`].
1984pub async fn all_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Vec<Some>> {
1985 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1986 future_fn(move |cx| {
1987 let mut pending = false;
1988 for input in &mut futures {
1989 if let FutureOrOutput::Future(fut) = input {
1990 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1991 // Future::poll call, so it will not move.
1992 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1993 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1994 match r {
1995 Some(r) => *input = FutureOrOutput::Output(Some(r)),
1996 None => return Poll::Ready(None),
1997 }
1998 } else {
1999 pending = true;
2000 }
2001 }
2002 }
2003
2004 if pending {
2005 Poll::Pending
2006 } else {
2007 Poll::Ready(Some(futures.iter_mut().map(|f| f.take_output().unwrap()).collect()))
2008 }
2009 })
2010 .await
2011}
2012
2013/// A future that will await until [`set`] is called.
2014///
2015/// # Examples
2016///
2017/// Spawns a parallel task that only writes to stdout after the main thread sets the signal:
2018///
2019/// ```
2020/// use zng_clone_move::async_clmv;
2021/// use zng_task::{self as task, *};
2022///
2023/// let signal = SignalOnce::default();
2024///
2025/// task::spawn(async_clmv!(signal, {
2026/// signal.await;
2027/// println!("After Signal!");
2028/// }));
2029///
2030/// signal.set();
2031/// ```
2032///
2033/// [`set`]: SignalOnce::set
2034#[derive(Default, Clone)]
2035pub struct SignalOnce(Arc<SignalInner>);
2036impl fmt::Debug for SignalOnce {
2037 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2038 write!(f, "SignalOnce({})", self.is_set())
2039 }
2040}
2041impl PartialEq for SignalOnce {
2042 fn eq(&self, other: &Self) -> bool {
2043 Arc::ptr_eq(&self.0, &other.0)
2044 }
2045}
2046impl Eq for SignalOnce {}
2047impl Hash for SignalOnce {
2048 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2049 Arc::as_ptr(&self.0).hash(state)
2050 }
2051}
2052impl SignalOnce {
2053 /// New unsigned.
2054 pub fn new() -> Self {
2055 Self::default()
2056 }
2057
2058 /// New signaled.
2059 pub fn new_set() -> Self {
2060 let s = Self::new();
2061 s.set();
2062 s
2063 }
2064
2065 /// If the signal was set.
2066 pub fn is_set(&self) -> bool {
2067 self.0.signaled.load(Ordering::Relaxed)
2068 }
2069
2070 /// Sets the signal and awakes listeners.
2071 pub fn set(&self) {
2072 if !self.0.signaled.swap(true, Ordering::Relaxed) {
2073 let listeners = mem::take(&mut *self.0.listeners.lock());
2074 for listener in listeners {
2075 listener.wake();
2076 }
2077 }
2078 }
2079}
2080impl Future for SignalOnce {
2081 type Output = ();
2082
2083 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
2084 if self.0.signaled.load(Ordering::Relaxed) {
2085 return Poll::Ready(());
2086 }
2087
2088 let mut listeners = self.0.listeners.lock();
2089 if self.0.signaled.load(Ordering::Relaxed) {
2090 return Poll::Ready(());
2091 }
2092
2093 let waker = cx.waker();
2094 if !listeners.iter().any(|w| w.will_wake(waker)) {
2095 listeners.push(waker.clone());
2096 }
2097
2098 Poll::Pending
2099 }
2100}
2101
2102#[derive(Default)]
2103struct SignalInner {
2104 signaled: AtomicBool,
2105 listeners: Mutex<Vec<std::task::Waker>>,
2106}
2107
2108/// A [`Waker`] that dispatches a wake call to multiple other wakers.
2109///
2110/// This is useful for sharing one wake source with multiple [`Waker`] clients that may not be all
2111/// known at the moment the first request is made.
2112///
2113/// [`Waker`]: std::task::Waker
2114#[derive(Clone)]
2115pub struct McWaker(Arc<WakeVec>);
2116
2117#[derive(Default)]
2118struct WakeVec(Mutex<Vec<std::task::Waker>>);
2119impl WakeVec {
2120 fn push(&self, waker: std::task::Waker) -> bool {
2121 let mut v = self.0.lock();
2122
2123 let return_waker = v.is_empty();
2124
2125 v.push(waker);
2126
2127 return_waker
2128 }
2129
2130 fn cancel(&self) {
2131 let mut v = self.0.lock();
2132
2133 debug_assert!(!v.is_empty(), "called cancel on an empty McWaker");
2134
2135 v.clear();
2136 }
2137}
2138impl std::task::Wake for WakeVec {
2139 fn wake(self: Arc<Self>) {
2140 for w in mem::take(&mut *self.0.lock()) {
2141 w.wake();
2142 }
2143 }
2144}
2145impl McWaker {
2146 /// New empty waker.
2147 pub fn empty() -> Self {
2148 Self(Arc::new(WakeVec::default()))
2149 }
2150
2151 /// Register a `waker` to wake once when `self` awakes.
2152 ///
2153 /// Returns `Some(self as waker)` if `self` was previously empty, if `None` is returned [`Poll::Pending`] must
2154 /// be returned, if a waker is returned the shared resource must be polled using the waker, if the shared resource
2155 /// is ready [`cancel`] must be called.
2156 ///
2157 /// [`cancel`]: Self::cancel
2158 pub fn push(&self, waker: std::task::Waker) -> Option<std::task::Waker> {
2159 if self.0.push(waker) { Some(self.0.clone().into()) } else { None }
2160 }
2161
2162 /// Clear current registered wakers.
2163 pub fn cancel(&self) {
2164 self.0.cancel()
2165 }
2166}
2167
2168/// Panic payload, captured by [`std::panic::catch_unwind`].
2169#[non_exhaustive]
2170pub struct TaskPanicError {
2171 /// Panic payload.
2172 pub payload: Box<dyn Any + Send + 'static>,
2173}
2174impl TaskPanicError {
2175 /// New from panic payload.
2176 pub fn new(payload: Box<dyn Any + Send + 'static>) -> Self {
2177 Self { payload }
2178 }
2179
2180 /// Get the panic string if the `payload` is string like.
2181 pub fn panic_str(&self) -> Option<&str> {
2182 if let Some(s) = self.payload.downcast_ref::<&str>() {
2183 Some(s)
2184 } else if let Some(s) = self.payload.downcast_ref::<String>() {
2185 Some(s)
2186 } else {
2187 None
2188 }
2189 }
2190}
2191impl fmt::Debug for TaskPanicError {
2192 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2193 f.debug_struct("TaskPanicError").field("panic_str()", &self.panic_str()).finish()
2194 }
2195}
2196impl fmt::Display for TaskPanicError {
2197 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2198 if let Some(s) = self.panic_str() { f.write_str(s) } else { Ok(()) }
2199 }
2200}
2201impl std::error::Error for TaskPanicError {}
2202
2203type SpawnPanicHandler = Box<dyn FnMut(TaskPanicError) + Send + 'static>;
2204
2205app_local! {
2206 // Mutex for Sync only
2207 static SPAWN_PANIC_HANDLERS: Option<Mutex<SpawnPanicHandler>> = None;
2208}
2209
2210/// Set a `handler` that is called when spawn tasks panic.
2211///
2212/// On panic the tasks [`spawn`], [`poll_spawn`] and [`spawn_wait`] log an error, notifies the `handler` and otherwise ignores the panic.
2213///
2214/// The handler is set for the process lifetime, only handler can be set per app. The handler is called inside the same [`LocalContext`]
2215/// and thread the task that panicked was called in.
2216///
2217/// ```
2218/// # macro_rules! example { () => {
2219/// task::set_spawn_panic_handler(|p| {
2220/// UPDATES
2221/// .run_hn_once(hn_once!(|_| {
2222/// std::panic::resume_unwind(p.payload);
2223/// }))
2224/// .perm();
2225/// });
2226/// # }}
2227/// ```
2228///
2229/// The example above shows how to set a handler that propagates the panic to the app main thread.
2230///
2231/// # Panics
2232///
2233/// Panics if another handler is already set in the same app.
2234///
2235/// Panics if no app is running in the caller thread.
2236pub fn set_spawn_panic_handler(handler: impl FnMut(TaskPanicError) + Send + 'static) {
2237 let mut h = SPAWN_PANIC_HANDLERS.try_write().expect("a spawn panic handler is already set");
2238 assert!(h.is_none(), "a spawn panic handler is already set");
2239 *h = Some(Mutex::new(Box::new(handler)));
2240}
2241
2242fn on_spawn_panic(p: TaskPanicError) {
2243 if let Some(f) = &mut *SPAWN_PANIC_HANDLERS.write() {
2244 f.get_mut()(p)
2245 }
2246}