Skip to main content

tokio_metrics/
task.rs

1use futures_util::task::{ArcWake, AtomicWaker};
2use pin_project_lite::pin_project;
3use std::future::Future;
4use std::ops::Deref;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
7use std::sync::Arc;
8use std::task::{Context, Poll};
9use tokio_stream::Stream;
10
11#[cfg(feature = "rt")]
12use tokio::time::{Duration, Instant};
13
14use crate::derived_metrics::derived_metrics;
15#[cfg(not(feature = "rt"))]
16use std::time::{Duration, Instant};
17
18#[cfg(feature = "metrics-rs-integration")]
19pub(crate) mod metrics_rs_integration;
20
21/// Monitors key metrics of instrumented tasks.
22///
23/// This struct is preferred for generating a variable number of monitors at runtime.
24/// If you can construct a fixed count of `static` monitors instead, see [`TaskMonitorCore`].
25///
26/// ### Basic Usage
27/// A [`TaskMonitor`] tracks key [metrics][TaskMetrics] of async tasks that have been
28/// [instrumented][`TaskMonitor::instrument`] with the monitor.
29///
30/// In the below example, a [`TaskMonitor`] is [constructed][TaskMonitor::new] and used to
31/// [instrument][TaskMonitor::instrument] three worker tasks; meanwhile, a fourth task
32/// prints [metrics][TaskMetrics] in 500ms [intervals][TaskMonitor::intervals].
33/// ```
34/// use std::time::Duration;
35///
36/// #[tokio::main]
37/// async fn main() {
38///     // construct a metrics monitor
39///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
40///
41///     // print task metrics every 500ms
42///     {
43///         let metrics_monitor = metrics_monitor.clone();
44///         tokio::spawn(async move {
45///             for interval in metrics_monitor.intervals() {
46///                 // pretty-print the metric interval
47///                 println!("{:?}", interval);
48///                 // wait 500ms
49///                 tokio::time::sleep(Duration::from_millis(500)).await;
50///             }
51///         });
52///     }
53///
54///     // instrument some tasks and await them
55///     // note that the same TaskMonitor can be used for multiple tasks
56///     tokio::join![
57///         metrics_monitor.instrument(do_work()),
58///         metrics_monitor.instrument(do_work()),
59///         metrics_monitor.instrument(do_work())
60///     ];
61/// }
62///
63/// async fn do_work() {
64///     for _ in 0..25 {
65///         tokio::task::yield_now().await;
66///         tokio::time::sleep(Duration::from_millis(100)).await;
67///     }
68/// }
69/// ```
70///
71/// ### What should I instrument?
72/// In most cases, you should construct a *distinct* [`TaskMonitor`] for each kind of key task.
73///
74/// #### Instrumenting a web application
75/// For instance, a web service should have a distinct [`TaskMonitor`] for each endpoint. Within
76/// each endpoint, it's prudent to additionally instrument major sub-tasks, each with their own
77/// distinct [`TaskMonitor`]s. [*Why are my tasks slow?*](#why-are-my-tasks-slow) explores a
78/// debugging scenario for a web service that takes this approach to instrumentation. This
79/// approach is exemplified in the below example:
80/// ```no_run
81/// // The unabridged version of this snippet is in the examples directory of this crate.
82///
83/// #[tokio::main]
84/// async fn main() {
85///     // construct a TaskMonitor for root endpoint
86///     let monitor_root = tokio_metrics::TaskMonitor::new();
87///
88///     // construct TaskMonitors for create_users endpoint
89///     let monitor_create_user = CreateUserMonitors {
90///         // monitor for the entire endpoint
91///         route: tokio_metrics::TaskMonitor::new(),
92///         // monitor for database insertion subtask
93///         insert: tokio_metrics::TaskMonitor::new(),
94///     };
95///
96///     // build our application with two instrumented endpoints
97///     let app = axum::Router::new()
98///         // `GET /` goes to `root`
99///         .route("/", axum::routing::get({
100///             let monitor = monitor_root.clone();
101///             move || monitor.instrument(async { "Hello, World!" })
102///         }))
103///         // `POST /users` goes to `create_user`
104///         .route("/users", axum::routing::post({
105///             let monitors = monitor_create_user.clone();
106///             let route = monitors.route.clone();
107///             move |payload| {
108///                 route.instrument(create_user(payload, monitors))
109///             }
110///         }));
111///
112///     // print task metrics for each endpoint every 1s
113///     let metrics_frequency = std::time::Duration::from_secs(1);
114///     tokio::spawn(async move {
115///         let root_intervals = monitor_root.intervals();
116///         let create_user_route_intervals =
117///             monitor_create_user.route.intervals();
118///         let create_user_insert_intervals =
119///             monitor_create_user.insert.intervals();
120///         let create_user_intervals =
121///             create_user_route_intervals.zip(create_user_insert_intervals);
122///
123///         let intervals = root_intervals.zip(create_user_intervals);
124///         for (root_route, (create_user_route, create_user_insert)) in intervals {
125///             println!("root_route = {:#?}", root_route);
126///             println!("create_user_route = {:#?}", create_user_route);
127///             println!("create_user_insert = {:#?}", create_user_insert);
128///             tokio::time::sleep(metrics_frequency).await;
129///         }
130///     });
131///
132///     // run the server
133///     let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 3000));
134///     let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
135///     axum::serve(listener, app)
136///         .await
137///         .unwrap();
138/// }
139///
140/// async fn create_user(
141///     axum::Json(payload): axum::Json<CreateUser>,
142///     monitors: CreateUserMonitors,
143/// ) -> impl axum::response::IntoResponse {
144///     let user = User { id: 1337, username: payload.username, };
145///     // instrument inserting the user into the db:
146///     let _ = monitors.insert.instrument(insert_user(user.clone())).await;
147///     (axum::http::StatusCode::CREATED, axum::Json(user))
148/// }
149///
150/// /* definitions of CreateUserMonitors, CreateUser and User omitted for brevity */
151///
152/// #
153/// # #[derive(Clone)]
154/// # struct CreateUserMonitors {
155/// #     // monitor for the entire endpoint
156/// #     route: tokio_metrics::TaskMonitor,
157/// #     // monitor for database insertion subtask
158/// #     insert: tokio_metrics::TaskMonitor,
159/// # }
160/// #
161/// # #[derive(serde::Deserialize)] struct CreateUser { username: String, }
162/// # #[derive(Clone, serde::Serialize)] struct User { id: u64, username: String, }
163/// #
164/// // insert the user into the database
165/// async fn insert_user(_: User) {
166///     /* implementation details elided */
167///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
168/// }
169/// ```
170///
171/// ### Why are my tasks slow?
172/// **Scenario:** You track key, high-level metrics about the customer response time. An alarm warns
173/// you that P90 latency for an endpoint exceeds your targets. What is causing the increase?
174///
175/// #### Identifying the high-level culprits
176/// A set of tasks will appear to execute more slowly if:
177/// - they are taking longer to poll (i.e., they consume too much CPU time)
178/// - they are waiting longer to be polled (e.g., they're waiting longer in tokio's scheduling
179///   queues)
180/// - they are waiting longer on external events to complete (e.g., asynchronous network requests)
181///
182/// The culprits, at a high level, may be some combination of these sources of latency. Fortunately,
183/// you have instrumented the key tasks of each of your endpoints with distinct [`TaskMonitor`]s.
184/// Using the monitors on the endpoint experiencing elevated latency, you begin by answering:
185/// - [*Are my tasks taking longer to poll?*](#are-my-tasks-taking-longer-to-poll)
186/// - [*Are my tasks spending more time waiting to be polled?*](#are-my-tasks-spending-more-time-waiting-to-be-polled)
187/// - [*Are my tasks spending more time waiting on external events to complete?*](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
188///
189/// ##### Are my tasks taking longer to poll?
190/// - **Did [`mean_poll_duration`][TaskMetrics::mean_poll_duration] increase?**
191///   This metric reflects the mean poll duration. If it increased, it means that, on average,
192///   individual polls tended to take longer. However, this does not necessarily imply increased
193///   task latency: An increase in poll durations could be offset by fewer polls.
194/// - **Did [`slow_poll_ratio`][TaskMetrics::slow_poll_ratio] increase?**
195///   This metric reflects the proportion of polls that were 'slow'. If it increased, it means that
196///   a greater proportion of polls performed excessive computation before yielding. This does not
197///   necessarily imply increased task latency: An increase in the proportion of slow polls could be
198///   offset by fewer or faster polls.
199/// - **Did [`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration] increase?**
200///   This metric reflects the mean duration of slow polls. If it increased, it means that, on
201///   average, slow polls got slower. This does not necessarily imply increased task latency: An
202///   increase in average slow poll duration could be offset by fewer or faster polls.
203///
204/// If so, [*why are my tasks taking longer to poll?*](#why-are-my-tasks-taking-longer-to-poll)
205///
206/// ##### Are my tasks spending more time waiting to be polled?
207/// - **Did [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increase?**
208///   This metric reflects the mean delay between the instant a task is first instrumented and the
209///   instant it is first polled. If it increases, it means that, on average, tasks spent longer
210///   waiting to be initially run.
211/// - **Did [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration] increase?**
212///   This metric reflects the mean duration that tasks spent in the scheduled state. The
213///   'scheduled' state of a task is the duration between the instant a task is awoken and the
214///   instant it is subsequently polled. If this metric increases, it means that, on average, tasks
215///   spent longer in tokio's queues before being polled.
216/// - **Did [`long_delay_ratio`][TaskMetrics::long_delay_ratio] increase?**
217///   This metric reflects the proportion of scheduling delays which were 'long'. If it increased,
218///   it means that a greater proportion of tasks experienced excessive delays before they could
219///   execute after being woken. This does not necessarily indicate an increase in latency, as this
220///   could be offset by fewer or faster task polls.
221/// - **Did [`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration] increase?**
222///   This metric reflects the mean duration of long delays. If it increased, it means that, on
223///   average, long delays got even longer. This does not necessarily imply increased task latency:
224///   an increase in average long delay duration could be offset by fewer or faster polls or more
225///   short schedules.
226///
227/// If so, [*why are my tasks spending more time waiting to be polled?*](#why-are-my-tasks-spending-more-time-waiting-to-be-polled)
228///
229/// ##### Are my tasks spending more time waiting on external events to complete?
230/// - **Did [`mean_idle_duration`][TaskMetrics::mean_idle_duration] increase?**
231///   This metric reflects the mean duration that tasks spent in the idle state. The idle state is
232///   the duration spanning the instant a task completes a poll, and the instant that it is next
233///   awoken. Tasks inhabit this state when they are waiting for task-external events to complete
234///   (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this metric increases,
235///   tasks, in aggregate, spent more time waiting for task-external events to complete.
236///
237/// If so, [*why are my tasks spending more time waiting on external events to complete?*](#why-are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
238///
239/// #### Digging deeper
240/// Having [established the high-level culprits](#identifying-the-high-level-culprits), you now
241/// search for further explanation...
242///
243/// ##### Why are my tasks taking longer to poll?
244/// You observed that [your tasks are taking longer to poll](#are-my-tasks-taking-longer-to-poll).
245/// The culprit is likely some combination of:
246/// - **Your tasks are accidentally blocking.** Common culprits include:
247///     1. Using the Rust standard library's [filesystem](https://doc.rust-lang.org/std/fs/) or
248///        [networking](https://doc.rust-lang.org/std/net/) APIs.
249///        These APIs are synchronous; use tokio's [filesystem](https://docs.rs/tokio/latest/tokio/fs/)
250///        and [networking](https://docs.rs/tokio/latest/tokio/net/) APIs, instead.
251///     3. Calling [`block_on`](https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.block_on).
252///     4. Invoking `println!` or other synchronous logging routines.
253///        Invocations of `println!` involve acquiring an exclusive lock on stdout, followed by a
254///        synchronous write to stdout.
255/// 2. **Your tasks are computationally expensive.** Common culprits include:
256///     1. TLS/cryptographic routines
257///     2. doing a lot of processing on bytes
258///     3. calling non-Tokio resources
259///
260/// ##### Why are my tasks spending more time waiting to be polled?
261/// You observed that [your tasks are spending more time waiting to be polled](#are-my-tasks-spending-more-time-waiting-to-be-polled)
262/// suggesting some combination of:
263/// - Your application is inflating the time elapsed between instrumentation and first poll.
264/// - Your tasks are being scheduled into tokio's global queue.
265/// - Other tasks are spending too long without yielding, thus backing up tokio's queues.
266///
267/// Start by asking: [*Is time-to-first-poll unusually high?*](#is-time-to-first-poll-unusually-high)
268///
269/// ##### Why are my tasks spending more time waiting on external events to complete?
270/// You observed that [your tasks are spending more time waiting waiting on external events to
271/// complete](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete). But what
272/// event? Fortunately, within the task experiencing increased idle times, you monitored several
273/// sub-tasks with distinct [`TaskMonitor`]s. For each of these sub-tasks, you [*you try to identify
274/// the performance culprits...*](#identifying-the-high-level-culprits)
275///
276/// #### Digging even deeper
277///
278/// ##### Is time-to-first-poll unusually high?
279/// Contrast these two metrics:
280/// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
281///   This metric reflects the mean delay between the instant a task is first instrumented and the
282///   instant it is *first* polled.
283/// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
284///   This metric reflects the mean delay between the instant when tasks were awoken and the
285///   instant they were subsequently polled.
286///
287/// If the former metric exceeds the latter (or increased unexpectedly more than the latter), then
288/// start by investigating [*if your application is artificially delaying the time-to-first-poll*](#is-my-application-delaying-the-time-to-first-poll).
289///
290/// Otherwise, investigate [*if other tasks are polling too long without yielding*](#are-other-tasks-polling-too-long-without-yielding).
291///
292/// ##### Is my application delaying the time-to-first-poll?
293/// You observed that [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increased, more
294/// than [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]. Your application may be
295/// needlessly inflating the time elapsed between instrumentation and first poll. Are you
296/// constructing (and instrumenting) tasks separately from awaiting or spawning them?
297///
298/// For instance, in the below example, the application induces 1 second delay between when `task`
299/// is instrumented and when it is awaited:
300/// ```rust
301/// #[tokio::main]
302/// async fn main() {
303///     use tokio::time::Duration;
304///     let monitor = tokio_metrics::TaskMonitor::new();
305///
306///     let task = monitor.instrument(async move {});
307///
308///     let one_sec = Duration::from_secs(1);
309///     tokio::time::sleep(one_sec).await;
310///
311///     let _ = tokio::spawn(task).await;
312///
313///     assert!(monitor.cumulative().total_first_poll_delay >= one_sec);
314/// }
315/// ```
316///
317/// Otherwise, [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] might be unusually high
318/// because [*your application is spawning key tasks into tokio's global queue...*](#is-my-application-spawning-more-tasks-into-tokio’s-global-queue)
319///
320/// ##### Is my application spawning more tasks into tokio's global queue?
321/// Tasks awoken from threads *not* managed by the tokio runtime are scheduled with a slower,
322/// global "injection" queue.
323///
324/// You may be notifying runtime tasks from off-runtime. For instance, Given the following:
325/// ```ignore
326/// #[tokio::main]
327/// async fn main() {
328///     for _ in 0..100 {
329///         let (tx, rx) = oneshot::channel();
330///         tokio::spawn(async move {
331///             tx.send(());
332///         })
333///
334///         rx.await;
335///     }
336/// }
337/// ```
338/// One would expect this to run efficiently, however, the main task is run *off* the main runtime
339/// and the spawned tasks are *on* runtime, which means the snippet will run much slower than:
340/// ```ignore
341/// #[tokio::main]
342/// async fn main() {
343///     tokio::spawn(async {
344///         for _ in 0..100 {
345///             let (tx, rx) = oneshot::channel();
346///             tokio::spawn(async move {
347///                 tx.send(());
348///             })
349///
350///             rx.await;
351///         }
352///     }).await;
353/// }
354/// ```
355/// The slowdown is caused by a higher time between the `rx` task being notified (in `tx.send()`)
356/// and the task being polled.
357///
358/// ##### Are other tasks polling too long without yielding?
359/// You suspect that your tasks are slow because they're backed up in tokio's scheduling queues. For
360/// *each* of your application's [`TaskMonitor`]s you check to see [*if their associated tasks are
361/// taking longer to poll...*](#are-my-tasks-taking-longer-to-poll)
362///
363/// ### Limitations
364/// The [`TaskMetrics`] type uses [`u64`] to represent both event counters and durations (measured
365/// in nanoseconds). Consequently, event counters are accurate for ≤ [`u64::MAX`] events, and
366/// durations are accurate for ≤ [`u64::MAX`] nanoseconds.
367///
368/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::cumulative`] increase
369/// monotonically with each successive invocation of [`TaskMonitor::cumulative`]. Upon overflow,
370/// counters and durations wrap.
371///
372/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::intervals`] are
373/// calculated by computing the difference of metrics in successive invocations of
374/// [`TaskMonitor::cumulative`]. If, within a monitoring interval, an event occurs more than
375/// [`u64::MAX`] times, or a monitored duration exceeds [`u64::MAX`] nanoseconds, the metrics for
376/// that interval will overflow and not be accurate.
377///
378/// ##### Examples at the limits
379/// Consider the [`TaskMetrics::total_first_poll_delay`] metric. This metric accurately reflects
380/// delays between instrumentation and first-poll ≤ [`u64::MAX`] nanoseconds:
381/// ```
382/// use tokio::time::Duration;
383///
384/// #[tokio::main(flavor = "current_thread", start_paused = true)]
385/// async fn main() {
386///     let monitor = tokio_metrics::TaskMonitor::new();
387///     let mut interval = monitor.intervals();
388///     let mut next_interval = || interval.next().unwrap();
389///
390///     // construct and instrument a task, but do not `await` it
391///     let task = monitor.instrument(async {});
392///
393///     // this is the maximum duration representable by tokio_metrics
394///     let max_duration = Duration::from_nanos(u64::MAX);
395///
396///     // let's advance the clock by this amount and poll `task`
397///     let _ = tokio::time::advance(max_duration).await;
398///     task.await;
399///
400///     // durations ≤ `max_duration` are accurately reflected in this metric
401///     assert_eq!(next_interval().total_first_poll_delay, max_duration);
402///     assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
403/// }
404/// ```
405/// If the total delay between instrumentation and first poll exceeds [`u64::MAX`] nanoseconds,
406/// [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay] will overflow:
407/// ```
408/// # use tokio::time::Duration;
409/// #
410/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
411/// # async fn main() {
412/// #    let monitor = tokio_metrics::TaskMonitor::new();
413/// #
414///  // construct and instrument a task, but do not `await` it
415///  let task_a = monitor.instrument(async {});
416///  let task_b = monitor.instrument(async {});
417///
418///  // this is the maximum duration representable by tokio_metrics
419///  let max_duration = Duration::from_nanos(u64::MAX);
420///
421///  // let's advance the clock by 1.5x this amount and await `task`
422///  let _ = tokio::time::advance(3 * (max_duration / 2)).await;
423///  task_a.await;
424///  task_b.await;
425///
426///  // the `total_first_poll_delay` has overflowed
427///  assert!(monitor.cumulative().total_first_poll_delay < max_duration);
428/// # }
429/// ```
430/// If *many* tasks are spawned, it will take far less than a [`u64::MAX`]-nanosecond delay to bring
431/// this metric to the precipice of overflow:
432/// ```
433/// # use tokio::time::Duration;
434/// #
435/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
436/// # async fn main() {
437/// #     let monitor = tokio_metrics::TaskMonitor::new();
438/// #     let mut interval = monitor.intervals();
439/// #     let mut next_interval = || interval.next().unwrap();
440/// #
441/// // construct and instrument u16::MAX tasks, but do not `await` them
442/// let first_poll_count = u16::MAX as u64;
443/// let mut tasks = Vec::with_capacity(first_poll_count as usize);
444/// for _ in 0..first_poll_count { tasks.push(monitor.instrument(async {})); }
445///
446/// // this is the maximum duration representable by tokio_metrics
447/// let max_duration = u64::MAX;
448///
449/// // let's advance the clock justenough such that all of the time-to-first-poll
450/// // delays summed nearly equals `max_duration_nanos`, less some remainder...
451/// let iffy_delay = max_duration / (first_poll_count as u64);
452/// let small_remainder = max_duration % first_poll_count;
453/// let _ = tokio::time::advance(Duration::from_nanos(iffy_delay)).await;
454///
455/// // ...then poll all of the instrumented tasks:
456/// for task in tasks { task.await; }
457///
458/// // `total_first_poll_delay` is at the precipice of overflowing!
459/// assert_eq!(
460///     next_interval().total_first_poll_delay.as_nanos(),
461///     (max_duration - small_remainder) as u128
462/// );
463/// assert_eq!(
464///     monitor.cumulative().total_first_poll_delay.as_nanos(),
465///     (max_duration - small_remainder) as u128
466/// );
467/// # }
468/// ```
469/// Frequent, interval-sampled metrics will retain their accuracy, even if the cumulative
470/// metrics counter overflows at most once in the midst of an interval:
471/// ```
472/// # use tokio::time::Duration;
473/// # use tokio_metrics::TaskMonitor;
474/// #
475/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
476/// # async fn main() {
477/// #     let monitor = TaskMonitor::new();
478/// #     let mut interval = monitor.intervals();
479/// #     let mut next_interval = || interval.next().unwrap();
480/// #
481///  let first_poll_count = u16::MAX as u64;
482///  let batch_size = first_poll_count / 3;
483///
484///  let max_duration_ns = u64::MAX;
485///  let iffy_delay_ns = max_duration_ns / first_poll_count;
486///
487///  // Instrument `batch_size` number of tasks, wait for `delay` nanoseconds,
488///  // then await the instrumented tasks.
489///  async fn run_batch(monitor: &TaskMonitor, batch_size: usize, delay: u64) {
490///      let mut tasks = Vec::with_capacity(batch_size);
491///      for _ in 0..batch_size { tasks.push(monitor.instrument(async {})); }
492///      let _ = tokio::time::advance(Duration::from_nanos(delay)).await;
493///      for task in tasks { task.await; }
494///  }
495///
496///  // this is how much `total_time_to_first_poll_ns` will
497///  // increase with each batch we run
498///  let batch_delay = iffy_delay_ns * batch_size;
499///
500///  // run batches 1, 2, and 3
501///  for i in 1..=3 {
502///      run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
503///      assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
504///      assert_eq!(i * batch_delay as u128, monitor.cumulative().total_first_poll_delay.as_nanos());
505///  }
506///
507///  /* now, the `total_time_to_first_poll_ns` counter is at the precipice of overflow */
508///  assert_eq!(monitor.cumulative().total_first_poll_delay.as_nanos(), max_duration_ns as u128);
509///
510///  // run batch 4
511///  run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
512///  // the interval counter remains accurate
513///  assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
514///  // but the cumulative counter has overflowed
515///  assert_eq!(batch_delay as u128 - 1, monitor.cumulative().total_first_poll_delay.as_nanos());
516/// # }
517/// ```
518/// If a cumulative metric overflows *more than once* in the midst of an interval,
519/// its interval-sampled counterpart will also overflow.
520#[derive(Clone, Debug)]
521pub struct TaskMonitor {
522    base: Arc<TaskMonitorCore>,
523}
524
525impl Deref for TaskMonitor {
526    type Target = TaskMonitorCore;
527
528    fn deref(&self) -> &Self::Target {
529        &self.base
530    }
531}
532
533impl AsRef<TaskMonitorCore> for TaskMonitor {
534    fn as_ref(&self) -> &TaskMonitorCore {
535        &self.base
536    }
537}
538
539/// A non-`Clone`, non-allocated, static-friendly version of [`TaskMonitor`].
540/// See full docs on the [`TaskMonitor`] struct.
541///
542/// You should use [`TaskMonitorCore`] if you have a known count of monitors
543/// that you want to initialize as compile-time `static` structs.
544///
545/// You can also use [`TaskMonitorCore`] if you are already passing around an `Arc`-wrapped
546/// struct that you want to store your monitor in. This way, you can avoid double-`Arc`'ing it.
547///
548/// For other most other non-static usage, [`TaskMonitor`] will be more ergonomic.
549///
550/// ##### Examples
551///
552/// Static usage:
553/// ```
554/// use tokio_metrics::TaskMonitorCore;
555///
556/// static MONITOR: TaskMonitorCore = TaskMonitorCore::new();
557///
558/// #[tokio::main]
559/// async fn main() {
560///     assert_eq!(MONITOR.cumulative().first_poll_count, 0);
561///
562///     MONITOR.instrument(async {}).await;
563///     assert_eq!(MONITOR.cumulative().first_poll_count, 1);
564/// }
565/// ```
566///
567/// Usage with wrapper struct and [`TaskMonitorCore::instrument_with`]:
568/// ```
569/// use std::sync::Arc;
570/// use tokio_metrics::TaskMonitorCore;
571///
572/// #[derive(Clone)]
573/// struct SharedState(Arc<SharedStateInner>);
574/// struct SharedStateInner {
575///     monitor: TaskMonitorCore,
576///     other_state: SomeOtherSharedState,
577/// }
578/// /// Imagine: a type that wasn't `Clone` that you want to pass around
579/// /// in a similar way as the monitor
580/// struct SomeOtherSharedState;
581///
582/// impl AsRef<TaskMonitorCore> for SharedState {
583///     fn as_ref(&self) -> &TaskMonitorCore {
584///         &self.0.monitor
585///     }
586/// }
587///
588/// #[tokio::main]
589/// async fn main() {
590///     let state = SharedState(Arc::new(SharedStateInner {
591///         monitor: TaskMonitorCore::new(),
592///         other_state: SomeOtherSharedState,
593///     }));
594///
595///     assert_eq!(state.0.monitor.cumulative().first_poll_count, 0);
596///
597///     TaskMonitorCore::instrument_with(async {}, state.clone()).await;
598///     assert_eq!(state.0.monitor.cumulative().first_poll_count, 1);
599/// }
600/// ```
601#[derive(Debug)]
602pub struct TaskMonitorCore {
603    metrics: RawMetrics,
604}
605
606/// Provides an interface for constructing a [`TaskMonitor`] with specialized configuration
607/// parameters.
608#[derive(Clone, Debug, Default)]
609pub struct TaskMonitorBuilder(TaskMonitorCoreBuilder);
610
611impl TaskMonitorBuilder {
612    /// Creates a new [`TaskMonitorBuilder`].
613    pub fn new() -> Self {
614        Self(TaskMonitorCoreBuilder::new())
615    }
616
617    /// Specifies the threshold at which polls are considered 'slow'.
618    pub fn with_slow_poll_threshold(&mut self, threshold: Duration) -> &mut Self {
619        self.0.slow_poll_threshold = Some(threshold);
620        self
621    }
622
623    /// Specifies the threshold at which schedules are considered 'long'.
624    pub fn with_long_delay_threshold(&mut self, threshold: Duration) -> &mut Self {
625        self.0.long_delay_threshold = Some(threshold);
626        self
627    }
628
629    /// Consume the builder, producing a [`TaskMonitor`].
630    pub fn build(self) -> TaskMonitor {
631        TaskMonitor {
632            base: Arc::new(self.0.build()),
633        }
634    }
635}
636
637/// Provides an interface for constructing a [`TaskMonitorCore`] with specialized configuration
638/// parameters.
639///
640/// ```
641/// use std::time::Duration;
642/// use tokio_metrics::TaskMonitorCoreBuilder;
643///
644/// static MONITOR: tokio_metrics::TaskMonitorCore = TaskMonitorCoreBuilder::new()
645///     .with_slow_poll_threshold(Duration::from_micros(100))
646///     .build();
647/// ```
648#[derive(Clone, Debug, Default)]
649pub struct TaskMonitorCoreBuilder {
650    slow_poll_threshold: Option<Duration>,
651    long_delay_threshold: Option<Duration>,
652}
653
654impl TaskMonitorCoreBuilder {
655    /// Creates a new [`TaskMonitorCoreBuilder`].
656    pub const fn new() -> Self {
657        Self {
658            slow_poll_threshold: None,
659            long_delay_threshold: None,
660        }
661    }
662
663    /// Specifies the threshold at which polls are considered 'slow'.
664    pub const fn with_slow_poll_threshold(self, threshold: Duration) -> Self {
665        Self {
666            slow_poll_threshold: Some(threshold),
667            ..self
668        }
669    }
670
671    /// Specifies the threshold at which schedules are considered 'long'.
672    pub const fn with_long_delay_threshold(self, threshold: Duration) -> Self {
673        Self {
674            long_delay_threshold: Some(threshold),
675            ..self
676        }
677    }
678
679    /// Consume the builder, producing a [`TaskMonitorCore`].
680    pub const fn build(self) -> TaskMonitorCore {
681        let slow = match self.slow_poll_threshold {
682            Some(v) => v,
683            None => TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD,
684        };
685        let long = match self.long_delay_threshold {
686            Some(v) => v,
687            None => TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD,
688        };
689        TaskMonitorCore::create(slow, long)
690    }
691}
692
693pin_project! {
694    /// An async task that has been instrumented with [`TaskMonitor::instrument`].
695    #[derive(Debug)]
696    pub struct Instrumented<T, M: AsRef<TaskMonitorCore> = TaskMonitor> {
697        // The task being instrumented
698        #[pin]
699        task: T,
700
701        // True when the task is polled for the first time
702        did_poll_once: bool,
703
704        // The instant, tracked as nanoseconds since `instrumented_at`, at which the future finished
705        // its last poll.
706        idled_at: u64,
707
708        // State shared between the task and its instrumented waker.
709        state: Arc<State<M>>,
710    }
711
712    impl<T, M: AsRef<TaskMonitorCore>> PinnedDrop for Instrumented<T, M> {
713        fn drop(this: Pin<&mut Self>) {
714            this.state.monitor.as_ref().metrics.dropped_count.fetch_add(1, SeqCst);
715        }
716    }
717}
718
719/// Key metrics of [instrumented][`TaskMonitor::instrument`] tasks.
720#[non_exhaustive]
721#[derive(Debug, Clone, Copy, Default)]
722pub struct TaskMetrics {
723    /// The number of tasks instrumented.
724    ///
725    /// ##### Examples
726    /// ```
727    /// #[tokio::main]
728    /// async fn main() {
729    ///     let monitor = tokio_metrics::TaskMonitor::new();
730    ///     let mut interval = monitor.intervals();
731    ///     let mut next_interval = || interval.next().unwrap();
732    ///
733    ///     // 0 tasks have been instrumented
734    ///     assert_eq!(next_interval().instrumented_count, 0);
735    ///
736    ///     monitor.instrument(async {});
737    ///
738    ///     // 1 task has been instrumented
739    ///     assert_eq!(next_interval().instrumented_count, 1);
740    ///
741    ///     monitor.instrument(async {});
742    ///     monitor.instrument(async {});
743    ///
744    ///     // 2 tasks have been instrumented
745    ///     assert_eq!(next_interval().instrumented_count, 2);
746    ///
747    ///     // since the last interval was produced, 0 tasks have been instrumented
748    ///     assert_eq!(next_interval().instrumented_count, 0);
749    /// }
750    /// ```
751    pub instrumented_count: u64,
752
753    /// The number of tasks dropped.
754    ///
755    /// ##### Examples
756    /// ```
757    /// #[tokio::main]
758    /// async fn main() {
759    ///     let monitor = tokio_metrics::TaskMonitor::new();
760    ///     let mut interval = monitor.intervals();
761    ///     let mut next_interval = || interval.next().unwrap();
762    ///
763    ///     // 0 tasks have been dropped
764    ///     assert_eq!(next_interval().dropped_count, 0);
765    ///
766    ///     let _task = monitor.instrument(async {});
767    ///
768    ///     // 0 tasks have been dropped
769    ///     assert_eq!(next_interval().dropped_count, 0);
770    ///
771    ///     monitor.instrument(async {}).await;
772    ///     drop(monitor.instrument(async {}));
773    ///
774    ///     // 2 tasks have been dropped
775    ///     assert_eq!(next_interval().dropped_count, 2);
776    ///
777    ///     // since the last interval was produced, 0 tasks have been dropped
778    ///     assert_eq!(next_interval().dropped_count, 0);
779    /// }
780    /// ```
781    pub dropped_count: u64,
782
783    /// The number of tasks polled for the first time.
784    ///
785    /// ##### Derived metrics
786    /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
787    ///   The mean duration elapsed between the instant tasks are instrumented, and the instant they
788    ///   are first polled.
789    ///
790    /// ##### Examples
791    /// In the below example, no tasks are instrumented or polled in the first sampling interval;
792    /// one task is instrumented (but not polled) in the second sampling interval; that task is
793    /// awaited to completion (and, thus, polled at least once) in the third sampling interval; no
794    /// additional tasks are polled for the first time within the fourth sampling interval:
795    /// ```
796    /// #[tokio::main]
797    /// async fn main() {
798    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
799    ///     let mut interval = metrics_monitor.intervals();
800    ///     let mut next_interval = || interval.next().unwrap();
801    ///
802    ///     // no tasks have been constructed, instrumented, and polled at least once
803    ///     assert_eq!(next_interval().first_poll_count, 0);
804    ///
805    ///     let task = metrics_monitor.instrument(async {});
806    ///
807    ///     // `task` has been constructed and instrumented, but has not yet been polled
808    ///     assert_eq!(next_interval().first_poll_count, 0);
809    ///
810    ///     // poll `task` to completion
811    ///     task.await;
812    ///
813    ///     // `task` has been constructed, instrumented, and polled at least once
814    ///     assert_eq!(next_interval().first_poll_count, 1);
815    ///
816    ///     // since the last interval was produced, 0 tasks have been constructed, instrumented and polled
817    ///     assert_eq!(next_interval().first_poll_count, 0);
818    ///
819    /// }
820    /// ```
821    pub first_poll_count: u64,
822
823    /// The total duration elapsed between the instant tasks are instrumented, and the instant they
824    /// are first polled.
825    ///
826    /// ##### Derived metrics
827    /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
828    ///   The mean duration elapsed between the instant tasks are instrumented, and the instant they
829    ///   are first polled.
830    ///
831    /// ##### Examples
832    /// In the below example, 0 tasks have been instrumented or polled within the first sampling
833    /// interval, a total of 500ms elapse between the instrumentation and polling of tasks within
834    /// the second sampling interval, and a total of 350ms elapse between the instrumentation and
835    /// polling of tasks within the third sampling interval:
836    /// ```
837    /// use tokio::time::Duration;
838    ///
839    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
840    /// async fn main() {
841    ///     let monitor = tokio_metrics::TaskMonitor::new();
842    ///     let mut interval = monitor.intervals();
843    ///     let mut next_interval = || interval.next().unwrap();
844    ///
845    ///     // no tasks have yet been created, instrumented, or polled
846    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
847    ///     assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
848    ///
849    ///     // constructs and instruments a task, pauses a given duration, then awaits the task
850    ///     async fn instrument_pause_await(monitor: &tokio_metrics::TaskMonitor, pause: Duration) {
851    ///         let task = monitor.instrument(async move {});
852    ///         tokio::time::sleep(pause).await;
853    ///         task.await;
854    ///     }
855    ///
856    ///     // construct and await a task that pauses for 500ms between instrumentation and first poll
857    ///     let task_a_pause_time = Duration::from_millis(500);
858    ///     instrument_pause_await(&monitor, task_a_pause_time).await;
859    ///
860    ///     assert_eq!(next_interval().total_first_poll_delay, task_a_pause_time);
861    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, task_a_pause_time);
862    ///
863    ///     // construct and await a task that pauses for 250ms between instrumentation and first poll
864    ///     let task_b_pause_time = Duration::from_millis(250);
865    ///     instrument_pause_await(&monitor, task_b_pause_time).await;
866    ///
867    ///     // construct and await a task that pauses for 100ms between instrumentation and first poll
868    ///     let task_c_pause_time = Duration::from_millis(100);
869    ///     instrument_pause_await(&monitor, task_c_pause_time).await;
870    ///
871    ///     assert_eq!(
872    ///         next_interval().total_first_poll_delay,
873    ///         task_b_pause_time + task_c_pause_time
874    ///     );
875    ///     assert_eq!(
876    ///         monitor.cumulative().total_first_poll_delay,
877    ///         task_a_pause_time + task_b_pause_time + task_c_pause_time
878    ///     );
879    /// }
880    /// ```
881    ///
882    /// ##### When is this metric recorded?
883    /// The delay between instrumentation and first poll is not recorded until the first poll
884    /// actually occurs:
885    /// ```
886    /// # use tokio::time::Duration;
887    /// #
888    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
889    /// # async fn main() {
890    /// #     let monitor = tokio_metrics::TaskMonitor::new();
891    /// #     let mut interval = monitor.intervals();
892    /// #     let mut next_interval = || interval.next().unwrap();
893    /// #
894    /// // we construct and instrument a task, but do not `await` it
895    /// let task = monitor.instrument(async {});
896    ///
897    /// // let's sleep for 1s before we poll `task`
898    /// let one_sec = Duration::from_secs(1);
899    /// let _ = tokio::time::sleep(one_sec).await;
900    ///
901    /// // although 1s has now elapsed since the instrumentation of `task`,
902    /// // this is not reflected in `total_first_poll_delay`...
903    /// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
904    /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
905    ///
906    /// // ...and won't be until `task` is actually polled
907    /// task.await;
908    ///
909    /// // now, the 1s delay is reflected in `total_first_poll_delay`:
910    /// assert_eq!(next_interval().total_first_poll_delay, one_sec);
911    /// assert_eq!(monitor.cumulative().total_first_poll_delay, one_sec);
912    /// # }
913    /// ```
914    ///
915    /// ##### What if first-poll-delay is very large?
916    /// The first-poll-delay of *individual* tasks saturates at `u64::MAX` nanoseconds. However, if
917    /// the *total* first-poll-delay *across* monitored tasks exceeds `u64::MAX` nanoseconds, this
918    /// metric will wrap around:
919    /// ```
920    /// use tokio::time::Duration;
921    ///
922    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
923    /// async fn main() {
924    ///     let monitor = tokio_metrics::TaskMonitor::new();
925    ///
926    ///     // construct and instrument a task, but do not `await` it
927    ///     let task = monitor.instrument(async {});
928    ///
929    ///     // this is the maximum duration representable by tokio_metrics
930    ///     let max_duration = Duration::from_nanos(u64::MAX);
931    ///
932    ///     // let's advance the clock by double this amount and await `task`
933    ///     let _ = tokio::time::advance(max_duration * 2).await;
934    ///     task.await;
935    ///
936    ///     // the time-to-first-poll of `task` saturates at `max_duration`
937    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
938    ///
939    ///     // ...but note that the metric *will* wrap around if more tasks are involved
940    ///     let task = monitor.instrument(async {});
941    ///     let _ = tokio::time::advance(Duration::from_nanos(1)).await;
942    ///     task.await;
943    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
944    /// }
945    /// ```
946    pub total_first_poll_delay: Duration,
947
948    /// The total number of times that tasks idled, waiting to be awoken.
949    ///
950    /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
951    /// task completes a poll, and the instant that it is next awoken.
952    ///
953    /// ##### Derived metrics
954    /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
955    ///   The mean duration of idles.
956    ///
957    /// ##### Examples
958    /// ```
959    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
960    /// async fn main() {
961    ///     let monitor = tokio_metrics::TaskMonitor::new();
962    ///     let mut interval = monitor.intervals();
963    ///     let mut next_interval = move || interval.next().unwrap();
964    ///     let one_sec = std::time::Duration::from_secs(1);
965    ///
966    ///     monitor.instrument(async {}).await;
967    ///
968    ///     assert_eq!(next_interval().total_idled_count, 0);
969    ///     assert_eq!(monitor.cumulative().total_idled_count, 0);
970    ///
971    ///     monitor.instrument(async move {
972    ///         tokio::time::sleep(one_sec).await;
973    ///     }).await;
974    ///
975    ///     assert_eq!(next_interval().total_idled_count, 1);
976    ///     assert_eq!(monitor.cumulative().total_idled_count, 1);
977    ///
978    ///     monitor.instrument(async {
979    ///         tokio::time::sleep(one_sec).await;
980    ///         tokio::time::sleep(one_sec).await;
981    ///     }).await;
982    ///
983    ///     assert_eq!(next_interval().total_idled_count, 2);
984    ///     assert_eq!(monitor.cumulative().total_idled_count, 3);
985    /// }
986    /// ```
987    pub total_idled_count: u64,
988
989    /// The total duration that tasks idled.
990    ///
991    /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
992    /// task completes a poll, and the instant that it is next awoken.
993    ///
994    /// ##### Derived metrics
995    /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
996    ///   The mean duration of idles.
997    ///
998    /// ##### Examples
999    /// ```
1000    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
1001    /// async fn main() {
1002    ///     let monitor = tokio_metrics::TaskMonitor::new();
1003    ///     let mut interval = monitor.intervals();
1004    ///     let mut next_interval = move || interval.next().unwrap();
1005    ///     let one_sec = std::time::Duration::from_secs(1);
1006    ///     let two_sec = std::time::Duration::from_secs(2);
1007    ///
1008    ///     assert_eq!(next_interval().total_idle_duration.as_nanos(), 0);
1009    ///     assert_eq!(monitor.cumulative().total_idle_duration.as_nanos(), 0);
1010    ///
1011    ///     monitor.instrument(async move {
1012    ///         tokio::time::sleep(one_sec).await;
1013    ///     }).await;
1014    ///
1015    ///     assert_eq!(next_interval().total_idle_duration, one_sec);
1016    ///     assert_eq!(monitor.cumulative().total_idle_duration, one_sec);
1017    ///
1018    ///     monitor.instrument(async move {
1019    ///         tokio::time::sleep(two_sec).await;
1020    ///     }).await;
1021    ///
1022    ///     assert_eq!(next_interval().total_idle_duration, two_sec);
1023    ///     assert_eq!(monitor.cumulative().total_idle_duration, one_sec + two_sec);
1024    /// }
1025    /// ```
1026    pub total_idle_duration: Duration,
1027
1028    /// The maximum idle duration that a task took.
1029    ///
1030    /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
1031    /// task completes a poll, and the instant that it is next awoken.
1032    ///
1033    /// ##### Examples
1034    /// ```
1035    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
1036    /// async fn main() {
1037    ///     let monitor = tokio_metrics::TaskMonitor::new();
1038    ///     let mut interval = monitor.intervals();
1039    ///     let mut next_interval = move || interval.next().unwrap();
1040    ///     let one_sec = std::time::Duration::from_secs(1);
1041    ///     let two_sec = std::time::Duration::from_secs(2);
1042    ///
1043    ///     assert_eq!(next_interval().max_idle_duration.as_nanos(), 0);
1044    ///     assert_eq!(monitor.cumulative().max_idle_duration.as_nanos(), 0);
1045    ///
1046    ///     monitor.instrument(async move {
1047    ///         tokio::time::sleep(one_sec).await;
1048    ///     }).await;
1049    ///
1050    ///     assert_eq!(next_interval().max_idle_duration, one_sec);
1051    ///     assert_eq!(monitor.cumulative().max_idle_duration, one_sec);
1052    ///
1053    ///     monitor.instrument(async move {
1054    ///         tokio::time::sleep(two_sec).await;
1055    ///     }).await;
1056    ///
1057    ///     assert_eq!(next_interval().max_idle_duration, two_sec);
1058    ///     assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
1059    ///
1060    ///     monitor.instrument(async move {
1061    ///         tokio::time::sleep(one_sec).await;
1062    ///     }).await;
1063    ///
1064    ///     assert_eq!(next_interval().max_idle_duration, one_sec);
1065    ///     assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
1066    /// }
1067    /// ```
1068    pub max_idle_duration: Duration,
1069
1070    /// The total number of times that tasks were awoken (and then, presumably, scheduled for
1071    /// execution).
1072    ///
1073    /// ##### Definition
1074    /// This metric is equal to [`total_short_delay_count`][TaskMetrics::total_short_delay_count]
1075    /// \+ [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
1076    ///
1077    /// ##### Derived metrics
1078    /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
1079    ///   The mean duration that tasks spent waiting to be executed after awakening.
1080    ///
1081    /// ##### Examples
1082    /// In the below example, a task yields to the scheduler a varying number of times between
1083    /// sampling intervals; this metric is equal to the number of times the task yielded:
1084    /// ```
1085    /// #[tokio::main]
1086    /// async fn main(){
1087    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1088    ///
1089    ///     // [A] no tasks have been created, instrumented, and polled more than once
1090    ///     assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
1091    ///
1092    ///     // [B] a `task` is created and instrumented
1093    ///     let task = {
1094    ///         let monitor = metrics_monitor.clone();
1095    ///         metrics_monitor.instrument(async move {
1096    ///             let mut interval = monitor.intervals();
1097    ///             let mut next_interval = move || interval.next().unwrap();
1098    ///
1099    ///             // [E] `task` has not yet yielded to the scheduler, and
1100    ///             // thus has not yet been scheduled since its first `poll`
1101    ///             assert_eq!(next_interval().total_scheduled_count, 0);
1102    ///
1103    ///             tokio::task::yield_now().await; // yield to the scheduler
1104    ///
1105    ///             // [F] `task` has yielded to the scheduler once (and thus been
1106    ///             // scheduled once) since the last sampling interval
1107    ///             assert_eq!(next_interval().total_scheduled_count, 1);
1108    ///
1109    ///             tokio::task::yield_now().await; // yield to the scheduler
1110    ///             tokio::task::yield_now().await; // yield to the scheduler
1111    ///             tokio::task::yield_now().await; // yield to the scheduler
1112    ///
1113    ///             // [G] `task` has yielded to the scheduler thrice (and thus been
1114    ///             // scheduled thrice) since the last sampling interval
1115    ///             assert_eq!(next_interval().total_scheduled_count, 3);
1116    ///
1117    ///             tokio::task::yield_now().await; // yield to the scheduler
1118    ///
1119    ///             next_interval
1120    ///         })
1121    ///     };
1122    ///
1123    ///     // [C] `task` has not yet been polled at all
1124    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1125    ///     assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
1126    ///
1127    ///     // [D] poll `task` to completion
1128    ///     let mut next_interval = task.await;
1129    ///
1130    ///     // [H] `task` has been polled 1 times since the last sample
1131    ///     assert_eq!(next_interval().total_scheduled_count, 1);
1132    ///
1133    ///     // [I] `task` has been polled 0 times since the last sample
1134    ///     assert_eq!(next_interval().total_scheduled_count, 0);
1135    ///
1136    ///     // [J] `task` has yielded to the scheduler a total of five times
1137    ///     assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 5);
1138    /// }
1139    /// ```
1140    #[doc(alias = "total_delay_count")]
1141    pub total_scheduled_count: u64,
1142
1143    /// The total duration that tasks spent waiting to be polled after awakening.
1144    ///
1145    /// ##### Definition
1146    /// This metric is equal to [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration]
1147    /// \+ [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration].
1148    ///
1149    /// ##### Derived metrics
1150    /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
1151    ///   The mean duration that tasks spent waiting to be executed after awakening.
1152    ///
1153    /// ##### Examples
1154    /// ```
1155    /// use tokio::time::Duration;
1156    ///
1157    /// #[tokio::main(flavor = "current_thread")]
1158    /// async fn main() {
1159    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1160    ///     let mut interval = metrics_monitor.intervals();
1161    ///     let mut next_interval = || interval.next().unwrap();
1162    ///
1163    ///     // construct and instrument and spawn a task that yields endlessly
1164    ///     tokio::spawn(metrics_monitor.instrument(async {
1165    ///         loop { tokio::task::yield_now().await }
1166    ///     }));
1167    ///
1168    ///     tokio::task::yield_now().await;
1169    ///
1170    ///     // block the executor for 1 second
1171    ///     std::thread::sleep(Duration::from_millis(1000));
1172    ///
1173    ///     tokio::task::yield_now().await;
1174    ///
1175    ///     // `endless_task` will have spent approximately one second waiting
1176    ///     let total_scheduled_duration = next_interval().total_scheduled_duration;
1177    ///     assert!(total_scheduled_duration >= Duration::from_millis(1000));
1178    ///     assert!(total_scheduled_duration <= Duration::from_millis(1100));
1179    /// }
1180    /// ```
1181    #[doc(alias = "total_delay_duration")]
1182    pub total_scheduled_duration: Duration,
1183
1184    /// The total number of times that tasks were polled.
1185    ///
1186    /// ##### Definition
1187    /// This metric is equal to [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count]
1188    /// \+ [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
1189    ///
1190    /// ##### Derived metrics
1191    /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
1192    ///   The mean duration of polls.
1193    ///
1194    /// ##### Examples
1195    /// In the below example, a task with multiple yield points is await'ed to completion; this
1196    /// metric reflects the number of `await`s within each sampling interval:
1197    /// ```
1198    /// #[tokio::main]
1199    /// async fn main() {
1200    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1201    ///
1202    ///     // [A] no tasks have been created, instrumented, and polled more than once
1203    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1204    ///
1205    ///     // [B] a `task` is created and instrumented
1206    ///     let task = {
1207    ///         let monitor = metrics_monitor.clone();
1208    ///         metrics_monitor.instrument(async move {
1209    ///             let mut interval = monitor.intervals();
1210    ///             let mut next_interval = move || interval.next().unwrap();
1211    ///
1212    ///             // [E] task is in the midst of its first poll
1213    ///             assert_eq!(next_interval().total_poll_count, 0);
1214    ///
1215    ///             tokio::task::yield_now().await; // poll 1
1216    ///
1217    ///             // [F] task has been polled 1 time
1218    ///             assert_eq!(next_interval().total_poll_count, 1);
1219    ///
1220    ///             tokio::task::yield_now().await; // poll 2
1221    ///             tokio::task::yield_now().await; // poll 3
1222    ///             tokio::task::yield_now().await; // poll 4
1223    ///
1224    ///             // [G] task has been polled 3 times
1225    ///             assert_eq!(next_interval().total_poll_count, 3);
1226    ///
1227    ///             tokio::task::yield_now().await; // poll 5
1228    ///
1229    ///             next_interval                   // poll 6
1230    ///         })
1231    ///     };
1232    ///
1233    ///     // [C] `task` has not yet been polled at all
1234    ///     assert_eq!(metrics_monitor.cumulative().total_poll_count, 0);
1235    ///
1236    ///     // [D] poll `task` to completion
1237    ///     let mut next_interval = task.await;
1238    ///
1239    ///     // [H] `task` has been polled 2 times since the last sample
1240    ///     assert_eq!(next_interval().total_poll_count, 2);
1241    ///
1242    ///     // [I] `task` has been polled 0 times since the last sample
1243    ///     assert_eq!(next_interval().total_poll_count, 0);
1244    ///
1245    ///     // [J] `task` has been polled 6 times
1246    ///     assert_eq!(metrics_monitor.cumulative().total_poll_count, 6);
1247    /// }
1248    /// ```
1249    pub total_poll_count: u64,
1250
1251    /// The total duration elapsed during polls.
1252    ///
1253    /// ##### Definition
1254    /// This metric is equal to [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration]
1255    /// \+ [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration].
1256    ///
1257    /// ##### Derived metrics
1258    /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
1259    ///   The mean duration of polls.
1260    ///
1261    /// #### Examples
1262    /// ```
1263    /// use tokio::time::Duration;
1264    ///
1265    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
1266    /// async fn main() {
1267    ///     let monitor = tokio_metrics::TaskMonitor::new();
1268    ///     let mut interval = monitor.intervals();
1269    ///     let mut next_interval = move || interval.next().unwrap();
1270    ///
1271    ///     assert_eq!(next_interval().total_poll_duration, Duration::ZERO);
1272    ///
1273    ///     monitor.instrument(async {
1274    ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
1275    ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
1276    ///         ()                                                  // poll 3 (0s)
1277    ///     }).await;
1278    ///
1279    ///     assert_eq!(next_interval().total_poll_duration, Duration::from_secs(2));
1280    /// }
1281    /// ```
1282    pub total_poll_duration: Duration,
1283
1284    /// The total number of times that polling tasks completed swiftly.
1285    ///
1286    /// Here, 'swiftly' is defined as completing in strictly less time than
1287    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1288    ///
1289    /// ##### Derived metrics
1290    /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**
1291    ///   The mean duration of fast polls.
1292    ///
1293    /// ##### Examples
1294    /// In the below example, 0 polls occur within the first sampling interval, 3 fast polls occur
1295    /// within the second sampling interval, and 2 fast polls occur within the third sampling
1296    /// interval:
1297    /// ```
1298    /// use std::future::Future;
1299    /// use std::time::Duration;
1300    ///
1301    /// #[tokio::main]
1302    /// async fn main() {
1303    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1304    ///     let mut interval = metrics_monitor.intervals();
1305    ///     let mut next_interval = || interval.next().unwrap();
1306    ///
1307    ///     // no tasks have been constructed, instrumented, or polled
1308    ///     assert_eq!(next_interval().total_fast_poll_count, 0);
1309    ///
1310    ///     let fast = Duration::ZERO;
1311    ///
1312    ///     // this task completes in three fast polls
1313    ///     let _ = metrics_monitor.instrument(async {
1314    ///         spin_for(fast).await; // fast poll 1
1315    ///         spin_for(fast).await; // fast poll 2
1316    ///         spin_for(fast)        // fast poll 3
1317    ///     }).await;
1318    ///
1319    ///     assert_eq!(next_interval().total_fast_poll_count, 3);
1320    ///
1321    ///     // this task completes in two fast polls
1322    ///     let _ = metrics_monitor.instrument(async {
1323    ///         spin_for(fast).await; // fast poll 1
1324    ///         spin_for(fast)        // fast poll 2
1325    ///     }).await;
1326    ///
1327    ///     assert_eq!(next_interval().total_fast_poll_count, 2);
1328    /// }
1329    ///
1330    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1331    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1332    ///     let start = tokio::time::Instant::now();
1333    ///     while start.elapsed() <= duration {}
1334    ///     tokio::task::yield_now()
1335    /// }
1336    /// ```
1337    pub total_fast_poll_count: u64,
1338
1339    /// The total duration of fast polls.
1340    ///
1341    /// Here, 'fast' is defined as completing in strictly less time than
1342    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1343    ///
1344    /// ##### Derived metrics
1345    /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**
1346    ///   The mean duration of fast polls.
1347    ///
1348    /// ##### Examples
1349    /// In the below example, no tasks are polled in the first sampling interval; three fast polls
1350    /// consume a total of 3μs time in the second sampling interval; and two fast polls consume a
1351    /// total of 2μs time in the third sampling interval:
1352    /// ```
1353    /// use std::future::Future;
1354    /// use std::time::Duration;
1355    ///
1356    /// #[tokio::main]
1357    /// async fn main() {
1358    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1359    ///     let mut interval = metrics_monitor.intervals();
1360    ///     let mut next_interval = || interval.next().unwrap();
1361    ///
1362    ///     // no tasks have been constructed, instrumented, or polled
1363    ///     let interval = next_interval();
1364    ///     assert_eq!(interval.total_fast_poll_duration, Duration::ZERO);
1365    ///
1366    ///     let fast = Duration::from_micros(1);
1367    ///
1368    ///     // this task completes in three fast polls
1369    ///     let task_a_time = time(metrics_monitor.instrument(async {
1370    ///         spin_for(fast).await; // fast poll 1
1371    ///         spin_for(fast).await; // fast poll 2
1372    ///         spin_for(fast)        // fast poll 3
1373    ///     })).await;
1374    ///
1375    ///     let interval = next_interval();
1376    ///     assert!(interval.total_fast_poll_duration >= fast * 3);
1377    ///     assert!(interval.total_fast_poll_duration <= task_a_time);
1378    ///
1379    ///     // this task completes in two fast polls
1380    ///     let task_b_time = time(metrics_monitor.instrument(async {
1381    ///         spin_for(fast).await; // fast poll 1
1382    ///         spin_for(fast)        // fast poll 2
1383    ///     })).await;
1384    ///
1385    ///     let interval = next_interval();
1386    ///     assert!(interval.total_fast_poll_duration >= fast * 2);
1387    ///     assert!(interval.total_fast_poll_duration <= task_b_time);
1388    /// }
1389    ///
1390    /// /// Produces the amount of time it took to await a given async task.
1391    /// async fn time(task: impl Future) -> Duration {
1392    ///     let start = tokio::time::Instant::now();
1393    ///     task.await;
1394    ///     start.elapsed()
1395    /// }
1396    ///
1397    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1398    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1399    ///     let start = tokio::time::Instant::now();
1400    ///     while start.elapsed() <= duration {}
1401    ///     tokio::task::yield_now()
1402    /// }
1403    /// ```
1404    pub total_fast_poll_duration: Duration,
1405
1406    /// The total number of times that polling tasks completed slowly.
1407    ///
1408    /// Here, 'slowly' is defined as completing in at least as much time as
1409    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1410    ///
1411    /// ##### Derived metrics
1412    /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**
1413    ///   The mean duration of slow polls.
1414    ///
1415    /// ##### Examples
1416    /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1417    /// within the second sampling interval, and 2 slow polls occur within the third sampling
1418    /// interval:
1419    /// ```
1420    /// use std::future::Future;
1421    /// use std::time::Duration;
1422    ///
1423    /// #[tokio::main]
1424    /// async fn main() {
1425    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1426    ///     let mut interval = metrics_monitor.intervals();
1427    ///     let mut next_interval = || interval.next().unwrap();
1428    ///
1429    ///     // no tasks have been constructed, instrumented, or polled
1430    ///     assert_eq!(next_interval().total_slow_poll_count, 0);
1431    ///
1432    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1433    ///
1434    ///     // this task completes in three slow polls
1435    ///     let _ = metrics_monitor.instrument(async {
1436    ///         spin_for(slow).await; // slow poll 1
1437    ///         spin_for(slow).await; // slow poll 2
1438    ///         spin_for(slow)        // slow poll 3
1439    ///     }).await;
1440    ///
1441    ///     assert_eq!(next_interval().total_slow_poll_count, 3);
1442    ///
1443    ///     // this task completes in two slow polls
1444    ///     let _ = metrics_monitor.instrument(async {
1445    ///         spin_for(slow).await; // slow poll 1
1446    ///         spin_for(slow)        // slow poll 2
1447    ///     }).await;
1448    ///
1449    ///     assert_eq!(next_interval().total_slow_poll_count, 2);
1450    /// }
1451    ///
1452    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1453    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1454    ///     let start = tokio::time::Instant::now();
1455    ///     while start.elapsed() <= duration {}
1456    ///     tokio::task::yield_now()
1457    /// }
1458    /// ```
1459    pub total_slow_poll_count: u64,
1460
1461    /// The total duration of slow polls.
1462    ///
1463    /// Here, 'slowly' is defined as completing in at least as much time as
1464    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1465    ///
1466    /// ##### Derived metrics
1467    /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**
1468    ///   The mean duration of slow polls.
1469    ///
1470    /// ##### Examples
1471    /// In the below example, no tasks are polled in the first sampling interval; three slow polls
1472    /// consume a total of
1473    /// 30 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD]
1474    /// time in the second sampling interval; and two slow polls consume a total of
1475    /// 20 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
1476    /// third sampling interval:
1477    /// ```
1478    /// use std::future::Future;
1479    /// use std::time::Duration;
1480    ///
1481    /// #[tokio::main]
1482    /// async fn main() {
1483    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1484    ///     let mut interval = metrics_monitor.intervals();
1485    ///     let mut next_interval = || interval.next().unwrap();
1486    ///
1487    ///     // no tasks have been constructed, instrumented, or polled
1488    ///     let interval = next_interval();
1489    ///     assert_eq!(interval.total_slow_poll_duration, Duration::ZERO);
1490    ///
1491    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1492    ///
1493    ///     // this task completes in three slow polls
1494    ///     let task_a_time = time(metrics_monitor.instrument(async {
1495    ///         spin_for(slow).await; // slow poll 1
1496    ///         spin_for(slow).await; // slow poll 2
1497    ///         spin_for(slow)        // slow poll 3
1498    ///     })).await;
1499    ///
1500    ///     let interval = next_interval();
1501    ///     assert!(interval.total_slow_poll_duration >= slow * 3);
1502    ///     assert!(interval.total_slow_poll_duration <= task_a_time);
1503    ///
1504    ///     // this task completes in two slow polls
1505    ///     let task_b_time = time(metrics_monitor.instrument(async {
1506    ///         spin_for(slow).await; // slow poll 1
1507    ///         spin_for(slow)        // slow poll 2
1508    ///     })).await;
1509    ///
1510    ///     let interval = next_interval();
1511    ///     assert!(interval.total_slow_poll_duration >= slow * 2);
1512    ///     assert!(interval.total_slow_poll_duration <= task_b_time);
1513    /// }
1514    ///
1515    /// /// Produces the amount of time it took to await a given async task.
1516    /// async fn time(task: impl Future) -> Duration {
1517    ///     let start = tokio::time::Instant::now();
1518    ///     task.await;
1519    ///     start.elapsed()
1520    /// }
1521    ///
1522    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1523    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1524    ///     let start = tokio::time::Instant::now();
1525    ///     while start.elapsed() <= duration {}
1526    ///     tokio::task::yield_now()
1527    /// }
1528    /// ```
1529    pub total_slow_poll_duration: Duration,
1530
1531    /// The total count of tasks with short scheduling delays.
1532    ///
1533    /// This is defined as tasks taking strictly less than
1534    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1535    /// scheduled.
1536    ///
1537    /// ##### Derived metrics
1538    /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**
1539    ///   The mean duration of short scheduling delays.
1540    pub total_short_delay_count: u64,
1541
1542    /// The total duration of tasks with short scheduling delays.
1543    ///
1544    /// This is defined as tasks taking strictly less than
1545    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1546    /// scheduled.
1547    ///
1548    /// ##### Derived metrics
1549    /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**
1550    ///   The mean duration of short scheduling delays.
1551    pub total_short_delay_duration: Duration,
1552
1553    /// The total count of tasks with long scheduling delays.
1554    ///
1555    /// This is defined as tasks taking
1556    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] or longer to be executed
1557    /// after being scheduled.
1558    ///
1559    /// ##### Derived metrics
1560    /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**
1561    ///   The mean duration of short scheduling delays.
1562    pub total_long_delay_count: u64,
1563
1564    /// The total duration of tasks with long scheduling delays.
1565    ///
1566    /// This is defined as tasks taking
1567    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] or longer to be executed
1568    /// after being scheduled.
1569    ///
1570    /// ##### Derived metrics
1571    /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**
1572    ///   The mean duration of short scheduling delays.
1573    pub total_long_delay_duration: Duration,
1574}
1575
1576/// Tracks the metrics, shared across the various types.
1577#[derive(Debug)]
1578struct RawMetrics {
1579    /// A task poll takes longer than this, it is considered a slow poll.
1580    slow_poll_threshold: Duration,
1581
1582    /// A scheduling delay of at least this long will be considered a long delay
1583    long_delay_threshold: Duration,
1584
1585    /// Total number of instrumented tasks.
1586    instrumented_count: AtomicU64,
1587
1588    /// Total number of instrumented tasks polled at least once.
1589    first_poll_count: AtomicU64,
1590
1591    /// Total number of times tasks entered the `idle` state.
1592    total_idled_count: AtomicU64,
1593
1594    /// Total number of times tasks were scheduled.
1595    total_scheduled_count: AtomicU64,
1596
1597    /// Total number of times tasks were polled fast
1598    total_fast_poll_count: AtomicU64,
1599
1600    /// Total number of times tasks were polled slow
1601    total_slow_poll_count: AtomicU64,
1602
1603    /// Total number of times tasks had long delay,
1604    total_long_delay_count: AtomicU64,
1605
1606    /// Total number of times tasks had little delay
1607    total_short_delay_count: AtomicU64,
1608
1609    /// Total number of times tasks were dropped
1610    dropped_count: AtomicU64,
1611
1612    /// Total amount of time until the first poll
1613    total_first_poll_delay_ns: AtomicU64,
1614
1615    /// Total amount of time tasks spent in the `idle` state.
1616    total_idle_duration_ns: AtomicU64,
1617
1618    /// The longest time tasks spent in the `idle` state locally.
1619    /// This will be used to track the local max between interval
1620    /// metric snapshots.
1621    local_max_idle_duration_ns: AtomicU64,
1622
1623    /// The longest time tasks spent in the `idle` state.
1624    global_max_idle_duration_ns: AtomicU64,
1625
1626    /// Total amount of time tasks spent in the waking state.
1627    total_scheduled_duration_ns: AtomicU64,
1628
1629    /// Total amount of time tasks spent being polled below the slow cut off.
1630    total_fast_poll_duration_ns: AtomicU64,
1631
1632    /// Total amount of time tasks spent being polled above the slow cut off.
1633    total_slow_poll_duration: AtomicU64,
1634
1635    /// Total amount of time tasks spent being polled below the long delay cut off.
1636    total_short_delay_duration_ns: AtomicU64,
1637
1638    /// Total amount of time tasks spent being polled at or above the long delay cut off.
1639    total_long_delay_duration_ns: AtomicU64,
1640}
1641
1642#[derive(Debug)]
1643struct State<M> {
1644    /// Where metrics should be recorded
1645    monitor: M,
1646
1647    /// Instant at which the task was instrumented. This is used to track the time to first poll.
1648    instrumented_at: Instant,
1649
1650    /// The instant, tracked as nanoseconds since `instrumented_at`, at which the future
1651    /// was last woken.
1652    woke_at: AtomicU64,
1653
1654    /// Waker to forward notifications to.
1655    waker: AtomicWaker,
1656}
1657
1658impl TaskMonitor {
1659    /// The default duration at which polls cross the threshold into being categorized as 'slow' is
1660    /// 50μs.
1661    #[cfg(not(test))]
1662    pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_micros(50);
1663    #[cfg(test)]
1664    #[allow(missing_docs)]
1665    pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_millis(500);
1666
1667    /// The default duration at which schedules cross the threshold into being categorized as 'long'
1668    /// is 50μs.
1669    #[cfg(not(test))]
1670    pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_micros(50);
1671    #[cfg(test)]
1672    #[allow(missing_docs)]
1673    pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_millis(500);
1674
1675    /// Constructs a new task monitor.
1676    ///
1677    /// Uses [`Self::DEFAULT_SLOW_POLL_THRESHOLD`] as the threshold at which polls will be
1678    /// considered 'slow'.
1679    ///
1680    /// Uses [`Self::DEFAULT_LONG_DELAY_THRESHOLD`] as the threshold at which scheduling will be
1681    /// considered 'long'.
1682    pub fn new() -> TaskMonitor {
1683        TaskMonitor::with_slow_poll_threshold(Self::DEFAULT_SLOW_POLL_THRESHOLD)
1684    }
1685
1686    /// Constructs a builder for a task monitor.
1687    pub fn builder() -> TaskMonitorBuilder {
1688        TaskMonitorBuilder::new()
1689    }
1690
1691    /// Constructs a new task monitor with a given threshold at which polls are considered 'slow'.
1692    ///
1693    /// ##### Selecting an appropriate threshold
1694    /// TODO. What advice can we give here?
1695    ///
1696    /// ##### Examples
1697    /// In the below example, low-threshold and high-threshold monitors are constructed and
1698    /// instrument identical tasks; the low-threshold monitor reports4 slow polls, and the
1699    /// high-threshold monitor reports only 2 slow polls:
1700    /// ```
1701    /// use std::future::Future;
1702    /// use std::time::Duration;
1703    /// use tokio_metrics::TaskMonitor;
1704    ///
1705    /// #[tokio::main]
1706    /// async fn main() {
1707    ///     let lo_threshold = Duration::from_micros(10);
1708    ///     let hi_threshold = Duration::from_millis(10);
1709    ///
1710    ///     let lo_monitor = TaskMonitor::with_slow_poll_threshold(lo_threshold);
1711    ///     let hi_monitor = TaskMonitor::with_slow_poll_threshold(hi_threshold);
1712    ///
1713    ///     let make_task = || async {
1714    ///         spin_for(lo_threshold).await; // faster poll 1
1715    ///         spin_for(lo_threshold).await; // faster poll 2
1716    ///         spin_for(hi_threshold).await; // slower poll 3
1717    ///         spin_for(hi_threshold).await  // slower poll 4
1718    ///     };
1719    ///
1720    ///     lo_monitor.instrument(make_task()).await;
1721    ///     hi_monitor.instrument(make_task()).await;
1722    ///
1723    ///     // the low-threshold monitor reported 4 slow polls:
1724    ///     assert_eq!(lo_monitor.cumulative().total_slow_poll_count, 4);
1725    ///     // the high-threshold monitor reported only 2 slow polls:
1726    ///     assert_eq!(hi_monitor.cumulative().total_slow_poll_count, 2);
1727    /// }
1728    ///
1729    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1730    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1731    ///     let start = tokio::time::Instant::now();
1732    ///     while start.elapsed() <= duration {}
1733    ///     tokio::task::yield_now()
1734    /// }
1735    /// ```
1736    pub fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitor {
1737        let base = TaskMonitorCore::create(slow_poll_cut_off, Self::DEFAULT_LONG_DELAY_THRESHOLD);
1738        TaskMonitor {
1739            base: Arc::new(base),
1740        }
1741    }
1742
1743    /// Produces the duration greater-than-or-equal-to at which polls are categorized as slow.
1744    ///
1745    /// ##### Examples
1746    /// In the below example, [`TaskMonitor`] is initialized with [`TaskMonitor::new`];
1747    /// consequently, its slow-poll threshold equals [`TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD`]:
1748    /// ```
1749    /// use tokio_metrics::TaskMonitor;
1750    ///
1751    /// #[tokio::main]
1752    /// async fn main() {
1753    ///     let metrics_monitor = TaskMonitor::new();
1754    ///
1755    ///     assert_eq!(
1756    ///         metrics_monitor.slow_poll_threshold(),
1757    ///         TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD
1758    ///     );
1759    /// }
1760    /// ```
1761    pub fn slow_poll_threshold(&self) -> Duration {
1762        self.base.metrics.slow_poll_threshold
1763    }
1764
1765    /// Produces the duration greater-than-or-equal-to at which scheduling delays are categorized
1766    /// as long.
1767    pub fn long_delay_threshold(&self) -> Duration {
1768        self.base.metrics.long_delay_threshold
1769    }
1770
1771    /// Produces an instrumented façade around a given async task.
1772    ///
1773    /// ##### Examples
1774    /// Instrument an async task by passing it to [`TaskMonitor::instrument`]:
1775    /// ```
1776    /// #[tokio::main]
1777    /// async fn main() {
1778    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1779    ///
1780    ///     // 0 tasks have been instrumented, much less polled
1781    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1782    ///
1783    ///     // instrument a task and poll it to completion
1784    ///     metrics_monitor.instrument(async {}).await;
1785    ///
1786    ///     // 1 task has been instrumented and polled
1787    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 1);
1788    ///
1789    ///     // instrument a task and poll it to completion
1790    ///     metrics_monitor.instrument(async {}).await;
1791    ///
1792    ///     // 2 tasks have been instrumented and polled
1793    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 2);
1794    /// }
1795    /// ```
1796    /// An aync task may be tracked by multiple [`TaskMonitor`]s; e.g.:
1797    /// ```
1798    /// #[tokio::main]
1799    /// async fn main() {
1800    ///     let monitor_a = tokio_metrics::TaskMonitor::new();
1801    ///     let monitor_b = tokio_metrics::TaskMonitor::new();
1802    ///
1803    ///     // 0 tasks have been instrumented, much less polled
1804    ///     assert_eq!(monitor_a.cumulative().first_poll_count, 0);
1805    ///     assert_eq!(monitor_b.cumulative().first_poll_count, 0);
1806    ///
1807    ///     // instrument a task and poll it to completion
1808    ///     monitor_a.instrument(monitor_b.instrument(async {})).await;
1809    ///
1810    ///     // 1 task has been instrumented and polled
1811    ///     assert_eq!(monitor_a.cumulative().first_poll_count, 1);
1812    ///     assert_eq!(monitor_b.cumulative().first_poll_count, 1);
1813    /// }
1814    /// ```
1815    /// It is also possible (but probably undesirable) to instrument an async task multiple times
1816    /// with the same [`TaskMonitor`]; e.g.:
1817    /// ```
1818    /// #[tokio::main]
1819    /// async fn main() {
1820    ///     let monitor = tokio_metrics::TaskMonitor::new();
1821    ///
1822    ///     // 0 tasks have been instrumented, much less polled
1823    ///     assert_eq!(monitor.cumulative().first_poll_count, 0);
1824    ///
1825    ///     // instrument a task and poll it to completion
1826    ///     monitor.instrument(monitor.instrument(async {})).await;
1827    ///
1828    ///     // 2 tasks have been instrumented and polled, supposedly
1829    ///     assert_eq!(monitor.cumulative().first_poll_count, 2);
1830    /// }
1831    /// ```
1832    pub fn instrument<F>(&self, task: F) -> Instrumented<F> {
1833        TaskMonitorCore::instrument_with(task, self.clone())
1834    }
1835
1836    /// Produces [`TaskMetrics`] for the tasks instrumented by this [`TaskMonitor`], collected since
1837    /// the construction of [`TaskMonitor`].
1838    ///
1839    /// ##### See also
1840    /// - [`TaskMonitor::intervals`]:
1841    ///   produces [`TaskMetrics`] for user-defined sampling intervals, instead of cumulatively
1842    ///
1843    /// ##### Examples
1844    /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1845    /// within the second sampling interval, and 2 slow polls occur within the third sampling
1846    /// interval; five slow polls occur across all sampling intervals:
1847    /// ```
1848    /// use std::future::Future;
1849    /// use std::time::Duration;
1850    ///
1851    /// #[tokio::main]
1852    /// async fn main() {
1853    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1854    ///
1855    ///     // initialize a stream of sampling intervals
1856    ///     let mut intervals = metrics_monitor.intervals();
1857    ///     // each call of `next_interval` will produce metrics for the last sampling interval
1858    ///     let mut next_interval = || intervals.next().unwrap();
1859    ///
1860    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1861    ///
1862    ///     // this task completes in three slow polls
1863    ///     let _ = metrics_monitor.instrument(async {
1864    ///         spin_for(slow).await; // slow poll 1
1865    ///         spin_for(slow).await; // slow poll 2
1866    ///         spin_for(slow)        // slow poll 3
1867    ///     }).await;
1868    ///
1869    ///     // in the previous sampling interval, there were 3 slow polls
1870    ///     assert_eq!(next_interval().total_slow_poll_count, 3);
1871    ///     assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 3);
1872    ///
1873    ///     // this task completes in two slow polls
1874    ///     let _ = metrics_monitor.instrument(async {
1875    ///         spin_for(slow).await; // slow poll 1
1876    ///         spin_for(slow)        // slow poll 2
1877    ///     }).await;
1878    ///
1879    ///     // in the previous sampling interval, there were 2 slow polls
1880    ///     assert_eq!(next_interval().total_slow_poll_count, 2);
1881    ///
1882    ///     // across all sampling interval, there were a total of 5 slow polls
1883    ///     assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1884    /// }
1885    ///
1886    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1887    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1888    ///     let start = tokio::time::Instant::now();
1889    ///     while start.elapsed() <= duration {}
1890    ///     tokio::task::yield_now()
1891    /// }
1892    /// ```
1893    pub fn cumulative(&self) -> TaskMetrics {
1894        self.base.metrics.metrics()
1895    }
1896
1897    /// Produces an unending iterator of metric sampling intervals.
1898    ///
1899    /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1900    /// produced by [`TaskMonitor::intervals`]. The item type of this iterator is [`TaskMetrics`],
1901    /// which is a bundle of task metrics that describe *only* events occurring within that sampling
1902    /// interval.
1903    ///
1904    /// ##### Examples
1905    /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1906    /// within the second sampling interval, and 2 slow polls occur within the third sampling
1907    /// interval; five slow polls occur across all sampling intervals:
1908    /// ```
1909    /// use std::future::Future;
1910    /// use std::time::Duration;
1911    ///
1912    /// #[tokio::main]
1913    /// async fn main() {
1914    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1915    ///
1916    ///     // initialize a stream of sampling intervals
1917    ///     let mut intervals = metrics_monitor.intervals();
1918    ///     // each call of `next_interval` will produce metrics for the last sampling interval
1919    ///     let mut next_interval = || intervals.next().unwrap();
1920    ///
1921    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1922    ///
1923    ///     // this task completes in three slow polls
1924    ///     let _ = metrics_monitor.instrument(async {
1925    ///         spin_for(slow).await; // slow poll 1
1926    ///         spin_for(slow).await; // slow poll 2
1927    ///         spin_for(slow)        // slow poll 3
1928    ///     }).await;
1929    ///
1930    ///     // in the previous sampling interval, there were 3 slow polls
1931    ///     assert_eq!(next_interval().total_slow_poll_count, 3);
1932    ///
1933    ///     // this task completes in two slow polls
1934    ///     let _ = metrics_monitor.instrument(async {
1935    ///         spin_for(slow).await; // slow poll 1
1936    ///         spin_for(slow)        // slow poll 2
1937    ///     }).await;
1938    ///
1939    ///     // in the previous sampling interval, there were 2 slow polls
1940    ///     assert_eq!(next_interval().total_slow_poll_count, 2);
1941    ///
1942    ///     // across all sampling intervals, there were a total of 5 slow polls
1943    ///     assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1944    /// }
1945    ///
1946    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1947    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1948    ///     let start = tokio::time::Instant::now();
1949    ///     while start.elapsed() <= duration {}
1950    ///     tokio::task::yield_now()
1951    /// }
1952    /// ```
1953    pub fn intervals(&self) -> TaskIntervals {
1954        TaskIntervals {
1955            monitor: self.clone(),
1956            previous: None,
1957        }
1958    }
1959}
1960
1961impl TaskMonitorCore {
1962    /// Returns a const-friendly [`TaskMonitorCoreBuilder`].
1963    pub const fn builder() -> TaskMonitorCoreBuilder {
1964        TaskMonitorCoreBuilder::new()
1965    }
1966
1967    /// Constructs a new [`TaskMonitorCore`]. Refer to the struct documentation for more discussion
1968    /// of benefits compared to [`TaskMonitor`].
1969    ///
1970    /// Uses [`TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD`] as the threshold at which polls will be
1971    /// considered 'slow'.
1972    ///
1973    /// Uses [`TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD`] as the threshold at which scheduling will be
1974    /// considered 'long'.
1975    pub const fn new() -> TaskMonitorCore {
1976        TaskMonitorCore::with_slow_poll_threshold(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD)
1977    }
1978
1979    /// Constructs a new task monitor with a given threshold at which polls are considered 'slow'.
1980    ///
1981    /// Refer to [`TaskMonitor::with_slow_poll_threshold`] for examples.
1982    pub const fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitorCore {
1983        Self::create(slow_poll_cut_off, TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD)
1984    }
1985
1986    /// Produces the duration greater-than-or-equal-to at which polls are categorized as slow.
1987    ///
1988    /// Refer to [`TaskMonitor::slow_poll_threshold`] for examples.
1989    pub fn slow_poll_threshold(&self) -> Duration {
1990        self.metrics.slow_poll_threshold
1991    }
1992
1993    /// Produces the duration greater-than-or-equal-to at which scheduling delays are categorized
1994    /// as long.
1995    pub fn long_delay_threshold(&self) -> Duration {
1996        self.metrics.long_delay_threshold
1997    }
1998
1999    /// Produces an instrumented façade around a given async task.
2000    ///
2001    /// ##### Examples
2002    /// ```
2003    /// use tokio_metrics::TaskMonitorCore;
2004    ///
2005    /// static MONITOR: TaskMonitorCore = TaskMonitorCore::new();
2006    ///
2007    /// #[tokio::main]
2008    /// async fn main() {
2009    ///     assert_eq!(MONITOR.cumulative().first_poll_count, 0);
2010    ///
2011    ///     MONITOR.instrument(async {}).await;
2012    ///     assert_eq!(MONITOR.cumulative().first_poll_count, 1);
2013    /// }
2014    /// ```
2015    pub fn instrument<F>(&'static self, task: F) -> Instrumented<F, &'static Self> {
2016        Self::instrument_with(task, self)
2017    }
2018
2019    /// Produces an instrumented façade around a given async task, with an explicit monitor.
2020    ///
2021    /// Use this when you have a non-static monitor reference, such as an `Arc<TaskMonitorCore>`.
2022    ///
2023    /// ##### Examples
2024    /// ```
2025    /// use std::sync::Arc;
2026    /// use tokio_metrics::TaskMonitorCore;
2027    ///
2028    /// #[derive(Clone)]
2029    /// struct SharedState(Arc<SharedStateInner>);
2030    /// struct SharedStateInner {
2031    ///     monitor: TaskMonitorCore,
2032    ///     other_state: SomeOtherSharedState,
2033    /// }
2034    /// /// Imagine: a type that wasn't `Clone` that you want to pass around
2035    /// /// in a similar way as the monitor
2036    /// struct SomeOtherSharedState;
2037    ///
2038    /// impl AsRef<TaskMonitorCore> for SharedState {
2039    ///     fn as_ref(&self) -> &TaskMonitorCore {
2040    ///         &self.0.monitor
2041    ///     }
2042    /// }
2043    ///
2044    /// #[tokio::main]
2045    /// async fn main() {
2046    ///     let state = SharedState(Arc::new(SharedStateInner {
2047    ///         monitor: TaskMonitorCore::new(),
2048    ///         other_state: SomeOtherSharedState,
2049    ///     }));
2050    ///
2051    ///     assert_eq!(state.0.monitor.cumulative().first_poll_count, 0);
2052    ///
2053    ///     TaskMonitorCore::instrument_with(async {}, state.clone()).await;
2054    ///     assert_eq!(state.0.monitor.cumulative().first_poll_count, 1);
2055    /// }
2056    /// ```
2057    pub fn instrument_with<F, M: AsRef<TaskMonitorCore> + Send + Sync + 'static>(
2058        task: F,
2059        monitor: M,
2060    ) -> Instrumented<F, M> {
2061        monitor
2062            .as_ref()
2063            .metrics
2064            .instrumented_count
2065            .fetch_add(1, SeqCst);
2066
2067        let state: State<M> = State {
2068            monitor,
2069            instrumented_at: Instant::now(),
2070            woke_at: AtomicU64::new(0),
2071            waker: AtomicWaker::new(),
2072        };
2073
2074        let instrumented: Instrumented<F, M> = Instrumented {
2075            task,
2076            did_poll_once: false,
2077            idled_at: 0,
2078            state: Arc::new(state),
2079        };
2080
2081        instrumented
2082    }
2083
2084    /// Produces [`TaskMetrics`] for the tasks instrumented by this [`TaskMonitorCore`], collected since
2085    /// the construction of [`TaskMonitorCore`].
2086    ///
2087    /// ##### See also
2088    /// - [`TaskMonitorCore::intervals`]:
2089    ///   produces [`TaskMetrics`] for user-defined sampling intervals, instead of cumulatively
2090    ///
2091    /// See [`TaskMonitor::cumulative`] for examples.
2092    pub fn cumulative(&self) -> TaskMetrics {
2093        self.metrics.metrics()
2094    }
2095
2096    /// Produces an unending iterator of metric sampling intervals.
2097    ///
2098    /// Each sampling interval is defined by the time elapsed between advancements of the iterator
2099    /// produced by [`TaskMonitorCore::intervals`]. The item type of this iterator is [`TaskMetrics`],
2100    /// which is a bundle of task metrics that describe *only* events occurring within that sampling
2101    /// interval.
2102    ///
2103    /// ##### Examples
2104    /// The below example demonstrates construction of [`TaskIntervals`] with [`TaskMonitorCore`].
2105    ///
2106    /// See [`TaskMonitor::intervals`] for more usage examples.
2107    ///
2108    /// ```
2109    /// use std::sync::Arc;
2110    ///
2111    /// fn main() {
2112    ///     let metrics_monitor = Arc::new(tokio_metrics::TaskMonitorCore::new());
2113    ///
2114    ///     let mut _intervals = tokio_metrics::TaskMonitorCore::intervals(metrics_monitor);
2115    /// }
2116    /// ```
2117    pub fn intervals<Monitor: AsRef<TaskMonitorCore> + Send + Sync + 'static>(
2118        monitor: Monitor,
2119    ) -> TaskIntervals<Monitor> {
2120        let intervals: TaskIntervals<Monitor> = TaskIntervals {
2121            monitor,
2122            previous: None,
2123        };
2124
2125        intervals
2126    }
2127}
2128
2129impl AsRef<TaskMonitorCore> for TaskMonitorCore {
2130    fn as_ref(&self) -> &TaskMonitorCore {
2131        self
2132    }
2133}
2134
2135impl TaskMonitorCore {
2136    const fn create(slow_poll_cut_off: Duration, long_delay_cut_off: Duration) -> TaskMonitorCore {
2137        TaskMonitorCore {
2138            metrics: RawMetrics {
2139                slow_poll_threshold: slow_poll_cut_off,
2140                first_poll_count: AtomicU64::new(0),
2141                total_idled_count: AtomicU64::new(0),
2142                total_scheduled_count: AtomicU64::new(0),
2143                total_fast_poll_count: AtomicU64::new(0),
2144                total_slow_poll_count: AtomicU64::new(0),
2145                total_long_delay_count: AtomicU64::new(0),
2146                instrumented_count: AtomicU64::new(0),
2147                dropped_count: AtomicU64::new(0),
2148                total_first_poll_delay_ns: AtomicU64::new(0),
2149                total_scheduled_duration_ns: AtomicU64::new(0),
2150                local_max_idle_duration_ns: AtomicU64::new(0),
2151                global_max_idle_duration_ns: AtomicU64::new(0),
2152                total_idle_duration_ns: AtomicU64::new(0),
2153                total_fast_poll_duration_ns: AtomicU64::new(0),
2154                total_slow_poll_duration: AtomicU64::new(0),
2155                total_short_delay_duration_ns: AtomicU64::new(0),
2156                long_delay_threshold: long_delay_cut_off,
2157                total_short_delay_count: AtomicU64::new(0),
2158                total_long_delay_duration_ns: AtomicU64::new(0),
2159            },
2160        }
2161    }
2162}
2163
2164impl RawMetrics {
2165    fn get_and_reset_local_max_idle_duration(&self) -> Duration {
2166        Duration::from_nanos(self.local_max_idle_duration_ns.swap(0, SeqCst))
2167    }
2168
2169    fn metrics(&self) -> TaskMetrics {
2170        let total_fast_poll_count = self.total_fast_poll_count.load(SeqCst);
2171        let total_slow_poll_count = self.total_slow_poll_count.load(SeqCst);
2172
2173        let total_fast_poll_duration =
2174            Duration::from_nanos(self.total_fast_poll_duration_ns.load(SeqCst));
2175        let total_slow_poll_duration =
2176            Duration::from_nanos(self.total_slow_poll_duration.load(SeqCst));
2177
2178        let total_poll_count = total_fast_poll_count.saturating_add(total_slow_poll_count);
2179        let total_poll_duration = total_fast_poll_duration.saturating_add(total_slow_poll_duration);
2180
2181        TaskMetrics {
2182            instrumented_count: self.instrumented_count.load(SeqCst),
2183            dropped_count: self.dropped_count.load(SeqCst),
2184
2185            total_poll_count,
2186            total_poll_duration,
2187            first_poll_count: self.first_poll_count.load(SeqCst),
2188            total_idled_count: self.total_idled_count.load(SeqCst),
2189            total_scheduled_count: self.total_scheduled_count.load(SeqCst),
2190            total_fast_poll_count: self.total_fast_poll_count.load(SeqCst),
2191            total_slow_poll_count: self.total_slow_poll_count.load(SeqCst),
2192            total_short_delay_count: self.total_short_delay_count.load(SeqCst),
2193            total_long_delay_count: self.total_long_delay_count.load(SeqCst),
2194            total_first_poll_delay: Duration::from_nanos(
2195                self.total_first_poll_delay_ns.load(SeqCst),
2196            ),
2197            max_idle_duration: Duration::from_nanos(self.global_max_idle_duration_ns.load(SeqCst)),
2198            total_idle_duration: Duration::from_nanos(self.total_idle_duration_ns.load(SeqCst)),
2199            total_scheduled_duration: Duration::from_nanos(
2200                self.total_scheduled_duration_ns.load(SeqCst),
2201            ),
2202            total_fast_poll_duration: Duration::from_nanos(
2203                self.total_fast_poll_duration_ns.load(SeqCst),
2204            ),
2205            total_slow_poll_duration: Duration::from_nanos(
2206                self.total_slow_poll_duration.load(SeqCst),
2207            ),
2208            total_short_delay_duration: Duration::from_nanos(
2209                self.total_short_delay_duration_ns.load(SeqCst),
2210            ),
2211            total_long_delay_duration: Duration::from_nanos(
2212                self.total_long_delay_duration_ns.load(SeqCst),
2213            ),
2214        }
2215    }
2216}
2217
2218impl Default for TaskMonitor {
2219    fn default() -> TaskMonitor {
2220        TaskMonitor::new()
2221    }
2222}
2223
2224impl Default for TaskMonitorCore {
2225    fn default() -> TaskMonitorCore {
2226        TaskMonitorCore::new()
2227    }
2228}
2229
2230derived_metrics!(
2231    [TaskMetrics] {
2232        stable {
2233            /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
2234            /// are first polled.
2235            ///
2236            /// ##### Definition
2237            /// This metric is derived from [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay]
2238            /// ÷ [`first_poll_count`][TaskMetrics::first_poll_count].
2239            ///
2240            /// ##### Interpretation
2241            /// If this metric increases, it means that, on average, tasks spent longer waiting to be
2242            /// initially polled.
2243            ///
2244            /// ##### See also
2245            /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
2246            ///   The mean duration that tasks spent waiting to be executed after awakening.
2247            ///
2248            /// ##### Examples
2249            /// In the below example, no tasks are instrumented or polled within the first sampling
2250            /// interval; in the second sampling interval, 500ms elapse between the instrumentation of a
2251            /// task and its first poll; in the third sampling interval, a mean of 750ms elapse between the
2252            /// instrumentation and first poll of two tasks:
2253            /// ```
2254            /// use std::time::Duration;
2255            ///
2256            /// #[tokio::main]
2257            /// async fn main() {
2258            ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2259            ///     let mut interval = metrics_monitor.intervals();
2260            ///     let mut next_interval = || interval.next().unwrap();
2261            ///
2262            ///     // no tasks have yet been created, instrumented, or polled
2263            ///     assert_eq!(next_interval().mean_first_poll_delay(), Duration::ZERO);
2264            ///
2265            ///     // constructs and instruments a task, pauses for `pause_time`, awaits the task, then
2266            ///     // produces the total time it took to do all of the aforementioned
2267            ///     async fn instrument_pause_await(
2268            ///         metrics_monitor: &tokio_metrics::TaskMonitor,
2269            ///         pause_time: Duration
2270            ///     ) -> Duration
2271            ///     {
2272            ///         let before_instrumentation = tokio::time::Instant::now();
2273            ///         let task = metrics_monitor.instrument(async move {});
2274            ///         tokio::time::sleep(pause_time).await;
2275            ///         task.await;
2276            ///         before_instrumentation.elapsed()
2277            ///     }
2278            ///
2279            ///     // construct and await a task that pauses for 500ms between instrumentation and first poll
2280            ///     let task_a_pause_time = Duration::from_millis(500);
2281            ///     let task_a_total_time = instrument_pause_await(&metrics_monitor, task_a_pause_time).await;
2282            ///
2283            ///     // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
2284            ///     // pause time of 500ms, and less-than-or-equal-to the total runtime of `task_a`
2285            ///     let mean_first_poll_delay = next_interval().mean_first_poll_delay();
2286            ///     assert!(mean_first_poll_delay >= task_a_pause_time);
2287            ///     assert!(mean_first_poll_delay <= task_a_total_time);
2288            ///
2289            ///     // construct and await a task that pauses for 500ms between instrumentation and first poll
2290            ///     let task_b_pause_time = Duration::from_millis(500);
2291            ///     let task_b_total_time = instrument_pause_await(&metrics_monitor, task_b_pause_time).await;
2292            ///
2293            ///     // construct and await a task that pauses for 1000ms between instrumentation and first poll
2294            ///     let task_c_pause_time = Duration::from_millis(1000);
2295            ///     let task_c_total_time = instrument_pause_await(&metrics_monitor, task_c_pause_time).await;
2296            ///
2297            ///     // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
2298            ///     // average pause time of 500ms, and less-than-or-equal-to the combined total runtime of
2299            ///     // `task_b` and `task_c`
2300            ///     let mean_first_poll_delay = next_interval().mean_first_poll_delay();
2301            ///     assert!(mean_first_poll_delay >= (task_b_pause_time + task_c_pause_time) / 2);
2302            ///     assert!(mean_first_poll_delay <= (task_b_total_time + task_c_total_time) / 2);
2303            /// }
2304            /// ```
2305            pub fn mean_first_poll_delay(&self) -> Duration {
2306                mean(self.total_first_poll_delay, self.first_poll_count)
2307            }
2308
2309            /// The mean duration of idles.
2310            ///
2311            /// ##### Definition
2312            /// This metric is derived from [`total_idle_duration`][TaskMetrics::total_idle_duration] ÷
2313            /// [`total_idled_count`][TaskMetrics::total_idled_count].
2314            ///
2315            /// ##### Interpretation
2316            /// The idle state is the duration spanning the instant a task completes a poll, and the instant
2317            /// that it is next awoken. Tasks inhabit this state when they are waiting for task-external
2318            /// events to complete (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this
2319            /// metric increases, it means that tasks, in aggregate, spent more time waiting for
2320            /// task-external events to complete.
2321            ///
2322            /// ##### Examples
2323            /// ```
2324            /// #[tokio::main]
2325            /// async fn main() {
2326            ///     let monitor = tokio_metrics::TaskMonitor::new();
2327            ///     let one_sec = std::time::Duration::from_secs(1);
2328            ///
2329            ///     monitor.instrument(async move {
2330            ///         tokio::time::sleep(one_sec).await;
2331            ///     }).await;
2332            ///
2333            ///     assert!(monitor.cumulative().mean_idle_duration() >= one_sec);
2334            /// }
2335            /// ```
2336            pub fn mean_idle_duration(&self) -> Duration {
2337                mean(self.total_idle_duration, self.total_idled_count)
2338            }
2339
2340            /// The mean duration that tasks spent waiting to be executed after awakening.
2341            ///
2342            /// ##### Definition
2343            /// This metric is derived from
2344            /// [`total_scheduled_duration`][TaskMetrics::total_scheduled_duration] ÷
2345            /// [`total_scheduled_count`][`TaskMetrics::total_scheduled_count`].
2346            ///
2347            /// ##### Interpretation
2348            /// If this metric increases, it means that, on average, tasks spent longer in the runtime's
2349            /// queues before being polled.
2350            ///
2351            /// ##### See also
2352            /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
2353            ///   The mean duration elapsed between the instant tasks are instrumented, and the instant they
2354            ///   are first polled.
2355            ///
2356            /// ##### Examples
2357            /// ```
2358            /// use tokio::time::Duration;
2359            ///
2360            /// #[tokio::main(flavor = "current_thread")]
2361            /// async fn main() {
2362            ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2363            ///     let mut interval = metrics_monitor.intervals();
2364            ///     let mut next_interval = || interval.next().unwrap();
2365            ///
2366            ///     // construct and instrument and spawn a task that yields endlessly
2367            ///     tokio::spawn(metrics_monitor.instrument(async {
2368            ///         loop { tokio::task::yield_now().await }
2369            ///     }));
2370            ///
2371            ///     tokio::task::yield_now().await;
2372            ///
2373            ///     // block the executor for 1 second
2374            ///     std::thread::sleep(Duration::from_millis(1000));
2375            ///
2376            ///     // get the task to run twice
2377            ///     // the first will have a 1 sec scheduling delay, the second will have almost none
2378            ///     tokio::task::yield_now().await;
2379            ///     tokio::task::yield_now().await;
2380            ///
2381            ///     // `endless_task` will have spent approximately one second waiting
2382            ///     let mean_scheduled_duration = next_interval().mean_scheduled_duration();
2383            ///     assert!(mean_scheduled_duration >= Duration::from_millis(500), "{}", mean_scheduled_duration.as_secs_f64());
2384            ///     assert!(mean_scheduled_duration <= Duration::from_millis(600), "{}", mean_scheduled_duration.as_secs_f64());
2385            /// }
2386            /// ```
2387            pub fn mean_scheduled_duration(&self) -> Duration {
2388                mean(self.total_scheduled_duration, self.total_scheduled_count)
2389            }
2390
2391            /// The mean duration of polls.
2392            ///
2393            /// ##### Definition
2394            /// This metric is derived from [`total_poll_duration`][TaskMetrics::total_poll_duration] ÷
2395            /// [`total_poll_count`][TaskMetrics::total_poll_count].
2396            ///
2397            /// ##### Interpretation
2398            /// If this metric increases, it means that, on average, individual polls are tending to take
2399            /// longer. However, this does not necessarily imply increased task latency: An increase in poll
2400            /// durations could be offset by fewer polls.
2401            ///
2402            /// ##### See also
2403            /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**
2404            ///   The ratio between the number polls categorized as slow and fast.
2405            /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**
2406            ///   The mean duration of slow polls.
2407            ///
2408            /// ##### Examples
2409            /// ```
2410            /// use std::time::Duration;
2411            ///
2412            /// #[tokio::main(flavor = "current_thread", start_paused = true)]
2413            /// async fn main() {
2414            ///     let monitor = tokio_metrics::TaskMonitor::new();
2415            ///     let mut interval = monitor.intervals();
2416            ///     let mut next_interval = move || interval.next().unwrap();
2417            ///
2418            ///     assert_eq!(next_interval().mean_poll_duration(), Duration::ZERO);
2419            ///
2420            ///     monitor.instrument(async {
2421            ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
2422            ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
2423            ///         ()                                                  // poll 3 (0s)
2424            ///     }).await;
2425            ///
2426            ///     assert_eq!(next_interval().mean_poll_duration(), Duration::from_secs(2) / 3);
2427            /// }
2428            /// ```
2429            pub fn mean_poll_duration(&self) -> Duration {
2430                mean(self.total_poll_duration, self.total_poll_count)
2431            }
2432
2433            /// The ratio between the number polls categorized as slow and fast.
2434            ///
2435            /// ##### Definition
2436            /// This metric is derived from [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count] ÷
2437            /// [`total_poll_count`][TaskMetrics::total_poll_count].
2438            ///
2439            /// ##### Interpretation
2440            /// If this metric increases, it means that a greater proportion of polls took excessively long
2441            /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2442            /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2443            /// However, as a rule, *should* yield to the scheduler frequently.
2444            ///
2445            /// ##### See also
2446            /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
2447            ///   The mean duration of polls.
2448            /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**
2449            ///   The mean duration of slow polls.
2450            ///
2451            /// ##### Examples
2452            /// Changes in this metric may be observed by varying the ratio of slow and slow fast within
2453            /// sampling intervals; for instance:
2454            /// ```
2455            /// use std::future::Future;
2456            /// use std::time::Duration;
2457            ///
2458            /// #[tokio::main]
2459            /// async fn main() {
2460            ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2461            ///     let mut interval = metrics_monitor.intervals();
2462            ///     let mut next_interval = || interval.next().unwrap();
2463            ///
2464            ///     // no tasks have been constructed, instrumented, or polled
2465            ///     let interval = next_interval();
2466            ///     assert_eq!(interval.total_fast_poll_count, 0);
2467            ///     assert_eq!(interval.total_slow_poll_count, 0);
2468            ///     assert!(interval.slow_poll_ratio().is_nan());
2469            ///
2470            ///     let fast = Duration::ZERO;
2471            ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
2472            ///
2473            ///     // this task completes in three fast polls
2474            ///     metrics_monitor.instrument(async {
2475            ///         spin_for(fast).await;   // fast poll 1
2476            ///         spin_for(fast).await;   // fast poll 2
2477            ///         spin_for(fast);         // fast poll 3
2478            ///     }).await;
2479            ///
2480            ///     // this task completes in two slow polls
2481            ///     metrics_monitor.instrument(async {
2482            ///         spin_for(slow).await;   // slow poll 1
2483            ///         spin_for(slow);         // slow poll 2
2484            ///     }).await;
2485            ///
2486            ///     let interval = next_interval();
2487            ///     assert_eq!(interval.total_fast_poll_count, 3);
2488            ///     assert_eq!(interval.total_slow_poll_count, 2);
2489            ///     assert_eq!(interval.slow_poll_ratio(), ratio(2., 3.));
2490            ///
2491            ///     // this task completes in three slow polls
2492            ///     metrics_monitor.instrument(async {
2493            ///         spin_for(slow).await;   // slow poll 1
2494            ///         spin_for(slow).await;   // slow poll 2
2495            ///         spin_for(slow);         // slow poll 3
2496            ///     }).await;
2497            ///
2498            ///     // this task completes in two fast polls
2499            ///     metrics_monitor.instrument(async {
2500            ///         spin_for(fast).await; // fast poll 1
2501            ///         spin_for(fast);       // fast poll 2
2502            ///     }).await;
2503            ///
2504            ///     let interval = next_interval();
2505            ///     assert_eq!(interval.total_fast_poll_count, 2);
2506            ///     assert_eq!(interval.total_slow_poll_count, 3);
2507            ///     assert_eq!(interval.slow_poll_ratio(), ratio(3., 2.));
2508            /// }
2509            ///
2510            /// fn ratio(a: f64, b: f64) -> f64 {
2511            ///     a / (a + b)
2512            /// }
2513            ///
2514            /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2515            /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2516            ///     let start = tokio::time::Instant::now();
2517            ///     while start.elapsed() <= duration {}
2518            ///     tokio::task::yield_now()
2519            /// }
2520            /// ```
2521            pub fn slow_poll_ratio(&self) -> f64 {
2522                self.total_slow_poll_count as f64 / self.total_poll_count as f64
2523            }
2524
2525            /// The ratio of tasks exceeding [`long_delay_threshold`][TaskMonitor::long_delay_threshold].
2526            ///
2527            /// ##### Definition
2528            /// This metric is derived from [`total_long_delay_count`][TaskMetrics::total_long_delay_count] ÷
2529            /// [`total_scheduled_count`][TaskMetrics::total_scheduled_count].
2530            pub fn long_delay_ratio(&self) -> f64 {
2531                self.total_long_delay_count as f64 / self.total_scheduled_count as f64
2532            }
2533
2534            /// The mean duration of fast polls.
2535            ///
2536            /// ##### Definition
2537            /// This metric is derived from
2538            /// [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration] ÷
2539            /// [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count].
2540            ///
2541            /// ##### Examples
2542            /// In the below example, no tasks are polled in the first sampling interval; three fast polls
2543            /// consume a mean of
2544            /// ⅜ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2545            /// second sampling interval; and two fast polls consume a total of
2546            /// ½ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2547            /// third sampling interval:
2548            /// ```
2549            /// use std::future::Future;
2550            /// use std::time::Duration;
2551            ///
2552            /// #[tokio::main]
2553            /// async fn main() {
2554            ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2555            ///     let mut interval = metrics_monitor.intervals();
2556            ///     let mut next_interval = || interval.next().unwrap();
2557            ///
2558            ///     // no tasks have been constructed, instrumented, or polled
2559            ///     assert_eq!(next_interval().mean_fast_poll_duration(), Duration::ZERO);
2560            ///
2561            ///     let threshold = metrics_monitor.slow_poll_threshold();
2562            ///     let fast_1 = 1 * Duration::from_micros(1);
2563            ///     let fast_2 = 2 * Duration::from_micros(1);
2564            ///     let fast_3 = 3 * Duration::from_micros(1);
2565            ///
2566            ///     // this task completes in two fast polls
2567            ///     let total_time = time(metrics_monitor.instrument(async {
2568            ///         spin_for(fast_1).await; // fast poll 1
2569            ///         spin_for(fast_2)        // fast poll 2
2570            ///     })).await;
2571            ///
2572            ///     // `mean_fast_poll_duration` ≈ the mean of `fast_1` and `fast_2`
2573            ///     let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2574            ///     assert!(mean_fast_poll_duration >= (fast_1 + fast_2) / 2);
2575            ///     assert!(mean_fast_poll_duration <= total_time / 2);
2576            ///
2577            ///     // this task completes in three fast polls
2578            ///     let total_time = time(metrics_monitor.instrument(async {
2579            ///         spin_for(fast_1).await; // fast poll 1
2580            ///         spin_for(fast_2).await; // fast poll 2
2581            ///         spin_for(fast_3)        // fast poll 3
2582            ///     })).await;
2583            ///
2584            ///     // `mean_fast_poll_duration` ≈ the mean of `fast_1`, `fast_2`, `fast_3`
2585            ///     let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2586            ///     assert!(mean_fast_poll_duration >= (fast_1 + fast_2 + fast_3) / 3);
2587            ///     assert!(mean_fast_poll_duration <= total_time / 3);
2588            /// }
2589            ///
2590            /// /// Produces the amount of time it took to await a given task.
2591            /// async fn time(task: impl Future) -> Duration {
2592            ///     let start = tokio::time::Instant::now();
2593            ///     task.await;
2594            ///     start.elapsed()
2595            /// }
2596            ///
2597            /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2598            /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2599            ///     let start = tokio::time::Instant::now();
2600            ///     while start.elapsed() <= duration {}
2601            ///     tokio::task::yield_now()
2602            /// }
2603            /// ```
2604            pub fn mean_fast_poll_duration(&self) -> Duration {
2605                mean(self.total_fast_poll_duration, self.total_fast_poll_count)
2606            }
2607
2608            /// The mean duration of slow polls.
2609            ///
2610            /// ##### Definition
2611            /// This metric is derived from
2612            /// [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration] ÷
2613            /// [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
2614            ///
2615            /// ##### Interpretation
2616            /// If this metric increases, it means that a greater proportion of polls took excessively long
2617            /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2618            /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2619            ///
2620            /// ##### See also
2621            /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
2622            ///   The mean duration of polls.
2623            /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**
2624            ///   The ratio between the number polls categorized as slow and fast.
2625            ///
2626            /// ##### Interpretation
2627            /// If this metric increases, it means that, on average, slow polls got even slower. This does
2628            /// necessarily imply increased task latency: An increase in average slow poll duration could be
2629            /// offset by fewer or faster polls. However, as a rule, *should* yield to the scheduler
2630            /// frequently.
2631            ///
2632            /// ##### Examples
2633            /// In the below example, no tasks are polled in the first sampling interval; three slow polls
2634            /// consume a mean of
2635            /// 1.5 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2636            /// second sampling interval; and two slow polls consume a total of
2637            /// 2 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2638            /// third sampling interval:
2639            /// ```
2640            /// use std::future::Future;
2641            /// use std::time::Duration;
2642            ///
2643            /// #[tokio::main]
2644            /// async fn main() {
2645            ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2646            ///     let mut interval = metrics_monitor.intervals();
2647            ///     let mut next_interval = || interval.next().unwrap();
2648            ///
2649            ///     // no tasks have been constructed, instrumented, or polled
2650            ///     assert_eq!(next_interval().mean_slow_poll_duration(), Duration::ZERO);
2651            ///
2652            ///     let threshold = metrics_monitor.slow_poll_threshold();
2653            ///     let slow_1 = 1 * threshold;
2654            ///     let slow_2 = 2 * threshold;
2655            ///     let slow_3 = 3 * threshold;
2656            ///
2657            ///     // this task completes in two slow polls
2658            ///     let total_time = time(metrics_monitor.instrument(async {
2659            ///         spin_for(slow_1).await; // slow poll 1
2660            ///         spin_for(slow_2)        // slow poll 2
2661            ///     })).await;
2662            ///
2663            ///     // `mean_slow_poll_duration` ≈ the mean of `slow_1` and `slow_2`
2664            ///     let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2665            ///     assert!(mean_slow_poll_duration >= (slow_1 + slow_2) / 2);
2666            ///     assert!(mean_slow_poll_duration <= total_time / 2);
2667            ///
2668            ///     // this task completes in three slow polls
2669            ///     let total_time = time(metrics_monitor.instrument(async {
2670            ///         spin_for(slow_1).await; // slow poll 1
2671            ///         spin_for(slow_2).await; // slow poll 2
2672            ///         spin_for(slow_3)        // slow poll 3
2673            ///     })).await;
2674            ///
2675            ///     // `mean_slow_poll_duration` ≈ the mean of `slow_1`, `slow_2`, `slow_3`
2676            ///     let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2677            ///     assert!(mean_slow_poll_duration >= (slow_1 + slow_2 + slow_3) / 3);
2678            ///     assert!(mean_slow_poll_duration <= total_time / 3);
2679            /// }
2680            ///
2681            /// /// Produces the amount of time it took to await a given task.
2682            /// async fn time(task: impl Future) -> Duration {
2683            ///     let start = tokio::time::Instant::now();
2684            ///     task.await;
2685            ///     start.elapsed()
2686            /// }
2687            ///
2688            /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2689            /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2690            ///     let start = tokio::time::Instant::now();
2691            ///     while start.elapsed() <= duration {}
2692            ///     tokio::task::yield_now()
2693            /// }
2694            /// ```
2695            pub fn mean_slow_poll_duration(&self) -> Duration {
2696                mean(self.total_slow_poll_duration, self.total_slow_poll_count)
2697            }
2698
2699            /// The average time taken for a task with a short scheduling delay to be executed after being
2700            /// scheduled.
2701            ///
2702            /// ##### Definition
2703            /// This metric is derived from
2704            /// [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration] ÷
2705            /// [`total_short_delay_count`][TaskMetrics::total_short_delay_count].
2706            pub fn mean_short_delay_duration(&self) -> Duration {
2707                mean(
2708                    self.total_short_delay_duration,
2709                    self.total_short_delay_count,
2710                )
2711            }
2712
2713            /// The average scheduling delay for a task which takes a long time to start executing after
2714            /// being scheduled.
2715            ///
2716            /// ##### Definition
2717            /// This metric is derived from
2718            /// [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration] ÷
2719            /// [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
2720            pub fn mean_long_delay_duration(&self) -> Duration {
2721                mean(self.total_long_delay_duration, self.total_long_delay_count)
2722            }
2723        }
2724        unstable {}
2725    }
2726);
2727
2728impl<T: Future, M: AsRef<TaskMonitorCore> + Send + Sync + 'static> Future for Instrumented<T, M> {
2729    type Output = T::Output;
2730
2731    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2732        instrument_poll(cx, self, Future::poll)
2733    }
2734}
2735
2736impl<T: Stream> Stream for Instrumented<T> {
2737    type Item = T::Item;
2738
2739    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2740        instrument_poll(cx, self, Stream::poll_next)
2741    }
2742}
2743
2744fn instrument_poll<T, M: AsRef<TaskMonitorCore> + Send + Sync + 'static, Out>(
2745    cx: &mut Context<'_>,
2746    instrumented: Pin<&mut Instrumented<T, M>>,
2747    poll_fn: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Out>,
2748) -> Poll<Out> {
2749    let poll_start = Instant::now();
2750    let this = instrumented.project();
2751    let idled_at = this.idled_at;
2752    let state = this.state;
2753    let instrumented_at = state.instrumented_at;
2754    let metrics = &state.monitor.as_ref().metrics;
2755    /* accounting for time-to-first-poll and tasks-count */
2756    // is this the first time this task has been polled?
2757    if !*this.did_poll_once {
2758        // if so, we need to do three things:
2759        /* 1. note that this task *has* been polled */
2760        *this.did_poll_once = true;
2761
2762        /* 2. account for the time-to-first-poll of this task */
2763        // if the time-to-first-poll of this task exceeds `u64::MAX` ns,
2764        // round down to `u64::MAX` nanoseconds
2765        let elapsed = poll_start
2766            .saturating_duration_since(instrumented_at)
2767            .as_nanos()
2768            .try_into()
2769            .unwrap_or(u64::MAX);
2770        // add this duration to `time_to_first_poll_ns_total`
2771        metrics.total_first_poll_delay_ns.fetch_add(elapsed, SeqCst);
2772
2773        /* 3. increment the count of tasks that have been polled at least once */
2774        metrics.first_poll_count.fetch_add(1, SeqCst);
2775    }
2776    /* accounting for time-idled and time-scheduled */
2777    // 1. note (and reset) the instant this task was last awoke
2778    let woke_at = state.woke_at.swap(0, SeqCst);
2779    // The state of a future is *idling* in the interim between the instant
2780    // it completes a `poll`, and the instant it is next awoken.
2781    if *idled_at < woke_at {
2782        // increment the counter of how many idles occurred
2783        metrics.total_idled_count.fetch_add(1, SeqCst);
2784
2785        // compute the duration of the idle
2786        let idle_ns = woke_at.saturating_sub(*idled_at);
2787
2788        // update the max time tasks spent idling, both locally and
2789        // globally.
2790        metrics
2791            .local_max_idle_duration_ns
2792            .fetch_max(idle_ns, SeqCst);
2793        metrics
2794            .global_max_idle_duration_ns
2795            .fetch_max(idle_ns, SeqCst);
2796        // adjust the total elapsed time monitored tasks spent idling
2797        metrics.total_idle_duration_ns.fetch_add(idle_ns, SeqCst);
2798    }
2799    // if this task spent any time in the scheduled state after instrumentation,
2800    // and after first poll, `woke_at` will be greater than 0.
2801    if woke_at > 0 {
2802        // increment the counter of how many schedules occurred
2803        metrics.total_scheduled_count.fetch_add(1, SeqCst);
2804
2805        // recall that the `woke_at` field is internally represented as
2806        // nanoseconds-since-instrumentation. here, for accounting purposes,
2807        // we need to instead represent it as a proper `Instant`.
2808        let woke_instant = instrumented_at
2809            .checked_add(Duration::from_nanos(woke_at))
2810            .unwrap_or(poll_start);
2811
2812        // the duration this task spent scheduled is time time elapsed between
2813        // when this task was awoke, and when it was polled.
2814        let scheduled_ns = poll_start
2815            .saturating_duration_since(woke_instant)
2816            .as_nanos()
2817            .try_into()
2818            .unwrap_or(u64::MAX);
2819
2820        let scheduled = Duration::from_nanos(scheduled_ns);
2821
2822        let (count_bucket, duration_bucket) = // was the scheduling delay long or short?
2823            if scheduled >= metrics.long_delay_threshold {
2824                (&metrics.total_long_delay_count, &metrics.total_long_delay_duration_ns)
2825            } else {
2826                (&metrics.total_short_delay_count, &metrics.total_short_delay_duration_ns)
2827            };
2828        // update the appropriate bucket
2829        count_bucket.fetch_add(1, SeqCst);
2830        duration_bucket.fetch_add(scheduled_ns, SeqCst);
2831
2832        // add `scheduled_ns` to the Monitor's total
2833        metrics
2834            .total_scheduled_duration_ns
2835            .fetch_add(scheduled_ns, SeqCst);
2836    }
2837    // Register the waker
2838    state.waker.register(cx.waker());
2839    // Get the instrumented waker
2840    let waker_ref = futures_util::task::waker_ref(state);
2841    let mut cx = Context::from_waker(&waker_ref);
2842    // Poll the task
2843    let inner_poll_start = Instant::now();
2844    let ret = poll_fn(this.task, &mut cx);
2845    let inner_poll_end = Instant::now();
2846    /* idle time starts now */
2847    *idled_at = inner_poll_end
2848        .saturating_duration_since(instrumented_at)
2849        .as_nanos()
2850        .try_into()
2851        .unwrap_or(u64::MAX);
2852    /* accounting for poll time */
2853    let inner_poll_duration = inner_poll_end.saturating_duration_since(inner_poll_start);
2854    let inner_poll_ns: u64 = inner_poll_duration
2855        .as_nanos()
2856        .try_into()
2857        .unwrap_or(u64::MAX);
2858    let (count_bucket, duration_bucket) = // was this a slow or fast poll?
2859            if inner_poll_duration >= metrics.slow_poll_threshold {
2860                (&metrics.total_slow_poll_count, &metrics.total_slow_poll_duration)
2861            } else {
2862                (&metrics.total_fast_poll_count, &metrics.total_fast_poll_duration_ns)
2863            };
2864    // update the appropriate bucket
2865    count_bucket.fetch_add(1, SeqCst);
2866    duration_bucket.fetch_add(inner_poll_ns, SeqCst);
2867    ret
2868}
2869
2870impl<M> State<M> {
2871    fn on_wake(&self) {
2872        let woke_at: u64 = match self.instrumented_at.elapsed().as_nanos().try_into() {
2873            Ok(woke_at) => woke_at,
2874            // This is highly unlikely as it would mean the task ran for over
2875            // 500 years. If you ran your service for 500 years. If you are
2876            // reading this 500 years in the future, I'm sorry.
2877            Err(_) => return,
2878        };
2879
2880        // We don't actually care about the result
2881        let _ = self.woke_at.compare_exchange(0, woke_at, SeqCst, SeqCst);
2882    }
2883}
2884
2885impl<M: Send + Sync> ArcWake for State<M> {
2886    fn wake_by_ref(arc_self: &Arc<State<M>>) {
2887        arc_self.on_wake();
2888        arc_self.waker.wake();
2889    }
2890
2891    fn wake(self: Arc<State<M>>) {
2892        self.on_wake();
2893        self.waker.wake();
2894    }
2895}
2896
2897/// Iterator returned by [`TaskMonitor::intervals`].
2898///
2899/// See that method's documentation for more details.
2900#[derive(Debug)]
2901pub struct TaskIntervals<M: AsRef<TaskMonitorCore> + Send + Sync + 'static = TaskMonitor> {
2902    monitor: M,
2903    previous: Option<TaskMetrics>,
2904}
2905
2906impl<M: AsRef<TaskMonitorCore> + Send + Sync + 'static> TaskIntervals<M> {
2907    fn probe(&mut self) -> TaskMetrics {
2908        let latest = self.monitor.as_ref().metrics.metrics();
2909        let local_max_idle_duration = self
2910            .monitor
2911            .as_ref()
2912            .metrics
2913            .get_and_reset_local_max_idle_duration();
2914
2915        let next = if let Some(previous) = self.previous {
2916            TaskMetrics {
2917                instrumented_count: latest
2918                    .instrumented_count
2919                    .wrapping_sub(previous.instrumented_count),
2920                dropped_count: latest.dropped_count.wrapping_sub(previous.dropped_count),
2921                total_poll_count: latest
2922                    .total_poll_count
2923                    .wrapping_sub(previous.total_poll_count),
2924                total_poll_duration: sub(latest.total_poll_duration, previous.total_poll_duration),
2925                first_poll_count: latest
2926                    .first_poll_count
2927                    .wrapping_sub(previous.first_poll_count),
2928                total_idled_count: latest
2929                    .total_idled_count
2930                    .wrapping_sub(previous.total_idled_count),
2931                total_scheduled_count: latest
2932                    .total_scheduled_count
2933                    .wrapping_sub(previous.total_scheduled_count),
2934                total_fast_poll_count: latest
2935                    .total_fast_poll_count
2936                    .wrapping_sub(previous.total_fast_poll_count),
2937                total_short_delay_count: latest
2938                    .total_short_delay_count
2939                    .wrapping_sub(previous.total_short_delay_count),
2940                total_slow_poll_count: latest
2941                    .total_slow_poll_count
2942                    .wrapping_sub(previous.total_slow_poll_count),
2943                total_long_delay_count: latest
2944                    .total_long_delay_count
2945                    .wrapping_sub(previous.total_long_delay_count),
2946                total_first_poll_delay: sub(
2947                    latest.total_first_poll_delay,
2948                    previous.total_first_poll_delay,
2949                ),
2950                max_idle_duration: local_max_idle_duration,
2951                total_idle_duration: sub(latest.total_idle_duration, previous.total_idle_duration),
2952                total_scheduled_duration: sub(
2953                    latest.total_scheduled_duration,
2954                    previous.total_scheduled_duration,
2955                ),
2956                total_fast_poll_duration: sub(
2957                    latest.total_fast_poll_duration,
2958                    previous.total_fast_poll_duration,
2959                ),
2960                total_short_delay_duration: sub(
2961                    latest.total_short_delay_duration,
2962                    previous.total_short_delay_duration,
2963                ),
2964                total_slow_poll_duration: sub(
2965                    latest.total_slow_poll_duration,
2966                    previous.total_slow_poll_duration,
2967                ),
2968                total_long_delay_duration: sub(
2969                    latest.total_long_delay_duration,
2970                    previous.total_long_delay_duration,
2971                ),
2972            }
2973        } else {
2974            latest
2975        };
2976
2977        self.previous = Some(latest);
2978
2979        next
2980    }
2981}
2982
2983impl<M: AsRef<TaskMonitorCore> + Send + Sync + 'static> Iterator for TaskIntervals<M> {
2984    type Item = TaskMetrics;
2985
2986    fn next(&mut self) -> Option<Self::Item> {
2987        Some(self.probe())
2988    }
2989}
2990
2991#[inline(always)]
2992fn to_nanos(d: Duration) -> u64 {
2993    debug_assert!(d <= Duration::from_nanos(u64::MAX));
2994    d.as_secs()
2995        .wrapping_mul(1_000_000_000)
2996        .wrapping_add(d.subsec_nanos() as u64)
2997}
2998
2999#[inline(always)]
3000fn sub(a: Duration, b: Duration) -> Duration {
3001    let nanos = to_nanos(a).wrapping_sub(to_nanos(b));
3002    Duration::from_nanos(nanos)
3003}
3004
3005#[inline(always)]
3006fn mean(d: Duration, count: u64) -> Duration {
3007    if let Some(quotient) = to_nanos(d).checked_div(count) {
3008        Duration::from_nanos(quotient)
3009    } else {
3010        Duration::ZERO
3011    }
3012}
3013
3014#[cfg(test)]
3015mod inference_tests {
3016    use super::*;
3017    use std::future::Future;
3018    use std::pin::Pin;
3019
3020    // Type alias — M defaults to TaskMonitor
3021    type _BoxedInstrumented = Instrumented<Pin<Box<dyn Future<Output = ()>>>>;
3022
3023    // Struct field — M defaults to TaskMonitor
3024    struct _Wrapper {
3025        _fut: Instrumented<Pin<Box<dyn Future<Output = ()>>>>,
3026    }
3027
3028    // Partial type annotation — M defaults to TaskMonitor
3029    async fn _partial_annotation(monitor: &TaskMonitor) {
3030        let fut: Instrumented<_> = monitor.instrument(async { 42 });
3031        fut.await;
3032    }
3033
3034    // Common path — fully inferred from instrument()'s return type
3035    async fn _common_usage(monitor: &TaskMonitor) {
3036        monitor.instrument(async { 42 }).await;
3037    }
3038
3039    // Storing without annotation — both T and M inferred
3040    async fn _store_without_annotation(monitor: &TaskMonitor) {
3041        let fut = monitor.instrument(async { 42 });
3042        fut.await;
3043    }
3044
3045    // Function boundary — M defaults to TaskMonitor in the signature
3046    async fn _function_boundary(fut: Instrumented<impl Future<Output = i32>>) -> i32 {
3047        fut.await
3048    }
3049
3050    // Return position — M defaults to TaskMonitor
3051    fn _return_position(monitor: &TaskMonitor) -> Instrumented<impl Future<Output = i32> + '_> {
3052        monitor.instrument(async { 42 })
3053    }
3054
3055    // intervals() inference
3056    fn _intervals_inference(monitor: &TaskMonitor) {
3057        let mut intervals = monitor.intervals();
3058        let _: Option<TaskMetrics> = intervals.next();
3059    }
3060
3061    #[tokio::test]
3062    async fn inference_compiles() {
3063        let monitor = TaskMonitor::new();
3064        _partial_annotation(&monitor).await;
3065        _common_usage(&monitor).await;
3066        _store_without_annotation(&monitor).await;
3067        _function_boundary(monitor.instrument(async { 42 })).await;
3068        _return_position(&monitor).await;
3069        _intervals_inference(&monitor);
3070    }
3071}