yield_progress/
lib.rs

1//! This library provides the `YieldProgress` type, which allows a long-running async task
2//! to report its progress, while also yielding to the scheduler (e.g. for the
3//! single-threaded web/Wasm environment) and introducing cancellation points.
4//!
5//! These things go together because the rate at which it makes sense to yield (to avoid
6//! event loop hangs) is similar to the rate at which it makes sense to report progress.
7//!
8//! `YieldProgress` is executor-independent; when it is constructed, the caller provides a
9//! function for yielding.
10//!
11//! # Crate feature flags
12//!
13//! * `sync` (default): Adds `YieldProgress::split_evenly_concurrent()`.
14//!
15//!   Requires `std` to be available for the compilation target.
16//!
17//! * `log_hiccups`: Log intervals between yields longer than 100 ms, via the [`log`] library.
18//!
19//!   Requires `std` to be available for the compilation target.
20//!   This might be removed in favor of something more configurable in future versions,
21//!   in which case the feature flag may still exist but do nothing.
22//!
23//! [`log`]: https://docs.rs/log/0.4/
24
25#![no_std]
26#![deny(elided_lifetimes_in_paths)]
27#![forbid(unsafe_code)]
28#![warn(clippy::cast_lossless)]
29#![warn(clippy::exhaustive_enums)]
30#![warn(clippy::exhaustive_structs)]
31#![warn(clippy::missing_panics_doc)]
32#![warn(clippy::return_self_not_must_use)]
33#![warn(clippy::wrong_self_convention)]
34#![warn(missing_docs)]
35#![warn(unused_lifetimes)]
36#![warn(unused_qualifications)]
37
38extern crate alloc;
39
40#[cfg(any(feature = "sync", feature = "log_hiccups"))]
41extern crate std;
42
43use core::fmt;
44use core::future::Future;
45use core::iter::FusedIterator;
46use core::panic::Location;
47use core::pin::Pin;
48
49use alloc::boxed::Box;
50use alloc::string::ToString as _;
51use alloc::sync::Arc;
52
53#[cfg(doc)]
54use core::task::Poll;
55
56#[cfg(feature = "log_hiccups")]
57use web_time::Instant;
58
59mod basic_yield;
60pub use basic_yield::basic_yield_now;
61
62mod builder;
63pub use builder::Builder;
64
65#[cfg(feature = "sync")]
66mod concurrent;
67
68mod info;
69pub use info::{ProgressInfo, YieldInfo};
70
71/// We could import this alias from `futures-core` but that would be another non-dev dependency.
72type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
73
74type ProgressFn = dyn for<'a> Fn(&'a ProgressInfo<'a>) + Send + Sync + 'static;
75
76type YieldFn = dyn for<'a> Fn(&'a YieldInfo<'a>) -> BoxFuture<'static, ()> + Send + Sync;
77
78type Label = Arc<str>;
79
80/// Allows a long-running async task to report its progress, while also yielding to the
81/// scheduler (e.g. for single-threaded web environment) and introducing cancellation
82/// points.
83///
84/// These things go together because the rate at which it makes sense to yield (to avoid event
85/// loop hangs) is similar to the rate at which it makes sense to report progress.
86///
87/// ---
88///
89/// To construct a [`YieldProgress`], use the [`Builder`], or [`noop()`](YieldProgress::noop).
90pub struct YieldProgress {
91    start: f32,
92    end: f32,
93
94    /// Name given to this specific portion of work. Inherited from the parent if not
95    /// overridden.
96    ///
97    /// TODO: Eventually we will want to have things like "label this segment as a
98    /// fallback if it has no better label", which will require some notion of distinguishing
99    /// inheritance from having been explicitly set.
100    label: Option<Label>,
101
102    yielding: BoxYielding,
103    // TODO: change progress reporting interface to support efficient handling of
104    // the label string being the same as last time.
105    progressor: Arc<ProgressFn>,
106}
107
108/// Piggyback on the `Arc` we need to store the `dyn Fn` anyway to also store some state.
109struct Yielding<F: ?Sized> {
110    #[cfg(feature = "log_hiccups")]
111    state: std::sync::Mutex<YieldState>,
112
113    yielder: F,
114}
115
116type BoxYielding = Arc<Yielding<YieldFn>>;
117
118/// Information about the last yield performed.
119/// Compared with the current state when the `log_hiccups` feature is enabled.
120#[cfg(feature = "log_hiccups")]
121#[derive(Clone)]
122struct YieldState {
123    /// The most recent instant at which `yielder`'s future completed.
124    /// Used to detect overlong time periods between yields.
125    last_finished_yielding: Instant,
126
127    last_yield_location: &'static Location<'static>,
128
129    last_yield_label: Option<Label>,
130}
131
132impl fmt::Debug for YieldProgress {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        f.debug_struct("YieldProgress")
135            .field("start", &self.start)
136            .field("end", &self.end)
137            .field("label", &self.label)
138            .finish_non_exhaustive()
139    }
140}
141
142impl YieldProgress {
143    /// Returns a [`YieldProgress`] that does no progress reporting **and no yielding at all**.
144    ///
145    /// This may be used, for example, to call a function that accepts [`YieldProgress`] and
146    /// is not `async` for any other reason.
147    /// It should not be used merely because no progress reporting is desired; in that case
148    /// use [`Builder`] instead so that a yield function can be provided.
149    ///
150    /// # Example
151    ///
152    /// ```
153    /// # #[tokio::main(flavor = "current_thread")] async fn main() {
154    /// # use yield_progress::YieldProgress;
155    /// let mut progress = YieldProgress::noop();
156    /// // These calls will have no effect.
157    /// progress.set_label("a tree falls in a forest");
158    /// progress.progress(0.12345).await;
159    /// # }
160    /// ```
161    pub fn noop() -> Self {
162        Builder::new()
163            .yield_using(|_| core::future::ready(()))
164            .build()
165    }
166
167    /// Add a name for the portion of work this [`YieldProgress`] covers, which will be
168    /// used by all future progress updates.
169    ///
170    /// If there is already a label, it will be overwritten.
171    ///
172    /// This does not immediately report progress; that is, the label will not be visible
173    /// anywhere until the next operation that does. Future versions may report it immediately.
174    ///
175    /// # Example
176    ///
177    /// ```
178    /// # #[tokio::main(flavor = "current_thread")] async fn main() {
179    /// # use yield_progress::YieldProgress;
180    /// async fn process_things(progress: YieldProgress, things: &[String]) {
181    ///     let len = things.len();
182    ///     for ((mut progress, thing), i) in progress.split_evenly(len).zip(things).zip(1..) {
183    ///         progress.set_label(format_args!("Processing {i}/{len}: {thing}"));
184    ///         progress.progress(0.0).await;
185    ///         // ... Do actual work here ...
186    ///         progress.finish().await;
187    ///     }
188    /// }
189    /// # let expected_label = &*Box::leak(Box::new(std::sync::OnceLock::<String>::new()));
190    /// # process_things(
191    /// #     yield_progress::Builder::new()
192    /// #         .progress_using(move |info| {
193    /// #             if !info.label_str().is_empty() {
194    /// #                 expected_label.set(info.label_str().to_owned());
195    /// #             }
196    /// #         })
197    /// #         .build(),
198    /// #     &[String::from("hello world")],
199    /// # ).await;
200    /// # assert_eq!(
201    /// #     expected_label.get().map(|s| &**s),
202    /// #     Some::<&str>("Processing 1/1: hello world"),
203    /// # );
204    /// # }
205    /// ```
206    pub fn set_label(&mut self, label: impl fmt::Display) {
207        self.set_label_internal(Some(Arc::from(label.to_string())))
208    }
209
210    fn set_label_internal(&mut self, label: Option<Label>) {
211        self.label = label;
212    }
213
214    /// Map a `0..=1` value to `self.start..=self.end`.
215    #[track_caller]
216    fn point_in_range(&self, mut x: f32) -> f32 {
217        x = x.clamp(0.0, 1.0);
218        if !x.is_finite() {
219            if cfg!(debug_assertions) {
220                panic!("NaN progress value");
221            } else {
222                x = 0.5;
223            }
224        }
225        self.start + (x * (self.end - self.start))
226    }
227
228    /// Report the current amount of progress (a number from 0 to 1) and yield.
229    ///
230    /// The value *may* be less than previously given values.
231    ///
232    /// # Example
233    ///
234    /// ```
235    /// # #[tokio::main(flavor = "current_thread")] async fn main() {
236    /// # use yield_progress::YieldProgress;
237    /// # pub fn first_half_of_work() {}
238    /// # pub fn second_half_of_work() {}
239    /// async fn do_work(progress: YieldProgress) {
240    ///     first_half_of_work();
241    ///     progress.progress(0.5).await;
242    ///     second_half_of_work();
243    ///     progress.finish().await;
244    /// }
245    /// # do_work(YieldProgress::noop()).await;
246    /// # }
247    /// ```
248    #[track_caller] // This is not an `async fn` because `track_caller` is not compatible
249    pub fn progress(&self, progress_fraction: f32) -> impl Future<Output = ()> + use<> {
250        let location = Location::caller();
251
252        self.progress_without_yield(progress_fraction);
253
254        self.yielding.clone().yield_only(
255            location,
256            #[cfg(feature = "log_hiccups")]
257            self.label.clone(),
258        )
259    }
260
261    /// Report the current amount of progress (a number from 0 to 1) without yielding.
262    ///
263    /// Caution: Not yielding may mean that the display of progress to the user does not
264    /// update. This should be used only when necessary for non-async code.
265    #[track_caller]
266    pub fn progress_without_yield(&self, progress_fraction: f32) {
267        let location = Location::caller();
268        self.send_progress(progress_fraction, self.label.as_ref(), location);
269    }
270
271    /// Yield only; that is, call the yield function contained within this [`YieldProgress`].
272    #[track_caller] // This is not an `async fn` because `track_caller` is not compatible
273    pub fn yield_without_progress(&self) -> impl Future<Output = ()> + Send + use<> {
274        let location = Location::caller();
275
276        self.yielding.clone().yield_only(
277            location,
278            #[cfg(feature = "log_hiccups")]
279            self.label.clone(),
280        )
281    }
282
283    /// Assemble a [`ProgressInfo`] using self's range and send it to the progress function.
284    /// This differs from `progress_without_yield()` by taking an explicit label and location;
285    /// only the range and destination from `self` is used.
286    fn send_progress(
287        &self,
288        progress_fraction: f32,
289        label: Option<&Label>,
290        location: &Location<'_>,
291    ) {
292        (self.progressor)(&ProgressInfo {
293            fraction: self.point_in_range(progress_fraction),
294            label,
295            location,
296        });
297    }
298
299    /// Report that 100% of progress has been made.
300    ///
301    /// This is identical to `.progress(1.0)` but consumes the `YieldProgress` object.
302    #[track_caller] // This is not an `async fn` because `track_caller` is not compatible
303    pub fn finish(self) -> impl Future<Output = ()> + Send + use<> {
304        self.progress(1.0)
305    }
306
307    /// Report that the given amount of progress has been made, then return
308    /// a [`YieldProgress`] covering the remaining range.
309    ///
310    /// # Example
311    ///
312    /// ```
313    /// # #[tokio::main(flavor = "current_thread")] async fn main() {
314    /// # use yield_progress::YieldProgress;
315    /// # pub fn first_half_of_work() {}
316    /// # pub async fn second_half_of_work(progress: YieldProgress) {
317    /// #     progress.finish().await;
318    /// # }
319    /// async fn do_work(progress: YieldProgress) {
320    ///     first_half_of_work();
321    ///     second_half_of_work(progress.finish_and_cut(0.5).await).await;
322    /// }
323    /// # do_work(YieldProgress::noop()).await;
324    /// # }
325    /// ```
326    #[track_caller] // This is not an `async fn` because `track_caller` is not compatible
327    pub fn finish_and_cut(
328        self,
329        progress_fraction: f32,
330    ) -> impl Future<Output = Self> + Send + use<> {
331        // Efficiency note: this is structured so that `a` can be dropped immediately
332        // and does not live on in the future.
333        let [a, b] = self.split(progress_fraction);
334        a.progress_without_yield(1.0);
335        async move {
336            b.yield_without_progress().await;
337            b
338        }
339    }
340
341    /// Report the _beginning_ of a unit of work of size `progress_fraction` and described
342    /// by `label`. That fraction is cut off of the beginning range of `self`, and returned
343    /// as a separate [`YieldProgress`].
344    ///
345    /// # Example
346    ///
347    /// ```
348    /// # #[tokio::main(flavor = "current_thread")] async fn main() {
349    /// # use yield_progress::YieldProgress;
350    /// # pub async fn first_half_of_work(progress: YieldProgress) {
351    /// #     progress.finish().await;
352    /// # }
353    /// # pub fn second_half_of_work() {}
354    /// async fn do_work(mut progress: YieldProgress) {
355    ///     first_half_of_work(progress.start_and_cut(0.5, "first half").await).await;
356    ///     second_half_of_work();
357    ///     progress.finish().await;
358    /// }
359    /// # do_work(YieldProgress::noop()).await;
360    /// # }
361    /// ```
362    #[track_caller]
363    pub fn start_and_cut(
364        &mut self,
365        cut: f32,
366        label: impl fmt::Display,
367    ) -> impl Future<Output = Self> + Send + 'static {
368        // Note: The `+ 'static` bound is the only currently available way to express that the
369        // returned future does not capture `label`.
370
371        let cut_abs = self.point_in_range(cut);
372        let mut portion = self.with_new_range(self.start, cut_abs);
373        self.start = cut_abs;
374
375        portion.set_label(label);
376        async {
377            portion.progress(0.0).await;
378            portion
379        }
380    }
381
382    fn with_new_range(&self, start: f32, end: f32) -> Self {
383        Self {
384            start,
385            end,
386            label: self.label.clone(),
387            yielding: Arc::clone(&self.yielding),
388            progressor: Arc::clone(&self.progressor),
389        }
390    }
391
392    /// Construct two new [`YieldProgress`] which divide the progress value into two
393    /// subranges.
394    ///
395    /// The returned instances should be used in sequence, but this is not enforced.
396    /// Using them concurrently will result in the progress bar jumping backwards.
397    ///
398    /// # Example
399    ///
400    /// ```
401    /// # #[tokio::main(flavor = "current_thread")] async fn main() {
402    /// # use yield_progress::YieldProgress;
403    /// # pub async fn first_half_of_work(progress: YieldProgress) {
404    /// #     progress.finish().await;
405    /// # }
406    /// # pub async fn second_half_of_work(progress: YieldProgress) {
407    /// #     progress.finish().await;
408    /// # }
409    /// async fn do_work(mut progress: YieldProgress) {
410    ///     let [p1, p2] = progress.split(0.5);
411    ///     first_half_of_work(p1).await;
412    ///     second_half_of_work(p2).await;
413    /// }
414    /// # do_work(YieldProgress::noop()).await;
415    /// # }
416    /// ```
417    pub fn split(self, cut: f32) -> [Self; 2] {
418        let cut_abs = self.point_in_range(cut);
419        [
420            self.with_new_range(self.start, cut_abs),
421            self.with_new_range(cut_abs, self.end),
422        ]
423    }
424
425    /// Construct many new [`YieldProgress`] which together divide the progress value into
426    /// `count` subranges.
427    ///
428    /// The returned instances should be used in sequence, but this is not enforced.
429    /// Using them concurrently will result in the progress bar jumping backwards.
430    ///
431    /// # Example
432    ///
433    /// ```
434    /// # #[tokio::main(flavor = "current_thread")] async fn main() {
435    /// # use yield_progress::YieldProgress;
436    /// # struct Thing;
437    /// # fn process_one_thing(thing: Thing) {}
438    /// async fn process_things(progress: YieldProgress, things: Vec<Thing>) {
439    ///     for (mut progress, thing) in progress.split_evenly(things.len()).zip(things) {
440    ///         process_one_thing(thing);
441    ///         progress.finish().await;
442    ///     }
443    /// }
444    /// # process_things(YieldProgress::noop(), vec![Thing]).await;
445    /// # }
446    /// ```
447    pub fn split_evenly(
448        self,
449        count: usize,
450    ) -> impl DoubleEndedIterator<Item = YieldProgress> + ExactSizeIterator + FusedIterator + use<>
451    {
452        (0..count).map(move |index| {
453            self.with_new_range(
454                self.point_in_range(index as f32 / count as f32),
455                self.point_in_range((index as f32 + 1.0) / count as f32),
456            )
457        })
458    }
459
460    /// Construct many new [`YieldProgress`] which will collectively advance `self` to completion
461    /// when they have all been advanced to completion, and which may be used concurrently.
462    ///
463    /// This is identical in effect to [`YieldProgress::split_evenly()`], except that it comprehends
464    /// concurrent operations — the progress of `self` is the sum of the progress of the subtasks.
465    /// To support this, it must allocate storage for the state tracking and synchronization, and
466    /// every progress update must calculate the sum from all subtasks. Therefore, for efficiency,
467    /// do not use this except when concurrency is actually present.
468    ///
469    /// The label passed through will be the label from the first subtask that has a progress
470    /// value less than 1.0. This choice may be changed in the future if the label system is
471    /// elaborated.
472    ///
473    /// # Example
474    ///
475    /// ```
476    /// # #[tokio::main(flavor = "current_thread")] async fn main() {
477    /// use yield_progress::YieldProgress;
478    /// use tokio::task::JoinSet;
479    /// # struct Thing;
480    /// # async fn process_one_thing(progress: YieldProgress, thing: Thing) {
481    /// #     progress.finish().await;
482    /// # }
483    ///
484    /// async fn process_things(progress: YieldProgress, things: Vec<Thing>) {
485    ///     let mut join_set = tokio::task::JoinSet::new();
486    ///     for (mut progress, thing) in progress.split_evenly_concurrent(things.len()).zip(things) {
487    ///         join_set.spawn(process_one_thing(progress, thing));
488    ///     }
489    ///     join_set.join_all().await;
490    /// }
491    /// # process_things(YieldProgress::noop(), vec![Thing]).await;
492    /// # }
493    /// ```
494    #[cfg(feature = "sync")]
495    pub fn split_evenly_concurrent(
496        self,
497        count: usize,
498    ) -> impl DoubleEndedIterator<Item = YieldProgress> + ExactSizeIterator + FusedIterator + use<>
499    {
500        let yielding = self.yielding.clone();
501        let conc = concurrent::ConcurrentProgress::new(self, count);
502        (0..count).map(move |index| {
503            Builder::new()
504                .yielding_internal(yielding.clone())
505                .progress_using(Arc::clone(&conc).progressor(index))
506                .build()
507        })
508    }
509}
510
511impl<F> Yielding<F>
512where
513    F: ?Sized + for<'a> Fn(&'a YieldInfo<'a>) -> BoxFuture<'static, ()> + Send + Sync,
514{
515    #[allow(clippy::manual_async_fn)] // false positive from cfg
516    fn yield_only(
517        self: Arc<Self>,
518        location: &'static Location<'static>,
519        #[cfg(feature = "log_hiccups")] mut label: Option<Label>,
520    ) -> impl Future<Output = ()> + use<F> {
521        #[cfg(feature = "log_hiccups")]
522        {
523            #[allow(unused)] // may be redundant depending on other features
524            use alloc::format;
525            use core::time::Duration;
526
527            // Note that we avoid holding the lock while calling yielder().
528            // The worst outcome of an inconsistency is that we will output a meaningless
529            // "between {location} and {location}" message, but none should occur because
530            // [`YieldProgress`] is intended to be used in a sequential manner.
531            let previous_state: YieldState = { self.state.lock().unwrap().clone() };
532
533            let delta = Instant::now().duration_since(previous_state.last_finished_yielding);
534            if delta > Duration::from_millis(100) {
535                let last_label = previous_state.last_yield_label;
536                log::trace!(
537                    "Yielding after {delta} ms between {old_location} and {new_location} {rel}",
538                    delta = delta.as_millis(),
539                    old_location = previous_state.last_yield_location,
540                    new_location = location,
541                    rel = if label == last_label {
542                        format!("during {label:?}")
543                    } else {
544                        format!("between {last_label:?} and {label:?}")
545                    }
546                );
547            }
548        }
549
550        // TODO: Since we're tracking time, we might as well decide whether to not bother
551        // yielding if it has been a short time ... except that different yielders might
552        // want different granularities/policies.
553
554        // Efficiency: This explicit `async` block somehow improves the future data size,
555        // compared to `async fn`, by not allocating both a local and a capture for all of
556        // `self`, `location`, and `label`. Seems odd that this helps...
557        async move {
558            let yield_future = {
559                // Efficiency: This block avoids holding the temp `YieldInfo` across the await.
560                (self.yielder)(&YieldInfo { location })
561            };
562            yield_future.await;
563
564            #[cfg(feature = "log_hiccups")]
565            {
566                let mut state = self.state.lock().unwrap();
567
568                state.last_yield_location = location;
569                // Efficiency: this `Option::take()` avoids generating a drop flag.
570                state.last_yield_label = label.take();
571
572                state.last_finished_yielding = Instant::now();
573            }
574        }
575    }
576}