zng_task/lib.rs
1#![doc(html_favicon_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo.png")]
3//!
4//! Parallel async tasks and async task runners.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9#![warn(unused_extern_crates)]
10#![warn(missing_docs)]
11
12use std::{
13 fmt,
14 hash::Hash,
15 mem, panic,
16 pin::Pin,
17 sync::{
18 Arc,
19 atomic::{AtomicBool, Ordering},
20 },
21 task::Poll,
22};
23
24#[doc(no_inline)]
25pub use parking_lot;
26use parking_lot::Mutex;
27
28mod crate_util;
29
30use crate::crate_util::PanicResult;
31use zng_app_context::{LocalContext, app_local};
32use zng_time::Deadline;
33use zng_var::{ResponseVar, VarValue, response_done_var, response_var};
34
35#[cfg(test)]
36mod tests;
37
38#[doc(no_inline)]
39pub use rayon;
40
41/// Async filesystem primitives.
42///
43/// This module is the [async-fs](https://docs.rs/async-fs) crate re-exported for convenience.
44pub mod fs {
45 #[doc(inline)]
46 pub use async_fs::*;
47}
48
49pub mod channel;
50pub mod io;
51mod ui;
52
53pub mod http;
54
55pub mod ipc;
56
57mod rayon_ctx;
58
59pub use rayon_ctx::*;
60
61pub use ui::*;
62
63mod progress;
64pub use progress::*;
65
66/// Spawn a parallel async task, this function is not blocking and the `task` starts executing immediately.
67///
68/// # Parallel
69///
70/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
71///
72/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
73/// otherwise it will run in a single thread at a time, still not blocking the UI.
74///
75/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
76///
77/// # Async
78///
79/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
80/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
81/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
82/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
83///
84/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
85///
86/// # Examples
87///
88/// ```
89/// # use zng_task::{self as task, *, rayon::iter::*};
90/// # use zng_var::*;
91/// # struct SomeStruct { sum_response: ResponseVar<usize> }
92/// # impl SomeStruct {
93/// fn on_event(&mut self) {
94/// let (responder, response) = response_var();
95/// self.sum_response = response;
96///
97/// task::spawn(async move {
98/// let r = (0..1000).into_par_iter().map(|i| i * i).sum();
99///
100/// responder.respond(r);
101/// });
102/// }
103///
104/// fn on_update(&mut self) {
105/// if let Some(result) = self.sum_response.rsp_new() {
106/// println!("sum of squares 0..1000: {result}");
107/// }
108/// }
109/// # }
110/// ```
111///
112/// The example uses the `rayon` parallel iterator to compute a result and uses a [`response_var`] to send the result to the UI.
113/// The task captures the caller [`LocalContext`] so the response variable will set correctly.
114///
115/// Note that this function is the most basic way to spawn a parallel task where you must setup channels to the rest of the app yourself,
116/// you can use [`respond`] to avoid having to manually set a response, or [`run`] to `.await` the result.
117///
118/// # Panic Handling
119///
120/// If the `task` panics the panic message is logged as an error, the panic is otherwise ignored.
121///
122/// # Unwind Safety
123///
124/// This function disables the [unwind safety validation], meaning that in case of a panic shared
125/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
126/// poisoning mutexes or atomics to mutate shared data or use [`run_catch`] to detect a panic or [`run`]
127/// to propagate a panic.
128///
129/// [unwind safety validation]: std::panic::UnwindSafe
130/// [`Waker`]: std::task::Waker
131/// [`rayon`]: https://docs.rs/rayon
132/// [`LocalContext`]: zng_app_context::LocalContext
133/// [`response_var`]: zng_var::response_var
134pub fn spawn<F>(task: impl IntoFuture<IntoFuture = F>)
135where
136 F: Future<Output = ()> + Send + 'static,
137{
138 Arc::new(RayonTask {
139 ctx: LocalContext::capture(),
140 fut: Mutex::new(Some(Box::pin(task.into_future()))),
141 })
142 .poll()
143}
144
145/// Polls the `task` once immediately on the calling thread, if the `task` is pending, continues execution in [`spawn`].
146pub fn poll_spawn<F>(task: impl IntoFuture<IntoFuture = F>)
147where
148 F: Future<Output = ()> + Send + 'static,
149{
150 struct PollRayonTask {
151 fut: Mutex<Option<(RayonSpawnFut, Option<LocalContext>)>>,
152 }
153 impl PollRayonTask {
154 // start task in calling thread
155 fn poll(self: Arc<Self>) {
156 let mut task = self.fut.lock();
157 let (mut t, _) = task.take().unwrap();
158
159 let waker = self.clone().into();
160
161 match t.as_mut().poll(&mut std::task::Context::from_waker(&waker)) {
162 Poll::Ready(()) => {}
163 Poll::Pending => {
164 let ctx = LocalContext::capture();
165 *task = Some((t, Some(ctx)));
166 }
167 }
168 }
169 }
170 impl std::task::Wake for PollRayonTask {
171 fn wake(self: Arc<Self>) {
172 // continue task in spawn threads
173 if let Some((task, Some(ctx))) = self.fut.lock().take() {
174 Arc::new(RayonTask {
175 ctx,
176 fut: Mutex::new(Some(Box::pin(task))),
177 })
178 .poll();
179 }
180 }
181 }
182
183 Arc::new(PollRayonTask {
184 fut: Mutex::new(Some((Box::pin(task.into_future()), None))),
185 })
186 .poll()
187}
188
189type RayonSpawnFut = Pin<Box<dyn Future<Output = ()> + Send>>;
190
191// A future that is its own waker that polls inside rayon spawn tasks.
192struct RayonTask {
193 ctx: LocalContext,
194 fut: Mutex<Option<RayonSpawnFut>>,
195}
196impl RayonTask {
197 fn poll(self: Arc<Self>) {
198 rayon::spawn(move || {
199 // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
200 let mut task = self.fut.lock();
201 if let Some(mut t) = task.take() {
202 let waker = self.clone().into();
203
204 // load app context
205 self.ctx.clone().with_context(move || {
206 let r = panic::catch_unwind(panic::AssertUnwindSafe(move || {
207 // poll future
208 if t.as_mut().poll(&mut std::task::Context::from_waker(&waker)).is_pending() {
209 // not done
210 *task = Some(t);
211 }
212 }));
213 if let Err(p) = r {
214 tracing::error!("panic in `task::spawn`: {}", crate_util::panic_str(&p));
215 }
216 });
217 }
218 })
219 }
220}
221impl std::task::Wake for RayonTask {
222 fn wake(self: Arc<Self>) {
223 self.poll()
224 }
225}
226
227/// Rayon join with local context.
228///
229/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
230/// operations.
231///
232/// See `rayon::join` for more details about join.
233///
234/// [`LocalContext`]: zng_app_context::LocalContext
235pub fn join<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
236where
237 A: FnOnce() -> RA + Send,
238 B: FnOnce() -> RB + Send,
239 RA: Send,
240 RB: Send,
241{
242 self::join_context(move |_| op_a(), move |_| op_b())
243}
244
245/// Rayon join context with local context.
246///
247/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
248/// operations.
249///
250/// See `rayon::join_context` for more details about join.
251///
252/// [`LocalContext`]: zng_app_context::LocalContext
253pub fn join_context<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
254where
255 A: FnOnce(rayon::FnContext) -> RA + Send,
256 B: FnOnce(rayon::FnContext) -> RB + Send,
257 RA: Send,
258 RB: Send,
259{
260 let ctx = LocalContext::capture();
261 let ctx = &ctx;
262 rayon::join_context(
263 move |a| {
264 if a.migrated() {
265 ctx.clone().with_context(|| op_a(a))
266 } else {
267 op_a(a)
268 }
269 },
270 move |b| {
271 if b.migrated() {
272 ctx.clone().with_context(|| op_b(b))
273 } else {
274 op_b(b)
275 }
276 },
277 )
278}
279
280/// Rayon scope with local context.
281///
282/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
283/// operations.
284///
285/// See `rayon::scope` for more details about scope.
286///
287/// [`LocalContext`]: zng_app_context::LocalContext
288pub fn scope<'scope, OP, R>(op: OP) -> R
289where
290 OP: FnOnce(ScopeCtx<'_, 'scope>) -> R + Send,
291 R: Send,
292{
293 let ctx = LocalContext::capture();
294
295 // Cast `&'_ ctx` to `&'scope ctx` to "inject" the context in the scope.
296 // Is there a better way to do this? I hope so.
297 //
298 // SAFETY:
299 // * We are extending `'_` to `'scope`, that is one of the documented valid usages of `transmute`.
300 // * No use after free because `rayon::scope` joins all threads before returning and we only drop `ctx` after.
301 let ctx_ref: &'_ LocalContext = &ctx;
302 let ctx_scope_ref: &'scope LocalContext = unsafe { std::mem::transmute(ctx_ref) };
303
304 let r = rayon::scope(move |s| {
305 op(ScopeCtx {
306 scope: s,
307 ctx: ctx_scope_ref,
308 })
309 });
310
311 drop(ctx);
312
313 r
314}
315
316/// Represents a fork-join scope which can be used to spawn any number of tasks that run in the caller's thread context.
317///
318/// See [`scope`] for more details.
319#[derive(Clone, Copy, Debug)]
320pub struct ScopeCtx<'a, 'scope: 'a> {
321 scope: &'a rayon::Scope<'scope>,
322 ctx: &'scope LocalContext,
323}
324impl<'a, 'scope: 'a> ScopeCtx<'a, 'scope> {
325 /// Spawns a job into the fork-join scope `self`. The job runs in the captured thread context.
326 ///
327 /// See `rayon::Scope::spawn` for more details.
328 pub fn spawn<F>(self, f: F)
329 where
330 F: FnOnce(ScopeCtx<'_, 'scope>) + Send + 'scope,
331 {
332 let ctx = self.ctx;
333 self.scope
334 .spawn(move |s| ctx.clone().with_context(move || f(ScopeCtx { scope: s, ctx })));
335 }
336}
337
338/// Spawn a parallel async task that can also be `.await` for the task result.
339///
340/// # Parallel
341///
342/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
343///
344/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
345/// otherwise it will run in a single thread at a time, still not blocking the UI.
346///
347/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
348///
349/// # Async
350///
351/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
352/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
353/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
354/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
355///
356/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
357///
358/// # Examples
359///
360/// ```
361/// # use zng_task::{self as task, rayon::iter::*};
362/// # struct SomeStruct { sum: usize }
363/// # async fn read_numbers() -> Vec<usize> { vec![] }
364/// # impl SomeStruct {
365/// async fn on_event(&mut self) {
366/// self.sum = task::run(async { read_numbers().await.par_iter().map(|i| i * i).sum() }).await;
367/// }
368/// # }
369/// ```
370///
371/// The example `.await` for some numbers and then uses a parallel iterator to compute a result, this all runs in parallel
372/// because it is inside a `run` task. The task result is then `.await` inside one of the UI async tasks. Note that the
373/// task captures the caller [`LocalContext`] so you can interact with variables and UI services directly inside the task too.
374///
375/// # Cancellation
376///
377/// The task starts running immediately, awaiting the returned future merely awaits for a message from the worker threads and
378/// that means the `task` future is not owned by the returned future. Usually to *cancel* a future you only need to drop it,
379/// in this task dropping the returned future will only drop the `task` once it reaches a `.await` point and detects that the
380/// result channel is disconnected.
381///
382/// If you want to deterministically known that the `task` was cancelled use a cancellation signal.
383///
384/// # Panic Propagation
385///
386/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
387/// can use [`run_catch`] to get the panic as an error instead.
388///
389/// [`resume_unwind`]: panic::resume_unwind
390/// [`Waker`]: std::task::Waker
391/// [`rayon`]: https://docs.rs/rayon
392/// [`LocalContext`]: zng_app_context::LocalContext
393pub async fn run<R, T>(task: impl IntoFuture<IntoFuture = T>) -> R
394where
395 R: Send + 'static,
396 T: Future<Output = R> + Send + 'static,
397{
398 match run_catch(task).await {
399 Ok(r) => r,
400 Err(p) => panic::resume_unwind(p),
401 }
402}
403
404/// Like [`run`] but catches panics.
405///
406/// This task works the same and has the same utility as [`run`], except if returns panic messages
407/// as an error instead of propagating the panic.
408///
409/// # Unwind Safety
410///
411/// This function disables the [unwind safety validation], meaning that in case of a panic shared
412/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
413/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
414/// if this function returns an error.
415///
416/// [unwind safety validation]: std::panic::UnwindSafe
417pub async fn run_catch<R, T>(task: impl IntoFuture<IntoFuture = T>) -> PanicResult<R>
418where
419 R: Send + 'static,
420 T: Future<Output = R> + Send + 'static,
421{
422 type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
423
424 // A future that is its own waker that polls inside the rayon primary thread-pool.
425 struct RayonCatchTask<R> {
426 ctx: LocalContext,
427 fut: Mutex<Option<Fut<R>>>,
428 sender: flume::Sender<PanicResult<R>>,
429 }
430 impl<R: Send + 'static> RayonCatchTask<R> {
431 fn poll(self: Arc<Self>) {
432 let sender = self.sender.clone();
433 if sender.is_disconnected() {
434 return; // cancel.
435 }
436 rayon::spawn(move || {
437 // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
438 let mut task = self.fut.lock();
439 if let Some(mut t) = task.take() {
440 let waker = self.clone().into();
441 let mut cx = std::task::Context::from_waker(&waker);
442
443 self.ctx.clone().with_context(|| {
444 let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
445 match r {
446 Ok(Poll::Ready(r)) => {
447 drop(task);
448 let _ = sender.send(Ok(r));
449 }
450 Ok(Poll::Pending) => {
451 *task = Some(t);
452 }
453 Err(p) => {
454 drop(task);
455 let _ = sender.send(Err(p));
456 }
457 }
458 });
459 }
460 })
461 }
462 }
463 impl<R: Send + 'static> std::task::Wake for RayonCatchTask<R> {
464 fn wake(self: Arc<Self>) {
465 self.poll()
466 }
467 }
468
469 let (sender, receiver) = channel::bounded(1);
470
471 Arc::new(RayonCatchTask {
472 ctx: LocalContext::capture(),
473 fut: Mutex::new(Some(Box::pin(task.into_future()))),
474 sender: sender.into(),
475 })
476 .poll();
477
478 receiver.recv().await.unwrap()
479}
480
481/// Spawn a parallel async task that will send its result to a [`ResponseVar<R>`].
482///
483/// The [`run`] documentation explains how `task` is *parallel* and *async*. The `task` starts executing immediately.
484///
485/// # Examples
486///
487/// ```
488/// # use zng_task::{self as task, rayon::iter::*};
489/// # use zng_var::*;
490/// # struct SomeStruct { sum_response: ResponseVar<usize> }
491/// # async fn read_numbers() -> Vec<usize> { vec![] }
492/// # impl SomeStruct {
493/// fn on_event(&mut self) {
494/// self.sum_response = task::respond(async { read_numbers().await.par_iter().map(|i| i * i).sum() });
495/// }
496///
497/// fn on_update(&mut self) {
498/// if let Some(result) = self.sum_response.rsp_new() {
499/// println!("sum of squares: {result}");
500/// }
501/// }
502/// # }
503/// ```
504///
505/// The example `.await` for some numbers and then uses a parallel iterator to compute a result. The result is send to
506/// `sum_response` that is a [`ResponseVar<R>`].
507///
508/// # Cancellation
509///
510/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
511///
512/// # Panic Handling
513///
514/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
515///
516/// [`resume_unwind`]: panic::resume_unwind
517/// [`ResponseVar<R>`]: zng_var::ResponseVar
518/// [`response_var`]: zng_var::response_var
519pub fn respond<R, F>(task: F) -> ResponseVar<R>
520where
521 R: VarValue,
522 F: Future<Output = R> + Send + 'static,
523{
524 type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
525
526 let (responder, response) = response_var();
527
528 // A future that is its own waker that polls inside the rayon primary thread-pool.
529 struct RayonRespondTask<R: VarValue> {
530 ctx: LocalContext,
531 fut: Mutex<Option<Fut<R>>>,
532 responder: zng_var::ResponderVar<R>,
533 }
534 impl<R: VarValue> RayonRespondTask<R> {
535 fn poll(self: Arc<Self>) {
536 let responder = self.responder.clone();
537 if responder.strong_count() == 2 {
538 return; // cancel.
539 }
540 rayon::spawn(move || {
541 // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
542 let mut task = self.fut.lock();
543 if let Some(mut t) = task.take() {
544 let waker = self.clone().into();
545 let mut cx = std::task::Context::from_waker(&waker);
546
547 self.ctx.clone().with_context(|| {
548 let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
549 match r {
550 Ok(Poll::Ready(r)) => {
551 drop(task);
552
553 responder.respond(r);
554 }
555 Ok(Poll::Pending) => {
556 *task = Some(t);
557 }
558 Err(p) => {
559 tracing::error!("panic in `task::respond`: {}", crate_util::panic_str(&p));
560 drop(task);
561 responder.modify(move |_| panic::resume_unwind(p));
562 }
563 }
564 });
565 }
566 })
567 }
568 }
569 impl<R: VarValue> std::task::Wake for RayonRespondTask<R> {
570 fn wake(self: Arc<Self>) {
571 self.poll()
572 }
573 }
574
575 Arc::new(RayonRespondTask {
576 ctx: LocalContext::capture(),
577 fut: Mutex::new(Some(Box::pin(task))),
578 responder,
579 })
580 .poll();
581
582 response
583}
584
585/// Polls the `task` once immediately on the calling thread, if the `task` is ready returns the response already set,
586/// if the `task` is pending continues execution like [`respond`].
587pub fn poll_respond<R, F>(task: impl IntoFuture<IntoFuture = F>) -> ResponseVar<R>
588where
589 R: VarValue,
590 F: Future<Output = R> + Send + 'static,
591{
592 enum QuickResponse<R: VarValue> {
593 Quick(Option<R>),
594 Response(zng_var::ResponderVar<R>),
595 }
596 let task = task.into_future();
597 let q = Arc::new(Mutex::new(QuickResponse::Quick(None)));
598 poll_spawn(zng_clone_move::async_clmv!(q, {
599 let rsp = task.await;
600
601 match &mut *q.lock() {
602 QuickResponse::Quick(q) => *q = Some(rsp),
603 QuickResponse::Response(r) => r.respond(rsp),
604 }
605 }));
606
607 let mut q = q.lock();
608 match &mut *q {
609 QuickResponse::Quick(q) if q.is_some() => response_done_var(q.take().unwrap()),
610 _ => {
611 let (responder, response) = response_var();
612 *q = QuickResponse::Response(responder);
613 response
614 }
615 }
616}
617
618/// Create a parallel `task` that blocks awaiting for an IO operation, the `task` starts on the first `.await`.
619///
620/// # Parallel
621///
622/// The `task` runs in the [`blocking`] thread-pool which is optimized for awaiting blocking operations.
623/// If the `task` is computation heavy you should use [`run`] and then `wait` inside that task for the
624/// parts that are blocking.
625///
626/// # Examples
627///
628/// ```
629/// # fn main() { }
630/// # use zng_task as task;
631/// # async fn example() {
632/// task::wait(|| std::fs::read_to_string("file.txt")).await
633/// # ; }
634/// ```
635///
636/// The example reads a file, that is a blocking file IO operation, most of the time is spend waiting for the operating system,
637/// so we offload this to a `wait` task. The task can be `.await` inside a [`run`] task or inside one of the UI tasks
638/// like in a async event handler.
639///
640/// # Async Read/Write
641///
642/// For [`std::io::Read`] and [`std::io::Write`] operations you can also use [`io`] and [`fs`] alternatives when you don't
643/// have or want the full file in memory or when you want to apply multiple operations to the file.
644///
645/// # Panic Propagation
646///
647/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
648/// can use [`wait_catch`] to get the panic as an error instead.
649///
650/// [`blocking`]: https://docs.rs/blocking
651/// [`resume_unwind`]: panic::resume_unwind
652pub async fn wait<T, F>(task: F) -> T
653where
654 F: FnOnce() -> T + Send + 'static,
655 T: Send + 'static,
656{
657 match wait_catch(task).await {
658 Ok(r) => r,
659 Err(p) => panic::resume_unwind(p),
660 }
661}
662
663/// Like [`wait`] but catches panics.
664///
665/// This task works the same and has the same utility as [`wait`], except if returns panic messages
666/// as an error instead of propagating the panic.
667///
668/// # Unwind Safety
669///
670/// This function disables the [unwind safety validation], meaning that in case of a panic shared
671/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
672/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
673/// if this function returns an error.
674///
675/// [unwind safety validation]: std::panic::UnwindSafe
676pub async fn wait_catch<T, F>(task: F) -> PanicResult<T>
677where
678 F: FnOnce() -> T + Send + 'static,
679 T: Send + 'static,
680{
681 let mut ctx = LocalContext::capture();
682 blocking::unblock(move || ctx.with_context(move || panic::catch_unwind(panic::AssertUnwindSafe(task)))).await
683}
684
685/// Fire and forget a [`wait`] task. The `task` starts executing immediately.
686///
687/// # Panic Handling
688///
689/// If the `task` panics the panic message is logged as an error, the panic is otherwise ignored.
690///
691/// # Unwind Safety
692///
693/// This function disables the [unwind safety validation], meaning that in case of a panic shared
694/// data can end-up in an invalid (still memory safe) state. If you are worried about that only use
695/// poisoning mutexes or atomics to mutate shared data or use [`wait_catch`] to detect a panic or [`wait`]
696/// to propagate a panic.
697///
698/// [unwind safety validation]: std::panic::UnwindSafe
699pub fn spawn_wait<F>(task: F)
700where
701 F: FnOnce() + Send + 'static,
702{
703 spawn(async move {
704 if let Err(p) = wait_catch(task).await {
705 tracing::error!("parallel `spawn_wait` task panicked: {}", crate_util::panic_str(&p))
706 }
707 });
708}
709
710/// Like [`spawn_wait`], but the task will send its result to a [`ResponseVar<R>`].
711///
712/// # Cancellation
713///
714/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
715///
716/// # Panic Handling
717///
718/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
719pub fn wait_respond<R, F>(task: F) -> ResponseVar<R>
720where
721 R: VarValue,
722 F: FnOnce() -> R + Send + 'static,
723{
724 let (responder, response) = response_var();
725 spawn_wait(move || match panic::catch_unwind(panic::AssertUnwindSafe(task)) {
726 Ok(r) => responder.respond(r),
727 Err(p) => {
728 tracing::error!("panic in `task::wait_respond`: {}", crate_util::panic_str(&p));
729 responder.modify(move |_| panic::resume_unwind(p));
730 }
731 });
732 response
733}
734
735/// Blocks the thread until the `task` future finishes.
736///
737/// This function is useful for implementing async tests, using it in an app will probably cause
738/// the app to stop responding.
739///
740/// The crate [`futures-lite`] is used to execute the task.
741///
742/// # Examples
743///
744/// Test a [`run`] call:
745///
746/// ```
747/// use zng_task as task;
748/// # use zng_unit::*;
749/// # async fn foo(u: u8) -> Result<u8, ()> { task::deadline(1.ms()).await; Ok(u) }
750///
751/// #[test]
752/// # fn __() { }
753/// pub fn run_ok() {
754/// let r = task::block_on(task::run(async { foo(32).await }));
755///
756/// # let value =
757/// r.expect("foo(32) was not Ok");
758/// # assert_eq!(32, value);
759/// }
760/// # run_ok();
761/// ```
762///
763/// [`futures-lite`]: https://docs.rs/futures-lite/
764pub fn block_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
765where
766 F: Future,
767{
768 futures_lite::future::block_on(task.into_future())
769}
770
771/// Continuous poll the `task` until if finishes.
772///
773/// This function is useful for implementing some async tests only, futures don't expect to be polled
774/// continuously. This function is only available in test builds.
775#[cfg(any(test, doc, feature = "test_util"))]
776pub fn spin_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
777where
778 F: Future,
779{
780 use std::pin::pin;
781
782 let mut task = pin!(task.into_future());
783 block_on(future_fn(|cx| match task.as_mut().poll(cx) {
784 Poll::Ready(r) => Poll::Ready(r),
785 Poll::Pending => {
786 cx.waker().wake_by_ref();
787 Poll::Pending
788 }
789 }))
790}
791
792/// Executor used in async doc tests.
793///
794/// If `spin` is `true` the [`spin_on`] executor is used with a timeout of 500 milliseconds.
795/// IF `spin` is `false` the [`block_on`] executor is used with a timeout of 5 seconds.
796#[cfg(any(test, doc, feature = "test_util"))]
797pub fn doc_test<F>(spin: bool, task: impl IntoFuture<IntoFuture = F>) -> F::Output
798where
799 F: Future,
800{
801 use zng_unit::TimeUnits;
802
803 if spin {
804 spin_on(with_deadline(task, 500.ms())).expect("async doc-test timeout")
805 } else {
806 block_on(with_deadline(task, 5.secs())).expect("async doc-test timeout")
807 }
808}
809
810/// A future that is [`Pending`] once and wakes the current task.
811///
812/// After the first `.await` the future is always [`Ready`] and on the first `.await` it calls [`wake`].
813///
814/// [`Pending`]: std::task::Poll::Pending
815/// [`Ready`]: std::task::Poll::Ready
816/// [`wake`]: std::task::Waker::wake
817pub async fn yield_now() {
818 struct YieldNowFut(bool);
819 impl Future for YieldNowFut {
820 type Output = ();
821
822 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
823 if self.0 {
824 Poll::Ready(())
825 } else {
826 self.0 = true;
827 cx.waker().wake_by_ref();
828 Poll::Pending
829 }
830 }
831 }
832
833 YieldNowFut(false).await
834}
835
836/// A future that is [`Pending`] until the `deadline` is reached.
837///
838/// # Examples
839///
840/// Await 5 seconds in a [`spawn`] parallel task:
841///
842/// ```
843/// use zng_task as task;
844/// use zng_unit::*;
845///
846/// task::spawn(async {
847/// println!("waiting 5 seconds..");
848/// task::deadline(5.secs()).await;
849/// println!("5 seconds elapsed.")
850/// });
851/// ```
852///
853/// The future runs on an app provider timer executor, or on the [`futures_timer`] by default.
854///
855/// Note that deadlines from [`Duration`](std::time::Duration) starts *counting* at the moment this function is called,
856/// not at the moment of the first `.await` call.
857///
858/// [`Pending`]: std::task::Poll::Pending
859/// [`futures_timer`]: https://docs.rs/futures-timer
860pub fn deadline(deadline: impl Into<Deadline>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
861 let deadline = deadline.into();
862 if zng_app_context::LocalContext::current_app().is_some() {
863 DEADLINE_SV.read().0(deadline)
864 } else {
865 default_deadline(deadline)
866 }
867}
868
869app_local! {
870 static DEADLINE_SV: (DeadlineService, bool) = const { (default_deadline, false) };
871}
872
873type DeadlineService = fn(Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
874
875fn default_deadline(deadline: Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
876 if let Some(timeout) = deadline.time_left() {
877 Box::pin(futures_timer::Delay::new(timeout))
878 } else {
879 Box::pin(std::future::ready(()))
880 }
881}
882
883/// Deadline APP integration.
884#[expect(non_camel_case_types)]
885pub struct DEADLINE_APP;
886
887impl DEADLINE_APP {
888 /// Called by the app implementer to setup the [`deadline`] executor.
889 ///
890 /// If no app calls this the [`futures_timer`] executor is used.
891 ///
892 /// [`futures_timer`]: https://docs.rs/futures-timer
893 ///
894 /// # Panics
895 ///
896 /// Panics if called more than once for the same app.
897 pub fn init_deadline_service(&self, service: DeadlineService) {
898 let (prev, already_set) = mem::replace(&mut *DEADLINE_SV.write(), (service, true));
899 if already_set {
900 *DEADLINE_SV.write() = (prev, true);
901 panic!("deadline service already inited for this app");
902 }
903 }
904}
905
906/// Implements a [`Future`] from a closure.
907///
908/// # Examples
909///
910/// A future that is ready with a closure returns `Some(R)`.
911///
912/// ```
913/// use std::task::Poll;
914/// use zng_task as task;
915///
916/// async fn ready_some<R>(mut closure: impl FnMut() -> Option<R>) -> R {
917/// task::future_fn(|cx| match closure() {
918/// Some(r) => Poll::Ready(r),
919/// None => Poll::Pending,
920/// })
921/// .await
922/// }
923/// ```
924pub async fn future_fn<T, F>(fn_: F) -> T
925where
926 F: FnMut(&mut std::task::Context) -> Poll<T>,
927{
928 struct PollFn<F>(F);
929 impl<F> Unpin for PollFn<F> {}
930 impl<T, F: FnMut(&mut std::task::Context<'_>) -> Poll<T>> Future for PollFn<F> {
931 type Output = T;
932
933 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
934 (self.0)(cx)
935 }
936 }
937 PollFn(fn_).await
938}
939
940/// Error when [`with_deadline`] reach a time limit before a task finishes.
941#[derive(Debug, Clone, Copy)]
942#[non_exhaustive]
943pub struct DeadlineError {}
944impl fmt::Display for DeadlineError {
945 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
946 write!(f, "reached deadline")
947 }
948}
949impl std::error::Error for DeadlineError {}
950
951/// Add a [`deadline`] to a future.
952///
953/// Returns the `fut` output or [`DeadlineError`] if the deadline elapses first.
954pub async fn with_deadline<O, F: Future<Output = O>>(
955 fut: impl IntoFuture<IntoFuture = F>,
956 deadline: impl Into<Deadline>,
957) -> Result<F::Output, DeadlineError> {
958 let deadline = deadline.into();
959 any!(async { Ok(fut.await) }, async {
960 self::deadline(deadline).await;
961 Err(DeadlineError {})
962 })
963 .await
964}
965
966/// <span data-del-macro-root></span> A future that *zips* other futures.
967///
968/// The macro input is a comma separated list of future expressions. The macro output is a future
969/// that when ".awaited" produces a tuple of results in the same order as the inputs.
970///
971/// At least one input future is required and any number of futures is accepted. For more than
972/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
973/// some IDEs.
974///
975/// Each input must implement [`IntoFuture`]. Note that each input must be known at compile time, use the [`fn@all`] async
976/// function to await on all futures in a dynamic list of futures.
977///
978/// # Examples
979///
980/// Await for three different futures to complete:
981///
982/// ```
983/// use zng_task as task;
984///
985/// # task::doc_test(false, async {
986/// let (a, b, c) = task::all!(task::run(async { 'a' }), task::wait(|| "b"), async { b"c" }).await;
987/// # });
988/// ```
989#[macro_export]
990macro_rules! all {
991 ($fut0:expr $(,)?) => { $crate::__all! { fut0: $fut0; } };
992 ($fut0:expr, $fut1:expr $(,)?) => {
993 $crate::__all! {
994 fut0: $fut0;
995 fut1: $fut1;
996 }
997 };
998 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
999 $crate::__all! {
1000 fut0: $fut0;
1001 fut1: $fut1;
1002 fut2: $fut2;
1003 }
1004 };
1005 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1006 $crate::__all! {
1007 fut0: $fut0;
1008 fut1: $fut1;
1009 fut2: $fut2;
1010 fut3: $fut3;
1011 }
1012 };
1013 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1014 $crate::__all! {
1015 fut0: $fut0;
1016 fut1: $fut1;
1017 fut2: $fut2;
1018 fut3: $fut3;
1019 fut4: $fut4;
1020 }
1021 };
1022 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1023 $crate::__all! {
1024 fut0: $fut0;
1025 fut1: $fut1;
1026 fut2: $fut2;
1027 fut3: $fut3;
1028 fut4: $fut4;
1029 fut5: $fut5;
1030 }
1031 };
1032 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1033 $crate::__all! {
1034 fut0: $fut0;
1035 fut1: $fut1;
1036 fut2: $fut2;
1037 fut3: $fut3;
1038 fut4: $fut4;
1039 fut5: $fut5;
1040 fut6: $fut6;
1041 }
1042 };
1043 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1044 $crate::__all! {
1045 fut0: $fut0;
1046 fut1: $fut1;
1047 fut2: $fut2;
1048 fut3: $fut3;
1049 fut4: $fut4;
1050 fut5: $fut5;
1051 fut6: $fut6;
1052 fut7: $fut7;
1053 }
1054 };
1055 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all; $($fut),+ } }
1056}
1057
1058#[doc(hidden)]
1059#[macro_export]
1060macro_rules! __all {
1061 ($($ident:ident: $fut:expr;)+) => {
1062 {
1063 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1064 $crate::future_fn(move |cx| {
1065 use std::task::Poll;
1066
1067 let mut pending = false;
1068
1069 $(
1070 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1071 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1072 // Future::poll call, so it will not move.
1073 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1074 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1075 $ident = $crate::FutureOrOutput::Output(r);
1076 } else {
1077 pending = true;
1078 }
1079 }
1080 )+
1081
1082 if pending {
1083 Poll::Pending
1084 } else {
1085 Poll::Ready(($($ident.take_output()),+))
1086 }
1087 })
1088 }
1089 }
1090}
1091
1092#[doc(hidden)]
1093pub enum FutureOrOutput<F: Future> {
1094 Future(F),
1095 Output(F::Output),
1096 Taken,
1097}
1098impl<F: Future> FutureOrOutput<F> {
1099 pub fn take_output(&mut self) -> F::Output {
1100 match std::mem::replace(self, Self::Taken) {
1101 FutureOrOutput::Output(o) => o,
1102 _ => unreachable!(),
1103 }
1104 }
1105}
1106
1107/// A future that awaits on all `futures` at the same time and returns all results when all futures are ready.
1108///
1109/// This is the dynamic version of [`all!`].
1110pub async fn all<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> Vec<F::Output> {
1111 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1112 future_fn(move |cx| {
1113 let mut pending = false;
1114 for input in &mut futures {
1115 if let FutureOrOutput::Future(fut) = input {
1116 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1117 // Future::poll call, so it will not move.
1118 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1119 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1120 *input = FutureOrOutput::Output(r);
1121 } else {
1122 pending = true;
1123 }
1124 }
1125 }
1126
1127 if pending {
1128 Poll::Pending
1129 } else {
1130 Poll::Ready(futures.iter_mut().map(FutureOrOutput::take_output).collect())
1131 }
1132 })
1133 .await
1134}
1135
1136/// <span data-del-macro-root></span> A future that awaits for the first future that is ready.
1137///
1138/// The macro input is comma separated list of future expressions, the futures must
1139/// all have the same output type. The macro output is a future that when ".awaited" produces
1140/// a single output type instance returned by the first input future that completes.
1141///
1142/// At least one input future is required and any number of futures is accepted. For more than
1143/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1144/// some IDEs.
1145///
1146/// If two futures are ready at the same time the result of the first future in the input list is used.
1147/// After one future is ready the other futures are not polled again and are dropped.
1148///
1149/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1150/// known at compile time, use the [`fn@any`] async function to await on all futures in a dynamic list of futures.
1151///
1152/// # Examples
1153///
1154/// Await for the first of three futures to complete:
1155///
1156/// ```
1157/// use zng_task as task;
1158/// use zng_unit::*;
1159///
1160/// # task::doc_test(false, async {
1161/// let r = task::any!(
1162/// task::run(async {
1163/// task::deadline(300.ms()).await;
1164/// 'a'
1165/// }),
1166/// task::wait(|| 'b'),
1167/// async {
1168/// task::deadline(300.ms()).await;
1169/// 'c'
1170/// }
1171/// )
1172/// .await;
1173///
1174/// assert_eq!('b', r);
1175/// # });
1176/// ```
1177#[macro_export]
1178macro_rules! any {
1179 ($fut0:expr $(,)?) => { $crate::__any! { fut0: $fut0; } };
1180 ($fut0:expr, $fut1:expr $(,)?) => {
1181 $crate::__any! {
1182 fut0: $fut0;
1183 fut1: $fut1;
1184 }
1185 };
1186 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1187 $crate::__any! {
1188 fut0: $fut0;
1189 fut1: $fut1;
1190 fut2: $fut2;
1191 }
1192 };
1193 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1194 $crate::__any! {
1195 fut0: $fut0;
1196 fut1: $fut1;
1197 fut2: $fut2;
1198 fut3: $fut3;
1199 }
1200 };
1201 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1202 $crate::__any! {
1203 fut0: $fut0;
1204 fut1: $fut1;
1205 fut2: $fut2;
1206 fut3: $fut3;
1207 fut4: $fut4;
1208 }
1209 };
1210 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1211 $crate::__any! {
1212 fut0: $fut0;
1213 fut1: $fut1;
1214 fut2: $fut2;
1215 fut3: $fut3;
1216 fut4: $fut4;
1217 fut5: $fut5;
1218 }
1219 };
1220 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1221 $crate::__any! {
1222 fut0: $fut0;
1223 fut1: $fut1;
1224 fut2: $fut2;
1225 fut3: $fut3;
1226 fut4: $fut4;
1227 fut5: $fut5;
1228 fut6: $fut6;
1229 }
1230 };
1231 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1232 $crate::__any! {
1233 fut0: $fut0;
1234 fut1: $fut1;
1235 fut2: $fut2;
1236 fut3: $fut3;
1237 fut4: $fut4;
1238 fut5: $fut5;
1239 fut6: $fut6;
1240 fut7: $fut7;
1241 }
1242 };
1243 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any; $($fut),+ } }
1244}
1245#[doc(hidden)]
1246#[macro_export]
1247macro_rules! __any {
1248 ($($ident:ident: $fut:expr;)+) => {
1249 {
1250 $(let mut $ident = std::future::IntoFuture::into_future($fut);)+
1251 $crate::future_fn(move |cx| {
1252 use std::task::Poll;
1253 $(
1254 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1255 // Future::poll call, so it will not move.
1256 let mut $ident = unsafe { std::pin::Pin::new_unchecked(&mut $ident) };
1257 if let Poll::Ready(r) = $ident.as_mut().poll(cx) {
1258 return Poll::Ready(r)
1259 }
1260 )+
1261
1262 Poll::Pending
1263 })
1264 }
1265 }
1266}
1267#[doc(hidden)]
1268pub use zng_task_proc_macros::task_any_all as __proc_any_all;
1269
1270/// A future that awaits on all `futures` at the same time and returns the first result when the first future is ready.
1271///
1272/// This is the dynamic version of [`any!`].
1273pub async fn any<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> F::Output {
1274 let mut futures: Vec<_> = futures.into_iter().map(IntoFuture::into_future).collect();
1275 future_fn(move |cx| {
1276 for fut in &mut futures {
1277 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1278 // Future::poll call, so it will not move.
1279 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1280 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1281 return Poll::Ready(r);
1282 }
1283 }
1284 Poll::Pending
1285 })
1286 .await
1287}
1288
1289/// <span data-del-macro-root></span> A future that waits for the first future that is ready with an `Ok(T)` result.
1290///
1291/// The macro input is comma separated list of future expressions, the futures must
1292/// all have the same output `Result<T, E>` type, but each can have a different `E`. The macro output is a future
1293/// that when ".awaited" produces a single output of type `Result<T, (E0, E1, ..)>` that is `Ok(T)` if any of the futures
1294/// is `Ok(T)` or is `Err((E0, E1, ..))` is all futures are `Err`.
1295///
1296/// At least one input future is required and any number of futures is accepted. For more than
1297/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1298/// some IDEs.
1299///
1300/// If two futures are ready and `Ok(T)` at the same time the result of the first future in the input list is used.
1301/// After one future is ready and `Ok(T)` the other futures are not polled again and are dropped. After a future
1302/// is ready and `Err(E)` it is also not polled again and dropped.
1303///
1304/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1305/// known at compile time, use the [`fn@any_ok`] async function to await on all futures in a dynamic list of futures.
1306///
1307/// # Examples
1308///
1309/// Await for the first of three futures to complete with `Ok`:
1310///
1311/// ```
1312/// use zng_task as task;
1313/// # #[derive(Debug, PartialEq)]
1314/// # pub struct FooError;
1315/// # task::doc_test(false, async {
1316/// let r = task::any_ok!(
1317/// task::run(async { Err::<char, _>("error") }),
1318/// task::wait(|| Ok::<_, FooError>('b')),
1319/// async { Err::<char, _>(FooError) }
1320/// )
1321/// .await;
1322///
1323/// assert_eq!(Ok('b'), r);
1324/// # });
1325/// ```
1326#[macro_export]
1327macro_rules! any_ok {
1328 ($fut0:expr $(,)?) => { $crate::__any_ok! { fut0: $fut0; } };
1329 ($fut0:expr, $fut1:expr $(,)?) => {
1330 $crate::__any_ok! {
1331 fut0: $fut0;
1332 fut1: $fut1;
1333 }
1334 };
1335 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1336 $crate::__any_ok! {
1337 fut0: $fut0;
1338 fut1: $fut1;
1339 fut2: $fut2;
1340 }
1341 };
1342 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1343 $crate::__any_ok! {
1344 fut0: $fut0;
1345 fut1: $fut1;
1346 fut2: $fut2;
1347 fut3: $fut3;
1348 }
1349 };
1350 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1351 $crate::__any_ok! {
1352 fut0: $fut0;
1353 fut1: $fut1;
1354 fut2: $fut2;
1355 fut3: $fut3;
1356 fut4: $fut4;
1357 }
1358 };
1359 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1360 $crate::__any_ok! {
1361 fut0: $fut0;
1362 fut1: $fut1;
1363 fut2: $fut2;
1364 fut3: $fut3;
1365 fut4: $fut4;
1366 fut5: $fut5;
1367 }
1368 };
1369 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1370 $crate::__any_ok! {
1371 fut0: $fut0;
1372 fut1: $fut1;
1373 fut2: $fut2;
1374 fut3: $fut3;
1375 fut4: $fut4;
1376 fut5: $fut5;
1377 fut6: $fut6;
1378 }
1379 };
1380 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1381 $crate::__any_ok! {
1382 fut0: $fut0;
1383 fut1: $fut1;
1384 fut2: $fut2;
1385 fut3: $fut3;
1386 fut4: $fut4;
1387 fut5: $fut5;
1388 fut6: $fut6;
1389 fut7: $fut7;
1390 }
1391 };
1392 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_ok; $($fut),+ } }
1393}
1394
1395#[doc(hidden)]
1396#[macro_export]
1397macro_rules! __any_ok {
1398 ($($ident:ident: $fut: expr;)+) => {
1399 {
1400 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1401 $crate::future_fn(move |cx| {
1402 use std::task::Poll;
1403
1404 let mut pending = false;
1405
1406 $(
1407 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1408 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1409 // Future::poll call, so it will not move.
1410 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1411 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1412 match r {
1413 Ok(r) => return Poll::Ready(Ok(r)),
1414 Err(e) => {
1415 $ident = $crate::FutureOrOutput::Output(Err(e));
1416 }
1417 }
1418 } else {
1419 pending = true;
1420 }
1421 }
1422 )+
1423
1424 if pending {
1425 Poll::Pending
1426 } else {
1427 Poll::Ready(Err((
1428 $($ident.take_output().unwrap_err()),+
1429 )))
1430 }
1431 })
1432 }
1433 }
1434}
1435
1436/// A future that awaits on all `futures` at the same time and returns when any future is `Ok(_)` or all are `Err(_)`.
1437///
1438/// This is the dynamic version of [`all_some!`].
1439pub async fn any_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Ok, Vec<Err>> {
1440 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1441 future_fn(move |cx| {
1442 let mut pending = false;
1443 for input in &mut futures {
1444 if let FutureOrOutput::Future(fut) = input {
1445 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1446 // Future::poll call, so it will not move.
1447 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1448 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1449 match r {
1450 Ok(r) => return Poll::Ready(Ok(r)),
1451 Err(e) => *input = FutureOrOutput::Output(Err(e)),
1452 }
1453 } else {
1454 pending = true;
1455 }
1456 }
1457 }
1458
1459 if pending {
1460 Poll::Pending
1461 } else {
1462 Poll::Ready(Err(futures
1463 .iter_mut()
1464 .map(|f| match f.take_output() {
1465 Ok(_) => unreachable!(),
1466 Err(e) => e,
1467 })
1468 .collect()))
1469 }
1470 })
1471 .await
1472}
1473
1474/// <span data-del-macro-root></span> A future that is ready when any of the futures is ready and `Some(T)`.
1475///
1476/// The macro input is comma separated list of future expressions, the futures must
1477/// all have the same output `Option<T>` type. The macro output is a future that when ".awaited" produces
1478/// a single output type instance returned by the first input future that completes with a `Some`.
1479/// If all futures complete with a `None` the output is `None`.
1480///
1481/// At least one input future is required and any number of futures is accepted. For more than
1482/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1483/// some IDEs.
1484///
1485/// If two futures are ready and `Some(T)` at the same time the result of the first future in the input list is used.
1486/// After one future is ready and `Some(T)` the other futures are not polled again and are dropped. After a future
1487/// is ready and `None` it is also not polled again and dropped.
1488///
1489/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1490/// known at compile time, use the [`fn@any_some`] async function to await on all futures in a dynamic list of futures.
1491///
1492/// # Examples
1493///
1494/// Await for the first of three futures to complete with `Some`:
1495///
1496/// ```
1497/// use zng_task as task;
1498/// # task::doc_test(false, async {
1499/// let r = task::any_some!(task::run(async { None::<char> }), task::wait(|| Some('b')), async { None::<char> }).await;
1500///
1501/// assert_eq!(Some('b'), r);
1502/// # });
1503/// ```
1504#[macro_export]
1505macro_rules! any_some {
1506 ($fut0:expr $(,)?) => { $crate::__any_some! { fut0: $fut0; } };
1507 ($fut0:expr, $fut1:expr $(,)?) => {
1508 $crate::__any_some! {
1509 fut0: $fut0;
1510 fut1: $fut1;
1511 }
1512 };
1513 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1514 $crate::__any_some! {
1515 fut0: $fut0;
1516 fut1: $fut1;
1517 fut2: $fut2;
1518 }
1519 };
1520 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1521 $crate::__any_some! {
1522 fut0: $fut0;
1523 fut1: $fut1;
1524 fut2: $fut2;
1525 fut3: $fut3;
1526 }
1527 };
1528 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1529 $crate::__any_some! {
1530 fut0: $fut0;
1531 fut1: $fut1;
1532 fut2: $fut2;
1533 fut3: $fut3;
1534 fut4: $fut4;
1535 }
1536 };
1537 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1538 $crate::__any_some! {
1539 fut0: $fut0;
1540 fut1: $fut1;
1541 fut2: $fut2;
1542 fut3: $fut3;
1543 fut4: $fut4;
1544 fut5: $fut5;
1545 }
1546 };
1547 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1548 $crate::__any_some! {
1549 fut0: $fut0;
1550 fut1: $fut1;
1551 fut2: $fut2;
1552 fut3: $fut3;
1553 fut4: $fut4;
1554 fut5: $fut5;
1555 fut6: $fut6;
1556 }
1557 };
1558 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1559 $crate::__any_some! {
1560 fut0: $fut0;
1561 fut1: $fut1;
1562 fut2: $fut2;
1563 fut3: $fut3;
1564 fut4: $fut4;
1565 fut5: $fut5;
1566 fut6: $fut6;
1567 fut7: $fut7;
1568 }
1569 };
1570 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_some; $($fut),+ } }
1571}
1572
1573#[doc(hidden)]
1574#[macro_export]
1575macro_rules! __any_some {
1576 ($($ident:ident: $fut: expr;)+) => {
1577 {
1578 $(let mut $ident = Some(std::future::IntoFuture::into_future($fut));)+
1579 $crate::future_fn(move |cx| {
1580 use std::task::Poll;
1581
1582 let mut pending = false;
1583
1584 $(
1585 if let Some(fut) = $ident.as_mut() {
1586 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1587 // Future::poll call, so it will not move.
1588 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1589 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1590 if let Some(r) = r {
1591 return Poll::Ready(Some(r));
1592 }
1593 $ident = None;
1594 } else {
1595 pending = true;
1596 }
1597 }
1598 )+
1599
1600 if pending {
1601 Poll::Pending
1602 } else {
1603 Poll::Ready(None)
1604 }
1605 })
1606 }
1607 }
1608}
1609
1610/// A future that awaits on all `futures` at the same time and returns when any future is `Some(_)` or all are `None`.
1611///
1612/// This is the dynamic version of [`all_some!`].
1613pub async fn any_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Some> {
1614 let mut futures: Vec<_> = futures.into_iter().map(|f| Some(f.into_future())).collect();
1615 future_fn(move |cx| {
1616 let mut pending = false;
1617 for input in &mut futures {
1618 if let Some(fut) = input {
1619 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1620 // Future::poll call, so it will not move.
1621 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1622 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1623 match r {
1624 Some(r) => return Poll::Ready(Some(r)),
1625 None => *input = None,
1626 }
1627 } else {
1628 pending = true;
1629 }
1630 }
1631 }
1632
1633 if pending { Poll::Pending } else { Poll::Ready(None) }
1634 })
1635 .await
1636}
1637
1638/// <span data-del-macro-root></span> A future that is ready when all futures are ready with an `Ok(T)` result or
1639/// any future is ready with an `Err(E)` result.
1640///
1641/// The output type is `Result<(T0, T1, ..), E>`, the `Ok` type is a tuple with all the `Ok` values, the error
1642/// type is the first error encountered, the input futures must have the same `Err` type but can have different
1643/// `Ok` types.
1644///
1645/// At least one input future is required and any number of futures is accepted. For more than
1646/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1647/// some IDEs.
1648///
1649/// If two futures are ready and `Err(E)` at the same time the result of the first future in the input list is used.
1650/// After one future is ready and `Err(T)` the other futures are not polled again and are dropped. After a future
1651/// is ready it is also not polled again and dropped.
1652///
1653/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1654/// known at compile time, use the [`fn@all_ok`] async function to await on all futures in a dynamic list of futures.
1655///
1656/// # Examples
1657///
1658/// Await for the first of three futures to complete with `Ok(T)`:
1659///
1660/// ```
1661/// use zng_task as task;
1662/// # #[derive(Debug, PartialEq)]
1663/// # struct FooError;
1664/// # task::doc_test(false, async {
1665/// let r = task::all_ok!(
1666/// task::run(async { Ok::<_, FooError>('a') }),
1667/// task::wait(|| Ok::<_, FooError>('b')),
1668/// async { Ok::<_, FooError>('c') }
1669/// )
1670/// .await;
1671///
1672/// assert_eq!(Ok(('a', 'b', 'c')), r);
1673/// # });
1674/// ```
1675///
1676/// And in if any completes with `Err(E)`:
1677///
1678/// ```
1679/// use zng_task as task;
1680/// # #[derive(Debug, PartialEq)]
1681/// # struct FooError;
1682/// # task::doc_test(false, async {
1683/// let r = task::all_ok!(task::run(async { Ok('a') }), task::wait(|| Err::<char, _>(FooError)), async {
1684/// Ok('c')
1685/// })
1686/// .await;
1687///
1688/// assert_eq!(Err(FooError), r);
1689/// # });
1690/// ```
1691#[macro_export]
1692macro_rules! all_ok {
1693 ($fut0:expr $(,)?) => { $crate::__all_ok! { fut0: $fut0; } };
1694 ($fut0:expr, $fut1:expr $(,)?) => {
1695 $crate::__all_ok! {
1696 fut0: $fut0;
1697 fut1: $fut1;
1698 }
1699 };
1700 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1701 $crate::__all_ok! {
1702 fut0: $fut0;
1703 fut1: $fut1;
1704 fut2: $fut2;
1705 }
1706 };
1707 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1708 $crate::__all_ok! {
1709 fut0: $fut0;
1710 fut1: $fut1;
1711 fut2: $fut2;
1712 fut3: $fut3;
1713 }
1714 };
1715 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1716 $crate::__all_ok! {
1717 fut0: $fut0;
1718 fut1: $fut1;
1719 fut2: $fut2;
1720 fut3: $fut3;
1721 fut4: $fut4;
1722 }
1723 };
1724 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1725 $crate::__all_ok! {
1726 fut0: $fut0;
1727 fut1: $fut1;
1728 fut2: $fut2;
1729 fut3: $fut3;
1730 fut4: $fut4;
1731 fut5: $fut5;
1732 }
1733 };
1734 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1735 $crate::__all_ok! {
1736 fut0: $fut0;
1737 fut1: $fut1;
1738 fut2: $fut2;
1739 fut3: $fut3;
1740 fut4: $fut4;
1741 fut5: $fut5;
1742 fut6: $fut6;
1743 }
1744 };
1745 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1746 $crate::__all_ok! {
1747 fut0: $fut0;
1748 fut1: $fut1;
1749 fut2: $fut2;
1750 fut3: $fut3;
1751 fut4: $fut4;
1752 fut5: $fut5;
1753 fut6: $fut6;
1754 fut7: $fut7;
1755 }
1756 };
1757 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_ok; $($fut),+ } }
1758}
1759
1760#[doc(hidden)]
1761#[macro_export]
1762macro_rules! __all_ok {
1763 ($($ident:ident: $fut: expr;)+) => {
1764 {
1765 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1766 $crate::future_fn(move |cx| {
1767 use std::task::Poll;
1768
1769 let mut pending = false;
1770
1771 $(
1772 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1773 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1774 // Future::poll call, so it will not move.
1775 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1776 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1777 match r {
1778 Ok(r) => {
1779 $ident = $crate::FutureOrOutput::Output(Ok(r))
1780 },
1781 Err(e) => return Poll::Ready(Err(e)),
1782 }
1783 } else {
1784 pending = true;
1785 }
1786 }
1787 )+
1788
1789 if pending {
1790 Poll::Pending
1791 } else {
1792 Poll::Ready(Ok((
1793 $($ident.take_output().unwrap()),+
1794 )))
1795 }
1796 })
1797 }
1798 }
1799}
1800
1801/// A future that awaits on all `futures` at the same time and returns when all futures are `Ok(_)` or any future is `Err(_)`.
1802///
1803/// This is the dynamic version of [`all_ok!`].
1804pub async fn all_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Vec<Ok>, Err> {
1805 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1806 future_fn(move |cx| {
1807 let mut pending = false;
1808 for input in &mut futures {
1809 if let FutureOrOutput::Future(fut) = input {
1810 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1811 // Future::poll call, so it will not move.
1812 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1813 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1814 match r {
1815 Ok(r) => *input = FutureOrOutput::Output(Ok(r)),
1816 Err(e) => return Poll::Ready(Err(e)),
1817 }
1818 } else {
1819 pending = true;
1820 }
1821 }
1822 }
1823
1824 if pending {
1825 Poll::Pending
1826 } else {
1827 Poll::Ready(Ok(futures
1828 .iter_mut()
1829 .map(|f| f.take_output().unwrap_or_else(|_| unreachable!()))
1830 .collect()))
1831 }
1832 })
1833 .await
1834}
1835
1836/// <span data-del-macro-root></span> A future that is ready when all futures are ready with `Some(T)` or when any
1837/// is future ready with `None`.
1838///
1839/// The macro input is comma separated list of future expressions, the futures must
1840/// all have the `Option<T>` output type, but each can have a different `T`. The macro output is a future that when ".awaited"
1841/// produces `Some<(T0, T1, ..)>` if all futures where `Some(T)` or `None` if any of the futures where `None`.
1842///
1843/// At least one input future is required and any number of futures is accepted. For more than
1844/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1845/// some IDEs.
1846///
1847/// After one future is ready and `None` the other futures are not polled again and are dropped. After a future
1848/// is ready it is also not polled again and dropped.
1849///
1850/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1851/// known at compile time, use the [`fn@all_some`] async function to await on all futures in a dynamic list of futures.
1852///
1853/// # Examples
1854///
1855/// Await for the first of three futures to complete with `Some`:
1856///
1857/// ```
1858/// use zng_task as task;
1859/// # task::doc_test(false, async {
1860/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| Some('b')), async { Some('c') }).await;
1861///
1862/// assert_eq!(Some(('a', 'b', 'c')), r);
1863/// # });
1864/// ```
1865///
1866/// Completes with `None` if any future completes with `None`:
1867///
1868/// ```
1869/// # use zng_task as task;
1870/// # task::doc_test(false, async {
1871/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| None::<char>), async { Some('b') }).await;
1872///
1873/// assert_eq!(None, r);
1874/// # });
1875/// ```
1876#[macro_export]
1877macro_rules! all_some {
1878 ($fut0:expr $(,)?) => { $crate::__all_some! { fut0: $fut0; } };
1879 ($fut0:expr, $fut1:expr $(,)?) => {
1880 $crate::__all_some! {
1881 fut0: $fut0;
1882 fut1: $fut1;
1883 }
1884 };
1885 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1886 $crate::__all_some! {
1887 fut0: $fut0;
1888 fut1: $fut1;
1889 fut2: $fut2;
1890 }
1891 };
1892 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1893 $crate::__all_some! {
1894 fut0: $fut0;
1895 fut1: $fut1;
1896 fut2: $fut2;
1897 fut3: $fut3;
1898 }
1899 };
1900 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1901 $crate::__all_some! {
1902 fut0: $fut0;
1903 fut1: $fut1;
1904 fut2: $fut2;
1905 fut3: $fut3;
1906 fut4: $fut4;
1907 }
1908 };
1909 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1910 $crate::__all_some! {
1911 fut0: $fut0;
1912 fut1: $fut1;
1913 fut2: $fut2;
1914 fut3: $fut3;
1915 fut4: $fut4;
1916 fut5: $fut5;
1917 }
1918 };
1919 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1920 $crate::__all_some! {
1921 fut0: $fut0;
1922 fut1: $fut1;
1923 fut2: $fut2;
1924 fut3: $fut3;
1925 fut4: $fut4;
1926 fut5: $fut5;
1927 fut6: $fut6;
1928 }
1929 };
1930 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1931 $crate::__all_some! {
1932 fut0: $fut0;
1933 fut1: $fut1;
1934 fut2: $fut2;
1935 fut3: $fut3;
1936 fut4: $fut4;
1937 fut5: $fut5;
1938 fut6: $fut6;
1939 fut7: $fut7;
1940 }
1941 };
1942 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_some; $($fut),+ } }
1943}
1944
1945#[doc(hidden)]
1946#[macro_export]
1947macro_rules! __all_some {
1948 ($($ident:ident: $fut: expr;)+) => {
1949 {
1950 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1951 $crate::future_fn(move |cx| {
1952 use std::task::Poll;
1953
1954 let mut pending = false;
1955
1956 $(
1957 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1958 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1959 // Future::poll call, so it will not move.
1960 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1961 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1962 if r.is_none() {
1963 return Poll::Ready(None);
1964 }
1965
1966 $ident = $crate::FutureOrOutput::Output(r);
1967 } else {
1968 pending = true;
1969 }
1970 }
1971 )+
1972
1973 if pending {
1974 Poll::Pending
1975 } else {
1976 Poll::Ready(Some((
1977 $($ident.take_output().unwrap()),+
1978 )))
1979 }
1980 })
1981 }
1982 }
1983}
1984
1985/// A future that awaits on all `futures` at the same time and returns when all futures are `Some(_)` or any future is `None`.
1986///
1987/// This is the dynamic version of [`all_some!`].
1988pub async fn all_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Vec<Some>> {
1989 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1990 future_fn(move |cx| {
1991 let mut pending = false;
1992 for input in &mut futures {
1993 if let FutureOrOutput::Future(fut) = input {
1994 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1995 // Future::poll call, so it will not move.
1996 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1997 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1998 match r {
1999 Some(r) => *input = FutureOrOutput::Output(Some(r)),
2000 None => return Poll::Ready(None),
2001 }
2002 } else {
2003 pending = true;
2004 }
2005 }
2006 }
2007
2008 if pending {
2009 Poll::Pending
2010 } else {
2011 Poll::Ready(Some(futures.iter_mut().map(|f| f.take_output().unwrap()).collect()))
2012 }
2013 })
2014 .await
2015}
2016
2017/// A future that will await until [`set`] is called.
2018///
2019/// # Examples
2020///
2021/// Spawns a parallel task that only writes to stdout after the main thread sets the signal:
2022///
2023/// ```
2024/// use zng_clone_move::async_clmv;
2025/// use zng_task::{self as task, *};
2026///
2027/// let signal = SignalOnce::default();
2028///
2029/// task::spawn(async_clmv!(signal, {
2030/// signal.await;
2031/// println!("After Signal!");
2032/// }));
2033///
2034/// signal.set();
2035/// ```
2036///
2037/// [`set`]: SignalOnce::set
2038#[derive(Default, Clone)]
2039pub struct SignalOnce(Arc<SignalInner>);
2040impl fmt::Debug for SignalOnce {
2041 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2042 write!(f, "SignalOnce({})", self.is_set())
2043 }
2044}
2045impl PartialEq for SignalOnce {
2046 fn eq(&self, other: &Self) -> bool {
2047 Arc::ptr_eq(&self.0, &other.0)
2048 }
2049}
2050impl Eq for SignalOnce {}
2051impl Hash for SignalOnce {
2052 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2053 Arc::as_ptr(&self.0).hash(state)
2054 }
2055}
2056impl SignalOnce {
2057 /// New unsigned.
2058 pub fn new() -> Self {
2059 Self::default()
2060 }
2061
2062 /// New signaled.
2063 pub fn new_set() -> Self {
2064 let s = Self::new();
2065 s.set();
2066 s
2067 }
2068
2069 /// If the signal was set.
2070 pub fn is_set(&self) -> bool {
2071 self.0.signaled.load(Ordering::Relaxed)
2072 }
2073
2074 /// Sets the signal and awakes listeners.
2075 pub fn set(&self) {
2076 if !self.0.signaled.swap(true, Ordering::Relaxed) {
2077 let listeners = mem::take(&mut *self.0.listeners.lock());
2078 for listener in listeners {
2079 listener.wake();
2080 }
2081 }
2082 }
2083}
2084impl Future for SignalOnce {
2085 type Output = ();
2086
2087 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
2088 if self.as_ref().is_set() {
2089 Poll::Ready(())
2090 } else {
2091 let mut listeners = self.0.listeners.lock();
2092 let waker = cx.waker();
2093 if !listeners.iter().any(|w| w.will_wake(waker)) {
2094 listeners.push(waker.clone());
2095 }
2096 Poll::Pending
2097 }
2098 }
2099}
2100
2101#[derive(Default)]
2102struct SignalInner {
2103 signaled: AtomicBool,
2104 listeners: Mutex<Vec<std::task::Waker>>,
2105}
2106
2107/// A [`Waker`] that dispatches a wake call to multiple other wakers.
2108///
2109/// This is useful for sharing one wake source with multiple [`Waker`] clients that may not be all
2110/// known at the moment the first request is made.
2111///
2112/// [`Waker`]: std::task::Waker
2113#[derive(Clone)]
2114pub struct McWaker(Arc<WakeVec>);
2115
2116#[derive(Default)]
2117struct WakeVec(Mutex<Vec<std::task::Waker>>);
2118impl WakeVec {
2119 fn push(&self, waker: std::task::Waker) -> bool {
2120 let mut v = self.0.lock();
2121
2122 let return_waker = v.is_empty();
2123
2124 v.push(waker);
2125
2126 return_waker
2127 }
2128
2129 fn cancel(&self) {
2130 let mut v = self.0.lock();
2131
2132 debug_assert!(!v.is_empty(), "called cancel on an empty McWaker");
2133
2134 v.clear();
2135 }
2136}
2137impl std::task::Wake for WakeVec {
2138 fn wake(self: Arc<Self>) {
2139 for w in mem::take(&mut *self.0.lock()) {
2140 w.wake();
2141 }
2142 }
2143}
2144impl McWaker {
2145 /// New empty waker.
2146 pub fn empty() -> Self {
2147 Self(Arc::new(WakeVec::default()))
2148 }
2149
2150 /// Register a `waker` to wake once when `self` awakes.
2151 ///
2152 /// Returns `Some(self as waker)` if `self` was previously empty, if `None` is returned [`Poll::Pending`] must
2153 /// be returned, if a waker is returned the shared resource must be polled using the waker, if the shared resource
2154 /// is ready [`cancel`] must be called.
2155 ///
2156 /// [`cancel`]: Self::cancel
2157 pub fn push(&self, waker: std::task::Waker) -> Option<std::task::Waker> {
2158 if self.0.push(waker) { Some(self.0.clone().into()) } else { None }
2159 }
2160
2161 /// Clear current registered wakers.
2162 pub fn cancel(&self) {
2163 self.0.cancel()
2164 }
2165}