tokio_util/task/
task_tracker.rs

1//! Types related to the [`TaskTracker`] collection.
2//!
3//! See the documentation of [`TaskTracker`] for more information.
4
5use pin_project_lite::pin_project;
6use std::fmt;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use tokio::sync::{futures::Notified, Notify};
13
14#[cfg(feature = "rt")]
15use tokio::{
16    runtime::Handle,
17    task::{JoinHandle, LocalSet},
18};
19
20/// A task tracker used for waiting until tasks exit.
21///
22/// This is usually used together with [`CancellationToken`] to implement [graceful shutdown]. The
23/// `CancellationToken` is used to signal to tasks that they should shut down, and the
24/// `TaskTracker` is used to wait for them to finish shutting down.
25///
26/// The `TaskTracker` will also keep track of a `closed` boolean. This is used to handle the case
27/// where the `TaskTracker` is empty, but we don't want to shut down yet. This means that the
28/// [`wait`] method will wait until *both* of the following happen at the same time:
29///
30///  * The `TaskTracker` must be closed using the [`close`] method.
31///  * The `TaskTracker` must be empty, that is, all tasks that it is tracking must have exited.
32///
33/// When a call to [`wait`] returns, it is guaranteed that all tracked tasks have exited and that
34/// the destructor of the future has finished running. However, there might be a short amount of
35/// time where [`JoinHandle::is_finished`] returns false.
36///
37/// # Comparison to `JoinSet`
38///
39/// The main Tokio crate has a similar collection known as [`JoinSet`]. The `JoinSet` type has a
40/// lot more features than `TaskTracker`, so `TaskTracker` should only be used when one of its
41/// unique features is required:
42///
43///  1. When tasks exit, a `TaskTracker` will allow the task to immediately free its memory.
44///  2. By not closing the `TaskTracker`, [`wait`] will be prevented from returning even if
45///     the `TaskTracker` is empty.
46///  3. A `TaskTracker` does not require mutable access to insert tasks.
47///  4. A `TaskTracker` can be cloned to share it with many tasks.
48///
49/// The first point is the most important one. A [`JoinSet`] keeps track of the return value of
50/// every inserted task. This means that if the caller keeps inserting tasks and never calls
51/// [`join_next`], then their return values will keep building up and consuming memory, _even if_
52/// most of the tasks have already exited. This can cause the process to run out of memory. With a
53/// `TaskTracker`, this does not happen. Once tasks exit, they are immediately removed from the
54/// `TaskTracker`.
55///
56/// Note that unlike [`JoinSet`], dropping a `TaskTracker` does not abort the tasks.
57///
58/// # Examples
59///
60/// For more examples, please see the topic page on [graceful shutdown].
61///
62/// ## Spawn tasks and wait for them to exit
63///
64/// This is a simple example. For this case, [`JoinSet`] should probably be used instead.
65///
66/// ```
67/// use tokio_util::task::TaskTracker;
68///
69/// # #[tokio::main(flavor = "current_thread")]
70/// # async fn main() {
71/// let tracker = TaskTracker::new();
72///
73/// for i in 0..10 {
74///     tracker.spawn(async move {
75///         println!("Task {} is running!", i);
76///     });
77/// }
78/// // Once we spawned everything, we close the tracker.
79/// tracker.close();
80///
81/// // Wait for everything to finish.
82/// tracker.wait().await;
83///
84/// println!("This is printed after all of the tasks.");
85/// # }
86/// ```
87///
88/// ## Wait for tasks to exit
89///
90/// This example shows the intended use-case of `TaskTracker`. It is used together with
91/// [`CancellationToken`] to implement graceful shutdown.
92/// ```
93/// use tokio_util::sync::CancellationToken;
94/// use tokio_util::task::TaskTracker;
95/// use tokio_util::time::FutureExt;
96///
97/// use tokio::time::{self, Duration};
98///
99/// async fn background_task(num: u64) {
100///     for i in 0..10 {
101///         time::sleep(Duration::from_millis(100*num)).await;
102///         println!("Background task {} in iteration {}.", num, i);
103///     }
104/// }
105///
106/// #[tokio::main]
107/// # async fn _hidden() {}
108/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
109/// async fn main() {
110///     let tracker = TaskTracker::new();
111///     let token = CancellationToken::new();
112///
113///     for i in 0..10 {
114///         let token = token.clone();
115///         tracker.spawn(async move {
116///             // Use a `with_cancellation_token_owned` to kill the background task
117///             // if the token is cancelled.
118///             match background_task(i)
119///                 .with_cancellation_token_owned(token)
120///                 .await
121///             {
122///                 Some(()) => println!("Task {} exiting normally.", i),
123///                 None => {
124///                     // Do some cleanup before we really exit.
125///                     time::sleep(Duration::from_millis(50)).await;
126///                     println!("Task {} finished cleanup.", i);
127///                 }
128///             }
129///         });
130///     }
131///
132///     // Spawn a background task that will send the shutdown signal.
133///     {
134///         let tracker = tracker.clone();
135///         tokio::spawn(async move {
136///             // Normally you would use something like ctrl-c instead of
137///             // sleeping.
138///             time::sleep(Duration::from_secs(2)).await;
139///             tracker.close();
140///             token.cancel();
141///         });
142///     }
143///
144///     // Wait for all tasks to exit.
145///     tracker.wait().await;
146///
147///     println!("All tasks have exited now.");
148/// }
149/// ```
150///
151/// [`CancellationToken`]: crate::sync::CancellationToken
152/// [`JoinHandle::is_finished`]: tokio::task::JoinHandle::is_finished
153/// [`JoinSet`]: tokio::task::JoinSet
154/// [`close`]: Self::close
155/// [`join_next`]: tokio::task::JoinSet::join_next
156/// [`wait`]: Self::wait
157/// [graceful shutdown]: https://tokio.rs/tokio/topics/shutdown
158pub struct TaskTracker {
159    inner: Arc<TaskTrackerInner>,
160}
161
162/// Represents a task tracked by a [`TaskTracker`].
163#[must_use]
164#[derive(Debug)]
165pub struct TaskTrackerToken {
166    task_tracker: TaskTracker,
167}
168
169struct TaskTrackerInner {
170    /// Keeps track of the state.
171    ///
172    /// The lowest bit is whether the task tracker is closed.
173    ///
174    /// The rest of the bits count the number of tracked tasks.
175    state: AtomicUsize,
176    /// Used to notify when the last task exits.
177    on_last_exit: Notify,
178}
179
180pin_project! {
181    /// A future that is tracked as a task by a [`TaskTracker`].
182    ///
183    /// The associated [`TaskTracker`] cannot complete until this future is dropped.
184    ///
185    /// This future is returned by [`TaskTracker::track_future`].
186    #[must_use = "futures do nothing unless polled"]
187    pub struct TrackedFuture<F> {
188        #[pin]
189        future: F,
190        token: TaskTrackerToken,
191    }
192}
193
194pin_project! {
195    /// A future that completes when the [`TaskTracker`] is empty and closed.
196    ///
197    /// This future is returned by [`TaskTracker::wait`].
198    #[must_use = "futures do nothing unless polled"]
199    pub struct TaskTrackerWaitFuture<'a> {
200        #[pin]
201        future: Notified<'a>,
202        inner: Option<&'a TaskTrackerInner>,
203    }
204}
205
206impl TaskTrackerInner {
207    #[inline]
208    fn new() -> Self {
209        Self {
210            state: AtomicUsize::new(0),
211            on_last_exit: Notify::new(),
212        }
213    }
214
215    #[inline]
216    fn is_closed_and_empty(&self) -> bool {
217        // If empty and closed bit set, then we are done.
218        //
219        // The acquire load will synchronize with the release store of any previous call to
220        // `set_closed` and `drop_task`.
221        self.state.load(Ordering::Acquire) == 1
222    }
223
224    #[inline]
225    fn set_closed(&self) -> bool {
226        // The AcqRel ordering makes the closed bit behave like a `Mutex<bool>` for synchronization
227        // purposes. We do this because it makes the return value of `TaskTracker::{close,reopen}`
228        // more meaningful for the user. Without these orderings, this assert could fail:
229        // ```
230        // // thread 1
231        // some_other_atomic.store(true, Relaxed);
232        // tracker.close();
233        //
234        // // thread 2
235        // if tracker.reopen() {
236        //     assert!(some_other_atomic.load(Relaxed));
237        // }
238        // ```
239        // However, with the AcqRel ordering, we establish a happens-before relationship from the
240        // call to `close` and the later call to `reopen` that returned true.
241        let state = self.state.fetch_or(1, Ordering::AcqRel);
242
243        // If there are no tasks, and if it was not already closed:
244        if state == 0 {
245            self.notify_now();
246        }
247
248        (state & 1) == 0
249    }
250
251    #[inline]
252    fn set_open(&self) -> bool {
253        // See `set_closed` regarding the AcqRel ordering.
254        let state = self.state.fetch_and(!1, Ordering::AcqRel);
255        (state & 1) == 1
256    }
257
258    #[inline]
259    fn add_task(&self) {
260        self.state.fetch_add(2, Ordering::Relaxed);
261    }
262
263    #[inline]
264    fn drop_task(&self) {
265        let state = self.state.fetch_sub(2, Ordering::Release);
266
267        // If this was the last task and we are closed:
268        if state == 3 {
269            self.notify_now();
270        }
271    }
272
273    #[cold]
274    fn notify_now(&self) {
275        // Insert an acquire fence. This matters for `drop_task` but doesn't matter for
276        // `set_closed` since it already uses AcqRel.
277        //
278        // This synchronizes with the release store of any other call to `drop_task`, and with the
279        // release store in the call to `set_closed`. That ensures that everything that happened
280        // before those other calls to `drop_task` or `set_closed` will be visible after this load,
281        // and those things will also be visible to anything woken by the call to `notify_waiters`.
282        self.state.load(Ordering::Acquire);
283
284        self.on_last_exit.notify_waiters();
285    }
286}
287
288impl TaskTracker {
289    /// Creates a new `TaskTracker`.
290    ///
291    /// The `TaskTracker` will start out as open.
292    #[must_use]
293    pub fn new() -> Self {
294        Self {
295            inner: Arc::new(TaskTrackerInner::new()),
296        }
297    }
298
299    /// Waits until this `TaskTracker` is both closed and empty.
300    ///
301    /// If the `TaskTracker` is already closed and empty when this method is called, then it
302    /// returns immediately.
303    ///
304    /// The `wait` future is resistant against [ABA problems][aba]. That is, if the `TaskTracker`
305    /// becomes both closed and empty for a short amount of time, then it is guarantee that all
306    /// `wait` futures that were created before the short time interval will trigger, even if they
307    /// are not polled during that short time interval.
308    ///
309    /// # Cancel safety
310    ///
311    /// This method is cancel safe.
312    ///
313    /// However, the resistance against [ABA problems][aba] is lost when using `wait` as the
314    /// condition in a `tokio::select!` loop.
315    ///
316    /// [aba]: https://en.wikipedia.org/wiki/ABA_problem
317    #[inline]
318    pub fn wait(&self) -> TaskTrackerWaitFuture<'_> {
319        TaskTrackerWaitFuture {
320            future: self.inner.on_last_exit.notified(),
321            inner: if self.inner.is_closed_and_empty() {
322                None
323            } else {
324                Some(&self.inner)
325            },
326        }
327    }
328
329    /// Close this `TaskTracker`.
330    ///
331    /// This allows [`wait`] futures to complete. It does not prevent you from spawning new tasks.
332    ///
333    /// Returns `true` if this closed the `TaskTracker`, or `false` if it was already closed.
334    ///
335    /// [`wait`]: Self::wait
336    #[inline]
337    pub fn close(&self) -> bool {
338        self.inner.set_closed()
339    }
340
341    /// Reopen this `TaskTracker`.
342    ///
343    /// This prevents [`wait`] futures from completing even if the `TaskTracker` is empty.
344    ///
345    /// Returns `true` if this reopened the `TaskTracker`, or `false` if it was already open.
346    ///
347    /// [`wait`]: Self::wait
348    #[inline]
349    pub fn reopen(&self) -> bool {
350        self.inner.set_open()
351    }
352
353    /// Returns `true` if this `TaskTracker` is [closed](Self::close).
354    #[inline]
355    #[must_use]
356    pub fn is_closed(&self) -> bool {
357        (self.inner.state.load(Ordering::Acquire) & 1) != 0
358    }
359
360    /// Returns the number of tasks tracked by this `TaskTracker`.
361    #[inline]
362    #[must_use]
363    pub fn len(&self) -> usize {
364        self.inner.state.load(Ordering::Acquire) >> 1
365    }
366
367    /// Returns `true` if there are no tasks in this `TaskTracker`.
368    #[inline]
369    #[must_use]
370    pub fn is_empty(&self) -> bool {
371        self.inner.state.load(Ordering::Acquire) <= 1
372    }
373
374    /// Spawn the provided future on the current Tokio runtime, and track it in this `TaskTracker`.
375    ///
376    /// This is equivalent to `tokio::spawn(tracker.track_future(task))`.
377    #[inline]
378    #[track_caller]
379    #[cfg(feature = "rt")]
380    #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
381    pub fn spawn<F>(&self, task: F) -> JoinHandle<F::Output>
382    where
383        F: Future + Send + 'static,
384        F::Output: Send + 'static,
385    {
386        tokio::task::spawn(self.track_future(task))
387    }
388
389    /// Spawn the provided future on the provided Tokio runtime, and track it in this `TaskTracker`.
390    ///
391    /// This is equivalent to `handle.spawn(tracker.track_future(task))`.
392    #[inline]
393    #[track_caller]
394    #[cfg(feature = "rt")]
395    #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
396    pub fn spawn_on<F>(&self, task: F, handle: &Handle) -> JoinHandle<F::Output>
397    where
398        F: Future + Send + 'static,
399        F::Output: Send + 'static,
400    {
401        handle.spawn(self.track_future(task))
402    }
403
404    /// Spawn the provided future on the current [`LocalSet`] or [`LocalRuntime`]
405    /// and track it in this `TaskTracker`.
406    ///
407    /// This is equivalent to `tokio::task::spawn_local(tracker.track_future(task))`.
408    ///
409    /// # Panics
410    ///
411    /// This method panics if it is called outside of a `LocalSet` or `LocalRuntime`.
412    ///
413    /// [`LocalSet`]: tokio::task::LocalSet
414    /// [`LocalRuntime`]: tokio::runtime::LocalRuntime
415    #[inline]
416    #[track_caller]
417    #[cfg(feature = "rt")]
418    #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
419    pub fn spawn_local<F>(&self, task: F) -> JoinHandle<F::Output>
420    where
421        F: Future + 'static,
422        F::Output: 'static,
423    {
424        tokio::task::spawn_local(self.track_future(task))
425    }
426
427    /// Spawn the provided future on the provided [`LocalSet`], and track it in this `TaskTracker`.
428    ///
429    /// This is equivalent to `local_set.spawn_local(tracker.track_future(task))`.
430    ///
431    /// [`LocalSet`]: tokio::task::LocalSet
432    #[inline]
433    #[track_caller]
434    #[cfg(feature = "rt")]
435    #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
436    pub fn spawn_local_on<F>(&self, task: F, local_set: &LocalSet) -> JoinHandle<F::Output>
437    where
438        F: Future + 'static,
439        F::Output: 'static,
440    {
441        local_set.spawn_local(self.track_future(task))
442    }
443
444    /// Spawn the provided blocking task on the current Tokio runtime, and track it in this `TaskTracker`.
445    ///
446    /// This is equivalent to `tokio::task::spawn_blocking(tracker.track_future(task))`.
447    #[inline]
448    #[track_caller]
449    #[cfg(feature = "rt")]
450    #[cfg(not(target_family = "wasm"))]
451    #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
452    pub fn spawn_blocking<F, T>(&self, task: F) -> JoinHandle<T>
453    where
454        F: FnOnce() -> T,
455        F: Send + 'static,
456        T: Send + 'static,
457    {
458        let token = self.token();
459        tokio::task::spawn_blocking(move || {
460            let res = task();
461            drop(token);
462            res
463        })
464    }
465
466    /// Spawn the provided blocking task on the provided Tokio runtime, and track it in this `TaskTracker`.
467    ///
468    /// This is equivalent to `handle.spawn_blocking(tracker.track_future(task))`.
469    #[inline]
470    #[track_caller]
471    #[cfg(feature = "rt")]
472    #[cfg(not(target_family = "wasm"))]
473    #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
474    pub fn spawn_blocking_on<F, T>(&self, task: F, handle: &Handle) -> JoinHandle<T>
475    where
476        F: FnOnce() -> T,
477        F: Send + 'static,
478        T: Send + 'static,
479    {
480        let token = self.token();
481        handle.spawn_blocking(move || {
482            let res = task();
483            drop(token);
484            res
485        })
486    }
487
488    /// Track the provided future.
489    ///
490    /// The returned [`TrackedFuture`] will count as a task tracked by this collection, and will
491    /// prevent calls to [`wait`] from returning until the task is dropped.
492    ///
493    /// The task is removed from the collection when it is dropped, not when [`poll`] returns
494    /// [`Poll::Ready`].
495    ///
496    /// # Examples
497    ///
498    /// Track a future spawned with [`tokio::spawn`].
499    ///
500    /// ```
501    /// # async fn my_async_fn() {}
502    /// use tokio_util::task::TaskTracker;
503    ///
504    /// # #[tokio::main(flavor = "current_thread")]
505    /// # async fn main() {
506    /// let tracker = TaskTracker::new();
507    ///
508    /// tokio::spawn(tracker.track_future(my_async_fn()));
509    /// # }
510    /// ```
511    ///
512    /// Track a future spawned on a [`JoinSet`].
513    /// ```
514    /// # async fn my_async_fn() {}
515    /// use tokio::task::JoinSet;
516    /// use tokio_util::task::TaskTracker;
517    ///
518    /// # #[tokio::main(flavor = "current_thread")]
519    /// # async fn main() {
520    /// let tracker = TaskTracker::new();
521    /// let mut join_set = JoinSet::new();
522    ///
523    /// join_set.spawn(tracker.track_future(my_async_fn()));
524    /// # }
525    /// ```
526    ///
527    /// [`JoinSet`]: tokio::task::JoinSet
528    /// [`Poll::Pending`]: std::task::Poll::Pending
529    /// [`poll`]: std::future::Future::poll
530    /// [`wait`]: Self::wait
531    #[inline]
532    pub fn track_future<F: Future>(&self, future: F) -> TrackedFuture<F> {
533        TrackedFuture {
534            future,
535            token: self.token(),
536        }
537    }
538
539    /// Creates a [`TaskTrackerToken`] representing a task tracked by this `TaskTracker`.
540    ///
541    /// This token is a lower-level utility than the spawn methods. Each token is considered to
542    /// correspond to a task. As long as the token exists, the `TaskTracker` cannot complete.
543    /// Furthermore, the count returned by the [`len`] method will include the tokens in the count.
544    ///
545    /// Dropping the token indicates to the `TaskTracker` that the task has exited.
546    ///
547    /// [`len`]: TaskTracker::len
548    #[inline]
549    pub fn token(&self) -> TaskTrackerToken {
550        self.inner.add_task();
551        TaskTrackerToken {
552            task_tracker: self.clone(),
553        }
554    }
555
556    /// Returns `true` if both task trackers correspond to the same set of tasks.
557    ///
558    /// # Examples
559    ///
560    /// ```
561    /// use tokio_util::task::TaskTracker;
562    ///
563    /// let tracker_1 = TaskTracker::new();
564    /// let tracker_2 = TaskTracker::new();
565    /// let tracker_1_clone = tracker_1.clone();
566    ///
567    /// assert!(TaskTracker::ptr_eq(&tracker_1, &tracker_1_clone));
568    /// assert!(!TaskTracker::ptr_eq(&tracker_1, &tracker_2));
569    /// ```
570    #[inline]
571    #[must_use]
572    pub fn ptr_eq(left: &TaskTracker, right: &TaskTracker) -> bool {
573        Arc::ptr_eq(&left.inner, &right.inner)
574    }
575}
576
577impl Default for TaskTracker {
578    /// Creates a new `TaskTracker`.
579    ///
580    /// The `TaskTracker` will start out as open.
581    #[inline]
582    fn default() -> TaskTracker {
583        TaskTracker::new()
584    }
585}
586
587impl Clone for TaskTracker {
588    /// Returns a new `TaskTracker` that tracks the same set of tasks.
589    ///
590    /// Since the new `TaskTracker` shares the same set of tasks, changes to one set are visible in
591    /// all other clones.
592    ///
593    /// # Examples
594    ///
595    /// ```
596    /// use tokio_util::task::TaskTracker;
597    ///
598    /// #[tokio::main]
599    /// # async fn _hidden() {}
600    /// # #[tokio::main(flavor = "current_thread")]
601    /// async fn main() {
602    ///     let tracker = TaskTracker::new();
603    ///     let cloned = tracker.clone();
604    ///
605    ///     // Spawns on `tracker` are visible in `cloned`.
606    ///     tracker.spawn(std::future::pending::<()>());
607    ///     assert_eq!(cloned.len(), 1);
608    ///
609    ///     // Spawns on `cloned` are visible in `tracker`.
610    ///     cloned.spawn(std::future::pending::<()>());
611    ///     assert_eq!(tracker.len(), 2);
612    ///
613    ///     // Calling `close` is visible to `cloned`.
614    ///     tracker.close();
615    ///     assert!(cloned.is_closed());
616    ///
617    ///     // Calling `reopen` is visible to `tracker`.
618    ///     cloned.reopen();
619    ///     assert!(!tracker.is_closed());
620    /// }
621    /// ```
622    #[inline]
623    fn clone(&self) -> TaskTracker {
624        Self {
625            inner: self.inner.clone(),
626        }
627    }
628}
629
630fn debug_inner(inner: &TaskTrackerInner, f: &mut fmt::Formatter<'_>) -> fmt::Result {
631    let state = inner.state.load(Ordering::Acquire);
632    let is_closed = (state & 1) != 0;
633    let len = state >> 1;
634
635    f.debug_struct("TaskTracker")
636        .field("len", &len)
637        .field("is_closed", &is_closed)
638        .field("inner", &(inner as *const TaskTrackerInner))
639        .finish()
640}
641
642impl fmt::Debug for TaskTracker {
643    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
644        debug_inner(&self.inner, f)
645    }
646}
647
648impl TaskTrackerToken {
649    /// Returns the [`TaskTracker`] that this token is associated with.
650    #[inline]
651    #[must_use]
652    pub fn task_tracker(&self) -> &TaskTracker {
653        &self.task_tracker
654    }
655}
656
657impl Clone for TaskTrackerToken {
658    /// Returns a new `TaskTrackerToken` associated with the same [`TaskTracker`].
659    ///
660    /// This is equivalent to `token.task_tracker().token()`.
661    #[inline]
662    fn clone(&self) -> TaskTrackerToken {
663        self.task_tracker.token()
664    }
665}
666
667impl Drop for TaskTrackerToken {
668    /// Dropping the token indicates to the [`TaskTracker`] that the task has exited.
669    #[inline]
670    fn drop(&mut self) {
671        self.task_tracker.inner.drop_task();
672    }
673}
674
675impl<F: Future> Future for TrackedFuture<F> {
676    type Output = F::Output;
677
678    #[inline]
679    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
680        self.project().future.poll(cx)
681    }
682}
683
684impl<F: fmt::Debug> fmt::Debug for TrackedFuture<F> {
685    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
686        f.debug_struct("TrackedFuture")
687            .field("future", &self.future)
688            .field("task_tracker", self.token.task_tracker())
689            .finish()
690    }
691}
692
693impl<'a> Future for TaskTrackerWaitFuture<'a> {
694    type Output = ();
695
696    #[inline]
697    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
698        let me = self.project();
699
700        let inner = match me.inner.as_ref() {
701            None => return Poll::Ready(()),
702            Some(inner) => inner,
703        };
704
705        let ready = inner.is_closed_and_empty() || me.future.poll(cx).is_ready();
706        if ready {
707            *me.inner = None;
708            Poll::Ready(())
709        } else {
710            Poll::Pending
711        }
712    }
713}
714
715impl<'a> fmt::Debug for TaskTrackerWaitFuture<'a> {
716    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
717        struct Helper<'a>(&'a TaskTrackerInner);
718
719        impl fmt::Debug for Helper<'_> {
720            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
721                debug_inner(self.0, f)
722            }
723        }
724
725        f.debug_struct("TaskTrackerWaitFuture")
726            .field("future", &self.future)
727            .field("task_tracker", &self.inner.map(Helper))
728            .finish()
729    }
730}