yield_progress/lib.rs
1//! This library provides the `YieldProgress` type, which allows a long-running async task
2//! to report its progress, while also yielding to the scheduler (e.g. for the
3//! single-threaded web/Wasm environment) and introducing cancellation points.
4//!
5//! These things go together because the rate at which it makes sense to yield (to avoid
6//! event loop hangs) is similar to the rate at which it makes sense to report progress.
7//!
8//! `YieldProgress` is executor-independent; when it is constructed, the caller provides a
9//! function for yielding.
10//!
11//! # Crate feature flags
12//!
13//! * `sync` (default): Adds `YieldProgress::split_evenly_concurrent()`.
14//!
15//! Requires `std` to be available for the compilation target.
16//!
17//! * `log_hiccups`: Log intervals between yields longer than 100 ms, via the [`log`] library.
18//!
19//! Requires `std` to be available for the compilation target.
20//! This might be removed in favor of something more configurable in future versions,
21//! in which case the feature flag may still exist but do nothing.
22//!
23//! [`log`]: https://docs.rs/log/0.4/
24
25#![no_std]
26#![deny(elided_lifetimes_in_paths)]
27#![forbid(unsafe_code)]
28#![warn(clippy::cast_lossless)]
29#![warn(clippy::exhaustive_enums)]
30#![warn(clippy::exhaustive_structs)]
31#![warn(clippy::missing_panics_doc)]
32#![warn(clippy::return_self_not_must_use)]
33#![warn(clippy::wrong_self_convention)]
34#![warn(missing_docs)]
35#![warn(unused_lifetimes)]
36#![warn(unused_qualifications)]
37
38extern crate alloc;
39
40#[cfg(any(feature = "sync", feature = "log_hiccups"))]
41extern crate std;
42
43use core::fmt;
44use core::future::Future;
45use core::iter::FusedIterator;
46use core::panic::Location;
47use core::pin::Pin;
48
49use alloc::boxed::Box;
50use alloc::string::ToString as _;
51use alloc::sync::Arc;
52
53#[cfg(doc)]
54use core::task::Poll;
55
56#[cfg(feature = "log_hiccups")]
57use web_time::Instant;
58
59mod basic_yield;
60pub use basic_yield::basic_yield_now;
61
62mod builder;
63pub use builder::Builder;
64
65#[cfg(feature = "sync")]
66mod concurrent;
67
68mod info;
69pub use info::{ProgressInfo, YieldInfo};
70
71/// We could import this alias from `futures-core` but that would be another non-dev dependency.
72type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
73
74type ProgressFn = dyn for<'a> Fn(&'a ProgressInfo<'a>) + Send + Sync + 'static;
75
76type YieldFn = dyn for<'a> Fn(&'a YieldInfo<'a>) -> BoxFuture<'static, ()> + Send + Sync;
77
78type Label = Arc<str>;
79
80/// Allows a long-running async task to report its progress, while also yielding to the
81/// scheduler (e.g. for single-threaded web environment) and introducing cancellation
82/// points.
83///
84/// These things go together because the rate at which it makes sense to yield (to avoid event
85/// loop hangs) is similar to the rate at which it makes sense to report progress.
86///
87/// ---
88///
89/// To construct a [`YieldProgress`], use the [`Builder`], or [`noop()`](YieldProgress::noop).
90pub struct YieldProgress {
91 start: f32,
92 end: f32,
93
94 /// Name given to this specific portion of work. Inherited from the parent if not
95 /// overridden.
96 ///
97 /// TODO: Eventually we will want to have things like "label this segment as a
98 /// fallback if it has no better label", which will require some notion of distinguishing
99 /// inheritance from having been explicitly set.
100 label: Option<Label>,
101
102 yielding: BoxYielding,
103 // TODO: change progress reporting interface to support efficient handling of
104 // the label string being the same as last time.
105 progressor: Arc<ProgressFn>,
106}
107
108/// Piggyback on the `Arc` we need to store the `dyn Fn` anyway to also store some state.
109struct Yielding<F: ?Sized> {
110 #[cfg(feature = "log_hiccups")]
111 state: std::sync::Mutex<YieldState>,
112
113 yielder: F,
114}
115
116type BoxYielding = Arc<Yielding<YieldFn>>;
117
118/// Information about the last yield performed.
119/// Compared with the current state when the `log_hiccups` feature is enabled.
120#[cfg(feature = "log_hiccups")]
121#[derive(Clone)]
122struct YieldState {
123 /// The most recent instant at which `yielder`'s future completed.
124 /// Used to detect overlong time periods between yields.
125 last_finished_yielding: Instant,
126
127 last_yield_location: &'static Location<'static>,
128
129 last_yield_label: Option<Label>,
130}
131
132impl fmt::Debug for YieldProgress {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 f.debug_struct("YieldProgress")
135 .field("start", &self.start)
136 .field("end", &self.end)
137 .field("label", &self.label)
138 .finish_non_exhaustive()
139 }
140}
141
142impl YieldProgress {
143 /// Returns a [`YieldProgress`] that does no progress reporting **and no yielding at all**.
144 ///
145 /// This may be used, for example, to call a function that accepts [`YieldProgress`] and
146 /// is not `async` for any other reason.
147 /// It should not be used merely because no progress reporting is desired; in that case
148 /// use [`Builder`] instead so that a yield function can be provided.
149 ///
150 /// # Example
151 ///
152 /// ```
153 /// # #[tokio::main(flavor = "current_thread")] async fn main() {
154 /// # use yield_progress::YieldProgress;
155 /// let mut progress = YieldProgress::noop();
156 /// // These calls will have no effect.
157 /// progress.set_label("a tree falls in a forest");
158 /// progress.progress(0.12345).await;
159 /// # }
160 /// ```
161 pub fn noop() -> Self {
162 Builder::new()
163 .yield_using(|_| core::future::ready(()))
164 .build()
165 }
166
167 /// Add a name for the portion of work this [`YieldProgress`] covers, which will be
168 /// used by all future progress updates.
169 ///
170 /// If there is already a label, it will be overwritten.
171 ///
172 /// This does not immediately report progress; that is, the label will not be visible
173 /// anywhere until the next operation that does. Future versions may report it immediately.
174 ///
175 /// # Example
176 ///
177 /// ```
178 /// # #[tokio::main(flavor = "current_thread")] async fn main() {
179 /// # use yield_progress::YieldProgress;
180 /// async fn process_things(progress: YieldProgress, things: &[String]) {
181 /// let len = things.len();
182 /// for ((mut progress, thing), i) in progress.split_evenly(len).zip(things).zip(1..) {
183 /// progress.set_label(format_args!("Processing {i}/{len}: {thing}"));
184 /// progress.progress(0.0).await;
185 /// // ... Do actual work here ...
186 /// progress.finish().await;
187 /// }
188 /// }
189 /// # let expected_label = &*Box::leak(Box::new(std::sync::OnceLock::<String>::new()));
190 /// # process_things(
191 /// # yield_progress::Builder::new()
192 /// # .progress_using(move |info| {
193 /// # if !info.label_str().is_empty() {
194 /// # expected_label.set(info.label_str().to_owned());
195 /// # }
196 /// # })
197 /// # .build(),
198 /// # &[String::from("hello world")],
199 /// # ).await;
200 /// # assert_eq!(
201 /// # expected_label.get().map(|s| &**s),
202 /// # Some::<&str>("Processing 1/1: hello world"),
203 /// # );
204 /// # }
205 /// ```
206 pub fn set_label(&mut self, label: impl fmt::Display) {
207 self.set_label_internal(Some(Arc::from(label.to_string())))
208 }
209
210 fn set_label_internal(&mut self, label: Option<Label>) {
211 self.label = label;
212 }
213
214 /// Map a `0..=1` value to `self.start..=self.end`.
215 #[track_caller]
216 fn point_in_range(&self, mut x: f32) -> f32 {
217 x = x.clamp(0.0, 1.0);
218 if !x.is_finite() {
219 if cfg!(debug_assertions) {
220 panic!("NaN progress value");
221 } else {
222 x = 0.5;
223 }
224 }
225 self.start + (x * (self.end - self.start))
226 }
227
228 /// Report the current amount of progress (a number from 0 to 1) and yield.
229 ///
230 /// The value *may* be less than previously given values.
231 ///
232 /// # Example
233 ///
234 /// ```
235 /// # #[tokio::main(flavor = "current_thread")] async fn main() {
236 /// # use yield_progress::YieldProgress;
237 /// # pub fn first_half_of_work() {}
238 /// # pub fn second_half_of_work() {}
239 /// async fn do_work(progress: YieldProgress) {
240 /// first_half_of_work();
241 /// progress.progress(0.5).await;
242 /// second_half_of_work();
243 /// progress.finish().await;
244 /// }
245 /// # do_work(YieldProgress::noop()).await;
246 /// # }
247 /// ```
248 #[track_caller] // This is not an `async fn` because `track_caller` is not compatible
249 pub fn progress(&self, progress_fraction: f32) -> impl Future<Output = ()> + use<> {
250 let location = Location::caller();
251
252 self.progress_without_yield(progress_fraction);
253
254 self.yielding.clone().yield_only(
255 location,
256 #[cfg(feature = "log_hiccups")]
257 self.label.clone(),
258 )
259 }
260
261 /// Report the current amount of progress (a number from 0 to 1) without yielding.
262 ///
263 /// Caution: Not yielding may mean that the display of progress to the user does not
264 /// update. This should be used only when necessary for non-async code.
265 #[track_caller]
266 pub fn progress_without_yield(&self, progress_fraction: f32) {
267 let location = Location::caller();
268 self.send_progress(progress_fraction, self.label.as_ref(), location);
269 }
270
271 /// Yield only; that is, call the yield function contained within this [`YieldProgress`].
272 #[track_caller] // This is not an `async fn` because `track_caller` is not compatible
273 pub fn yield_without_progress(&self) -> impl Future<Output = ()> + Send + use<> {
274 let location = Location::caller();
275
276 self.yielding.clone().yield_only(
277 location,
278 #[cfg(feature = "log_hiccups")]
279 self.label.clone(),
280 )
281 }
282
283 /// Assemble a [`ProgressInfo`] using self's range and send it to the progress function.
284 /// This differs from `progress_without_yield()` by taking an explicit label and location;
285 /// only the range and destination from `self` is used.
286 fn send_progress(
287 &self,
288 progress_fraction: f32,
289 label: Option<&Label>,
290 location: &Location<'_>,
291 ) {
292 (self.progressor)(&ProgressInfo {
293 fraction: self.point_in_range(progress_fraction),
294 label,
295 location,
296 });
297 }
298
299 /// Report that 100% of progress has been made.
300 ///
301 /// This is identical to `.progress(1.0)` but consumes the `YieldProgress` object.
302 #[track_caller] // This is not an `async fn` because `track_caller` is not compatible
303 pub fn finish(self) -> impl Future<Output = ()> + Send + use<> {
304 self.progress(1.0)
305 }
306
307 /// Report that the given amount of progress has been made, then return
308 /// a [`YieldProgress`] covering the remaining range.
309 ///
310 /// # Example
311 ///
312 /// ```
313 /// # #[tokio::main(flavor = "current_thread")] async fn main() {
314 /// # use yield_progress::YieldProgress;
315 /// # pub fn first_half_of_work() {}
316 /// # pub async fn second_half_of_work(progress: YieldProgress) {
317 /// # progress.finish().await;
318 /// # }
319 /// async fn do_work(progress: YieldProgress) {
320 /// first_half_of_work();
321 /// second_half_of_work(progress.finish_and_cut(0.5).await).await;
322 /// }
323 /// # do_work(YieldProgress::noop()).await;
324 /// # }
325 /// ```
326 #[track_caller] // This is not an `async fn` because `track_caller` is not compatible
327 pub fn finish_and_cut(
328 self,
329 progress_fraction: f32,
330 ) -> impl Future<Output = Self> + Send + use<> {
331 // Efficiency note: this is structured so that `a` can be dropped immediately
332 // and does not live on in the future.
333 let [a, b] = self.split(progress_fraction);
334 a.progress_without_yield(1.0);
335 async move {
336 b.yield_without_progress().await;
337 b
338 }
339 }
340
341 /// Report the _beginning_ of a unit of work of size `progress_fraction` and described
342 /// by `label`. That fraction is cut off of the beginning range of `self`, and returned
343 /// as a separate [`YieldProgress`].
344 ///
345 /// # Example
346 ///
347 /// ```
348 /// # #[tokio::main(flavor = "current_thread")] async fn main() {
349 /// # use yield_progress::YieldProgress;
350 /// # pub async fn first_half_of_work(progress: YieldProgress) {
351 /// # progress.finish().await;
352 /// # }
353 /// # pub fn second_half_of_work() {}
354 /// async fn do_work(mut progress: YieldProgress) {
355 /// first_half_of_work(progress.start_and_cut(0.5, "first half").await).await;
356 /// second_half_of_work();
357 /// progress.finish().await;
358 /// }
359 /// # do_work(YieldProgress::noop()).await;
360 /// # }
361 /// ```
362 #[track_caller]
363 pub fn start_and_cut(
364 &mut self,
365 cut: f32,
366 label: impl fmt::Display,
367 ) -> impl Future<Output = Self> + Send + 'static {
368 // Note: The `+ 'static` bound is the only currently available way to express that the
369 // returned future does not capture `label`.
370
371 let cut_abs = self.point_in_range(cut);
372 let mut portion = self.with_new_range(self.start, cut_abs);
373 self.start = cut_abs;
374
375 portion.set_label(label);
376 async {
377 portion.progress(0.0).await;
378 portion
379 }
380 }
381
382 fn with_new_range(&self, start: f32, end: f32) -> Self {
383 Self {
384 start,
385 end,
386 label: self.label.clone(),
387 yielding: Arc::clone(&self.yielding),
388 progressor: Arc::clone(&self.progressor),
389 }
390 }
391
392 /// Construct two new [`YieldProgress`] which divide the progress value into two
393 /// subranges.
394 ///
395 /// The returned instances should be used in sequence, but this is not enforced.
396 /// Using them concurrently will result in the progress bar jumping backwards.
397 ///
398 /// # Example
399 ///
400 /// ```
401 /// # #[tokio::main(flavor = "current_thread")] async fn main() {
402 /// # use yield_progress::YieldProgress;
403 /// # pub async fn first_half_of_work(progress: YieldProgress) {
404 /// # progress.finish().await;
405 /// # }
406 /// # pub async fn second_half_of_work(progress: YieldProgress) {
407 /// # progress.finish().await;
408 /// # }
409 /// async fn do_work(mut progress: YieldProgress) {
410 /// let [p1, p2] = progress.split(0.5);
411 /// first_half_of_work(p1).await;
412 /// second_half_of_work(p2).await;
413 /// }
414 /// # do_work(YieldProgress::noop()).await;
415 /// # }
416 /// ```
417 pub fn split(self, cut: f32) -> [Self; 2] {
418 let cut_abs = self.point_in_range(cut);
419 [
420 self.with_new_range(self.start, cut_abs),
421 self.with_new_range(cut_abs, self.end),
422 ]
423 }
424
425 /// Construct many new [`YieldProgress`] which together divide the progress value into
426 /// `count` subranges.
427 ///
428 /// The returned instances should be used in sequence, but this is not enforced.
429 /// Using them concurrently will result in the progress bar jumping backwards.
430 ///
431 /// # Example
432 ///
433 /// ```
434 /// # #[tokio::main(flavor = "current_thread")] async fn main() {
435 /// # use yield_progress::YieldProgress;
436 /// # struct Thing;
437 /// # fn process_one_thing(thing: Thing) {}
438 /// async fn process_things(progress: YieldProgress, things: Vec<Thing>) {
439 /// for (mut progress, thing) in progress.split_evenly(things.len()).zip(things) {
440 /// process_one_thing(thing);
441 /// progress.finish().await;
442 /// }
443 /// }
444 /// # process_things(YieldProgress::noop(), vec![Thing]).await;
445 /// # }
446 /// ```
447 pub fn split_evenly(
448 self,
449 count: usize,
450 ) -> impl DoubleEndedIterator<Item = YieldProgress> + ExactSizeIterator + FusedIterator + use<>
451 {
452 (0..count).map(move |index| {
453 self.with_new_range(
454 self.point_in_range(index as f32 / count as f32),
455 self.point_in_range((index as f32 + 1.0) / count as f32),
456 )
457 })
458 }
459
460 /// Construct many new [`YieldProgress`] which will collectively advance `self` to completion
461 /// when they have all been advanced to completion, and which may be used concurrently.
462 ///
463 /// This is identical in effect to [`YieldProgress::split_evenly()`], except that it comprehends
464 /// concurrent operations — the progress of `self` is the sum of the progress of the subtasks.
465 /// To support this, it must allocate storage for the state tracking and synchronization, and
466 /// every progress update must calculate the sum from all subtasks. Therefore, for efficiency,
467 /// do not use this except when concurrency is actually present.
468 ///
469 /// The label passed through will be the label from the first subtask that has a progress
470 /// value less than 1.0. This choice may be changed in the future if the label system is
471 /// elaborated.
472 ///
473 /// # Example
474 ///
475 /// ```
476 /// # #[tokio::main(flavor = "current_thread")] async fn main() {
477 /// use yield_progress::YieldProgress;
478 /// use tokio::task::JoinSet;
479 /// # struct Thing;
480 /// # async fn process_one_thing(progress: YieldProgress, thing: Thing) {
481 /// # progress.finish().await;
482 /// # }
483 ///
484 /// async fn process_things(progress: YieldProgress, things: Vec<Thing>) {
485 /// let mut join_set = tokio::task::JoinSet::new();
486 /// for (mut progress, thing) in progress.split_evenly_concurrent(things.len()).zip(things) {
487 /// join_set.spawn(process_one_thing(progress, thing));
488 /// }
489 /// join_set.join_all().await;
490 /// }
491 /// # process_things(YieldProgress::noop(), vec![Thing]).await;
492 /// # }
493 /// ```
494 #[cfg(feature = "sync")]
495 pub fn split_evenly_concurrent(
496 self,
497 count: usize,
498 ) -> impl DoubleEndedIterator<Item = YieldProgress> + ExactSizeIterator + FusedIterator + use<>
499 {
500 let yielding = self.yielding.clone();
501 let conc = concurrent::ConcurrentProgress::new(self, count);
502 (0..count).map(move |index| {
503 Builder::new()
504 .yielding_internal(yielding.clone())
505 .progress_using(Arc::clone(&conc).progressor(index))
506 .build()
507 })
508 }
509}
510
511impl<F> Yielding<F>
512where
513 F: ?Sized + for<'a> Fn(&'a YieldInfo<'a>) -> BoxFuture<'static, ()> + Send + Sync,
514{
515 #[allow(clippy::manual_async_fn)] // false positive from cfg
516 fn yield_only(
517 self: Arc<Self>,
518 location: &'static Location<'static>,
519 #[cfg(feature = "log_hiccups")] mut label: Option<Label>,
520 ) -> impl Future<Output = ()> + use<F> {
521 #[cfg(feature = "log_hiccups")]
522 {
523 #[allow(unused)] // may be redundant depending on other features
524 use alloc::format;
525 use core::time::Duration;
526
527 // Note that we avoid holding the lock while calling yielder().
528 // The worst outcome of an inconsistency is that we will output a meaningless
529 // "between {location} and {location}" message, but none should occur because
530 // [`YieldProgress`] is intended to be used in a sequential manner.
531 let previous_state: YieldState = { self.state.lock().unwrap().clone() };
532
533 let delta = Instant::now().duration_since(previous_state.last_finished_yielding);
534 if delta > Duration::from_millis(100) {
535 let last_label = previous_state.last_yield_label;
536 log::trace!(
537 "Yielding after {delta} ms between {old_location} and {new_location} {rel}",
538 delta = delta.as_millis(),
539 old_location = previous_state.last_yield_location,
540 new_location = location,
541 rel = if label == last_label {
542 format!("during {label:?}")
543 } else {
544 format!("between {last_label:?} and {label:?}")
545 }
546 );
547 }
548 }
549
550 // TODO: Since we're tracking time, we might as well decide whether to not bother
551 // yielding if it has been a short time ... except that different yielders might
552 // want different granularities/policies.
553
554 // Efficiency: This explicit `async` block somehow improves the future data size,
555 // compared to `async fn`, by not allocating both a local and a capture for all of
556 // `self`, `location`, and `label`. Seems odd that this helps...
557 async move {
558 let yield_future = {
559 // Efficiency: This block avoids holding the temp `YieldInfo` across the await.
560 (self.yielder)(&YieldInfo { location })
561 };
562 yield_future.await;
563
564 #[cfg(feature = "log_hiccups")]
565 {
566 let mut state = self.state.lock().unwrap();
567
568 state.last_yield_location = location;
569 // Efficiency: this `Option::take()` avoids generating a drop flag.
570 state.last_yield_label = label.take();
571
572 state.last_finished_yielding = Instant::now();
573 }
574 }
575 }
576}