tokio_metrics/
task.rs

1use futures_util::task::{ArcWake, AtomicWaker};
2use pin_project_lite::pin_project;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
6use std::sync::Arc;
7use std::task::{Context, Poll};
8use tokio_stream::Stream;
9
10#[cfg(feature = "rt")]
11use tokio::time::{Duration, Instant};
12
13#[cfg(not(feature = "rt"))]
14use std::time::{Duration, Instant};
15
16#[cfg(all(feature = "rt", feature = "metrics-rs-integration"))]
17pub(crate) mod metrics_rs_integration;
18
19/// Monitors key metrics of instrumented tasks.
20///
21/// ### Basic Usage
22/// A [`TaskMonitor`] tracks key [metrics][TaskMetrics] of async tasks that have been
23/// [instrumented][`TaskMonitor::instrument`] with the monitor.
24///
25/// In the below example, a [`TaskMonitor`] is [constructed][TaskMonitor::new] and used to
26/// [instrument][TaskMonitor::instrument] three worker tasks; meanwhile, a fourth task
27/// prints [metrics][TaskMetrics] in 500ms [intervals][TaskMonitor::intervals].
28/// ```
29/// use std::time::Duration;
30///
31/// #[tokio::main]
32/// async fn main() {
33///     // construct a metrics monitor
34///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
35///
36///     // print task metrics every 500ms
37///     {
38///         let metrics_monitor = metrics_monitor.clone();
39///         tokio::spawn(async move {
40///             for interval in metrics_monitor.intervals() {
41///                 // pretty-print the metric interval
42///                 println!("{:?}", interval);
43///                 // wait 500ms
44///                 tokio::time::sleep(Duration::from_millis(500)).await;
45///             }
46///         });
47///     }
48///
49///     // instrument some tasks and await them
50///     // note that the same TaskMonitor can be used for multiple tasks
51///     tokio::join![
52///         metrics_monitor.instrument(do_work()),
53///         metrics_monitor.instrument(do_work()),
54///         metrics_monitor.instrument(do_work())
55///     ];
56/// }
57///
58/// async fn do_work() {
59///     for _ in 0..25 {
60///         tokio::task::yield_now().await;
61///         tokio::time::sleep(Duration::from_millis(100)).await;
62///     }
63/// }
64/// ```
65///
66/// ### What should I instrument?
67/// In most cases, you should construct a *distinct* [`TaskMonitor`] for each kind of key task.
68///
69/// #### Instrumenting a web application
70/// For instance, a web service should have a distinct [`TaskMonitor`] for each endpoint. Within
71/// each endpoint, it's prudent to additionally instrument major sub-tasks, each with their own
72/// distinct [`TaskMonitor`]s. [*Why are my tasks slow?*](#why-are-my-tasks-slow) explores a
73/// debugging scenario for a web service that takes this approach to instrumentation. This
74/// approach is exemplified in the below example:
75/// ```no_run
76/// // The unabridged version of this snippet is in the examples directory of this crate.
77///
78/// #[tokio::main]
79/// async fn main() {
80///     // construct a TaskMonitor for root endpoint
81///     let monitor_root = tokio_metrics::TaskMonitor::new();
82///
83///     // construct TaskMonitors for create_users endpoint
84///     let monitor_create_user = CreateUserMonitors {
85///         // monitor for the entire endpoint
86///         route: tokio_metrics::TaskMonitor::new(),
87///         // monitor for database insertion subtask
88///         insert: tokio_metrics::TaskMonitor::new(),
89///     };
90///
91///     // build our application with two instrumented endpoints
92///     let app = axum::Router::new()
93///         // `GET /` goes to `root`
94///         .route("/", axum::routing::get({
95///             let monitor = monitor_root.clone();
96///             move || monitor.instrument(async { "Hello, World!" })
97///         }))
98///         // `POST /users` goes to `create_user`
99///         .route("/users", axum::routing::post({
100///             let monitors = monitor_create_user.clone();
101///             let route = monitors.route.clone();
102///             move |payload| {
103///                 route.instrument(create_user(payload, monitors))
104///             }
105///         }));
106///
107///     // print task metrics for each endpoint every 1s
108///     let metrics_frequency = std::time::Duration::from_secs(1);
109///     tokio::spawn(async move {
110///         let root_intervals = monitor_root.intervals();
111///         let create_user_route_intervals =
112///             monitor_create_user.route.intervals();
113///         let create_user_insert_intervals =
114///             monitor_create_user.insert.intervals();
115///         let create_user_intervals =
116///             create_user_route_intervals.zip(create_user_insert_intervals);
117///
118///         let intervals = root_intervals.zip(create_user_intervals);
119///         for (root_route, (create_user_route, create_user_insert)) in intervals {
120///             println!("root_route = {:#?}", root_route);
121///             println!("create_user_route = {:#?}", create_user_route);
122///             println!("create_user_insert = {:#?}", create_user_insert);
123///             tokio::time::sleep(metrics_frequency).await;
124///         }
125///     });
126///
127///     // run the server
128///     let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 3000));
129///     let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
130///     axum::serve(listener, app)
131///         .await
132///         .unwrap();
133/// }
134///
135/// async fn create_user(
136///     axum::Json(payload): axum::Json<CreateUser>,
137///     monitors: CreateUserMonitors,
138/// ) -> impl axum::response::IntoResponse {
139///     let user = User { id: 1337, username: payload.username, };
140///     // instrument inserting the user into the db:
141///     let _ = monitors.insert.instrument(insert_user(user.clone())).await;
142///     (axum::http::StatusCode::CREATED, axum::Json(user))
143/// }
144///
145/// /* definitions of CreateUserMonitors, CreateUser and User omitted for brevity */
146///
147/// #
148/// # #[derive(Clone)]
149/// # struct CreateUserMonitors {
150/// #     // monitor for the entire endpoint
151/// #     route: tokio_metrics::TaskMonitor,
152/// #     // monitor for database insertion subtask
153/// #     insert: tokio_metrics::TaskMonitor,
154/// # }
155/// #
156/// # #[derive(serde::Deserialize)] struct CreateUser { username: String, }
157/// # #[derive(Clone, serde::Serialize)] struct User { id: u64, username: String, }
158/// #
159/// // insert the user into the database
160/// async fn insert_user(_: User) {
161///     /* implementation details elided */
162///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
163/// }
164/// ```
165///
166/// ### Why are my tasks slow?
167/// **Scenario:** You track key, high-level metrics about the customer response time. An alarm warns
168/// you that P90 latency for an endpoint exceeds your targets. What is causing the increase?
169///
170/// #### Identifying the high-level culprits
171/// A set of tasks will appear to execute more slowly if:
172/// - they are taking longer to poll (i.e., they consume too much CPU time)
173/// - they are waiting longer to be polled (e.g., they're waiting longer in tokio's scheduling
174///   queues)
175/// - they are waiting longer on external events to complete (e.g., asynchronous network requests)
176///
177/// The culprits, at a high level, may be some combination of these sources of latency. Fortunately,
178/// you have instrumented the key tasks of each of your endpoints with distinct [`TaskMonitor`]s.
179/// Using the monitors on the endpoint experiencing elevated latency, you begin by answering:
180/// - [*Are my tasks taking longer to poll?*](#are-my-tasks-taking-longer-to-poll)
181/// - [*Are my tasks spending more time waiting to be polled?*](#are-my-tasks-spending-more-time-waiting-to-be-polled)
182/// - [*Are my tasks spending more time waiting on external events to complete?*](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
183///
184/// ##### Are my tasks taking longer to poll?
185/// - **Did [`mean_poll_duration`][TaskMetrics::mean_poll_duration] increase?**   
186///   This metric reflects the mean poll duration. If it increased, it means that, on average,
187///   individual polls tended to take longer. However, this does not necessarily imply increased
188///   task latency: An increase in poll durations could be offset by fewer polls.
189/// - **Did [`slow_poll_ratio`][TaskMetrics::slow_poll_ratio] increase?**   
190///   This metric reflects the proportion of polls that were 'slow'. If it increased, it means that
191///   a greater proportion of polls performed excessive computation before yielding. This does not
192///   necessarily imply increased task latency: An increase in the proportion of slow polls could be
193///   offset by fewer or faster polls.
194/// - **Did [`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration] increase?**   
195///   This metric reflects the mean duration of slow polls. If it increased, it means that, on
196///   average, slow polls got slower. This does not necessarily imply increased task latency: An
197///   increase in average slow poll duration could be offset by fewer or faster polls.
198///
199/// If so, [*why are my tasks taking longer to poll?*](#why-are-my-tasks-taking-longer-to-poll)
200///
201/// ##### Are my tasks spending more time waiting to be polled?
202/// - **Did [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increase?**   
203///   This metric reflects the mean delay between the instant a task is first instrumented and the
204///   instant it is first polled. If it increases, it means that, on average, tasks spent longer
205///   waiting to be initially run.
206/// - **Did [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration] increase?**   
207///   This metric reflects the mean duration that tasks spent in the scheduled state. The
208///   'scheduled' state of a task is the duration between the instant a task is awoken and the
209///   instant it is subsequently polled. If this metric increases, it means that, on average, tasks
210///   spent longer in tokio's queues before being polled.
211/// - **Did [`long_delay_ratio`][TaskMetrics::long_delay_ratio] increase?**
212///   This metric reflects the proportion of scheduling delays which were 'long'. If it increased,
213///   it means that a greater proportion of tasks experienced excessive delays before they could
214///   execute after being woken. This does not necessarily indicate an increase in latency, as this
215///   could be offset by fewer or faster task polls.
216/// - **Did [`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration] increase?**
217///   This metric reflects the mean duration of long delays. If it increased, it means that, on
218///   average, long delays got even longer. This does not necessarily imply increased task latency:
219///   an increase in average long delay duration could be offset by fewer or faster polls or more
220///   short schedules.
221///
222/// If so, [*why are my tasks spending more time waiting to be polled?*](#why-are-my-tasks-spending-more-time-waiting-to-be-polled)
223///
224/// ##### Are my tasks spending more time waiting on external events to complete?
225/// - **Did [`mean_idle_duration`][TaskMetrics::mean_idle_duration] increase?**   
226///   This metric reflects the mean duration that tasks spent in the idle state. The idle state is
227///   the duration spanning the instant a task completes a poll, and the instant that it is next
228///   awoken. Tasks inhabit this state when they are waiting for task-external events to complete
229///   (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this metric increases,
230///   tasks, in aggregate, spent more time waiting for task-external events to complete.
231///
232/// If so, [*why are my tasks spending more time waiting on external events to complete?*](#why-are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
233///
234/// #### Digging deeper
235/// Having [established the high-level culprits](#identifying-the-high-level-culprits), you now
236/// search for further explanation...
237///
238/// ##### Why are my tasks taking longer to poll?
239/// You observed that [your tasks are taking longer to poll](#are-my-tasks-taking-longer-to-poll).
240/// The culprit is likely some combination of:
241/// - **Your tasks are accidentally blocking.** Common culprits include:
242///     1. Using the Rust standard library's [filesystem](https://doc.rust-lang.org/std/fs/) or
243///        [networking](https://doc.rust-lang.org/std/net/) APIs.
244///        These APIs are synchronous; use tokio's [filesystem](https://docs.rs/tokio/latest/tokio/fs/)
245///        and [networking](https://docs.rs/tokio/latest/tokio/net/) APIs, instead.
246///     3. Calling [`block_on`](https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.block_on).
247///     4. Invoking `println!` or other synchronous logging routines.   
248///        Invocations of `println!` involve acquiring an exclusive lock on stdout, followed by a
249///        synchronous write to stdout.
250/// 2. **Your tasks are computationally expensive.** Common culprits include:
251///     1. TLS/cryptographic routines
252///     2. doing a lot of processing on bytes
253///     3. calling non-Tokio resources
254///
255/// ##### Why are my tasks spending more time waiting to be polled?
256/// You observed that [your tasks are spending more time waiting to be polled](#are-my-tasks-spending-more-time-waiting-to-be-polled)
257/// suggesting some combination of:
258/// - Your application is inflating the time elapsed between instrumentation and first poll.
259/// - Your tasks are being scheduled into tokio's global queue.
260/// - Other tasks are spending too long without yielding, thus backing up tokio's queues.
261///
262/// Start by asking: [*Is time-to-first-poll unusually high?*](#is-time-to-first-poll-unusually-high)
263///
264/// ##### Why are my tasks spending more time waiting on external events to complete?
265/// You observed that [your tasks are spending more time waiting waiting on external events to
266/// complete](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete). But what
267/// event? Fortunately, within the task experiencing increased idle times, you monitored several
268/// sub-tasks with distinct [`TaskMonitor`]s. For each of these sub-tasks, you [*you try to identify
269/// the performance culprits...*](#identifying-the-high-level-culprits)
270///
271/// #### Digging even deeper
272///
273/// ##### Is time-to-first-poll unusually high?
274/// Contrast these two metrics:
275/// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**  
276///   This metric reflects the mean delay between the instant a task is first instrumented and the
277///   instant it is *first* polled.
278/// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**  
279///   This metric reflects the mean delay between the instant when tasks were awoken and the
280///   instant they were subsequently polled.
281///
282/// If the former metric exceeds the latter (or increased unexpectedly more than the latter), then
283/// start by investigating [*if your application is artificially delaying the time-to-first-poll*](#is-my-application-delaying-the-time-to-first-poll).
284///
285/// Otherwise, investigate [*if other tasks are polling too long without yielding*](#are-other-tasks-polling-too-long-without-yielding).
286///
287/// ##### Is my application delaying the time-to-first-poll?
288/// You observed that [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increased, more
289/// than [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]. Your application may be
290/// needlessly inflating the time elapsed between instrumentation and first poll. Are you
291/// constructing (and instrumenting) tasks separately from awaiting or spawning them?
292///
293/// For instance, in the below example, the application induces 1 second delay between when `task`
294/// is instrumented and when it is awaited:
295/// ```rust
296/// #[tokio::main]
297/// async fn main() {
298///     use tokio::time::Duration;
299///     let monitor = tokio_metrics::TaskMonitor::new();
300///
301///     let task = monitor.instrument(async move {});
302///
303///     let one_sec = Duration::from_secs(1);
304///     tokio::time::sleep(one_sec).await;
305///
306///     let _ = tokio::spawn(task).await;
307///
308///     assert!(monitor.cumulative().total_first_poll_delay >= one_sec);
309/// }
310/// ```
311///
312/// Otherwise, [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] might be unusually high
313/// because [*your application is spawning key tasks into tokio's global queue...*](#is-my-application-spawning-more-tasks-into-tokio’s-global-queue)
314///
315/// ##### Is my application spawning more tasks into tokio's global queue?
316/// Tasks awoken from threads *not* managed by the tokio runtime are scheduled with a slower,
317/// global "injection" queue.
318///
319/// You may be notifying runtime tasks from off-runtime. For instance, Given the following:
320/// ```ignore
321/// #[tokio::main]
322/// async fn main() {
323///     for _ in 0..100 {
324///         let (tx, rx) = oneshot::channel();
325///         tokio::spawn(async move {
326///             tx.send(());
327///         })
328///         
329///         rx.await;
330///     }
331/// }
332/// ```
333/// One would expect this to run efficiently, however, the main task is run *off* the main runtime
334/// and the spawned tasks are *on* runtime, which means the snippet will run much slower than:
335/// ```ignore
336/// #[tokio::main]
337/// async fn main() {
338///     tokio::spawn(async {
339///         for _ in 0..100 {
340///             let (tx, rx) = oneshot::channel();
341///             tokio::spawn(async move {
342///                 tx.send(());
343///             })
344///
345///             rx.await;
346///         }
347///     }).await;
348/// }
349/// ```
350/// The slowdown is caused by a higher time between the `rx` task being notified (in `tx.send()`)
351/// and the task being polled.
352///
353/// ##### Are other tasks polling too long without yielding?
354/// You suspect that your tasks are slow because they're backed up in tokio's scheduling queues. For
355/// *each* of your application's [`TaskMonitor`]s you check to see [*if their associated tasks are
356/// taking longer to poll...*](#are-my-tasks-taking-longer-to-poll)
357///
358/// ### Limitations
359/// The [`TaskMetrics`] type uses [`u64`] to represent both event counters and durations (measured
360/// in nanoseconds). Consequently, event counters are accurate for ≤ [`u64::MAX`] events, and
361/// durations are accurate for ≤ [`u64::MAX`] nanoseconds.
362///
363/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::cumulative`] increase
364/// monotonically with each successive invocation of [`TaskMonitor::cumulative`]. Upon overflow,
365/// counters and durations wrap.
366///
367/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::intervals`] are
368/// calculated by computing the difference of metrics in successive invocations of
369/// [`TaskMonitor::cumulative`]. If, within a monitoring interval, an event occurs more than
370/// [`u64::MAX`] times, or a monitored duration exceeds [`u64::MAX`] nanoseconds, the metrics for
371/// that interval will overflow and not be accurate.
372///
373/// ##### Examples at the limits
374/// Consider the [`TaskMetrics::total_first_poll_delay`] metric. This metric accurately reflects
375/// delays between instrumentation and first-poll ≤ [`u64::MAX`] nanoseconds:
376/// ```
377/// use tokio::time::Duration;
378///
379/// #[tokio::main(flavor = "current_thread", start_paused = true)]
380/// async fn main() {
381///     let monitor = tokio_metrics::TaskMonitor::new();
382///     let mut interval = monitor.intervals();
383///     let mut next_interval = || interval.next().unwrap();
384///
385///     // construct and instrument a task, but do not `await` it
386///     let task = monitor.instrument(async {});
387///
388///     // this is the maximum duration representable by tokio_metrics
389///     let max_duration = Duration::from_nanos(u64::MAX);
390///
391///     // let's advance the clock by this amount and poll `task`
392///     let _ = tokio::time::advance(max_duration).await;
393///     task.await;
394///
395///     // durations ≤ `max_duration` are accurately reflected in this metric
396///     assert_eq!(next_interval().total_first_poll_delay, max_duration);
397///     assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
398/// }
399/// ```
400/// If the total delay between instrumentation and first poll exceeds [`u64::MAX`] nanoseconds,
401/// [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay] will overflow:
402/// ```
403/// # use tokio::time::Duration;
404/// #
405/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
406/// # async fn main() {
407/// #    let monitor = tokio_metrics::TaskMonitor::new();
408/// #
409///  // construct and instrument a task, but do not `await` it
410///  let task_a = monitor.instrument(async {});
411///  let task_b = monitor.instrument(async {});
412///
413///  // this is the maximum duration representable by tokio_metrics
414///  let max_duration = Duration::from_nanos(u64::MAX);
415///
416///  // let's advance the clock by 1.5x this amount and await `task`
417///  let _ = tokio::time::advance(3 * (max_duration / 2)).await;
418///  task_a.await;
419///  task_b.await;
420///
421///  // the `total_first_poll_delay` has overflowed
422///  assert!(monitor.cumulative().total_first_poll_delay < max_duration);
423/// # }
424/// ```
425/// If *many* tasks are spawned, it will take far less than a [`u64::MAX`]-nanosecond delay to bring
426/// this metric to the precipice of overflow:
427/// ```
428/// # use tokio::time::Duration;
429/// #
430/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
431/// # async fn main() {
432/// #     let monitor = tokio_metrics::TaskMonitor::new();
433/// #     let mut interval = monitor.intervals();
434/// #     let mut next_interval = || interval.next().unwrap();
435/// #
436/// // construct and instrument u16::MAX tasks, but do not `await` them
437/// let first_poll_count = u16::MAX as u64;
438/// let mut tasks = Vec::with_capacity(first_poll_count as usize);
439/// for _ in 0..first_poll_count { tasks.push(monitor.instrument(async {})); }
440///
441/// // this is the maximum duration representable by tokio_metrics
442/// let max_duration = u64::MAX;
443///
444/// // let's advance the clock justenough such that all of the time-to-first-poll
445/// // delays summed nearly equals `max_duration_nanos`, less some remainder...
446/// let iffy_delay = max_duration / (first_poll_count as u64);
447/// let small_remainder = max_duration % first_poll_count;
448/// let _ = tokio::time::advance(Duration::from_nanos(iffy_delay)).await;
449///
450/// // ...then poll all of the instrumented tasks:
451/// for task in tasks { task.await; }
452///
453/// // `total_first_poll_delay` is at the precipice of overflowing!
454/// assert_eq!(
455///     next_interval().total_first_poll_delay.as_nanos(),
456///     (max_duration - small_remainder) as u128
457/// );
458/// assert_eq!(
459///     monitor.cumulative().total_first_poll_delay.as_nanos(),
460///     (max_duration - small_remainder) as u128
461/// );
462/// # }
463/// ```
464/// Frequent, interval-sampled metrics will retain their accuracy, even if the cumulative
465/// metrics counter overflows at most once in the midst of an interval:
466/// ```
467/// # use tokio::time::Duration;
468/// # use tokio_metrics::TaskMonitor;
469/// #
470/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
471/// # async fn main() {
472/// #     let monitor = TaskMonitor::new();
473/// #     let mut interval = monitor.intervals();
474/// #     let mut next_interval = || interval.next().unwrap();
475/// #
476///  let first_poll_count = u16::MAX as u64;
477///  let batch_size = first_poll_count / 3;
478///
479///  let max_duration_ns = u64::MAX;
480///  let iffy_delay_ns = max_duration_ns / first_poll_count;
481///
482///  // Instrument `batch_size` number of tasks, wait for `delay` nanoseconds,
483///  // then await the instrumented tasks.
484///  async fn run_batch(monitor: &TaskMonitor, batch_size: usize, delay: u64) {
485///      let mut tasks = Vec::with_capacity(batch_size);
486///      for _ in 0..batch_size { tasks.push(monitor.instrument(async {})); }
487///      let _ = tokio::time::advance(Duration::from_nanos(delay)).await;
488///      for task in tasks { task.await; }
489///  }
490///
491///  // this is how much `total_time_to_first_poll_ns` will
492///  // increase with each batch we run
493///  let batch_delay = iffy_delay_ns * batch_size;
494///
495///  // run batches 1, 2, and 3
496///  for i in 1..=3 {
497///      run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
498///      assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
499///      assert_eq!(i * batch_delay as u128, monitor.cumulative().total_first_poll_delay.as_nanos());
500///  }
501///
502///  /* now, the `total_time_to_first_poll_ns` counter is at the precipice of overflow */
503///  assert_eq!(monitor.cumulative().total_first_poll_delay.as_nanos(), max_duration_ns as u128);
504///
505///  // run batch 4
506///  run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
507///  // the interval counter remains accurate
508///  assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
509///  // but the cumulative counter has overflowed
510///  assert_eq!(batch_delay as u128 - 1, monitor.cumulative().total_first_poll_delay.as_nanos());
511/// # }
512/// ```
513/// If a cumulative metric overflows *more than once* in the midst of an interval,
514/// its interval-sampled counterpart will also overflow.
515#[derive(Clone, Debug)]
516pub struct TaskMonitor {
517    metrics: Arc<RawMetrics>,
518}
519
520/// Provides an interface for constructing a [`TaskMonitor`] with specialized configuration
521/// parameters.
522#[derive(Clone, Debug, Default)]
523pub struct TaskMonitorBuilder {
524    slow_poll_threshold: Option<Duration>,
525    long_delay_threshold: Option<Duration>,
526}
527
528impl TaskMonitorBuilder {
529    /// Creates a new [`TaskMonitorBuilder`].
530    pub fn new() -> Self {
531        Self {
532            slow_poll_threshold: None,
533            long_delay_threshold: None,
534        }
535    }
536
537    /// Specifies the threshold at which polls are considered 'slow'.
538    pub fn with_slow_poll_threshold(&mut self, threshold: Duration) -> &mut Self {
539        self.slow_poll_threshold = Some(threshold);
540
541        self
542    }
543
544    /// Specifies the threshold at which schedules are considered 'long'.
545    pub fn with_long_delay_threshold(&mut self, threshold: Duration) -> &mut Self {
546        self.long_delay_threshold = Some(threshold);
547
548        self
549    }
550
551    /// Consume the builder, producing a [`TaskMonitor`].
552    pub fn build(self) -> TaskMonitor {
553        TaskMonitor::create(
554            self.slow_poll_threshold
555                .unwrap_or(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD),
556            self.long_delay_threshold
557                .unwrap_or(TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD),
558        )
559    }
560}
561
562pin_project! {
563    /// An async task that has been instrumented with [`TaskMonitor::instrument`].
564    #[derive(Debug)]
565    pub struct Instrumented<T> {
566        // The task being instrumented
567        #[pin]
568        task: T,
569
570        // True when the task is polled for the first time
571        did_poll_once: bool,
572
573        // The instant, tracked as nanoseconds since `instrumented_at`, at which the future finished
574        // its last poll.
575        idled_at: u64,
576
577        // State shared between the task and its instrumented waker.
578        state: Arc<State>,
579    }
580
581    impl<T> PinnedDrop for Instrumented<T> {
582        fn drop(this: Pin<&mut Self>) {
583            this.state.metrics.dropped_count.fetch_add(1, SeqCst);
584        }
585    }
586}
587
588/// Key metrics of [instrumented][`TaskMonitor::instrument`] tasks.
589#[non_exhaustive]
590#[derive(Debug, Clone, Copy, Default)]
591pub struct TaskMetrics {
592    /// The number of tasks instrumented.
593    ///
594    /// ##### Examples
595    /// ```
596    /// #[tokio::main]
597    /// async fn main() {
598    ///     let monitor = tokio_metrics::TaskMonitor::new();
599    ///     let mut interval = monitor.intervals();
600    ///     let mut next_interval = || interval.next().unwrap();
601    ///
602    ///     // 0 tasks have been instrumented
603    ///     assert_eq!(next_interval().instrumented_count, 0);
604    ///
605    ///     monitor.instrument(async {});
606    ///
607    ///     // 1 task has been instrumented
608    ///     assert_eq!(next_interval().instrumented_count, 1);
609    ///
610    ///     monitor.instrument(async {});
611    ///     monitor.instrument(async {});
612    ///
613    ///     // 2 tasks have been instrumented
614    ///     assert_eq!(next_interval().instrumented_count, 2);
615    ///
616    ///     // since the last interval was produced, 0 tasks have been instrumented
617    ///     assert_eq!(next_interval().instrumented_count, 0);
618    /// }
619    /// ```
620    pub instrumented_count: u64,
621
622    /// The number of tasks dropped.
623    ///
624    /// ##### Examples
625    /// ```
626    /// #[tokio::main]
627    /// async fn main() {
628    ///     let monitor = tokio_metrics::TaskMonitor::new();
629    ///     let mut interval = monitor.intervals();
630    ///     let mut next_interval = || interval.next().unwrap();
631    ///
632    ///     // 0 tasks have been dropped
633    ///     assert_eq!(next_interval().dropped_count, 0);
634    ///
635    ///     let _task = monitor.instrument(async {});
636    ///
637    ///     // 0 tasks have been dropped
638    ///     assert_eq!(next_interval().dropped_count, 0);
639    ///
640    ///     monitor.instrument(async {}).await;
641    ///     drop(monitor.instrument(async {}));
642    ///
643    ///     // 2 tasks have been dropped
644    ///     assert_eq!(next_interval().dropped_count, 2);
645    ///
646    ///     // since the last interval was produced, 0 tasks have been dropped
647    ///     assert_eq!(next_interval().dropped_count, 0);
648    /// }
649    /// ```
650    pub dropped_count: u64,
651
652    /// The number of tasks polled for the first time.
653    ///
654    /// ##### Derived metrics
655    /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**  
656    ///   The mean duration elapsed between the instant tasks are instrumented, and the instant they
657    ///   are first polled.
658    ///
659    /// ##### Examples
660    /// In the below example, no tasks are instrumented or polled in the first sampling interval;
661    /// one task is instrumented (but not polled) in the second sampling interval; that task is
662    /// awaited to completion (and, thus, polled at least once) in the third sampling interval; no
663    /// additional tasks are polled for the first time within the fourth sampling interval:
664    /// ```
665    /// #[tokio::main]
666    /// async fn main() {
667    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
668    ///     let mut interval = metrics_monitor.intervals();
669    ///     let mut next_interval = || interval.next().unwrap();
670    ///
671    ///     // no tasks have been constructed, instrumented, and polled at least once
672    ///     assert_eq!(next_interval().first_poll_count, 0);
673    ///
674    ///     let task = metrics_monitor.instrument(async {});
675    ///
676    ///     // `task` has been constructed and instrumented, but has not yet been polled
677    ///     assert_eq!(next_interval().first_poll_count, 0);
678    ///
679    ///     // poll `task` to completion
680    ///     task.await;
681    ///
682    ///     // `task` has been constructed, instrumented, and polled at least once
683    ///     assert_eq!(next_interval().first_poll_count, 1);
684    ///
685    ///     // since the last interval was produced, 0 tasks have been constructed, instrumented and polled
686    ///     assert_eq!(next_interval().first_poll_count, 0);
687    ///
688    /// }
689    /// ```
690    pub first_poll_count: u64,
691
692    /// The total duration elapsed between the instant tasks are instrumented, and the instant they
693    /// are first polled.
694    ///
695    /// ##### Derived metrics
696    /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**   
697    ///   The mean duration elapsed between the instant tasks are instrumented, and the instant they
698    ///   are first polled.
699    ///
700    /// ##### Examples
701    /// In the below example, 0 tasks have been instrumented or polled within the first sampling
702    /// interval, a total of 500ms elapse between the instrumentation and polling of tasks within
703    /// the second sampling interval, and a total of 350ms elapse between the instrumentation and
704    /// polling of tasks within the third sampling interval:
705    /// ```
706    /// use tokio::time::Duration;
707    ///
708    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
709    /// async fn main() {
710    ///     let monitor = tokio_metrics::TaskMonitor::new();
711    ///     let mut interval = monitor.intervals();
712    ///     let mut next_interval = || interval.next().unwrap();
713    ///
714    ///     // no tasks have yet been created, instrumented, or polled
715    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
716    ///     assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
717    ///
718    ///     // constructs and instruments a task, pauses a given duration, then awaits the task
719    ///     async fn instrument_pause_await(monitor: &tokio_metrics::TaskMonitor, pause: Duration) {
720    ///         let task = monitor.instrument(async move {});
721    ///         tokio::time::sleep(pause).await;
722    ///         task.await;
723    ///     }
724    ///
725    ///     // construct and await a task that pauses for 500ms between instrumentation and first poll
726    ///     let task_a_pause_time = Duration::from_millis(500);
727    ///     instrument_pause_await(&monitor, task_a_pause_time).await;
728    ///
729    ///     assert_eq!(next_interval().total_first_poll_delay, task_a_pause_time);
730    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, task_a_pause_time);
731    ///
732    ///     // construct and await a task that pauses for 250ms between instrumentation and first poll
733    ///     let task_b_pause_time = Duration::from_millis(250);
734    ///     instrument_pause_await(&monitor, task_b_pause_time).await;
735    ///
736    ///     // construct and await a task that pauses for 100ms between instrumentation and first poll
737    ///     let task_c_pause_time = Duration::from_millis(100);
738    ///     instrument_pause_await(&monitor, task_c_pause_time).await;
739    ///
740    ///     assert_eq!(
741    ///         next_interval().total_first_poll_delay,
742    ///         task_b_pause_time + task_c_pause_time
743    ///     );
744    ///     assert_eq!(
745    ///         monitor.cumulative().total_first_poll_delay,
746    ///         task_a_pause_time + task_b_pause_time + task_c_pause_time
747    ///     );
748    /// }
749    /// ```
750    ///
751    /// ##### When is this metric recorded?
752    /// The delay between instrumentation and first poll is not recorded until the first poll
753    /// actually occurs:
754    /// ```
755    /// # use tokio::time::Duration;
756    /// #
757    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
758    /// # async fn main() {
759    /// #     let monitor = tokio_metrics::TaskMonitor::new();
760    /// #     let mut interval = monitor.intervals();
761    /// #     let mut next_interval = || interval.next().unwrap();
762    /// #
763    /// // we construct and instrument a task, but do not `await` it
764    /// let task = monitor.instrument(async {});
765    ///
766    /// // let's sleep for 1s before we poll `task`
767    /// let one_sec = Duration::from_secs(1);
768    /// let _ = tokio::time::sleep(one_sec).await;
769    ///
770    /// // although 1s has now elapsed since the instrumentation of `task`,
771    /// // this is not reflected in `total_first_poll_delay`...
772    /// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
773    /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
774    ///
775    /// // ...and won't be until `task` is actually polled
776    /// task.await;
777    ///
778    /// // now, the 1s delay is reflected in `total_first_poll_delay`:
779    /// assert_eq!(next_interval().total_first_poll_delay, one_sec);
780    /// assert_eq!(monitor.cumulative().total_first_poll_delay, one_sec);
781    /// # }
782    /// ```
783    ///
784    /// ##### What if first-poll-delay is very large?
785    /// The first-poll-delay of *individual* tasks saturates at `u64::MAX` nanoseconds. However, if
786    /// the *total* first-poll-delay *across* monitored tasks exceeds `u64::MAX` nanoseconds, this
787    /// metric will wrap around:
788    /// ```
789    /// use tokio::time::Duration;
790    ///
791    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
792    /// async fn main() {
793    ///     let monitor = tokio_metrics::TaskMonitor::new();
794    ///
795    ///     // construct and instrument a task, but do not `await` it
796    ///     let task = monitor.instrument(async {});
797    ///
798    ///     // this is the maximum duration representable by tokio_metrics
799    ///     let max_duration = Duration::from_nanos(u64::MAX);
800    ///
801    ///     // let's advance the clock by double this amount and await `task`
802    ///     let _ = tokio::time::advance(max_duration * 2).await;
803    ///     task.await;
804    ///
805    ///     // the time-to-first-poll of `task` saturates at `max_duration`
806    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
807    ///
808    ///     // ...but note that the metric *will* wrap around if more tasks are involved
809    ///     let task = monitor.instrument(async {});
810    ///     let _ = tokio::time::advance(Duration::from_nanos(1)).await;
811    ///     task.await;
812    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
813    /// }
814    /// ```
815    pub total_first_poll_delay: Duration,
816
817    /// The total number of times that tasks idled, waiting to be awoken.
818    ///
819    /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
820    /// task completes a poll, and the instant that it is next awoken.
821    ///
822    /// ##### Derived metrics
823    /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**   
824    ///   The mean duration of idles.
825    ///
826    /// ##### Examples
827    /// ```
828    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
829    /// async fn main() {
830    ///     let monitor = tokio_metrics::TaskMonitor::new();
831    ///     let mut interval = monitor.intervals();
832    ///     let mut next_interval = move || interval.next().unwrap();
833    ///     let one_sec = std::time::Duration::from_secs(1);
834    ///
835    ///     monitor.instrument(async {}).await;
836    ///
837    ///     assert_eq!(next_interval().total_idled_count, 0);
838    ///     assert_eq!(monitor.cumulative().total_idled_count, 0);
839    ///
840    ///     monitor.instrument(async move {
841    ///         tokio::time::sleep(one_sec).await;
842    ///     }).await;
843    ///
844    ///     assert_eq!(next_interval().total_idled_count, 1);
845    ///     assert_eq!(monitor.cumulative().total_idled_count, 1);
846    ///
847    ///     monitor.instrument(async {
848    ///         tokio::time::sleep(one_sec).await;
849    ///         tokio::time::sleep(one_sec).await;
850    ///     }).await;
851    ///
852    ///     assert_eq!(next_interval().total_idled_count, 2);
853    ///     assert_eq!(monitor.cumulative().total_idled_count, 3);
854    /// }
855    /// ```
856    pub total_idled_count: u64,
857
858    /// The total duration that tasks idled.
859    ///
860    /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
861    /// task completes a poll, and the instant that it is next awoken.
862    ///
863    /// ##### Derived metrics
864    /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**   
865    ///   The mean duration of idles.
866    ///
867    /// ##### Examples
868    /// ```
869    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
870    /// async fn main() {
871    ///     let monitor = tokio_metrics::TaskMonitor::new();
872    ///     let mut interval = monitor.intervals();
873    ///     let mut next_interval = move || interval.next().unwrap();
874    ///     let one_sec = std::time::Duration::from_secs(1);
875    ///     let two_sec = std::time::Duration::from_secs(2);
876    ///
877    ///     assert_eq!(next_interval().total_idle_duration.as_nanos(), 0);
878    ///     assert_eq!(monitor.cumulative().total_idle_duration.as_nanos(), 0);
879    ///
880    ///     monitor.instrument(async move {
881    ///         tokio::time::sleep(one_sec).await;
882    ///     }).await;
883    ///
884    ///     assert_eq!(next_interval().total_idle_duration, one_sec);
885    ///     assert_eq!(monitor.cumulative().total_idle_duration, one_sec);
886    ///
887    ///     monitor.instrument(async move {
888    ///         tokio::time::sleep(two_sec).await;
889    ///     }).await;
890    ///
891    ///     assert_eq!(next_interval().total_idle_duration, two_sec);
892    ///     assert_eq!(monitor.cumulative().total_idle_duration, one_sec + two_sec);
893    /// }
894    /// ```
895    pub total_idle_duration: Duration,
896
897    /// The maximum idle duration that a task took.
898    ///
899    /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
900    /// task completes a poll, and the instant that it is next awoken.
901    ///
902    /// ##### Derived metrics
903    /// - **[`max_idle_duration`][TaskMetrics::max_idle_duration]**   
904    ///   The longest duration a task spent idle.
905    ///
906    /// ##### Examples
907    /// ```
908    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
909    /// async fn main() {
910    ///     let monitor = tokio_metrics::TaskMonitor::new();
911    ///     let mut interval = monitor.intervals();
912    ///     let mut next_interval = move || interval.next().unwrap();
913    ///     let one_sec = std::time::Duration::from_secs(1);
914    ///     let two_sec = std::time::Duration::from_secs(2);
915    ///
916    ///     assert_eq!(next_interval().max_idle_duration.as_nanos(), 0);
917    ///     assert_eq!(monitor.cumulative().max_idle_duration.as_nanos(), 0);
918    ///
919    ///     monitor.instrument(async move {
920    ///         tokio::time::sleep(one_sec).await;
921    ///     }).await;
922    ///
923    ///     assert_eq!(next_interval().max_idle_duration, one_sec);
924    ///     assert_eq!(monitor.cumulative().max_idle_duration, one_sec);
925    ///
926    ///     monitor.instrument(async move {
927    ///         tokio::time::sleep(two_sec).await;
928    ///     }).await;
929    ///
930    ///     assert_eq!(next_interval().max_idle_duration, two_sec);
931    ///     assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
932    ///
933    ///     monitor.instrument(async move {
934    ///         tokio::time::sleep(one_sec).await;
935    ///     }).await;
936    ///
937    ///     assert_eq!(next_interval().max_idle_duration, one_sec);
938    ///     assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
939    /// }
940    /// ```
941    pub max_idle_duration: Duration,
942
943    /// The total number of times that tasks were awoken (and then, presumably, scheduled for
944    /// execution).
945    ///
946    /// ##### Definition
947    /// This metric is equal to [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration]
948    /// \+ [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration].
949    ///
950    /// ##### Derived metrics
951    /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**   
952    ///   The mean duration that tasks spent waiting to be executed after awakening.
953    ///
954    /// ##### Examples
955    /// In the below example, a task yields to the scheduler a varying number of times between
956    /// sampling intervals; this metric is equal to the number of times the task yielded:
957    /// ```
958    /// #[tokio::main]
959    /// async fn main(){
960    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
961    ///
962    ///     // [A] no tasks have been created, instrumented, and polled more than once
963    ///     assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
964    ///
965    ///     // [B] a `task` is created and instrumented
966    ///     let task = {
967    ///         let monitor = metrics_monitor.clone();
968    ///         metrics_monitor.instrument(async move {
969    ///             let mut interval = monitor.intervals();
970    ///             let mut next_interval = move || interval.next().unwrap();
971    ///
972    ///             // [E] `task` has not yet yielded to the scheduler, and
973    ///             // thus has not yet been scheduled since its first `poll`
974    ///             assert_eq!(next_interval().total_scheduled_count, 0);
975    ///
976    ///             tokio::task::yield_now().await; // yield to the scheduler
977    ///
978    ///             // [F] `task` has yielded to the scheduler once (and thus been
979    ///             // scheduled once) since the last sampling interval
980    ///             assert_eq!(next_interval().total_scheduled_count, 1);
981    ///
982    ///             tokio::task::yield_now().await; // yield to the scheduler
983    ///             tokio::task::yield_now().await; // yield to the scheduler
984    ///             tokio::task::yield_now().await; // yield to the scheduler
985    ///
986    ///             // [G] `task` has yielded to the scheduler thrice (and thus been
987    ///             // scheduled thrice) since the last sampling interval
988    ///             assert_eq!(next_interval().total_scheduled_count, 3);
989    ///
990    ///             tokio::task::yield_now().await; // yield to the scheduler
991    ///
992    ///             next_interval
993    ///         })
994    ///     };
995    ///
996    ///     // [C] `task` has not yet been polled at all
997    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
998    ///     assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
999    ///
1000    ///     // [D] poll `task` to completion
1001    ///     let mut next_interval = task.await;
1002    ///
1003    ///     // [H] `task` has been polled 1 times since the last sample
1004    ///     assert_eq!(next_interval().total_scheduled_count, 1);
1005    ///
1006    ///     // [I] `task` has been polled 0 times since the last sample
1007    ///     assert_eq!(next_interval().total_scheduled_count, 0);
1008    ///
1009    ///     // [J] `task` has yielded to the scheduler a total of five times
1010    ///     assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 5);
1011    /// }
1012    /// ```
1013    #[doc(alias = "total_delay_count")]
1014    pub total_scheduled_count: u64,
1015
1016    /// The total duration that tasks spent waiting to be polled after awakening.
1017    ///
1018    /// ##### Definition
1019    /// This metric is equal to [`total_short_delay_count`][TaskMetrics::total_short_delay_count]
1020    /// \+ [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
1021    ///
1022    /// ##### Derived metrics
1023    /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**   
1024    ///   The mean duration that tasks spent waiting to be executed after awakening.
1025    ///
1026    /// ##### Examples
1027    /// ```
1028    /// use tokio::time::Duration;
1029    ///
1030    /// #[tokio::main(flavor = "current_thread")]
1031    /// async fn main() {
1032    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1033    ///     let mut interval = metrics_monitor.intervals();
1034    ///     let mut next_interval = || interval.next().unwrap();
1035    ///
1036    ///     // construct and instrument and spawn a task that yields endlessly
1037    ///     tokio::spawn(metrics_monitor.instrument(async {
1038    ///         loop { tokio::task::yield_now().await }
1039    ///     }));
1040    ///
1041    ///     tokio::task::yield_now().await;
1042    ///
1043    ///     // block the executor for 1 second
1044    ///     std::thread::sleep(Duration::from_millis(1000));
1045    ///
1046    ///     tokio::task::yield_now().await;
1047    ///
1048    ///     // `endless_task` will have spent approximately one second waiting
1049    ///     let total_scheduled_duration = next_interval().total_scheduled_duration;
1050    ///     assert!(total_scheduled_duration >= Duration::from_millis(1000));
1051    ///     assert!(total_scheduled_duration <= Duration::from_millis(1100));
1052    /// }
1053    /// ```
1054    #[doc(alias = "total_delay_duration")]
1055    pub total_scheduled_duration: Duration,
1056
1057    /// The total number of times that tasks were polled.
1058    ///
1059    /// ##### Definition
1060    /// This metric is equal to [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count]
1061    /// \+ [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
1062    ///
1063    /// ##### Derived metrics
1064    /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**   
1065    ///   The mean duration of polls.
1066    ///
1067    /// ##### Examples
1068    /// In the below example, a task with multiple yield points is await'ed to completion; this
1069    /// metric reflects the number of `await`s within each sampling interval:
1070    /// ```
1071    /// #[tokio::main]
1072    /// async fn main() {
1073    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1074    ///
1075    ///     // [A] no tasks have been created, instrumented, and polled more than once
1076    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1077    ///
1078    ///     // [B] a `task` is created and instrumented
1079    ///     let task = {
1080    ///         let monitor = metrics_monitor.clone();
1081    ///         metrics_monitor.instrument(async move {
1082    ///             let mut interval = monitor.intervals();
1083    ///             let mut next_interval = move || interval.next().unwrap();
1084    ///
1085    ///             // [E] task is in the midst of its first poll
1086    ///             assert_eq!(next_interval().total_poll_count, 0);
1087    ///
1088    ///             tokio::task::yield_now().await; // poll 1
1089    ///
1090    ///             // [F] task has been polled 1 time
1091    ///             assert_eq!(next_interval().total_poll_count, 1);
1092    ///
1093    ///             tokio::task::yield_now().await; // poll 2
1094    ///             tokio::task::yield_now().await; // poll 3
1095    ///             tokio::task::yield_now().await; // poll 4
1096    ///
1097    ///             // [G] task has been polled 3 times
1098    ///             assert_eq!(next_interval().total_poll_count, 3);
1099    ///
1100    ///             tokio::task::yield_now().await; // poll 5
1101    ///
1102    ///             next_interval                   // poll 6
1103    ///         })
1104    ///     };
1105    ///
1106    ///     // [C] `task` has not yet been polled at all
1107    ///     assert_eq!(metrics_monitor.cumulative().total_poll_count, 0);
1108    ///
1109    ///     // [D] poll `task` to completion
1110    ///     let mut next_interval = task.await;
1111    ///
1112    ///     // [H] `task` has been polled 2 times since the last sample
1113    ///     assert_eq!(next_interval().total_poll_count, 2);
1114    ///
1115    ///     // [I] `task` has been polled 0 times since the last sample
1116    ///     assert_eq!(next_interval().total_poll_count, 0);
1117    ///
1118    ///     // [J] `task` has been polled 6 times
1119    ///     assert_eq!(metrics_monitor.cumulative().total_poll_count, 6);
1120    /// }
1121    /// ```
1122    pub total_poll_count: u64,
1123
1124    /// The total duration elapsed during polls.
1125    ///
1126    /// ##### Definition
1127    /// This metric is equal to [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration]
1128    /// \+ [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration].
1129    ///
1130    /// ##### Derived metrics
1131    /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**   
1132    ///   The mean duration of polls.
1133    ///
1134    /// #### Examples
1135    /// ```
1136    /// use tokio::time::Duration;
1137    ///
1138    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
1139    /// async fn main() {
1140    ///     let monitor = tokio_metrics::TaskMonitor::new();
1141    ///     let mut interval = monitor.intervals();
1142    ///     let mut next_interval = move || interval.next().unwrap();
1143    ///
1144    ///     assert_eq!(next_interval().total_poll_duration, Duration::ZERO);
1145    ///
1146    ///     monitor.instrument(async {
1147    ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
1148    ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
1149    ///         ()                                                  // poll 3 (0s)
1150    ///     }).await;
1151    ///
1152    ///     assert_eq!(next_interval().total_poll_duration, Duration::from_secs(2));
1153    /// }
1154    /// ```
1155    pub total_poll_duration: Duration,
1156
1157    /// The total number of times that polling tasks completed swiftly.
1158    ///
1159    /// Here, 'swiftly' is defined as completing in strictly less time than
1160    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1161    ///
1162    /// ##### Derived metrics
1163    /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**   
1164    ///   The mean duration of fast polls.
1165    ///
1166    /// ##### Examples
1167    /// In the below example, 0 polls occur within the first sampling interval, 3 fast polls occur
1168    /// within the second sampling interval, and 2 fast polls occur within the third sampling
1169    /// interval:
1170    /// ```
1171    /// use std::future::Future;
1172    /// use std::time::Duration;
1173    ///
1174    /// #[tokio::main]
1175    /// async fn main() {
1176    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1177    ///     let mut interval = metrics_monitor.intervals();
1178    ///     let mut next_interval = || interval.next().unwrap();
1179    ///
1180    ///     // no tasks have been constructed, instrumented, or polled
1181    ///     assert_eq!(next_interval().total_fast_poll_count, 0);
1182    ///
1183    ///     let fast = Duration::ZERO;
1184    ///
1185    ///     // this task completes in three fast polls
1186    ///     let _ = metrics_monitor.instrument(async {
1187    ///         spin_for(fast).await; // fast poll 1
1188    ///         spin_for(fast).await; // fast poll 2
1189    ///         spin_for(fast)        // fast poll 3
1190    ///     }).await;
1191    ///
1192    ///     assert_eq!(next_interval().total_fast_poll_count, 3);
1193    ///
1194    ///     // this task completes in two fast polls
1195    ///     let _ = metrics_monitor.instrument(async {
1196    ///         spin_for(fast).await; // fast poll 1
1197    ///         spin_for(fast)        // fast poll 2
1198    ///     }).await;
1199    ///
1200    ///     assert_eq!(next_interval().total_fast_poll_count, 2);
1201    /// }
1202    ///
1203    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1204    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1205    ///     let start = tokio::time::Instant::now();
1206    ///     while start.elapsed() <= duration {}
1207    ///     tokio::task::yield_now()
1208    /// }
1209    /// ```
1210    pub total_fast_poll_count: u64,
1211
1212    /// The total duration of fast polls.
1213    ///
1214    /// Here, 'fast' is defined as completing in strictly less time than
1215    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1216    ///
1217    /// ##### Derived metrics
1218    /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**   
1219    ///   The mean duration of fast polls.
1220    ///
1221    /// ##### Examples
1222    /// In the below example, no tasks are polled in the first sampling interval; three fast polls
1223    /// consume a total of 3μs time in the second sampling interval; and two fast polls consume a
1224    /// total of 2μs time in the third sampling interval:
1225    /// ```
1226    /// use std::future::Future;
1227    /// use std::time::Duration;
1228    ///
1229    /// #[tokio::main]
1230    /// async fn main() {
1231    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1232    ///     let mut interval = metrics_monitor.intervals();
1233    ///     let mut next_interval = || interval.next().unwrap();
1234    ///
1235    ///     // no tasks have been constructed, instrumented, or polled
1236    ///     let interval = next_interval();
1237    ///     assert_eq!(interval.total_fast_poll_duration, Duration::ZERO);
1238    ///
1239    ///     let fast = Duration::from_micros(1);
1240    ///
1241    ///     // this task completes in three fast polls
1242    ///     let task_a_time = time(metrics_monitor.instrument(async {
1243    ///         spin_for(fast).await; // fast poll 1
1244    ///         spin_for(fast).await; // fast poll 2
1245    ///         spin_for(fast)        // fast poll 3
1246    ///     })).await;
1247    ///
1248    ///     let interval = next_interval();
1249    ///     assert!(interval.total_fast_poll_duration >= fast * 3);
1250    ///     assert!(interval.total_fast_poll_duration <= task_a_time);
1251    ///
1252    ///     // this task completes in two fast polls
1253    ///     let task_b_time = time(metrics_monitor.instrument(async {
1254    ///         spin_for(fast).await; // fast poll 1
1255    ///         spin_for(fast)        // fast poll 2
1256    ///     })).await;
1257    ///
1258    ///     let interval = next_interval();
1259    ///     assert!(interval.total_fast_poll_duration >= fast * 2);
1260    ///     assert!(interval.total_fast_poll_duration <= task_b_time);
1261    /// }
1262    ///
1263    /// /// Produces the amount of time it took to await a given async task.
1264    /// async fn time(task: impl Future) -> Duration {
1265    ///     let start = tokio::time::Instant::now();
1266    ///     task.await;
1267    ///     start.elapsed()
1268    /// }
1269    ///
1270    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1271    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1272    ///     let start = tokio::time::Instant::now();
1273    ///     while start.elapsed() <= duration {}
1274    ///     tokio::task::yield_now()
1275    /// }
1276    /// ```
1277    pub total_fast_poll_duration: Duration,
1278
1279    /// The total number of times that polling tasks completed slowly.
1280    ///
1281    /// Here, 'slowly' is defined as completing in at least as much time as
1282    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1283    ///
1284    /// ##### Derived metrics
1285    /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**   
1286    ///   The mean duration of slow polls.
1287    ///
1288    /// ##### Examples
1289    /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1290    /// within the second sampling interval, and 2 slow polls occur within the third sampling
1291    /// interval:
1292    /// ```
1293    /// use std::future::Future;
1294    /// use std::time::Duration;
1295    ///
1296    /// #[tokio::main]
1297    /// async fn main() {
1298    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1299    ///     let mut interval = metrics_monitor.intervals();
1300    ///     let mut next_interval = || interval.next().unwrap();
1301    ///
1302    ///     // no tasks have been constructed, instrumented, or polled
1303    ///     assert_eq!(next_interval().total_slow_poll_count, 0);
1304    ///
1305    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1306    ///
1307    ///     // this task completes in three slow polls
1308    ///     let _ = metrics_monitor.instrument(async {
1309    ///         spin_for(slow).await; // slow poll 1
1310    ///         spin_for(slow).await; // slow poll 2
1311    ///         spin_for(slow)        // slow poll 3
1312    ///     }).await;
1313    ///
1314    ///     assert_eq!(next_interval().total_slow_poll_count, 3);
1315    ///
1316    ///     // this task completes in two slow polls
1317    ///     let _ = metrics_monitor.instrument(async {
1318    ///         spin_for(slow).await; // slow poll 1
1319    ///         spin_for(slow)        // slow poll 2
1320    ///     }).await;
1321    ///
1322    ///     assert_eq!(next_interval().total_slow_poll_count, 2);
1323    /// }
1324    ///
1325    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1326    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1327    ///     let start = tokio::time::Instant::now();
1328    ///     while start.elapsed() <= duration {}
1329    ///     tokio::task::yield_now()
1330    /// }
1331    /// ```
1332    pub total_slow_poll_count: u64,
1333
1334    /// The total duration of slow polls.
1335    ///
1336    /// Here, 'slowly' is defined as completing in at least as much time as
1337    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1338    ///
1339    /// ##### Derived metrics
1340    /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**   
1341    ///   The mean duration of slow polls.
1342    ///
1343    /// ##### Examples
1344    /// In the below example, no tasks are polled in the first sampling interval; three slow polls
1345    /// consume a total of
1346    /// 30 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD]
1347    /// time in the second sampling interval; and two slow polls consume a total of
1348    /// 20 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
1349    /// third sampling interval:
1350    /// ```
1351    /// use std::future::Future;
1352    /// use std::time::Duration;
1353    ///
1354    /// #[tokio::main]
1355    /// async fn main() {
1356    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1357    ///     let mut interval = metrics_monitor.intervals();
1358    ///     let mut next_interval = || interval.next().unwrap();
1359    ///
1360    ///     // no tasks have been constructed, instrumented, or polled
1361    ///     let interval = next_interval();
1362    ///     assert_eq!(interval.total_slow_poll_duration, Duration::ZERO);
1363    ///
1364    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1365    ///
1366    ///     // this task completes in three slow polls
1367    ///     let task_a_time = time(metrics_monitor.instrument(async {
1368    ///         spin_for(slow).await; // slow poll 1
1369    ///         spin_for(slow).await; // slow poll 2
1370    ///         spin_for(slow)        // slow poll 3
1371    ///     })).await;
1372    ///
1373    ///     let interval = next_interval();
1374    ///     assert!(interval.total_slow_poll_duration >= slow * 3);
1375    ///     assert!(interval.total_slow_poll_duration <= task_a_time);
1376    ///
1377    ///     // this task completes in two slow polls
1378    ///     let task_b_time = time(metrics_monitor.instrument(async {
1379    ///         spin_for(slow).await; // slow poll 1
1380    ///         spin_for(slow)        // slow poll 2
1381    ///     })).await;
1382    ///
1383    ///     let interval = next_interval();
1384    ///     assert!(interval.total_slow_poll_duration >= slow * 2);
1385    ///     assert!(interval.total_slow_poll_duration <= task_b_time);
1386    /// }
1387    ///
1388    /// /// Produces the amount of time it took to await a given async task.
1389    /// async fn time(task: impl Future) -> Duration {
1390    ///     let start = tokio::time::Instant::now();
1391    ///     task.await;
1392    ///     start.elapsed()
1393    /// }
1394    ///
1395    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1396    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1397    ///     let start = tokio::time::Instant::now();
1398    ///     while start.elapsed() <= duration {}
1399    ///     tokio::task::yield_now()
1400    /// }
1401    /// ```
1402    pub total_slow_poll_duration: Duration,
1403
1404    /// The total count of tasks with short scheduling delays.
1405    ///
1406    /// This is defined as tasks taking strictly less than
1407    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1408    /// scheduled.
1409    ///
1410    /// ##### Derived metrics
1411    /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**   
1412    ///   The mean duration of short scheduling delays.
1413    pub total_short_delay_count: u64,
1414
1415    /// The total count of tasks with long scheduling delays.
1416    ///
1417    /// This is defined as tasks taking
1418    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] or longer to be executed
1419    /// after being scheduled.
1420    ///
1421    /// ##### Derived metrics
1422    /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**   
1423    ///   The mean duration of short scheduling delays.
1424    pub total_long_delay_count: u64,
1425
1426    /// The total duration of tasks with short scheduling delays.
1427    ///
1428    /// This is defined as tasks taking strictly less than
1429    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1430    /// scheduled.
1431    ///
1432    /// ##### Derived metrics
1433    /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**   
1434    ///   The mean duration of short scheduling delays.
1435    pub total_short_delay_duration: Duration,
1436
1437    /// The total number of times that a task had a long scheduling duration.
1438    ///
1439    /// Here, a long scheduling duration is defined as taking longer to start execution after
1440    /// scheduling than [`long_delay_threshold`][TaskMonitor::long_delay_threshold].
1441    ///
1442    /// ##### Derived metrics
1443    /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**   
1444    ///   The mean duration of short scheduling delays.
1445    pub total_long_delay_duration: Duration,
1446}
1447
1448/// Tracks the metrics, shared across the various types.
1449#[derive(Debug)]
1450struct RawMetrics {
1451    /// A task poll takes longer than this, it is considered a slow poll.
1452    slow_poll_threshold: Duration,
1453
1454    /// A scheduling delay of at least this long will be considered a long delay
1455    long_delay_threshold: Duration,
1456
1457    /// Total number of instrumented tasks.
1458    instrumented_count: AtomicU64,
1459
1460    /// Total number of instrumented tasks polled at least once.
1461    first_poll_count: AtomicU64,
1462
1463    /// Total number of times tasks entered the `idle` state.
1464    total_idled_count: AtomicU64,
1465
1466    /// Total number of times tasks were scheduled.
1467    total_scheduled_count: AtomicU64,
1468
1469    /// Total number of times tasks were polled fast
1470    total_fast_poll_count: AtomicU64,
1471
1472    /// Total number of times tasks were polled slow
1473    total_slow_poll_count: AtomicU64,
1474
1475    /// Total number of times tasks had long delay,
1476    total_long_delay_count: AtomicU64,
1477
1478    /// Total number of times tasks had little delay
1479    total_short_delay_count: AtomicU64,
1480
1481    /// Total number of times tasks were dropped
1482    dropped_count: AtomicU64,
1483
1484    /// Total amount of time until the first poll
1485    total_first_poll_delay_ns: AtomicU64,
1486
1487    /// Total amount of time tasks spent in the `idle` state.
1488    total_idle_duration_ns: AtomicU64,
1489
1490    /// The longest time tasks spent in the `idle` state locally.
1491    /// This will be used to track the local max between interval
1492    /// metric snapshots.
1493    local_max_idle_duration_ns: AtomicU64,
1494
1495    /// The longest time tasks spent in the `idle` state.
1496    global_max_idle_duration_ns: AtomicU64,
1497
1498    /// Total amount of time tasks spent in the waking state.
1499    total_scheduled_duration_ns: AtomicU64,
1500
1501    /// Total amount of time tasks spent being polled below the slow cut off.
1502    total_fast_poll_duration_ns: AtomicU64,
1503
1504    /// Total amount of time tasks spent being polled above the slow cut off.
1505    total_slow_poll_duration: AtomicU64,
1506
1507    /// Total amount of time tasks spent being polled below the long delay cut off.
1508    total_short_delay_duration_ns: AtomicU64,
1509
1510    /// Total amount of time tasks spent being polled at or above the long delay cut off.
1511    total_long_delay_duration_ns: AtomicU64,
1512}
1513
1514#[derive(Debug)]
1515struct State {
1516    /// Where metrics should be recorded
1517    metrics: Arc<RawMetrics>,
1518
1519    /// Instant at which the task was instrumented. This is used to track the time to first poll.
1520    instrumented_at: Instant,
1521
1522    /// The instant, tracked as nanoseconds since `instrumented_at`, at which the future
1523    /// was last woken.
1524    woke_at: AtomicU64,
1525
1526    /// Waker to forward notifications to.
1527    waker: AtomicWaker,
1528}
1529
1530impl TaskMonitor {
1531    /// The default duration at which polls cross the threshold into being categorized as 'slow' is
1532    /// 50μs.
1533    #[cfg(not(test))]
1534    pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_micros(50);
1535    #[cfg(test)]
1536    #[allow(missing_docs)]
1537    pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_millis(500);
1538
1539    /// The default duration at which schedules cross the threshold into being categorized as 'long'
1540    /// is 50μs.
1541    #[cfg(not(test))]
1542    pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_micros(50);
1543    #[cfg(test)]
1544    #[allow(missing_docs)]
1545    pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_millis(500);
1546
1547    /// Constructs a new task monitor.
1548    ///
1549    /// Uses [`Self::DEFAULT_SLOW_POLL_THRESHOLD`] as the threshold at which polls will be
1550    /// considered 'slow'.
1551    ///
1552    /// Uses [`Self::DEFAULT_LONG_DELAY_THRESHOLD`] as the threshold at which scheduling will be
1553    /// considered 'long'.
1554    pub fn new() -> TaskMonitor {
1555        TaskMonitor::with_slow_poll_threshold(Self::DEFAULT_SLOW_POLL_THRESHOLD)
1556    }
1557
1558    /// Constructs a builder for a task monitor.
1559    pub fn builder() -> TaskMonitorBuilder {
1560        TaskMonitorBuilder::new()
1561    }
1562
1563    /// Constructs a new task monitor with a given threshold at which polls are considered 'slow'.
1564    ///
1565    /// ##### Selecting an appropriate threshold
1566    /// TODO. What advice can we give here?
1567    ///
1568    /// ##### Examples
1569    /// In the below example, low-threshold and high-threshold monitors are constructed and
1570    /// instrument identical tasks; the low-threshold monitor reports4 slow polls, and the
1571    /// high-threshold monitor reports only 2 slow polls:
1572    /// ```
1573    /// use std::future::Future;
1574    /// use std::time::Duration;
1575    /// use tokio_metrics::TaskMonitor;
1576    ///
1577    /// #[tokio::main]
1578    /// async fn main() {
1579    ///     let lo_threshold = Duration::from_micros(10);
1580    ///     let hi_threshold = Duration::from_millis(10);
1581    ///
1582    ///     let lo_monitor = TaskMonitor::with_slow_poll_threshold(lo_threshold);
1583    ///     let hi_monitor = TaskMonitor::with_slow_poll_threshold(hi_threshold);
1584    ///
1585    ///     let make_task = || async {
1586    ///         spin_for(lo_threshold).await; // faster poll 1
1587    ///         spin_for(lo_threshold).await; // faster poll 2
1588    ///         spin_for(hi_threshold).await; // slower poll 3
1589    ///         spin_for(hi_threshold).await  // slower poll 4
1590    ///     };
1591    ///
1592    ///     lo_monitor.instrument(make_task()).await;
1593    ///     hi_monitor.instrument(make_task()).await;
1594    ///
1595    ///     // the low-threshold monitor reported 4 slow polls:
1596    ///     assert_eq!(lo_monitor.cumulative().total_slow_poll_count, 4);
1597    ///     // the high-threshold monitor reported only 2 slow polls:
1598    ///     assert_eq!(hi_monitor.cumulative().total_slow_poll_count, 2);
1599    /// }
1600    ///
1601    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1602    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1603    ///     let start = tokio::time::Instant::now();
1604    ///     while start.elapsed() <= duration {}
1605    ///     tokio::task::yield_now()
1606    /// }
1607    /// ```
1608    pub fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitor {
1609        Self::create(slow_poll_cut_off, Self::DEFAULT_LONG_DELAY_THRESHOLD)
1610    }
1611
1612    fn create(slow_poll_cut_off: Duration, long_delay_cut_off: Duration) -> TaskMonitor {
1613        TaskMonitor {
1614            metrics: Arc::new(RawMetrics {
1615                slow_poll_threshold: slow_poll_cut_off,
1616                first_poll_count: AtomicU64::new(0),
1617                total_idled_count: AtomicU64::new(0),
1618                total_scheduled_count: AtomicU64::new(0),
1619                total_fast_poll_count: AtomicU64::new(0),
1620                total_slow_poll_count: AtomicU64::new(0),
1621                total_long_delay_count: AtomicU64::new(0),
1622                instrumented_count: AtomicU64::new(0),
1623                dropped_count: AtomicU64::new(0),
1624                total_first_poll_delay_ns: AtomicU64::new(0),
1625                total_scheduled_duration_ns: AtomicU64::new(0),
1626                local_max_idle_duration_ns: AtomicU64::new(0),
1627                global_max_idle_duration_ns: AtomicU64::new(0),
1628                total_idle_duration_ns: AtomicU64::new(0),
1629                total_fast_poll_duration_ns: AtomicU64::new(0),
1630                total_slow_poll_duration: AtomicU64::new(0),
1631                total_short_delay_duration_ns: AtomicU64::new(0),
1632                long_delay_threshold: long_delay_cut_off,
1633                total_short_delay_count: AtomicU64::new(0),
1634                total_long_delay_duration_ns: AtomicU64::new(0),
1635            }),
1636        }
1637    }
1638
1639    /// Produces the duration greater-than-or-equal-to at which polls are categorized as slow.
1640    ///
1641    /// ##### Examples
1642    /// In the below example, [`TaskMonitor`] is initialized with [`TaskMonitor::new`];
1643    /// consequently, its slow-poll threshold equals [`TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD`]:
1644    /// ```
1645    /// use tokio_metrics::TaskMonitor;
1646    ///
1647    /// #[tokio::main]
1648    /// async fn main() {
1649    ///     let metrics_monitor = TaskMonitor::new();
1650    ///
1651    ///     assert_eq!(
1652    ///         metrics_monitor.slow_poll_threshold(),
1653    ///         TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD
1654    ///     );
1655    /// }
1656    /// ```
1657    pub fn slow_poll_threshold(&self) -> Duration {
1658        self.metrics.slow_poll_threshold
1659    }
1660
1661    /// Produces the duration greater-than-or-equal-to at which scheduling delays are categorized
1662    /// as long.
1663    pub fn long_delay_threshold(&self) -> Duration {
1664        self.metrics.long_delay_threshold
1665    }
1666
1667    /// Produces an instrumented façade around a given async task.
1668    ///
1669    /// ##### Examples
1670    /// Instrument an async task by passing it to [`TaskMonitor::instrument`]:
1671    /// ```
1672    /// #[tokio::main]
1673    /// async fn main() {
1674    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1675    ///
1676    ///     // 0 tasks have been instrumented, much less polled
1677    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1678    ///
1679    ///     // instrument a task and poll it to completion
1680    ///     metrics_monitor.instrument(async {}).await;
1681    ///
1682    ///     // 1 task has been instrumented and polled
1683    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 1);
1684    ///
1685    ///     // instrument a task and poll it to completion
1686    ///     metrics_monitor.instrument(async {}).await;
1687    ///
1688    ///     // 2 tasks have been instrumented and polled
1689    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 2);
1690    /// }
1691    /// ```
1692    /// An aync task may be tracked by multiple [`TaskMonitor`]s; e.g.:
1693    /// ```
1694    /// #[tokio::main]
1695    /// async fn main() {
1696    ///     let monitor_a = tokio_metrics::TaskMonitor::new();
1697    ///     let monitor_b = tokio_metrics::TaskMonitor::new();
1698    ///
1699    ///     // 0 tasks have been instrumented, much less polled
1700    ///     assert_eq!(monitor_a.cumulative().first_poll_count, 0);
1701    ///     assert_eq!(monitor_b.cumulative().first_poll_count, 0);
1702    ///
1703    ///     // instrument a task and poll it to completion
1704    ///     monitor_a.instrument(monitor_b.instrument(async {})).await;
1705    ///
1706    ///     // 1 task has been instrumented and polled
1707    ///     assert_eq!(monitor_a.cumulative().first_poll_count, 1);
1708    ///     assert_eq!(monitor_b.cumulative().first_poll_count, 1);
1709    /// }
1710    /// ```
1711    /// It is also possible (but probably undesirable) to instrument an async task multiple times
1712    /// with the same [`TaskMonitor`]; e.g.:
1713    /// ```
1714    /// #[tokio::main]
1715    /// async fn main() {
1716    ///     let monitor = tokio_metrics::TaskMonitor::new();
1717    ///
1718    ///     // 0 tasks have been instrumented, much less polled
1719    ///     assert_eq!(monitor.cumulative().first_poll_count, 0);
1720    ///
1721    ///     // instrument a task and poll it to completion
1722    ///     monitor.instrument(monitor.instrument(async {})).await;
1723    ///
1724    ///     // 2 tasks have been instrumented and polled, supposedly
1725    ///     assert_eq!(monitor.cumulative().first_poll_count, 2);
1726    /// }
1727    /// ```
1728    pub fn instrument<F>(&self, task: F) -> Instrumented<F> {
1729        self.metrics.instrumented_count.fetch_add(1, SeqCst);
1730        Instrumented {
1731            task,
1732            did_poll_once: false,
1733            idled_at: 0,
1734            state: Arc::new(State {
1735                metrics: self.metrics.clone(),
1736                instrumented_at: Instant::now(),
1737                woke_at: AtomicU64::new(0),
1738                waker: AtomicWaker::new(),
1739            }),
1740        }
1741    }
1742
1743    /// Produces [`TaskMetrics`] for the tasks instrumented by this [`TaskMonitor`], collected since
1744    /// the construction of [`TaskMonitor`].
1745    ///
1746    /// ##### See also
1747    /// - [`TaskMonitor::intervals`]:
1748    ///   produces [`TaskMetrics`] for user-defined sampling intervals, instead of cumulatively
1749    ///
1750    /// ##### Examples
1751    /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1752    /// within the second sampling interval, and 2 slow polls occur within the third sampling
1753    /// interval; five slow polls occur across all sampling intervals:
1754    /// ```
1755    /// use std::future::Future;
1756    /// use std::time::Duration;
1757    ///
1758    /// #[tokio::main]
1759    /// async fn main() {
1760    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1761    ///
1762    ///     // initialize a stream of sampling intervals
1763    ///     let mut intervals = metrics_monitor.intervals();
1764    ///     // each call of `next_interval` will produce metrics for the last sampling interval
1765    ///     let mut next_interval = || intervals.next().unwrap();
1766    ///
1767    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1768    ///
1769    ///     // this task completes in three slow polls
1770    ///     let _ = metrics_monitor.instrument(async {
1771    ///         spin_for(slow).await; // slow poll 1
1772    ///         spin_for(slow).await; // slow poll 2
1773    ///         spin_for(slow)        // slow poll 3
1774    ///     }).await;
1775    ///
1776    ///     // in the previous sampling interval, there were 3 slow polls
1777    ///     assert_eq!(next_interval().total_slow_poll_count, 3);
1778    ///     assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 3);
1779    ///
1780    ///     // this task completes in two slow polls
1781    ///     let _ = metrics_monitor.instrument(async {
1782    ///         spin_for(slow).await; // slow poll 1
1783    ///         spin_for(slow)        // slow poll 2
1784    ///     }).await;
1785    ///
1786    ///     // in the previous sampling interval, there were 2 slow polls
1787    ///     assert_eq!(next_interval().total_slow_poll_count, 2);
1788    ///
1789    ///     // across all sampling interval, there were a total of 5 slow polls
1790    ///     assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1791    /// }
1792    ///
1793    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1794    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1795    ///     let start = tokio::time::Instant::now();
1796    ///     while start.elapsed() <= duration {}
1797    ///     tokio::task::yield_now()
1798    /// }
1799    /// ```
1800    pub fn cumulative(&self) -> TaskMetrics {
1801        self.metrics.metrics()
1802    }
1803
1804    /// Produces an unending iterator of metric sampling intervals.
1805    ///
1806    /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1807    /// produced by [`TaskMonitor::intervals`]. The item type of this iterator is [`TaskMetrics`],
1808    /// which is a bundle of task metrics that describe *only* events occurring within that sampling
1809    /// interval.
1810    ///
1811    /// ##### Examples
1812    /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1813    /// within the second sampling interval, and 2 slow polls occur within the third sampling
1814    /// interval; five slow polls occur across all sampling intervals:
1815    /// ```
1816    /// use std::future::Future;
1817    /// use std::time::Duration;
1818    ///
1819    /// #[tokio::main]
1820    /// async fn main() {
1821    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1822    ///
1823    ///     // initialize a stream of sampling intervals
1824    ///     let mut intervals = metrics_monitor.intervals();
1825    ///     // each call of `next_interval` will produce metrics for the last sampling interval
1826    ///     let mut next_interval = || intervals.next().unwrap();
1827    ///
1828    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1829    ///
1830    ///     // this task completes in three slow polls
1831    ///     let _ = metrics_monitor.instrument(async {
1832    ///         spin_for(slow).await; // slow poll 1
1833    ///         spin_for(slow).await; // slow poll 2
1834    ///         spin_for(slow)        // slow poll 3
1835    ///     }).await;
1836    ///
1837    ///     // in the previous sampling interval, there were 3 slow polls
1838    ///     assert_eq!(next_interval().total_slow_poll_count, 3);
1839    ///
1840    ///     // this task completes in two slow polls
1841    ///     let _ = metrics_monitor.instrument(async {
1842    ///         spin_for(slow).await; // slow poll 1
1843    ///         spin_for(slow)        // slow poll 2
1844    ///     }).await;
1845    ///
1846    ///     // in the previous sampling interval, there were 2 slow polls
1847    ///     assert_eq!(next_interval().total_slow_poll_count, 2);
1848    ///
1849    ///     // across all sampling intervals, there were a total of 5 slow polls
1850    ///     assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1851    /// }
1852    ///
1853    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1854    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1855    ///     let start = tokio::time::Instant::now();
1856    ///     while start.elapsed() <= duration {}
1857    ///     tokio::task::yield_now()
1858    /// }
1859    /// ```
1860    pub fn intervals(&self) -> TaskIntervals {
1861        TaskIntervals {
1862            metrics: self.metrics.clone(),
1863            previous: None,
1864        }
1865    }
1866}
1867
1868impl RawMetrics {
1869    fn get_and_reset_local_max_idle_duration(&self) -> Duration {
1870        Duration::from_nanos(self.local_max_idle_duration_ns.swap(0, SeqCst))
1871    }
1872
1873    fn metrics(&self) -> TaskMetrics {
1874        let total_fast_poll_count = self.total_fast_poll_count.load(SeqCst);
1875        let total_slow_poll_count = self.total_slow_poll_count.load(SeqCst);
1876
1877        let total_fast_poll_duration =
1878            Duration::from_nanos(self.total_fast_poll_duration_ns.load(SeqCst));
1879        let total_slow_poll_duration =
1880            Duration::from_nanos(self.total_slow_poll_duration.load(SeqCst));
1881
1882        let total_poll_count = total_fast_poll_count + total_slow_poll_count;
1883        let total_poll_duration = total_fast_poll_duration + total_slow_poll_duration;
1884
1885        TaskMetrics {
1886            instrumented_count: self.instrumented_count.load(SeqCst),
1887            dropped_count: self.dropped_count.load(SeqCst),
1888
1889            total_poll_count,
1890            total_poll_duration,
1891            first_poll_count: self.first_poll_count.load(SeqCst),
1892            total_idled_count: self.total_idled_count.load(SeqCst),
1893            total_scheduled_count: self.total_scheduled_count.load(SeqCst),
1894            total_fast_poll_count: self.total_fast_poll_count.load(SeqCst),
1895            total_slow_poll_count: self.total_slow_poll_count.load(SeqCst),
1896            total_short_delay_count: self.total_short_delay_count.load(SeqCst),
1897            total_long_delay_count: self.total_long_delay_count.load(SeqCst),
1898            total_first_poll_delay: Duration::from_nanos(
1899                self.total_first_poll_delay_ns.load(SeqCst),
1900            ),
1901            max_idle_duration: Duration::from_nanos(self.global_max_idle_duration_ns.load(SeqCst)),
1902            total_idle_duration: Duration::from_nanos(self.total_idle_duration_ns.load(SeqCst)),
1903            total_scheduled_duration: Duration::from_nanos(
1904                self.total_scheduled_duration_ns.load(SeqCst),
1905            ),
1906            total_fast_poll_duration: Duration::from_nanos(
1907                self.total_fast_poll_duration_ns.load(SeqCst),
1908            ),
1909            total_slow_poll_duration: Duration::from_nanos(
1910                self.total_slow_poll_duration.load(SeqCst),
1911            ),
1912            total_short_delay_duration: Duration::from_nanos(
1913                self.total_short_delay_duration_ns.load(SeqCst),
1914            ),
1915            total_long_delay_duration: Duration::from_nanos(
1916                self.total_long_delay_duration_ns.load(SeqCst),
1917            ),
1918        }
1919    }
1920}
1921
1922impl Default for TaskMonitor {
1923    fn default() -> TaskMonitor {
1924        TaskMonitor::new()
1925    }
1926}
1927
1928impl TaskMetrics {
1929    /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
1930    /// are first polled.
1931    ///
1932    /// ##### Definition
1933    /// This metric is derived from [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay]
1934    /// ÷ [`first_poll_count`][TaskMetrics::first_poll_count].
1935    ///
1936    /// ##### Interpretation
1937    /// If this metric increases, it means that, on average, tasks spent longer waiting to be
1938    /// initially polled.
1939    ///
1940    /// ##### See also
1941    /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**   
1942    ///   The mean duration that tasks spent waiting to be executed after awakening.
1943    ///
1944    /// ##### Examples
1945    /// In the below example, no tasks are instrumented or polled within the first sampling
1946    /// interval; in the second sampling interval, 500ms elapse between the instrumentation of a
1947    /// task and its first poll; in the third sampling interval, a mean of 750ms elapse between the
1948    /// instrumentation and first poll of two tasks:
1949    /// ```
1950    /// use std::time::Duration;
1951    ///
1952    /// #[tokio::main]
1953    /// async fn main() {
1954    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1955    ///     let mut interval = metrics_monitor.intervals();
1956    ///     let mut next_interval = || interval.next().unwrap();
1957    ///
1958    ///     // no tasks have yet been created, instrumented, or polled
1959    ///     assert_eq!(next_interval().mean_first_poll_delay(), Duration::ZERO);
1960    ///
1961    ///     // constructs and instruments a task, pauses for `pause_time`, awaits the task, then
1962    ///     // produces the total time it took to do all of the aforementioned
1963    ///     async fn instrument_pause_await(
1964    ///         metrics_monitor: &tokio_metrics::TaskMonitor,
1965    ///         pause_time: Duration
1966    ///     ) -> Duration
1967    ///     {
1968    ///         let before_instrumentation = tokio::time::Instant::now();
1969    ///         let task = metrics_monitor.instrument(async move {});
1970    ///         tokio::time::sleep(pause_time).await;
1971    ///         task.await;
1972    ///         before_instrumentation.elapsed()
1973    ///     }
1974    ///
1975    ///     // construct and await a task that pauses for 500ms between instrumentation and first poll
1976    ///     let task_a_pause_time = Duration::from_millis(500);
1977    ///     let task_a_total_time = instrument_pause_await(&metrics_monitor, task_a_pause_time).await;
1978    ///
1979    ///     // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
1980    ///     // pause time of 500ms, and less-than-or-equal-to the total runtime of `task_a`
1981    ///     let mean_first_poll_delay = next_interval().mean_first_poll_delay();
1982    ///     assert!(mean_first_poll_delay >= task_a_pause_time);
1983    ///     assert!(mean_first_poll_delay <= task_a_total_time);
1984    ///
1985    ///     // construct and await a task that pauses for 500ms between instrumentation and first poll
1986    ///     let task_b_pause_time = Duration::from_millis(500);
1987    ///     let task_b_total_time = instrument_pause_await(&metrics_monitor, task_b_pause_time).await;
1988    ///
1989    ///     // construct and await a task that pauses for 1000ms between instrumentation and first poll
1990    ///     let task_c_pause_time = Duration::from_millis(1000);
1991    ///     let task_c_total_time = instrument_pause_await(&metrics_monitor, task_c_pause_time).await;
1992    ///
1993    ///     // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
1994    ///     // average pause time of 500ms, and less-than-or-equal-to the combined total runtime of
1995    ///     // `task_b` and `task_c`
1996    ///     let mean_first_poll_delay = next_interval().mean_first_poll_delay();
1997    ///     assert!(mean_first_poll_delay >= (task_b_pause_time + task_c_pause_time) / 2);
1998    ///     assert!(mean_first_poll_delay <= (task_b_total_time + task_c_total_time) / 2);
1999    /// }
2000    /// ```
2001    pub fn mean_first_poll_delay(&self) -> Duration {
2002        mean(self.total_first_poll_delay, self.first_poll_count)
2003    }
2004
2005    /// The mean duration of idles.
2006    ///
2007    /// ##### Definition
2008    /// This metric is derived from [`total_idle_duration`][TaskMetrics::total_idle_duration] ÷
2009    /// [`total_idled_count`][TaskMetrics::total_idled_count].
2010    ///
2011    /// ##### Interpretation
2012    /// The idle state is the duration spanning the instant a task completes a poll, and the instant
2013    /// that it is next awoken. Tasks inhabit this state when they are waiting for task-external
2014    /// events to complete (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this
2015    /// metric increases, it means that tasks, in aggregate, spent more time waiting for
2016    /// task-external events to complete.
2017    ///
2018    /// ##### Examples
2019    /// ```
2020    /// #[tokio::main]
2021    /// async fn main() {
2022    ///     let monitor = tokio_metrics::TaskMonitor::new();
2023    ///     let one_sec = std::time::Duration::from_secs(1);
2024    ///
2025    ///     monitor.instrument(async move {
2026    ///         tokio::time::sleep(one_sec).await;
2027    ///     }).await;
2028    ///
2029    ///     assert!(monitor.cumulative().mean_idle_duration() >= one_sec);
2030    /// }
2031    /// ```
2032    pub fn mean_idle_duration(&self) -> Duration {
2033        mean(self.total_idle_duration, self.total_idled_count)
2034    }
2035
2036    /// The mean duration that tasks spent waiting to be executed after awakening.
2037    ///
2038    /// ##### Definition
2039    /// This metric is derived from
2040    /// [`total_scheduled_duration`][TaskMetrics::total_scheduled_duration] ÷
2041    /// [`total_scheduled_count`][`TaskMetrics::total_scheduled_count`].
2042    ///
2043    /// ##### Interpretation
2044    /// If this metric increases, it means that, on average, tasks spent longer in the runtime's
2045    /// queues before being polled.
2046    ///
2047    /// ##### See also
2048    /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**   
2049    ///   The mean duration elapsed between the instant tasks are instrumented, and the instant they
2050    ///   are first polled.
2051    ///
2052    /// ##### Examples
2053    /// ```
2054    /// use tokio::time::Duration;
2055    ///
2056    /// #[tokio::main(flavor = "current_thread")]
2057    /// async fn main() {
2058    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2059    ///     let mut interval = metrics_monitor.intervals();
2060    ///     let mut next_interval = || interval.next().unwrap();
2061    ///
2062    ///     // construct and instrument and spawn a task that yields endlessly
2063    ///     tokio::spawn(metrics_monitor.instrument(async {
2064    ///         loop { tokio::task::yield_now().await }
2065    ///     }));
2066    ///
2067    ///     tokio::task::yield_now().await;
2068    ///
2069    ///     // block the executor for 1 second
2070    ///     std::thread::sleep(Duration::from_millis(1000));
2071    ///
2072    ///     // get the task to run twice
2073    ///     // the first will have a 1 sec scheduling delay, the second will have almost none
2074    ///     tokio::task::yield_now().await;
2075    ///     tokio::task::yield_now().await;
2076    ///
2077    ///     // `endless_task` will have spent approximately one second waiting
2078    ///     let mean_scheduled_duration = next_interval().mean_scheduled_duration();
2079    ///     assert!(mean_scheduled_duration >= Duration::from_millis(500), "{}", mean_scheduled_duration.as_secs_f64());
2080    ///     assert!(mean_scheduled_duration <= Duration::from_millis(600), "{}", mean_scheduled_duration.as_secs_f64());
2081    /// }
2082    /// ```
2083    pub fn mean_scheduled_duration(&self) -> Duration {
2084        mean(self.total_scheduled_duration, self.total_scheduled_count)
2085    }
2086
2087    /// The mean duration of polls.
2088    ///
2089    /// ##### Definition
2090    /// This metric is derived from [`total_poll_duration`][TaskMetrics::total_poll_duration] ÷
2091    /// [`total_poll_count`][TaskMetrics::total_poll_count].
2092    ///
2093    /// ##### Interpretation
2094    /// If this metric increases, it means that, on average, individual polls are tending to take
2095    /// longer. However, this does not necessarily imply increased task latency: An increase in poll
2096    /// durations could be offset by fewer polls.
2097    ///
2098    /// ##### See also
2099    /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**   
2100    ///   The ratio between the number polls categorized as slow and fast.
2101    /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**   
2102    ///   The mean duration of slow polls.
2103    ///
2104    /// ##### Examples
2105    /// ```
2106    /// use std::time::Duration;
2107    ///
2108    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
2109    /// async fn main() {
2110    ///     let monitor = tokio_metrics::TaskMonitor::new();
2111    ///     let mut interval = monitor.intervals();
2112    ///     let mut next_interval = move || interval.next().unwrap();
2113    ///  
2114    ///     assert_eq!(next_interval().mean_poll_duration(), Duration::ZERO);
2115    ///  
2116    ///     monitor.instrument(async {
2117    ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
2118    ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
2119    ///         ()                                                  // poll 3 (0s)
2120    ///     }).await;
2121    ///  
2122    ///     assert_eq!(next_interval().mean_poll_duration(), Duration::from_secs(2) / 3);
2123    /// }
2124    /// ```
2125    pub fn mean_poll_duration(&self) -> Duration {
2126        mean(self.total_poll_duration, self.total_poll_count)
2127    }
2128
2129    /// The ratio between the number polls categorized as slow and fast.
2130    ///
2131    /// ##### Definition
2132    /// This metric is derived from [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count] ÷
2133    /// [`total_poll_count`][TaskMetrics::total_poll_count].
2134    ///
2135    /// ##### Interpretation
2136    /// If this metric increases, it means that a greater proportion of polls took excessively long
2137    /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2138    /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2139    /// However, as a rule, *should* yield to the scheduler frequently.
2140    ///
2141    /// ##### See also
2142    /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**   
2143    ///   The mean duration of polls.
2144    /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**   
2145    ///   The mean duration of slow polls.
2146    ///
2147    /// ##### Examples
2148    /// Changes in this metric may be observed by varying the ratio of slow and slow fast within
2149    /// sampling intervals; for instance:
2150    /// ```
2151    /// use std::future::Future;
2152    /// use std::time::Duration;
2153    ///
2154    /// #[tokio::main]
2155    /// async fn main() {
2156    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2157    ///     let mut interval = metrics_monitor.intervals();
2158    ///     let mut next_interval = || interval.next().unwrap();
2159    ///
2160    ///     // no tasks have been constructed, instrumented, or polled
2161    ///     let interval = next_interval();
2162    ///     assert_eq!(interval.total_fast_poll_count, 0);
2163    ///     assert_eq!(interval.total_slow_poll_count, 0);
2164    ///     assert!(interval.slow_poll_ratio().is_nan());
2165    ///
2166    ///     let fast = Duration::ZERO;
2167    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
2168    ///
2169    ///     // this task completes in three fast polls
2170    ///     metrics_monitor.instrument(async {
2171    ///         spin_for(fast).await;   // fast poll 1
2172    ///         spin_for(fast).await;   // fast poll 2
2173    ///         spin_for(fast);         // fast poll 3
2174    ///     }).await;
2175    ///
2176    ///     // this task completes in two slow polls
2177    ///     metrics_monitor.instrument(async {
2178    ///         spin_for(slow).await;   // slow poll 1
2179    ///         spin_for(slow);         // slow poll 2
2180    ///     }).await;
2181    ///
2182    ///     let interval = next_interval();
2183    ///     assert_eq!(interval.total_fast_poll_count, 3);
2184    ///     assert_eq!(interval.total_slow_poll_count, 2);
2185    ///     assert_eq!(interval.slow_poll_ratio(), ratio(2., 3.));
2186    ///
2187    ///     // this task completes in three slow polls
2188    ///     metrics_monitor.instrument(async {
2189    ///         spin_for(slow).await;   // slow poll 1
2190    ///         spin_for(slow).await;   // slow poll 2
2191    ///         spin_for(slow);         // slow poll 3
2192    ///     }).await;
2193    ///
2194    ///     // this task completes in two fast polls
2195    ///     metrics_monitor.instrument(async {
2196    ///         spin_for(fast).await; // fast poll 1
2197    ///         spin_for(fast);       // fast poll 2
2198    ///     }).await;
2199    ///
2200    ///     let interval = next_interval();
2201    ///     assert_eq!(interval.total_fast_poll_count, 2);
2202    ///     assert_eq!(interval.total_slow_poll_count, 3);
2203    ///     assert_eq!(interval.slow_poll_ratio(), ratio(3., 2.));
2204    /// }
2205    ///
2206    /// fn ratio(a: f64, b: f64) -> f64 {
2207    ///     a / (a + b)
2208    /// }
2209    ///
2210    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2211    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2212    ///     let start = tokio::time::Instant::now();
2213    ///     while start.elapsed() <= duration {}
2214    ///     tokio::task::yield_now()
2215    /// }
2216    /// ```
2217    pub fn slow_poll_ratio(&self) -> f64 {
2218        self.total_slow_poll_count as f64 / self.total_poll_count as f64
2219    }
2220
2221    /// The ratio of tasks exceeding [`long_delay_threshold`][TaskMonitor::long_delay_threshold].
2222    ///
2223    /// ##### Definition
2224    /// This metric is derived from [`total_long_delay_count`][TaskMetrics::total_long_delay_count] ÷
2225    /// [`total_scheduled_count`][TaskMetrics::total_scheduled_count].
2226    pub fn long_delay_ratio(&self) -> f64 {
2227        self.total_long_delay_count as f64 / self.total_scheduled_count as f64
2228    }
2229
2230    /// The mean duration of fast polls.
2231    ///
2232    /// ##### Definition
2233    /// This metric is derived from
2234    /// [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration] ÷
2235    /// [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count].
2236    ///
2237    /// ##### Examples
2238    /// In the below example, no tasks are polled in the first sampling interval; three fast polls
2239    /// consume a mean of
2240    /// ⅜ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2241    /// second sampling interval; and two fast polls consume a total of
2242    /// ½ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2243    /// third sampling interval:
2244    /// ```
2245    /// use std::future::Future;
2246    /// use std::time::Duration;
2247    ///
2248    /// #[tokio::main]
2249    /// async fn main() {
2250    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2251    ///     let mut interval = metrics_monitor.intervals();
2252    ///     let mut next_interval = || interval.next().unwrap();
2253    ///
2254    ///     // no tasks have been constructed, instrumented, or polled
2255    ///     assert_eq!(next_interval().mean_fast_poll_duration(), Duration::ZERO);
2256    ///
2257    ///     let threshold = metrics_monitor.slow_poll_threshold();
2258    ///     let fast_1 = 1 * Duration::from_micros(1);
2259    ///     let fast_2 = 2 * Duration::from_micros(1);
2260    ///     let fast_3 = 3 * Duration::from_micros(1);
2261    ///
2262    ///     // this task completes in two fast polls
2263    ///     let total_time = time(metrics_monitor.instrument(async {
2264    ///         spin_for(fast_1).await; // fast poll 1
2265    ///         spin_for(fast_2)        // fast poll 2
2266    ///     })).await;
2267    ///
2268    ///     // `mean_fast_poll_duration` ≈ the mean of `fast_1` and `fast_2`
2269    ///     let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2270    ///     assert!(mean_fast_poll_duration >= (fast_1 + fast_2) / 2);
2271    ///     assert!(mean_fast_poll_duration <= total_time / 2);
2272    ///
2273    ///     // this task completes in three fast polls
2274    ///     let total_time = time(metrics_monitor.instrument(async {
2275    ///         spin_for(fast_1).await; // fast poll 1
2276    ///         spin_for(fast_2).await; // fast poll 2
2277    ///         spin_for(fast_3)        // fast poll 3
2278    ///     })).await;
2279    ///
2280    ///     // `mean_fast_poll_duration` ≈ the mean of `fast_1`, `fast_2`, `fast_3`
2281    ///     let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2282    ///     assert!(mean_fast_poll_duration >= (fast_1 + fast_2 + fast_3) / 3);
2283    ///     assert!(mean_fast_poll_duration <= total_time / 3);
2284    /// }
2285    ///
2286    /// /// Produces the amount of time it took to await a given task.
2287    /// async fn time(task: impl Future) -> Duration {
2288    ///     let start = tokio::time::Instant::now();
2289    ///     task.await;
2290    ///     start.elapsed()
2291    /// }
2292    ///
2293    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2294    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2295    ///     let start = tokio::time::Instant::now();
2296    ///     while start.elapsed() <= duration {}
2297    ///     tokio::task::yield_now()
2298    /// }
2299    /// ```
2300    pub fn mean_fast_poll_duration(&self) -> Duration {
2301        mean(self.total_fast_poll_duration, self.total_fast_poll_count)
2302    }
2303
2304    /// The average time taken for a task with a short scheduling delay to be executed after being
2305    /// scheduled.
2306    ///
2307    /// ##### Definition
2308    /// This metric is derived from
2309    /// [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration] ÷
2310    /// [`total_short_delay_count`][TaskMetrics::total_short_delay_count].
2311    pub fn mean_short_delay_duration(&self) -> Duration {
2312        mean(
2313            self.total_short_delay_duration,
2314            self.total_short_delay_count,
2315        )
2316    }
2317
2318    /// The mean duration of slow polls.
2319    ///
2320    /// ##### Definition
2321    /// This metric is derived from
2322    /// [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration] ÷
2323    /// [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
2324    ///
2325    /// ##### Interpretation
2326    /// If this metric increases, it means that a greater proportion of polls took excessively long
2327    /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2328    /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2329    ///
2330    /// ##### See also
2331    /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**   
2332    ///   The mean duration of polls.
2333    /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**   
2334    ///   The ratio between the number polls categorized as slow and fast.
2335    ///
2336    /// ##### Interpretation
2337    /// If this metric increases, it means that, on average, slow polls got even slower. This does
2338    /// necessarily imply increased task latency: An increase in average slow poll duration could be
2339    /// offset by fewer or faster polls. However, as a rule, *should* yield to the scheduler
2340    /// frequently.
2341    ///
2342    /// ##### Examples
2343    /// In the below example, no tasks are polled in the first sampling interval; three slow polls
2344    /// consume a mean of
2345    /// 1.5 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2346    /// second sampling interval; and two slow polls consume a total of
2347    /// 2 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2348    /// third sampling interval:
2349    /// ```
2350    /// use std::future::Future;
2351    /// use std::time::Duration;
2352    ///
2353    /// #[tokio::main]
2354    /// async fn main() {
2355    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2356    ///     let mut interval = metrics_monitor.intervals();
2357    ///     let mut next_interval = || interval.next().unwrap();
2358    ///
2359    ///     // no tasks have been constructed, instrumented, or polled
2360    ///     assert_eq!(next_interval().mean_slow_poll_duration(), Duration::ZERO);
2361    ///
2362    ///     let threshold = metrics_monitor.slow_poll_threshold();
2363    ///     let slow_1 = 1 * threshold;
2364    ///     let slow_2 = 2 * threshold;
2365    ///     let slow_3 = 3 * threshold;
2366    ///
2367    ///     // this task completes in two slow polls
2368    ///     let total_time = time(metrics_monitor.instrument(async {
2369    ///         spin_for(slow_1).await; // slow poll 1
2370    ///         spin_for(slow_2)        // slow poll 2
2371    ///     })).await;
2372    ///
2373    ///     // `mean_slow_poll_duration` ≈ the mean of `slow_1` and `slow_2`
2374    ///     let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2375    ///     assert!(mean_slow_poll_duration >= (slow_1 + slow_2) / 2);
2376    ///     assert!(mean_slow_poll_duration <= total_time / 2);
2377    ///
2378    ///     // this task completes in three slow polls
2379    ///     let total_time = time(metrics_monitor.instrument(async {
2380    ///         spin_for(slow_1).await; // slow poll 1
2381    ///         spin_for(slow_2).await; // slow poll 2
2382    ///         spin_for(slow_3)        // slow poll 3
2383    ///     })).await;
2384    ///
2385    ///     // `mean_slow_poll_duration` ≈ the mean of `slow_1`, `slow_2`, `slow_3`
2386    ///     let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2387    ///     assert!(mean_slow_poll_duration >= (slow_1 + slow_2 + slow_3) / 3);
2388    ///     assert!(mean_slow_poll_duration <= total_time / 3);
2389    /// }
2390    ///
2391    /// /// Produces the amount of time it took to await a given task.
2392    /// async fn time(task: impl Future) -> Duration {
2393    ///     let start = tokio::time::Instant::now();
2394    ///     task.await;
2395    ///     start.elapsed()
2396    /// }
2397    ///
2398    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2399    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2400    ///     let start = tokio::time::Instant::now();
2401    ///     while start.elapsed() <= duration {}
2402    ///     tokio::task::yield_now()
2403    /// }
2404    /// ```
2405    pub fn mean_slow_poll_duration(&self) -> Duration {
2406        mean(self.total_slow_poll_duration, self.total_slow_poll_count)
2407    }
2408
2409    /// The average scheduling delay for a task which takes a long time to start executing after
2410    /// being scheduled.
2411    ///
2412    /// ##### Definition
2413    /// This metric is derived from
2414    /// [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration] ÷
2415    /// [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
2416    pub fn mean_long_delay_duration(&self) -> Duration {
2417        mean(self.total_long_delay_duration, self.total_long_delay_count)
2418    }
2419}
2420
2421impl<T: Future> Future for Instrumented<T> {
2422    type Output = T::Output;
2423
2424    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2425        instrument_poll(cx, self, Future::poll)
2426    }
2427}
2428
2429impl<T: Stream> Stream for Instrumented<T> {
2430    type Item = T::Item;
2431
2432    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2433        instrument_poll(cx, self, Stream::poll_next)
2434    }
2435}
2436
2437fn instrument_poll<T, Out>(
2438    cx: &mut Context<'_>,
2439    instrumented: Pin<&mut Instrumented<T>>,
2440    poll_fn: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Out>,
2441) -> Poll<Out> {
2442    let poll_start = Instant::now();
2443    let this = instrumented.project();
2444    let idled_at = this.idled_at;
2445    let state = this.state;
2446    let instrumented_at = state.instrumented_at;
2447    let metrics = &state.metrics;
2448    /* accounting for time-to-first-poll and tasks-count */
2449    // is this the first time this task has been polled?
2450    if !*this.did_poll_once {
2451        // if so, we need to do three things:
2452        /* 1. note that this task *has* been polled */
2453        *this.did_poll_once = true;
2454
2455        /* 2. account for the time-to-first-poll of this task */
2456        // if the time-to-first-poll of this task exceeds `u64::MAX` ns,
2457        // round down to `u64::MAX` nanoseconds
2458        let elapsed = (poll_start - instrumented_at)
2459            .as_nanos()
2460            .try_into()
2461            .unwrap_or(u64::MAX);
2462        // add this duration to `time_to_first_poll_ns_total`
2463        metrics.total_first_poll_delay_ns.fetch_add(elapsed, SeqCst);
2464
2465        /* 3. increment the count of tasks that have been polled at least once */
2466        state.metrics.first_poll_count.fetch_add(1, SeqCst);
2467    }
2468    /* accounting for time-idled and time-scheduled */
2469    // 1. note (and reset) the instant this task was last awoke
2470    let woke_at = state.woke_at.swap(0, SeqCst);
2471    // The state of a future is *idling* in the interim between the instant
2472    // it completes a `poll`, and the instant it is next awoken.
2473    if *idled_at < woke_at {
2474        // increment the counter of how many idles occurred
2475        metrics.total_idled_count.fetch_add(1, SeqCst);
2476
2477        // compute the duration of the idle
2478        let idle_ns = woke_at - *idled_at;
2479
2480        // update the max time tasks spent idling, both locally and
2481        // globally.
2482        metrics
2483            .local_max_idle_duration_ns
2484            .fetch_max(idle_ns, SeqCst);
2485        metrics
2486            .global_max_idle_duration_ns
2487            .fetch_max(idle_ns, SeqCst);
2488        // adjust the total elapsed time monitored tasks spent idling
2489        metrics.total_idle_duration_ns.fetch_add(idle_ns, SeqCst);
2490    }
2491    // if this task spent any time in the scheduled state after instrumentation,
2492    // and after first poll, `woke_at` will be greater than 0.
2493    if woke_at > 0 {
2494        // increment the counter of how many schedules occurred
2495        metrics.total_scheduled_count.fetch_add(1, SeqCst);
2496
2497        // recall that the `woke_at` field is internally represented as
2498        // nanoseconds-since-instrumentation. here, for accounting purposes,
2499        // we need to instead represent it as a proper `Instant`.
2500        let woke_instant = instrumented_at + Duration::from_nanos(woke_at);
2501
2502        // the duration this task spent scheduled is time time elapsed between
2503        // when this task was awoke, and when it was polled.
2504        let scheduled_ns = (poll_start - woke_instant)
2505            .as_nanos()
2506            .try_into()
2507            .unwrap_or(u64::MAX);
2508
2509        let scheduled = Duration::from_nanos(scheduled_ns);
2510
2511        let (count_bucket, duration_bucket) = // was the scheduling delay long or short?
2512            if scheduled >= metrics.long_delay_threshold {
2513                (&metrics.total_long_delay_count, &metrics.total_long_delay_duration_ns)
2514            } else {
2515                (&metrics.total_short_delay_count, &metrics.total_short_delay_duration_ns)
2516            };
2517        // update the appropriate bucket
2518        count_bucket.fetch_add(1, SeqCst);
2519        duration_bucket.fetch_add(scheduled_ns, SeqCst);
2520
2521        // add `scheduled_ns` to the Monitor's total
2522        metrics
2523            .total_scheduled_duration_ns
2524            .fetch_add(scheduled_ns, SeqCst);
2525    }
2526    // Register the waker
2527    state.waker.register(cx.waker());
2528    // Get the instrumented waker
2529    let waker_ref = futures_util::task::waker_ref(state);
2530    let mut cx = Context::from_waker(&waker_ref);
2531    // Poll the task
2532    let inner_poll_start = Instant::now();
2533    let ret = poll_fn(this.task, &mut cx);
2534    let inner_poll_end = Instant::now();
2535    /* idle time starts now */
2536    *idled_at = (inner_poll_end - instrumented_at)
2537        .as_nanos()
2538        .try_into()
2539        .unwrap_or(u64::MAX);
2540    /* accounting for poll time */
2541    let inner_poll_duration = inner_poll_end - inner_poll_start;
2542    let inner_poll_ns: u64 = inner_poll_duration
2543        .as_nanos()
2544        .try_into()
2545        .unwrap_or(u64::MAX);
2546    let (count_bucket, duration_bucket) = // was this a slow or fast poll?
2547            if inner_poll_duration >= metrics.slow_poll_threshold {
2548                (&metrics.total_slow_poll_count, &metrics.total_slow_poll_duration)
2549            } else {
2550                (&metrics.total_fast_poll_count, &metrics.total_fast_poll_duration_ns)
2551            };
2552    // update the appropriate bucket
2553    count_bucket.fetch_add(1, SeqCst);
2554    duration_bucket.fetch_add(inner_poll_ns, SeqCst);
2555    ret
2556}
2557
2558impl State {
2559    fn on_wake(&self) {
2560        let woke_at: u64 = match self.instrumented_at.elapsed().as_nanos().try_into() {
2561            Ok(woke_at) => woke_at,
2562            // This is highly unlikely as it would mean the task ran for over
2563            // 500 years. If you ran your service for 500 years. If you are
2564            // reading this 500 years in the future, I'm sorry.
2565            Err(_) => return,
2566        };
2567
2568        // We don't actually care about the result
2569        let _ = self.woke_at.compare_exchange(0, woke_at, SeqCst, SeqCst);
2570    }
2571}
2572
2573impl ArcWake for State {
2574    fn wake_by_ref(arc_self: &Arc<State>) {
2575        arc_self.on_wake();
2576        arc_self.waker.wake();
2577    }
2578
2579    fn wake(self: Arc<State>) {
2580        self.on_wake();
2581        self.waker.wake();
2582    }
2583}
2584
2585/// Iterator returned by [`TaskMonitor::intervals`].
2586///
2587/// See that method's documentation for more details.
2588#[derive(Debug)]
2589pub struct TaskIntervals {
2590    metrics: Arc<RawMetrics>,
2591    previous: Option<TaskMetrics>,
2592}
2593
2594impl TaskIntervals {
2595    fn probe(&mut self) -> TaskMetrics {
2596        let latest = self.metrics.metrics();
2597        let local_max_idle_duration = self.metrics.get_and_reset_local_max_idle_duration();
2598
2599        let next = if let Some(previous) = self.previous {
2600            TaskMetrics {
2601                instrumented_count: latest
2602                    .instrumented_count
2603                    .wrapping_sub(previous.instrumented_count),
2604                dropped_count: latest.dropped_count.wrapping_sub(previous.dropped_count),
2605                total_poll_count: latest
2606                    .total_poll_count
2607                    .wrapping_sub(previous.total_poll_count),
2608                total_poll_duration: sub(latest.total_poll_duration, previous.total_poll_duration),
2609                first_poll_count: latest
2610                    .first_poll_count
2611                    .wrapping_sub(previous.first_poll_count),
2612                total_idled_count: latest
2613                    .total_idled_count
2614                    .wrapping_sub(previous.total_idled_count),
2615                total_scheduled_count: latest
2616                    .total_scheduled_count
2617                    .wrapping_sub(previous.total_scheduled_count),
2618                total_fast_poll_count: latest
2619                    .total_fast_poll_count
2620                    .wrapping_sub(previous.total_fast_poll_count),
2621                total_short_delay_count: latest
2622                    .total_short_delay_count
2623                    .wrapping_sub(previous.total_short_delay_count),
2624                total_slow_poll_count: latest
2625                    .total_slow_poll_count
2626                    .wrapping_sub(previous.total_slow_poll_count),
2627                total_long_delay_count: latest
2628                    .total_long_delay_count
2629                    .wrapping_sub(previous.total_long_delay_count),
2630                total_first_poll_delay: sub(
2631                    latest.total_first_poll_delay,
2632                    previous.total_first_poll_delay,
2633                ),
2634                max_idle_duration: local_max_idle_duration,
2635                total_idle_duration: sub(latest.total_idle_duration, previous.total_idle_duration),
2636                total_scheduled_duration: sub(
2637                    latest.total_scheduled_duration,
2638                    previous.total_scheduled_duration,
2639                ),
2640                total_fast_poll_duration: sub(
2641                    latest.total_fast_poll_duration,
2642                    previous.total_fast_poll_duration,
2643                ),
2644                total_short_delay_duration: sub(
2645                    latest.total_short_delay_duration,
2646                    previous.total_short_delay_duration,
2647                ),
2648                total_slow_poll_duration: sub(
2649                    latest.total_slow_poll_duration,
2650                    previous.total_slow_poll_duration,
2651                ),
2652                total_long_delay_duration: sub(
2653                    latest.total_long_delay_duration,
2654                    previous.total_long_delay_duration,
2655                ),
2656            }
2657        } else {
2658            latest
2659        };
2660
2661        self.previous = Some(latest);
2662
2663        next
2664    }
2665}
2666
2667impl Iterator for TaskIntervals {
2668    type Item = TaskMetrics;
2669
2670    fn next(&mut self) -> Option<Self::Item> {
2671        Some(self.probe())
2672    }
2673}
2674
2675#[inline(always)]
2676fn to_nanos(d: Duration) -> u64 {
2677    debug_assert!(d <= Duration::from_nanos(u64::MAX));
2678    d.as_secs()
2679        .wrapping_mul(1_000_000_000)
2680        .wrapping_add(d.subsec_nanos() as u64)
2681}
2682
2683#[inline(always)]
2684fn sub(a: Duration, b: Duration) -> Duration {
2685    let nanos = to_nanos(a).wrapping_sub(to_nanos(b));
2686    Duration::from_nanos(nanos)
2687}
2688
2689#[inline(always)]
2690fn mean(d: Duration, count: u64) -> Duration {
2691    if let Some(quotient) = to_nanos(d).checked_div(count) {
2692        Duration::from_nanos(quotient)
2693    } else {
2694        Duration::ZERO
2695    }
2696}