tokio_metrics/
task.rs

1use futures_util::task::{ArcWake, AtomicWaker};
2use pin_project_lite::pin_project;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
6use std::sync::Arc;
7use std::task::{Context, Poll};
8use tokio_stream::Stream;
9
10#[cfg(feature = "rt")]
11use tokio::time::{Duration, Instant};
12
13#[cfg(not(feature = "rt"))]
14use std::time::{Duration, Instant};
15
16/// Monitors key metrics of instrumented tasks.
17///
18/// ### Basic Usage
19/// A [`TaskMonitor`] tracks key [metrics][TaskMetrics] of async tasks that have been
20/// [instrumented][`TaskMonitor::instrument`] with the monitor.
21///
22/// In the below example, a [`TaskMonitor`] is [constructed][TaskMonitor::new] and used to
23/// [instrument][TaskMonitor::instrument] three worker tasks; meanwhile, a fourth task
24/// prints [metrics][TaskMetrics] in 500ms [intervals][TaskMonitor::intervals].
25/// ```
26/// use std::time::Duration;
27///
28/// #[tokio::main]
29/// async fn main() {
30///     // construct a metrics monitor
31///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
32///
33///     // print task metrics every 500ms
34///     {
35///         let metrics_monitor = metrics_monitor.clone();
36///         tokio::spawn(async move {
37///             for interval in metrics_monitor.intervals() {
38///                 // pretty-print the metric interval
39///                 println!("{:?}", interval);
40///                 // wait 500ms
41///                 tokio::time::sleep(Duration::from_millis(500)).await;
42///             }
43///         });
44///     }
45///
46///     // instrument some tasks and await them
47///     // note that the same TaskMonitor can be used for multiple tasks
48///     tokio::join![
49///         metrics_monitor.instrument(do_work()),
50///         metrics_monitor.instrument(do_work()),
51///         metrics_monitor.instrument(do_work())
52///     ];
53/// }
54///
55/// async fn do_work() {
56///     for _ in 0..25 {
57///         tokio::task::yield_now().await;
58///         tokio::time::sleep(Duration::from_millis(100)).await;
59///     }
60/// }
61/// ```
62///
63/// ### What should I instrument?
64/// In most cases, you should construct a *distinct* [`TaskMonitor`] for each kind of key task.
65///
66/// #### Instrumenting a web application
67/// For instance, a web service should have a distinct [`TaskMonitor`] for each endpoint. Within
68/// each endpoint, it's prudent to additionally instrument major sub-tasks, each with their own
69/// distinct [`TaskMonitor`]s. [*Why are my tasks slow?*](#why-are-my-tasks-slow) explores a
70/// debugging scenario for a web service that takes this approach to instrumentation. This
71/// approach is exemplified in the below example:
72/// ```no_run
73/// // The unabridged version of this snippet is in the examples directory of this crate.
74///
75/// #[tokio::main]
76/// async fn main() {
77///     // construct a TaskMonitor for root endpoint
78///     let monitor_root = tokio_metrics::TaskMonitor::new();
79///
80///     // construct TaskMonitors for create_users endpoint
81///     let monitor_create_user = CreateUserMonitors {
82///         // monitor for the entire endpoint
83///         route: tokio_metrics::TaskMonitor::new(),
84///         // monitor for database insertion subtask
85///         insert: tokio_metrics::TaskMonitor::new(),
86///     };
87///
88///     // build our application with two instrumented endpoints
89///     let app = axum::Router::new()
90///         // `GET /` goes to `root`
91///         .route("/", axum::routing::get({
92///             let monitor = monitor_root.clone();
93///             move || monitor.instrument(async { "Hello, World!" })
94///         }))
95///         // `POST /users` goes to `create_user`
96///         .route("/users", axum::routing::post({
97///             let monitors = monitor_create_user.clone();
98///             let route = monitors.route.clone();
99///             move |payload| {
100///                 route.instrument(create_user(payload, monitors))
101///             }
102///         }));
103///
104///     // print task metrics for each endpoint every 1s
105///     let metrics_frequency = std::time::Duration::from_secs(1);
106///     tokio::spawn(async move {
107///         let root_intervals = monitor_root.intervals();
108///         let create_user_route_intervals =
109///             monitor_create_user.route.intervals();
110///         let create_user_insert_intervals =
111///             monitor_create_user.insert.intervals();
112///         let create_user_intervals =
113///             create_user_route_intervals.zip(create_user_insert_intervals);
114///
115///         let intervals = root_intervals.zip(create_user_intervals);
116///         for (root_route, (create_user_route, create_user_insert)) in intervals {
117///             println!("root_route = {:#?}", root_route);
118///             println!("create_user_route = {:#?}", create_user_route);
119///             println!("create_user_insert = {:#?}", create_user_insert);
120///             tokio::time::sleep(metrics_frequency).await;
121///         }
122///     });
123///
124///     // run the server
125///     let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 3000));
126///     let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
127///     axum::serve(listener, app)
128///         .await
129///         .unwrap();
130/// }
131///
132/// async fn create_user(
133///     axum::Json(payload): axum::Json<CreateUser>,
134///     monitors: CreateUserMonitors,
135/// ) -> impl axum::response::IntoResponse {
136///     let user = User { id: 1337, username: payload.username, };
137///     // instrument inserting the user into the db:
138///     let _ = monitors.insert.instrument(insert_user(user.clone())).await;
139///     (axum::http::StatusCode::CREATED, axum::Json(user))
140/// }
141///
142/// /* definitions of CreateUserMonitors, CreateUser and User omitted for brevity */
143///
144/// #
145/// # #[derive(Clone)]
146/// # struct CreateUserMonitors {
147/// #     // monitor for the entire endpoint
148/// #     route: tokio_metrics::TaskMonitor,
149/// #     // monitor for database insertion subtask
150/// #     insert: tokio_metrics::TaskMonitor,
151/// # }
152/// #
153/// # #[derive(serde::Deserialize)] struct CreateUser { username: String, }
154/// # #[derive(Clone, serde::Serialize)] struct User { id: u64, username: String, }
155/// #
156/// // insert the user into the database
157/// async fn insert_user(_: User) {
158///     /* implementation details elided */
159///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
160/// }
161/// ```
162///
163/// ### Why are my tasks slow?
164/// **Scenario:** You track key, high-level metrics about the customer response time. An alarm warns
165/// you that P90 latency for an endpoint exceeds your targets. What is causing the increase?
166///
167/// #### Identifying the high-level culprits
168/// A set of tasks will appear to execute more slowly if:
169/// - they are taking longer to poll (i.e., they consume too much CPU time)
170/// - they are waiting longer to be polled (e.g., they're waiting longer in tokio's scheduling
171///   queues)
172/// - they are waiting longer on external events to complete (e.g., asynchronous network requests)
173///
174/// The culprits, at a high level, may be some combination of these sources of latency. Fortunately,
175/// you have instrumented the key tasks of each of your endpoints with distinct [`TaskMonitor`]s.
176/// Using the monitors on the endpoint experiencing elevated latency, you begin by answering:
177/// - [*Are my tasks taking longer to poll?*](#are-my-tasks-taking-longer-to-poll)
178/// - [*Are my tasks spending more time waiting to be polled?*](#are-my-tasks-spending-more-time-waiting-to-be-polled)
179/// - [*Are my tasks spending more time waiting on external events to complete?*](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
180///
181/// ##### Are my tasks taking longer to poll?
182/// - **Did [`mean_poll_duration`][TaskMetrics::mean_poll_duration] increase?**   
183///   This metric reflects the mean poll duration. If it increased, it means that, on average,
184///   individual polls tended to take longer. However, this does not necessarily imply increased
185///   task latency: An increase in poll durations could be offset by fewer polls.
186/// - **Did [`slow_poll_ratio`][TaskMetrics::slow_poll_ratio] increase?**   
187///   This metric reflects the proportion of polls that were 'slow'. If it increased, it means that
188///   a greater proportion of polls performed excessive computation before yielding. This does not
189///   necessarily imply increased task latency: An increase in the proportion of slow polls could be
190///   offset by fewer or faster polls.
191/// - **Did [`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration] increase?**   
192///   This metric reflects the mean duration of slow polls. If it increased, it means that, on
193///   average, slow polls got slower. This does not necessarily imply increased task latency: An
194///   increase in average slow poll duration could be offset by fewer or faster polls.
195///
196/// If so, [*why are my tasks taking longer to poll?*](#why-are-my-tasks-taking-longer-to-poll)
197///
198/// ##### Are my tasks spending more time waiting to be polled?
199/// - **Did [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increase?**   
200///   This metric reflects the mean delay between the instant a task is first instrumented and the
201///   instant it is first polled. If it increases, it means that, on average, tasks spent longer
202///   waiting to be initially run.
203/// - **Did [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration] increase?**   
204///   This metric reflects the mean duration that tasks spent in the scheduled state. The
205///   'scheduled' state of a task is the duration between the instant a task is awoken and the
206///   instant it is subsequently polled. If this metric increases, it means that, on average, tasks
207///   spent longer in tokio's queues before being polled.
208/// - **Did [`long_delay_ratio`][TaskMetrics::long_delay_ratio] increase?**
209///   This metric reflects the proportion of scheduling delays which were 'long'. If it increased,
210///   it means that a greater proportion of tasks experienced excessive delays before they could
211///   execute after being woken. This does not necessarily indicate an increase in latency, as this
212///   could be offset by fewer or faster task polls.
213/// - **Did [`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration] increase?**
214///   This metric reflects the mean duration of long delays. If it increased, it means that, on
215///   average, long delays got even longer. This does not necessarily imply increased task latency:
216///   an increase in average long delay duration could be offset by fewer or faster polls or more
217///   short schedules.
218///
219/// If so, [*why are my tasks spending more time waiting to be polled?*](#why-are-my-tasks-spending-more-time-waiting-to-be-polled)
220///
221/// ##### Are my tasks spending more time waiting on external events to complete?
222/// - **Did [`mean_idle_duration`][TaskMetrics::mean_idle_duration] increase?**   
223///   This metric reflects the mean duration that tasks spent in the idle state. The idle state is
224///   the duration spanning the instant a task completes a poll, and the instant that it is next
225///   awoken. Tasks inhabit this state when they are waiting for task-external events to complete
226///   (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this metric increases,
227///   tasks, in aggregate, spent more time waiting for task-external events to complete.
228///
229/// If so, [*why are my tasks spending more time waiting on external events to complete?*](#why-are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
230///
231/// #### Digging deeper
232/// Having [established the high-level culprits](#identifying-the-high-level-culprits), you now
233/// search for further explanation...
234///
235/// ##### Why are my tasks taking longer to poll?
236/// You observed that [your tasks are taking longer to poll](#are-my-tasks-taking-longer-to-poll).
237/// The culprit is likely some combination of:
238/// - **Your tasks are accidentally blocking.** Common culprits include:
239///     1. Using the Rust standard library's [filesystem](https://doc.rust-lang.org/std/fs/) or
240///        [networking](https://doc.rust-lang.org/std/net/) APIs.
241///        These APIs are synchronous; use tokio's [filesystem](https://docs.rs/tokio/latest/tokio/fs/)
242///        and [networking](https://docs.rs/tokio/latest/tokio/net/) APIs, instead.
243///     3. Calling [`block_on`](https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.block_on).
244///     4. Invoking `println!` or other synchronous logging routines.   
245///        Invocations of `println!` involve acquiring an exclusive lock on stdout, followed by a
246///        synchronous write to stdout.
247/// 2. **Your tasks are computationally expensive.** Common culprits include:
248///     1. TLS/cryptographic routines
249///     2. doing a lot of processing on bytes
250///     3. calling non-Tokio resources
251///
252/// ##### Why are my tasks spending more time waiting to be polled?
253/// You observed that [your tasks are spending more time waiting to be polled](#are-my-tasks-spending-more-time-waiting-to-be-polled)
254/// suggesting some combination of:
255/// - Your application is inflating the time elapsed between instrumentation and first poll.
256/// - Your tasks are being scheduled into tokio's global queue.
257/// - Other tasks are spending too long without yielding, thus backing up tokio's queues.
258///
259/// Start by asking: [*Is time-to-first-poll unusually high?*](#is-time-to-first-poll-unusually-high)
260///
261/// ##### Why are my tasks spending more time waiting on external events to complete?
262/// You observed that [your tasks are spending more time waiting waiting on external events to
263/// complete](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete). But what
264/// event? Fortunately, within the task experiencing increased idle times, you monitored several
265/// sub-tasks with distinct [`TaskMonitor`]s. For each of these sub-tasks, you [*you try to identify
266/// the performance culprits...*](#identifying-the-high-level-culprits)
267///
268/// #### Digging even deeper
269///
270/// ##### Is time-to-first-poll unusually high?
271/// Contrast these two metrics:
272/// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**  
273///   This metric reflects the mean delay between the instant a task is first instrumented and the
274///   instant it is *first* polled.
275/// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**  
276///   This metric reflects the mean delay between the instant when tasks were awoken and the
277///   instant they were subsequently polled.
278///
279/// If the former metric exceeds the latter (or increased unexpectedly more than the latter), then
280/// start by investigating [*if your application is artificially delaying the time-to-first-poll*](#is-my-application-delaying-the-time-to-first-poll).
281///
282/// Otherwise, investigate [*if other tasks are polling too long without yielding*](#are-other-tasks-polling-too-long-without-yielding).
283///
284/// ##### Is my application delaying the time-to-first-poll?
285/// You observed that [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increased, more
286/// than [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]. Your application may be
287/// needlessly inflating the time elapsed between instrumentation and first poll. Are you
288/// constructing (and instrumenting) tasks separately from awaiting or spawning them?
289///
290/// For instance, in the below example, the application induces 1 second delay between when `task`
291/// is instrumented and when it is awaited:
292/// ```rust
293/// #[tokio::main]
294/// async fn main() {
295///     use tokio::time::Duration;
296///     let monitor = tokio_metrics::TaskMonitor::new();
297///
298///     let task = monitor.instrument(async move {});
299///
300///     let one_sec = Duration::from_secs(1);
301///     tokio::time::sleep(one_sec).await;
302///
303///     let _ = tokio::spawn(task).await;
304///
305///     assert!(monitor.cumulative().total_first_poll_delay >= one_sec);
306/// }
307/// ```
308///
309/// Otherwise, [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] might be unusually high
310/// because [*your application is spawning key tasks into tokio's global queue...*](#is-my-application-spawning-more-tasks-into-tokio’s-global-queue)
311///
312/// ##### Is my application spawning more tasks into tokio's global queue?
313/// Tasks awoken from threads *not* managed by the tokio runtime are scheduled with a slower,
314/// global "injection" queue.
315///
316/// You may be notifying runtime tasks from off-runtime. For instance, Given the following:
317/// ```ignore
318/// #[tokio::main]
319/// async fn main() {
320///     for _ in 0..100 {
321///         let (tx, rx) = oneshot::channel();
322///         tokio::spawn(async move {
323///             tx.send(());
324///         })
325///         
326///         rx.await;
327///     }
328/// }
329/// ```
330/// One would expect this to run efficiently, however, the main task is run *off* the main runtime
331/// and the spawned tasks are *on* runtime, which means the snippet will run much slower than:
332/// ```ignore
333/// #[tokio::main]
334/// async fn main() {
335///     tokio::spawn(async {
336///         for _ in 0..100 {
337///             let (tx, rx) = oneshot::channel();
338///             tokio::spawn(async move {
339///                 tx.send(());
340///             })
341///
342///             rx.await;
343///         }
344///     }).await;
345/// }
346/// ```
347/// The slowdown is caused by a higher time between the `rx` task being notified (in `tx.send()`)
348/// and the task being polled.
349///
350/// ##### Are other tasks polling too long without yielding?
351/// You suspect that your tasks are slow because they're backed up in tokio's scheduling queues. For
352/// *each* of your application's [`TaskMonitor`]s you check to see [*if their associated tasks are
353/// taking longer to poll...*](#are-my-tasks-taking-longer-to-poll)
354///
355/// ### Limitations
356/// The [`TaskMetrics`] type uses [`u64`] to represent both event counters and durations (measured
357/// in nanoseconds). Consequently, event counters are accurate for ≤ [`u64::MAX`] events, and
358/// durations are accurate for ≤ [`u64::MAX`] nanoseconds.
359///
360/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::cumulative`] increase
361/// monotonically with each successive invocation of [`TaskMonitor::cumulative`]. Upon overflow,
362/// counters and durations wrap.
363///
364/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::intervals`] are
365/// calculated by computing the difference of metrics in successive invocations of
366/// [`TaskMonitor::cumulative`]. If, within a monitoring interval, an event occurs more than
367/// [`u64::MAX`] times, or a monitored duration exceeds [`u64::MAX`] nanoseconds, the metrics for
368/// that interval will overflow and not be accurate.
369///
370/// ##### Examples at the limits
371/// Consider the [`TaskMetrics::total_first_poll_delay`] metric. This metric accurately reflects
372/// delays between instrumentation and first-poll ≤ [`u64::MAX`] nanoseconds:
373/// ```
374/// use tokio::time::Duration;
375///
376/// #[tokio::main(flavor = "current_thread", start_paused = true)]
377/// async fn main() {
378///     let monitor = tokio_metrics::TaskMonitor::new();
379///     let mut interval = monitor.intervals();
380///     let mut next_interval = || interval.next().unwrap();
381///
382///     // construct and instrument a task, but do not `await` it
383///     let task = monitor.instrument(async {});
384///
385///     // this is the maximum duration representable by tokio_metrics
386///     let max_duration = Duration::from_nanos(u64::MAX);
387///
388///     // let's advance the clock by this amount and poll `task`
389///     let _ = tokio::time::advance(max_duration).await;
390///     task.await;
391///
392///     // durations ≤ `max_duration` are accurately reflected in this metric
393///     assert_eq!(next_interval().total_first_poll_delay, max_duration);
394///     assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
395/// }
396/// ```
397/// If the total delay between instrumentation and first poll exceeds [`u64::MAX`] nanoseconds,
398/// [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay] will overflow:
399/// ```
400/// # use tokio::time::Duration;
401/// #
402/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
403/// # async fn main() {
404/// #    let monitor = tokio_metrics::TaskMonitor::new();
405/// #
406///  // construct and instrument a task, but do not `await` it
407///  let task_a = monitor.instrument(async {});
408///  let task_b = monitor.instrument(async {});
409///
410///  // this is the maximum duration representable by tokio_metrics
411///  let max_duration = Duration::from_nanos(u64::MAX);
412///
413///  // let's advance the clock by 1.5x this amount and await `task`
414///  let _ = tokio::time::advance(3 * (max_duration / 2)).await;
415///  task_a.await;
416///  task_b.await;
417///
418///  // the `total_first_poll_delay` has overflowed
419///  assert!(monitor.cumulative().total_first_poll_delay < max_duration);
420/// # }
421/// ```
422/// If *many* tasks are spawned, it will take far less than a [`u64::MAX`]-nanosecond delay to bring
423/// this metric to the precipice of overflow:
424/// ```
425/// # use tokio::time::Duration;
426/// #
427/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
428/// # async fn main() {
429/// #     let monitor = tokio_metrics::TaskMonitor::new();
430/// #     let mut interval = monitor.intervals();
431/// #     let mut next_interval = || interval.next().unwrap();
432/// #
433/// // construct and instrument u16::MAX tasks, but do not `await` them
434/// let first_poll_count = u16::MAX as u64;
435/// let mut tasks = Vec::with_capacity(first_poll_count as usize);
436/// for _ in 0..first_poll_count { tasks.push(monitor.instrument(async {})); }
437///
438/// // this is the maximum duration representable by tokio_metrics
439/// let max_duration = u64::MAX;
440///
441/// // let's advance the clock justenough such that all of the time-to-first-poll
442/// // delays summed nearly equals `max_duration_nanos`, less some remainder...
443/// let iffy_delay = max_duration / (first_poll_count as u64);
444/// let small_remainder = max_duration % first_poll_count;
445/// let _ = tokio::time::advance(Duration::from_nanos(iffy_delay)).await;
446///
447/// // ...then poll all of the instrumented tasks:
448/// for task in tasks { task.await; }
449///
450/// // `total_first_poll_delay` is at the precipice of overflowing!
451/// assert_eq!(
452///     next_interval().total_first_poll_delay.as_nanos(),
453///     (max_duration - small_remainder) as u128
454/// );
455/// assert_eq!(
456///     monitor.cumulative().total_first_poll_delay.as_nanos(),
457///     (max_duration - small_remainder) as u128
458/// );
459/// # }
460/// ```
461/// Frequent, interval-sampled metrics will retain their accuracy, even if the cumulative
462/// metrics counter overflows at most once in the midst of an interval:
463/// ```
464/// # use tokio::time::Duration;
465/// # use tokio_metrics::TaskMonitor;
466/// #
467/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
468/// # async fn main() {
469/// #     let monitor = TaskMonitor::new();
470/// #     let mut interval = monitor.intervals();
471/// #     let mut next_interval = || interval.next().unwrap();
472/// #
473///  let first_poll_count = u16::MAX as u64;
474///  let batch_size = first_poll_count / 3;
475///
476///  let max_duration_ns = u64::MAX;
477///  let iffy_delay_ns = max_duration_ns / first_poll_count;
478///
479///  // Instrument `batch_size` number of tasks, wait for `delay` nanoseconds,
480///  // then await the instrumented tasks.
481///  async fn run_batch(monitor: &TaskMonitor, batch_size: usize, delay: u64) {
482///      let mut tasks = Vec::with_capacity(batch_size);
483///      for _ in 0..batch_size { tasks.push(monitor.instrument(async {})); }
484///      let _ = tokio::time::advance(Duration::from_nanos(delay)).await;
485///      for task in tasks { task.await; }
486///  }
487///
488///  // this is how much `total_time_to_first_poll_ns` will
489///  // increase with each batch we run
490///  let batch_delay = iffy_delay_ns * batch_size;
491///
492///  // run batches 1, 2, and 3
493///  for i in 1..=3 {
494///      run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
495///      assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
496///      assert_eq!(i * batch_delay as u128, monitor.cumulative().total_first_poll_delay.as_nanos());
497///  }
498///
499///  /* now, the `total_time_to_first_poll_ns` counter is at the precipice of overflow */
500///  assert_eq!(monitor.cumulative().total_first_poll_delay.as_nanos(), max_duration_ns as u128);
501///
502///  // run batch 4
503///  run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
504///  // the interval counter remains accurate
505///  assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
506///  // but the cumulative counter has overflowed
507///  assert_eq!(batch_delay as u128 - 1, monitor.cumulative().total_first_poll_delay.as_nanos());
508/// # }
509/// ```
510/// If a cumulative metric overflows *more than once* in the midst of an interval,
511/// its interval-sampled counterpart will also overflow.
512#[derive(Clone, Debug)]
513pub struct TaskMonitor {
514    metrics: Arc<RawMetrics>,
515}
516
517/// Provides an interface for constructing a [`TaskMonitor`] with specialized configuration
518/// parameters.
519#[derive(Clone, Debug, Default)]
520pub struct TaskMonitorBuilder {
521    slow_poll_threshold: Option<Duration>,
522    long_delay_threshold: Option<Duration>,
523}
524
525impl TaskMonitorBuilder {
526    /// Creates a new [`TaskMonitorBuilder`].
527    pub fn new() -> Self {
528        Self {
529            slow_poll_threshold: None,
530            long_delay_threshold: None,
531        }
532    }
533
534    /// Specifies the threshold at which polls are considered 'slow'.
535    pub fn with_slow_poll_threshold(&mut self, threshold: Duration) -> &mut Self {
536        self.slow_poll_threshold = Some(threshold);
537
538        self
539    }
540
541    /// Specifies the threshold at which schedules are considered 'long'.
542    pub fn with_long_delay_threshold(&mut self, threshold: Duration) -> &mut Self {
543        self.long_delay_threshold = Some(threshold);
544
545        self
546    }
547
548    /// Consume the builder, producing a [`TaskMonitor`].
549    pub fn build(self) -> TaskMonitor {
550        TaskMonitor::create(
551            self.slow_poll_threshold
552                .unwrap_or(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD),
553            self.long_delay_threshold
554                .unwrap_or(TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD),
555        )
556    }
557}
558
559pin_project! {
560    /// An async task that has been instrumented with [`TaskMonitor::instrument`].
561    #[derive(Debug)]
562    pub struct Instrumented<T> {
563        // The task being instrumented
564        #[pin]
565        task: T,
566
567        // True when the task is polled for the first time
568        did_poll_once: bool,
569
570        // The instant, tracked as nanoseconds since `instrumented_at`, at which the future finished
571        // its last poll.
572        idled_at: u64,
573
574        // State shared between the task and its instrumented waker.
575        state: Arc<State>,
576    }
577
578    impl<T> PinnedDrop for Instrumented<T> {
579        fn drop(this: Pin<&mut Self>) {
580            this.state.metrics.dropped_count.fetch_add(1, SeqCst);
581        }
582    }
583}
584
585/// Key metrics of [instrumented][`TaskMonitor::instrument`] tasks.
586#[non_exhaustive]
587#[derive(Debug, Clone, Copy, Default)]
588pub struct TaskMetrics {
589    /// The number of tasks instrumented.
590    ///
591    /// ##### Examples
592    /// ```
593    /// #[tokio::main]
594    /// async fn main() {
595    ///     let monitor = tokio_metrics::TaskMonitor::new();
596    ///     let mut interval = monitor.intervals();
597    ///     let mut next_interval = || interval.next().unwrap();
598    ///
599    ///     // 0 tasks have been instrumented
600    ///     assert_eq!(next_interval().instrumented_count, 0);
601    ///
602    ///     monitor.instrument(async {});
603    ///
604    ///     // 1 task has been instrumented
605    ///     assert_eq!(next_interval().instrumented_count, 1);
606    ///
607    ///     monitor.instrument(async {});
608    ///     monitor.instrument(async {});
609    ///
610    ///     // 2 tasks have been instrumented
611    ///     assert_eq!(next_interval().instrumented_count, 2);
612    ///
613    ///     // since the last interval was produced, 0 tasks have been instrumented
614    ///     assert_eq!(next_interval().instrumented_count, 0);
615    /// }
616    /// ```
617    pub instrumented_count: u64,
618
619    /// The number of tasks dropped.
620    ///
621    /// ##### Examples
622    /// ```
623    /// #[tokio::main]
624    /// async fn main() {
625    ///     let monitor = tokio_metrics::TaskMonitor::new();
626    ///     let mut interval = monitor.intervals();
627    ///     let mut next_interval = || interval.next().unwrap();
628    ///
629    ///     // 0 tasks have been dropped
630    ///     assert_eq!(next_interval().dropped_count, 0);
631    ///
632    ///     let _task = monitor.instrument(async {});
633    ///
634    ///     // 0 tasks have been dropped
635    ///     assert_eq!(next_interval().dropped_count, 0);
636    ///
637    ///     monitor.instrument(async {}).await;
638    ///     drop(monitor.instrument(async {}));
639    ///
640    ///     // 2 tasks have been dropped
641    ///     assert_eq!(next_interval().dropped_count, 2);
642    ///
643    ///     // since the last interval was produced, 0 tasks have been dropped
644    ///     assert_eq!(next_interval().dropped_count, 0);
645    /// }
646    /// ```
647    pub dropped_count: u64,
648
649    /// The number of tasks polled for the first time.
650    ///
651    /// ##### Derived metrics
652    /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**  
653    ///   The mean duration elapsed between the instant tasks are instrumented, and the instant they
654    ///   are first polled.
655    ///
656    /// ##### Examples
657    /// In the below example, no tasks are instrumented or polled in the first sampling interval;
658    /// one task is instrumented (but not polled) in the second sampling interval; that task is
659    /// awaited to completion (and, thus, polled at least once) in the third sampling interval; no
660    /// additional tasks are polled for the first time within the fourth sampling interval:
661    /// ```
662    /// #[tokio::main]
663    /// async fn main() {
664    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
665    ///     let mut interval = metrics_monitor.intervals();
666    ///     let mut next_interval = || interval.next().unwrap();
667    ///
668    ///     // no tasks have been constructed, instrumented, and polled at least once
669    ///     assert_eq!(next_interval().first_poll_count, 0);
670    ///
671    ///     let task = metrics_monitor.instrument(async {});
672    ///
673    ///     // `task` has been constructed and instrumented, but has not yet been polled
674    ///     assert_eq!(next_interval().first_poll_count, 0);
675    ///
676    ///     // poll `task` to completion
677    ///     task.await;
678    ///
679    ///     // `task` has been constructed, instrumented, and polled at least once
680    ///     assert_eq!(next_interval().first_poll_count, 1);
681    ///
682    ///     // since the last interval was produced, 0 tasks have been constructed, instrumented and polled
683    ///     assert_eq!(next_interval().first_poll_count, 0);
684    ///
685    /// }
686    /// ```
687    pub first_poll_count: u64,
688
689    /// The total duration elapsed between the instant tasks are instrumented, and the instant they
690    /// are first polled.
691    ///
692    /// ##### Derived metrics
693    /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**   
694    ///   The mean duration elapsed between the instant tasks are instrumented, and the instant they
695    ///   are first polled.
696    ///
697    /// ##### Examples
698    /// In the below example, 0 tasks have been instrumented or polled within the first sampling
699    /// interval, a total of 500ms elapse between the instrumentation and polling of tasks within
700    /// the second sampling interval, and a total of 350ms elapse between the instrumentation and
701    /// polling of tasks within the third sampling interval:
702    /// ```
703    /// use tokio::time::Duration;
704    ///
705    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
706    /// async fn main() {
707    ///     let monitor = tokio_metrics::TaskMonitor::new();
708    ///     let mut interval = monitor.intervals();
709    ///     let mut next_interval = || interval.next().unwrap();
710    ///
711    ///     // no tasks have yet been created, instrumented, or polled
712    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
713    ///     assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
714    ///
715    ///     // constructs and instruments a task, pauses a given duration, then awaits the task
716    ///     async fn instrument_pause_await(monitor: &tokio_metrics::TaskMonitor, pause: Duration) {
717    ///         let task = monitor.instrument(async move {});
718    ///         tokio::time::sleep(pause).await;
719    ///         task.await;
720    ///     }
721    ///
722    ///     // construct and await a task that pauses for 500ms between instrumentation and first poll
723    ///     let task_a_pause_time = Duration::from_millis(500);
724    ///     instrument_pause_await(&monitor, task_a_pause_time).await;
725    ///
726    ///     assert_eq!(next_interval().total_first_poll_delay, task_a_pause_time);
727    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, task_a_pause_time);
728    ///
729    ///     // construct and await a task that pauses for 250ms between instrumentation and first poll
730    ///     let task_b_pause_time = Duration::from_millis(250);
731    ///     instrument_pause_await(&monitor, task_b_pause_time).await;
732    ///
733    ///     // construct and await a task that pauses for 100ms between instrumentation and first poll
734    ///     let task_c_pause_time = Duration::from_millis(100);
735    ///     instrument_pause_await(&monitor, task_c_pause_time).await;
736    ///
737    ///     assert_eq!(
738    ///         next_interval().total_first_poll_delay,
739    ///         task_b_pause_time + task_c_pause_time
740    ///     );
741    ///     assert_eq!(
742    ///         monitor.cumulative().total_first_poll_delay,
743    ///         task_a_pause_time + task_b_pause_time + task_c_pause_time
744    ///     );
745    /// }
746    /// ```
747    ///
748    /// ##### When is this metric recorded?
749    /// The delay between instrumentation and first poll is not recorded until the first poll
750    /// actually occurs:
751    /// ```
752    /// # use tokio::time::Duration;
753    /// #
754    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
755    /// # async fn main() {
756    /// #     let monitor = tokio_metrics::TaskMonitor::new();
757    /// #     let mut interval = monitor.intervals();
758    /// #     let mut next_interval = || interval.next().unwrap();
759    /// #
760    /// // we construct and instrument a task, but do not `await` it
761    /// let task = monitor.instrument(async {});
762    ///
763    /// // let's sleep for 1s before we poll `task`
764    /// let one_sec = Duration::from_secs(1);
765    /// let _ = tokio::time::sleep(one_sec).await;
766    ///
767    /// // although 1s has now elapsed since the instrumentation of `task`,
768    /// // this is not reflected in `total_first_poll_delay`...
769    /// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
770    /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
771    ///
772    /// // ...and won't be until `task` is actually polled
773    /// task.await;
774    ///
775    /// // now, the 1s delay is reflected in `total_first_poll_delay`:
776    /// assert_eq!(next_interval().total_first_poll_delay, one_sec);
777    /// assert_eq!(monitor.cumulative().total_first_poll_delay, one_sec);
778    /// # }
779    /// ```
780    ///
781    /// ##### What if first-poll-delay is very large?
782    /// The first-poll-delay of *individual* tasks saturates at `u64::MAX` nanoseconds. However, if
783    /// the *total* first-poll-delay *across* monitored tasks exceeds `u64::MAX` nanoseconds, this
784    /// metric will wrap around:
785    /// ```
786    /// use tokio::time::Duration;
787    ///
788    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
789    /// async fn main() {
790    ///     let monitor = tokio_metrics::TaskMonitor::new();
791    ///
792    ///     // construct and instrument a task, but do not `await` it
793    ///     let task = monitor.instrument(async {});
794    ///
795    ///     // this is the maximum duration representable by tokio_metrics
796    ///     let max_duration = Duration::from_nanos(u64::MAX);
797    ///
798    ///     // let's advance the clock by double this amount and await `task`
799    ///     let _ = tokio::time::advance(max_duration * 2).await;
800    ///     task.await;
801    ///
802    ///     // the time-to-first-poll of `task` saturates at `max_duration`
803    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
804    ///
805    ///     // ...but note that the metric *will* wrap around if more tasks are involved
806    ///     let task = monitor.instrument(async {});
807    ///     let _ = tokio::time::advance(Duration::from_nanos(1)).await;
808    ///     task.await;
809    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
810    /// }
811    /// ```
812    pub total_first_poll_delay: Duration,
813
814    /// The total number of times that tasks idled, waiting to be awoken.
815    ///
816    /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
817    /// task completes a poll, and the instant that it is next awoken.
818    ///
819    /// ##### Derived metrics
820    /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**   
821    ///   The mean duration of idles.
822    ///
823    /// ##### Examples
824    /// ```
825    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
826    /// async fn main() {
827    ///     let monitor = tokio_metrics::TaskMonitor::new();
828    ///     let mut interval = monitor.intervals();
829    ///     let mut next_interval = move || interval.next().unwrap();
830    ///     let one_sec = std::time::Duration::from_secs(1);
831    ///
832    ///     monitor.instrument(async {}).await;
833    ///
834    ///     assert_eq!(next_interval().total_idled_count, 0);
835    ///     assert_eq!(monitor.cumulative().total_idled_count, 0);
836    ///
837    ///     monitor.instrument(async move {
838    ///         tokio::time::sleep(one_sec).await;
839    ///     }).await;
840    ///
841    ///     assert_eq!(next_interval().total_idled_count, 1);
842    ///     assert_eq!(monitor.cumulative().total_idled_count, 1);
843    ///
844    ///     monitor.instrument(async {
845    ///         tokio::time::sleep(one_sec).await;
846    ///         tokio::time::sleep(one_sec).await;
847    ///     }).await;
848    ///
849    ///     assert_eq!(next_interval().total_idled_count, 2);
850    ///     assert_eq!(monitor.cumulative().total_idled_count, 3);
851    /// }
852    /// ```
853    pub total_idled_count: u64,
854
855    /// The total duration that tasks idled.
856    ///
857    /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
858    /// task completes a poll, and the instant that it is next awoken.
859    ///
860    /// ##### Derived metrics
861    /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**   
862    ///   The mean duration of idles.
863    ///
864    /// ##### Examples
865    /// ```
866    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
867    /// async fn main() {
868    ///     let monitor = tokio_metrics::TaskMonitor::new();
869    ///     let mut interval = monitor.intervals();
870    ///     let mut next_interval = move || interval.next().unwrap();
871    ///     let one_sec = std::time::Duration::from_secs(1);
872    ///     let two_sec = std::time::Duration::from_secs(2);
873    ///
874    ///     assert_eq!(next_interval().total_idle_duration.as_nanos(), 0);
875    ///     assert_eq!(monitor.cumulative().total_idle_duration.as_nanos(), 0);
876    ///
877    ///     monitor.instrument(async move {
878    ///         tokio::time::sleep(one_sec).await;
879    ///     }).await;
880    ///
881    ///     assert_eq!(next_interval().total_idle_duration, one_sec);
882    ///     assert_eq!(monitor.cumulative().total_idle_duration, one_sec);
883    ///
884    ///     monitor.instrument(async move {
885    ///         tokio::time::sleep(two_sec).await;
886    ///     }).await;
887    ///
888    ///     assert_eq!(next_interval().total_idle_duration, two_sec);
889    ///     assert_eq!(monitor.cumulative().total_idle_duration, one_sec + two_sec);
890    /// }
891    /// ```
892    pub total_idle_duration: Duration,
893
894    /// The maximum idle duration that a task took.
895    ///
896    /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
897    /// task completes a poll, and the instant that it is next awoken.
898    ///
899    /// ##### Derived metrics
900    /// - **[`max_idle_duration`][TaskMetrics::max_idle_duration]**   
901    ///   The longest duration a task spent idle.
902    ///
903    /// ##### Examples
904    /// ```
905    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
906    /// async fn main() {
907    ///     let monitor = tokio_metrics::TaskMonitor::new();
908    ///     let mut interval = monitor.intervals();
909    ///     let mut next_interval = move || interval.next().unwrap();
910    ///     let one_sec = std::time::Duration::from_secs(1);
911    ///     let two_sec = std::time::Duration::from_secs(2);
912    ///
913    ///     assert_eq!(next_interval().max_idle_duration.as_nanos(), 0);
914    ///     assert_eq!(monitor.cumulative().max_idle_duration.as_nanos(), 0);
915    ///
916    ///     monitor.instrument(async move {
917    ///         tokio::time::sleep(one_sec).await;
918    ///     }).await;
919    ///
920    ///     assert_eq!(next_interval().max_idle_duration, one_sec);
921    ///     assert_eq!(monitor.cumulative().max_idle_duration, one_sec);
922    ///
923    ///     monitor.instrument(async move {
924    ///         tokio::time::sleep(two_sec).await;
925    ///     }).await;
926    ///
927    ///     assert_eq!(next_interval().max_idle_duration, two_sec);
928    ///     assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
929    ///
930    ///     monitor.instrument(async move {
931    ///         tokio::time::sleep(one_sec).await;
932    ///     }).await;
933    ///
934    ///     assert_eq!(next_interval().max_idle_duration, one_sec);
935    ///     assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
936    /// }
937    /// ```
938    pub max_idle_duration: Duration,
939
940    /// The total number of times that tasks were awoken (and then, presumably, scheduled for
941    /// execution).
942    ///
943    /// ##### Definition
944    /// This metric is equal to [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration]
945    /// \+ [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration].
946    ///
947    /// ##### Derived metrics
948    /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**   
949    ///   The mean duration that tasks spent waiting to be executed after awakening.
950    ///
951    /// ##### Examples
952    /// In the below example, a task yields to the scheduler a varying number of times between
953    /// sampling intervals; this metric is equal to the number of times the task yielded:
954    /// ```
955    /// #[tokio::main]
956    /// async fn main(){
957    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
958    ///
959    ///     // [A] no tasks have been created, instrumented, and polled more than once
960    ///     assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
961    ///
962    ///     // [B] a `task` is created and instrumented
963    ///     let task = {
964    ///         let monitor = metrics_monitor.clone();
965    ///         metrics_monitor.instrument(async move {
966    ///             let mut interval = monitor.intervals();
967    ///             let mut next_interval = move || interval.next().unwrap();
968    ///
969    ///             // [E] `task` has not yet yielded to the scheduler, and
970    ///             // thus has not yet been scheduled since its first `poll`
971    ///             assert_eq!(next_interval().total_scheduled_count, 0);
972    ///
973    ///             tokio::task::yield_now().await; // yield to the scheduler
974    ///
975    ///             // [F] `task` has yielded to the scheduler once (and thus been
976    ///             // scheduled once) since the last sampling interval
977    ///             assert_eq!(next_interval().total_scheduled_count, 1);
978    ///
979    ///             tokio::task::yield_now().await; // yield to the scheduler
980    ///             tokio::task::yield_now().await; // yield to the scheduler
981    ///             tokio::task::yield_now().await; // yield to the scheduler
982    ///
983    ///             // [G] `task` has yielded to the scheduler thrice (and thus been
984    ///             // scheduled thrice) since the last sampling interval
985    ///             assert_eq!(next_interval().total_scheduled_count, 3);
986    ///
987    ///             tokio::task::yield_now().await; // yield to the scheduler
988    ///
989    ///             next_interval
990    ///         })
991    ///     };
992    ///
993    ///     // [C] `task` has not yet been polled at all
994    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
995    ///     assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
996    ///
997    ///     // [D] poll `task` to completion
998    ///     let mut next_interval = task.await;
999    ///
1000    ///     // [H] `task` has been polled 1 times since the last sample
1001    ///     assert_eq!(next_interval().total_scheduled_count, 1);
1002    ///
1003    ///     // [I] `task` has been polled 0 times since the last sample
1004    ///     assert_eq!(next_interval().total_scheduled_count, 0);
1005    ///
1006    ///     // [J] `task` has yielded to the scheduler a total of five times
1007    ///     assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 5);
1008    /// }
1009    /// ```
1010    #[doc(alias = "total_delay_count")]
1011    pub total_scheduled_count: u64,
1012
1013    /// The total duration that tasks spent waiting to be polled after awakening.
1014    ///
1015    /// ##### Definition
1016    /// This metric is equal to [`total_short_delay_count`][TaskMetrics::total_short_delay_count]
1017    /// \+ [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
1018    ///
1019    /// ##### Derived metrics
1020    /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**   
1021    ///   The mean duration that tasks spent waiting to be executed after awakening.
1022    ///
1023    /// ##### Examples
1024    /// ```
1025    /// use tokio::time::Duration;
1026    ///
1027    /// #[tokio::main(flavor = "current_thread")]
1028    /// async fn main() {
1029    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1030    ///     let mut interval = metrics_monitor.intervals();
1031    ///     let mut next_interval = || interval.next().unwrap();
1032    ///
1033    ///     // construct and instrument and spawn a task that yields endlessly
1034    ///     tokio::spawn(metrics_monitor.instrument(async {
1035    ///         loop { tokio::task::yield_now().await }
1036    ///     }));
1037    ///
1038    ///     tokio::task::yield_now().await;
1039    ///
1040    ///     // block the executor for 1 second
1041    ///     std::thread::sleep(Duration::from_millis(1000));
1042    ///
1043    ///     tokio::task::yield_now().await;
1044    ///
1045    ///     // `endless_task` will have spent approximately one second waiting
1046    ///     let total_scheduled_duration = next_interval().total_scheduled_duration;
1047    ///     assert!(total_scheduled_duration >= Duration::from_millis(1000));
1048    ///     assert!(total_scheduled_duration <= Duration::from_millis(1100));
1049    /// }
1050    /// ```
1051    #[doc(alias = "total_delay_duration")]
1052    pub total_scheduled_duration: Duration,
1053
1054    /// The total number of times that tasks were polled.
1055    ///
1056    /// ##### Definition
1057    /// This metric is equal to [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count]
1058    /// \+ [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
1059    ///
1060    /// ##### Derived metrics
1061    /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**   
1062    ///   The mean duration of polls.
1063    ///
1064    /// ##### Examples
1065    /// In the below example, a task with multiple yield points is await'ed to completion; this
1066    /// metric reflects the number of `await`s within each sampling interval:
1067    /// ```
1068    /// #[tokio::main]
1069    /// async fn main() {
1070    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1071    ///
1072    ///     // [A] no tasks have been created, instrumented, and polled more than once
1073    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1074    ///
1075    ///     // [B] a `task` is created and instrumented
1076    ///     let task = {
1077    ///         let monitor = metrics_monitor.clone();
1078    ///         metrics_monitor.instrument(async move {
1079    ///             let mut interval = monitor.intervals();
1080    ///             let mut next_interval = move || interval.next().unwrap();
1081    ///
1082    ///             // [E] task is in the midst of its first poll
1083    ///             assert_eq!(next_interval().total_poll_count, 0);
1084    ///
1085    ///             tokio::task::yield_now().await; // poll 1
1086    ///
1087    ///             // [F] task has been polled 1 time
1088    ///             assert_eq!(next_interval().total_poll_count, 1);
1089    ///
1090    ///             tokio::task::yield_now().await; // poll 2
1091    ///             tokio::task::yield_now().await; // poll 3
1092    ///             tokio::task::yield_now().await; // poll 4
1093    ///
1094    ///             // [G] task has been polled 3 times
1095    ///             assert_eq!(next_interval().total_poll_count, 3);
1096    ///
1097    ///             tokio::task::yield_now().await; // poll 5
1098    ///
1099    ///             next_interval                   // poll 6
1100    ///         })
1101    ///     };
1102    ///
1103    ///     // [C] `task` has not yet been polled at all
1104    ///     assert_eq!(metrics_monitor.cumulative().total_poll_count, 0);
1105    ///
1106    ///     // [D] poll `task` to completion
1107    ///     let mut next_interval = task.await;
1108    ///
1109    ///     // [H] `task` has been polled 2 times since the last sample
1110    ///     assert_eq!(next_interval().total_poll_count, 2);
1111    ///
1112    ///     // [I] `task` has been polled 0 times since the last sample
1113    ///     assert_eq!(next_interval().total_poll_count, 0);
1114    ///
1115    ///     // [J] `task` has been polled 6 times
1116    ///     assert_eq!(metrics_monitor.cumulative().total_poll_count, 6);
1117    /// }
1118    /// ```
1119    pub total_poll_count: u64,
1120
1121    /// The total duration elapsed during polls.
1122    ///
1123    /// ##### Definition
1124    /// This metric is equal to [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration]
1125    /// \+ [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration].
1126    ///
1127    /// ##### Derived metrics
1128    /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**   
1129    ///   The mean duration of polls.
1130    ///
1131    /// #### Examples
1132    /// ```
1133    /// use tokio::time::Duration;
1134    ///
1135    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
1136    /// async fn main() {
1137    ///     let monitor = tokio_metrics::TaskMonitor::new();
1138    ///     let mut interval = monitor.intervals();
1139    ///     let mut next_interval = move || interval.next().unwrap();
1140    ///
1141    ///     assert_eq!(next_interval().total_poll_duration, Duration::ZERO);
1142    ///
1143    ///     monitor.instrument(async {
1144    ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
1145    ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
1146    ///         ()                                                  // poll 3 (0s)
1147    ///     }).await;
1148    ///
1149    ///     assert_eq!(next_interval().total_poll_duration, Duration::from_secs(2));
1150    /// }
1151    /// ```
1152    pub total_poll_duration: Duration,
1153
1154    /// The total number of times that polling tasks completed swiftly.
1155    ///
1156    /// Here, 'swiftly' is defined as completing in strictly less time than
1157    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1158    ///
1159    /// ##### Derived metrics
1160    /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**   
1161    ///   The mean duration of fast polls.
1162    ///
1163    /// ##### Examples
1164    /// In the below example, 0 polls occur within the first sampling interval, 3 fast polls occur
1165    /// within the second sampling interval, and 2 fast polls occur within the third sampling
1166    /// interval:
1167    /// ```
1168    /// use std::future::Future;
1169    /// use std::time::Duration;
1170    ///
1171    /// #[tokio::main]
1172    /// async fn main() {
1173    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1174    ///     let mut interval = metrics_monitor.intervals();
1175    ///     let mut next_interval = || interval.next().unwrap();
1176    ///
1177    ///     // no tasks have been constructed, instrumented, or polled
1178    ///     assert_eq!(next_interval().total_fast_poll_count, 0);
1179    ///
1180    ///     let fast = Duration::ZERO;
1181    ///
1182    ///     // this task completes in three fast polls
1183    ///     let _ = metrics_monitor.instrument(async {
1184    ///         spin_for(fast).await; // fast poll 1
1185    ///         spin_for(fast).await; // fast poll 2
1186    ///         spin_for(fast)        // fast poll 3
1187    ///     }).await;
1188    ///
1189    ///     assert_eq!(next_interval().total_fast_poll_count, 3);
1190    ///
1191    ///     // this task completes in two fast polls
1192    ///     let _ = metrics_monitor.instrument(async {
1193    ///         spin_for(fast).await; // fast poll 1
1194    ///         spin_for(fast)        // fast poll 2
1195    ///     }).await;
1196    ///
1197    ///     assert_eq!(next_interval().total_fast_poll_count, 2);
1198    /// }
1199    ///
1200    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1201    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1202    ///     let start = tokio::time::Instant::now();
1203    ///     while start.elapsed() <= duration {}
1204    ///     tokio::task::yield_now()
1205    /// }
1206    /// ```
1207    pub total_fast_poll_count: u64,
1208
1209    /// The total duration of fast polls.
1210    ///
1211    /// Here, 'fast' is defined as completing in strictly less time than
1212    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1213    ///
1214    /// ##### Derived metrics
1215    /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**   
1216    ///   The mean duration of fast polls.
1217    ///
1218    /// ##### Examples
1219    /// In the below example, no tasks are polled in the first sampling interval; three fast polls
1220    /// consume a total of 3μs time in the second sampling interval; and two fast polls consume a
1221    /// total of 2μs time in the third sampling interval:
1222    /// ```
1223    /// use std::future::Future;
1224    /// use std::time::Duration;
1225    ///
1226    /// #[tokio::main]
1227    /// async fn main() {
1228    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1229    ///     let mut interval = metrics_monitor.intervals();
1230    ///     let mut next_interval = || interval.next().unwrap();
1231    ///
1232    ///     // no tasks have been constructed, instrumented, or polled
1233    ///     let interval = next_interval();
1234    ///     assert_eq!(interval.total_fast_poll_duration, Duration::ZERO);
1235    ///
1236    ///     let fast = Duration::from_micros(1);
1237    ///
1238    ///     // this task completes in three fast polls
1239    ///     let task_a_time = time(metrics_monitor.instrument(async {
1240    ///         spin_for(fast).await; // fast poll 1
1241    ///         spin_for(fast).await; // fast poll 2
1242    ///         spin_for(fast)        // fast poll 3
1243    ///     })).await;
1244    ///
1245    ///     let interval = next_interval();
1246    ///     assert!(interval.total_fast_poll_duration >= fast * 3);
1247    ///     assert!(interval.total_fast_poll_duration <= task_a_time);
1248    ///
1249    ///     // this task completes in two fast polls
1250    ///     let task_b_time = time(metrics_monitor.instrument(async {
1251    ///         spin_for(fast).await; // fast poll 1
1252    ///         spin_for(fast)        // fast poll 2
1253    ///     })).await;
1254    ///
1255    ///     let interval = next_interval();
1256    ///     assert!(interval.total_fast_poll_duration >= fast * 2);
1257    ///     assert!(interval.total_fast_poll_duration <= task_b_time);
1258    /// }
1259    ///
1260    /// /// Produces the amount of time it took to await a given async task.
1261    /// async fn time(task: impl Future) -> Duration {
1262    ///     let start = tokio::time::Instant::now();
1263    ///     task.await;
1264    ///     start.elapsed()
1265    /// }
1266    ///
1267    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1268    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1269    ///     let start = tokio::time::Instant::now();
1270    ///     while start.elapsed() <= duration {}
1271    ///     tokio::task::yield_now()
1272    /// }
1273    /// ```
1274    pub total_fast_poll_duration: Duration,
1275
1276    /// The total number of times that polling tasks completed slowly.
1277    ///
1278    /// Here, 'slowly' is defined as completing in at least as much time as
1279    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1280    ///
1281    /// ##### Derived metrics
1282    /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**   
1283    ///   The mean duration of slow polls.
1284    ///
1285    /// ##### Examples
1286    /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1287    /// within the second sampling interval, and 2 slow polls occur within the third sampling
1288    /// interval:
1289    /// ```
1290    /// use std::future::Future;
1291    /// use std::time::Duration;
1292    ///
1293    /// #[tokio::main]
1294    /// async fn main() {
1295    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1296    ///     let mut interval = metrics_monitor.intervals();
1297    ///     let mut next_interval = || interval.next().unwrap();
1298    ///
1299    ///     // no tasks have been constructed, instrumented, or polled
1300    ///     assert_eq!(next_interval().total_slow_poll_count, 0);
1301    ///
1302    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1303    ///
1304    ///     // this task completes in three slow polls
1305    ///     let _ = metrics_monitor.instrument(async {
1306    ///         spin_for(slow).await; // slow poll 1
1307    ///         spin_for(slow).await; // slow poll 2
1308    ///         spin_for(slow)        // slow poll 3
1309    ///     }).await;
1310    ///
1311    ///     assert_eq!(next_interval().total_slow_poll_count, 3);
1312    ///
1313    ///     // this task completes in two slow polls
1314    ///     let _ = metrics_monitor.instrument(async {
1315    ///         spin_for(slow).await; // slow poll 1
1316    ///         spin_for(slow)        // slow poll 2
1317    ///     }).await;
1318    ///
1319    ///     assert_eq!(next_interval().total_slow_poll_count, 2);
1320    /// }
1321    ///
1322    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1323    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1324    ///     let start = tokio::time::Instant::now();
1325    ///     while start.elapsed() <= duration {}
1326    ///     tokio::task::yield_now()
1327    /// }
1328    /// ```
1329    pub total_slow_poll_count: u64,
1330
1331    /// The total duration of slow polls.
1332    ///
1333    /// Here, 'slowly' is defined as completing in at least as much time as
1334    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1335    ///
1336    /// ##### Derived metrics
1337    /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**   
1338    ///   The mean duration of slow polls.
1339    ///
1340    /// ##### Examples
1341    /// In the below example, no tasks are polled in the first sampling interval; three slow polls
1342    /// consume a total of
1343    /// 30 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD]
1344    /// time in the second sampling interval; and two slow polls consume a total of
1345    /// 20 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
1346    /// third sampling interval:
1347    /// ```
1348    /// use std::future::Future;
1349    /// use std::time::Duration;
1350    ///
1351    /// #[tokio::main]
1352    /// async fn main() {
1353    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1354    ///     let mut interval = metrics_monitor.intervals();
1355    ///     let mut next_interval = || interval.next().unwrap();
1356    ///
1357    ///     // no tasks have been constructed, instrumented, or polled
1358    ///     let interval = next_interval();
1359    ///     assert_eq!(interval.total_slow_poll_duration, Duration::ZERO);
1360    ///
1361    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1362    ///
1363    ///     // this task completes in three slow polls
1364    ///     let task_a_time = time(metrics_monitor.instrument(async {
1365    ///         spin_for(slow).await; // slow poll 1
1366    ///         spin_for(slow).await; // slow poll 2
1367    ///         spin_for(slow)        // slow poll 3
1368    ///     })).await;
1369    ///
1370    ///     let interval = next_interval();
1371    ///     assert!(interval.total_slow_poll_duration >= slow * 3);
1372    ///     assert!(interval.total_slow_poll_duration <= task_a_time);
1373    ///
1374    ///     // this task completes in two slow polls
1375    ///     let task_b_time = time(metrics_monitor.instrument(async {
1376    ///         spin_for(slow).await; // slow poll 1
1377    ///         spin_for(slow)        // slow poll 2
1378    ///     })).await;
1379    ///
1380    ///     let interval = next_interval();
1381    ///     assert!(interval.total_slow_poll_duration >= slow * 2);
1382    ///     assert!(interval.total_slow_poll_duration <= task_b_time);
1383    /// }
1384    ///
1385    /// /// Produces the amount of time it took to await a given async task.
1386    /// async fn time(task: impl Future) -> Duration {
1387    ///     let start = tokio::time::Instant::now();
1388    ///     task.await;
1389    ///     start.elapsed()
1390    /// }
1391    ///
1392    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1393    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1394    ///     let start = tokio::time::Instant::now();
1395    ///     while start.elapsed() <= duration {}
1396    ///     tokio::task::yield_now()
1397    /// }
1398    /// ```
1399    pub total_slow_poll_duration: Duration,
1400
1401    /// The total count of tasks with short scheduling delays.
1402    ///
1403    /// This is defined as tasks taking strictly less than
1404    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1405    /// scheduled.
1406    ///
1407    /// ##### Derived metrics
1408    /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**   
1409    ///   The mean duration of short scheduling delays.
1410    pub total_short_delay_count: u64,
1411
1412    /// The total count of tasks with long scheduling delays.
1413    ///
1414    /// This is defined as tasks taking
1415    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] or longer to be executed
1416    /// after being scheduled.
1417    ///
1418    /// ##### Derived metrics
1419    /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**   
1420    ///   The mean duration of short scheduling delays.
1421    pub total_long_delay_count: u64,
1422
1423    /// The total duration of tasks with short scheduling delays.
1424    ///
1425    /// This is defined as tasks taking strictly less than
1426    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1427    /// scheduled.
1428    ///
1429    /// ##### Derived metrics
1430    /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**   
1431    ///   The mean duration of short scheduling delays.
1432    pub total_short_delay_duration: Duration,
1433
1434    /// The total number of times that a task had a long scheduling duration.
1435    ///
1436    /// Here, a long scheduling duration is defined as taking longer to start execution after
1437    /// scheduling than [`long_delay_threshold`][TaskMonitor::long_delay_threshold].
1438    ///
1439    /// ##### Derived metrics
1440    /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**   
1441    ///   The mean duration of short scheduling delays.
1442    pub total_long_delay_duration: Duration,
1443}
1444
1445/// Tracks the metrics, shared across the various types.
1446#[derive(Debug)]
1447struct RawMetrics {
1448    /// A task poll takes longer than this, it is considered a slow poll.
1449    slow_poll_threshold: Duration,
1450
1451    /// A scheduling delay of at least this long will be considered a long delay
1452    long_delay_threshold: Duration,
1453
1454    /// Total number of instrumented tasks.
1455    instrumented_count: AtomicU64,
1456
1457    /// Total number of instrumented tasks polled at least once.
1458    first_poll_count: AtomicU64,
1459
1460    /// Total number of times tasks entered the `idle` state.
1461    total_idled_count: AtomicU64,
1462
1463    /// Total number of times tasks were scheduled.
1464    total_scheduled_count: AtomicU64,
1465
1466    /// Total number of times tasks were polled fast
1467    total_fast_poll_count: AtomicU64,
1468
1469    /// Total number of times tasks were polled slow
1470    total_slow_poll_count: AtomicU64,
1471
1472    /// Total number of times tasks had long delay,
1473    total_long_delay_count: AtomicU64,
1474
1475    /// Total number of times tasks had little delay
1476    total_short_delay_count: AtomicU64,
1477
1478    /// Total number of times tasks were dropped
1479    dropped_count: AtomicU64,
1480
1481    /// Total amount of time until the first poll
1482    total_first_poll_delay_ns: AtomicU64,
1483
1484    /// Total amount of time tasks spent in the `idle` state.
1485    total_idle_duration_ns: AtomicU64,
1486
1487    /// The longest time tasks spent in the `idle` state locally.
1488    /// This will be used to track the local max between interval
1489    /// metric snapshots.
1490    local_max_idle_duration_ns: AtomicU64,
1491
1492    /// The longest time tasks spent in the `idle` state.
1493    global_max_idle_duration_ns: AtomicU64,
1494
1495    /// Total amount of time tasks spent in the waking state.
1496    total_scheduled_duration_ns: AtomicU64,
1497
1498    /// Total amount of time tasks spent being polled below the slow cut off.
1499    total_fast_poll_duration_ns: AtomicU64,
1500
1501    /// Total amount of time tasks spent being polled above the slow cut off.
1502    total_slow_poll_duration: AtomicU64,
1503
1504    /// Total amount of time tasks spent being polled below the long delay cut off.
1505    total_short_delay_duration_ns: AtomicU64,
1506
1507    /// Total amount of time tasks spent being polled at or above the long delay cut off.
1508    total_long_delay_duration_ns: AtomicU64,
1509}
1510
1511#[derive(Debug)]
1512struct State {
1513    /// Where metrics should be recorded
1514    metrics: Arc<RawMetrics>,
1515
1516    /// Instant at which the task was instrumented. This is used to track the time to first poll.
1517    instrumented_at: Instant,
1518
1519    /// The instant, tracked as nanoseconds since `instrumented_at`, at which the future
1520    /// was last woken.
1521    woke_at: AtomicU64,
1522
1523    /// Waker to forward notifications to.
1524    waker: AtomicWaker,
1525}
1526
1527impl TaskMonitor {
1528    /// The default duration at which polls cross the threshold into being categorized as 'slow' is
1529    /// 50μs.
1530    #[cfg(not(test))]
1531    pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_micros(50);
1532    #[cfg(test)]
1533    #[allow(missing_docs)]
1534    pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_millis(500);
1535
1536    /// The default duration at which schedules cross the threshold into being categorized as 'long'
1537    /// is 50μs.
1538    #[cfg(not(test))]
1539    pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_micros(50);
1540    #[cfg(test)]
1541    #[allow(missing_docs)]
1542    pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_millis(500);
1543
1544    /// Constructs a new task monitor.
1545    ///
1546    /// Uses [`Self::DEFAULT_SLOW_POLL_THRESHOLD`] as the threshold at which polls will be
1547    /// considered 'slow'.
1548    ///
1549    /// Uses [`Self::DEFAULT_LONG_DELAY_THRESHOLD`] as the threshold at which scheduling will be
1550    /// considered 'long'.
1551    pub fn new() -> TaskMonitor {
1552        TaskMonitor::with_slow_poll_threshold(Self::DEFAULT_SLOW_POLL_THRESHOLD)
1553    }
1554
1555    /// Constructs a builder for a task monitor.
1556    pub fn builder() -> TaskMonitorBuilder {
1557        TaskMonitorBuilder::new()
1558    }
1559
1560    /// Constructs a new task monitor with a given threshold at which polls are considered 'slow'.
1561    ///
1562    /// ##### Selecting an appropriate threshold
1563    /// TODO. What advice can we give here?
1564    ///
1565    /// ##### Examples
1566    /// In the below example, low-threshold and high-threshold monitors are constructed and
1567    /// instrument identical tasks; the low-threshold monitor reports4 slow polls, and the
1568    /// high-threshold monitor reports only 2 slow polls:
1569    /// ```
1570    /// use std::future::Future;
1571    /// use std::time::Duration;
1572    /// use tokio_metrics::TaskMonitor;
1573    ///
1574    /// #[tokio::main]
1575    /// async fn main() {
1576    ///     let lo_threshold = Duration::from_micros(10);
1577    ///     let hi_threshold = Duration::from_millis(10);
1578    ///
1579    ///     let lo_monitor = TaskMonitor::with_slow_poll_threshold(lo_threshold);
1580    ///     let hi_monitor = TaskMonitor::with_slow_poll_threshold(hi_threshold);
1581    ///
1582    ///     let make_task = || async {
1583    ///         spin_for(lo_threshold).await; // faster poll 1
1584    ///         spin_for(lo_threshold).await; // faster poll 2
1585    ///         spin_for(hi_threshold).await; // slower poll 3
1586    ///         spin_for(hi_threshold).await  // slower poll 4
1587    ///     };
1588    ///
1589    ///     lo_monitor.instrument(make_task()).await;
1590    ///     hi_monitor.instrument(make_task()).await;
1591    ///
1592    ///     // the low-threshold monitor reported 4 slow polls:
1593    ///     assert_eq!(lo_monitor.cumulative().total_slow_poll_count, 4);
1594    ///     // the high-threshold monitor reported only 2 slow polls:
1595    ///     assert_eq!(hi_monitor.cumulative().total_slow_poll_count, 2);
1596    /// }
1597    ///
1598    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1599    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1600    ///     let start = tokio::time::Instant::now();
1601    ///     while start.elapsed() <= duration {}
1602    ///     tokio::task::yield_now()
1603    /// }
1604    /// ```
1605    pub fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitor {
1606        Self::create(slow_poll_cut_off, Self::DEFAULT_LONG_DELAY_THRESHOLD)
1607    }
1608
1609    fn create(slow_poll_cut_off: Duration, long_delay_cut_off: Duration) -> TaskMonitor {
1610        TaskMonitor {
1611            metrics: Arc::new(RawMetrics {
1612                slow_poll_threshold: slow_poll_cut_off,
1613                first_poll_count: AtomicU64::new(0),
1614                total_idled_count: AtomicU64::new(0),
1615                total_scheduled_count: AtomicU64::new(0),
1616                total_fast_poll_count: AtomicU64::new(0),
1617                total_slow_poll_count: AtomicU64::new(0),
1618                total_long_delay_count: AtomicU64::new(0),
1619                instrumented_count: AtomicU64::new(0),
1620                dropped_count: AtomicU64::new(0),
1621                total_first_poll_delay_ns: AtomicU64::new(0),
1622                total_scheduled_duration_ns: AtomicU64::new(0),
1623                local_max_idle_duration_ns: AtomicU64::new(0),
1624                global_max_idle_duration_ns: AtomicU64::new(0),
1625                total_idle_duration_ns: AtomicU64::new(0),
1626                total_fast_poll_duration_ns: AtomicU64::new(0),
1627                total_slow_poll_duration: AtomicU64::new(0),
1628                total_short_delay_duration_ns: AtomicU64::new(0),
1629                long_delay_threshold: long_delay_cut_off,
1630                total_short_delay_count: AtomicU64::new(0),
1631                total_long_delay_duration_ns: AtomicU64::new(0),
1632            }),
1633        }
1634    }
1635
1636    /// Produces the duration greater-than-or-equal-to at which polls are categorized as slow.
1637    ///
1638    /// ##### Examples
1639    /// In the below example, [`TaskMonitor`] is initialized with [`TaskMonitor::new`];
1640    /// consequently, its slow-poll threshold equals [`TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD`]:
1641    /// ```
1642    /// use tokio_metrics::TaskMonitor;
1643    ///
1644    /// #[tokio::main]
1645    /// async fn main() {
1646    ///     let metrics_monitor = TaskMonitor::new();
1647    ///
1648    ///     assert_eq!(
1649    ///         metrics_monitor.slow_poll_threshold(),
1650    ///         TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD
1651    ///     );
1652    /// }
1653    /// ```
1654    pub fn slow_poll_threshold(&self) -> Duration {
1655        self.metrics.slow_poll_threshold
1656    }
1657
1658    /// Produces the duration greater-than-or-equal-to at which scheduling delays are categorized
1659    /// as long.
1660    pub fn long_delay_threshold(&self) -> Duration {
1661        self.metrics.long_delay_threshold
1662    }
1663
1664    /// Produces an instrumented façade around a given async task.
1665    ///
1666    /// ##### Examples
1667    /// Instrument an async task by passing it to [`TaskMonitor::instrument`]:
1668    /// ```
1669    /// #[tokio::main]
1670    /// async fn main() {
1671    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1672    ///
1673    ///     // 0 tasks have been instrumented, much less polled
1674    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1675    ///
1676    ///     // instrument a task and poll it to completion
1677    ///     metrics_monitor.instrument(async {}).await;
1678    ///
1679    ///     // 1 task has been instrumented and polled
1680    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 1);
1681    ///
1682    ///     // instrument a task and poll it to completion
1683    ///     metrics_monitor.instrument(async {}).await;
1684    ///
1685    ///     // 2 tasks have been instrumented and polled
1686    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 2);
1687    /// }
1688    /// ```
1689    /// An aync task may be tracked by multiple [`TaskMonitor`]s; e.g.:
1690    /// ```
1691    /// #[tokio::main]
1692    /// async fn main() {
1693    ///     let monitor_a = tokio_metrics::TaskMonitor::new();
1694    ///     let monitor_b = tokio_metrics::TaskMonitor::new();
1695    ///
1696    ///     // 0 tasks have been instrumented, much less polled
1697    ///     assert_eq!(monitor_a.cumulative().first_poll_count, 0);
1698    ///     assert_eq!(monitor_b.cumulative().first_poll_count, 0);
1699    ///
1700    ///     // instrument a task and poll it to completion
1701    ///     monitor_a.instrument(monitor_b.instrument(async {})).await;
1702    ///
1703    ///     // 1 task has been instrumented and polled
1704    ///     assert_eq!(monitor_a.cumulative().first_poll_count, 1);
1705    ///     assert_eq!(monitor_b.cumulative().first_poll_count, 1);
1706    /// }
1707    /// ```
1708    /// It is also possible (but probably undesirable) to instrument an async task multiple times
1709    /// with the same [`TaskMonitor`]; e.g.:
1710    /// ```
1711    /// #[tokio::main]
1712    /// async fn main() {
1713    ///     let monitor = tokio_metrics::TaskMonitor::new();
1714    ///
1715    ///     // 0 tasks have been instrumented, much less polled
1716    ///     assert_eq!(monitor.cumulative().first_poll_count, 0);
1717    ///
1718    ///     // instrument a task and poll it to completion
1719    ///     monitor.instrument(monitor.instrument(async {})).await;
1720    ///
1721    ///     // 2 tasks have been instrumented and polled, supposedly
1722    ///     assert_eq!(monitor.cumulative().first_poll_count, 2);
1723    /// }
1724    /// ```
1725    pub fn instrument<F>(&self, task: F) -> Instrumented<F> {
1726        self.metrics.instrumented_count.fetch_add(1, SeqCst);
1727        Instrumented {
1728            task,
1729            did_poll_once: false,
1730            idled_at: 0,
1731            state: Arc::new(State {
1732                metrics: self.metrics.clone(),
1733                instrumented_at: Instant::now(),
1734                woke_at: AtomicU64::new(0),
1735                waker: AtomicWaker::new(),
1736            }),
1737        }
1738    }
1739
1740    /// Produces [`TaskMetrics`] for the tasks instrumented by this [`TaskMonitor`], collected since
1741    /// the construction of [`TaskMonitor`].
1742    ///
1743    /// ##### See also
1744    /// - [`TaskMonitor::intervals`]:
1745    ///   produces [`TaskMetrics`] for user-defined sampling intervals, instead of cumulatively
1746    ///
1747    /// ##### Examples
1748    /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1749    /// within the second sampling interval, and 2 slow polls occur within the third sampling
1750    /// interval; five slow polls occur across all sampling intervals:
1751    /// ```
1752    /// use std::future::Future;
1753    /// use std::time::Duration;
1754    ///
1755    /// #[tokio::main]
1756    /// async fn main() {
1757    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1758    ///
1759    ///     // initialize a stream of sampling intervals
1760    ///     let mut intervals = metrics_monitor.intervals();
1761    ///     // each call of `next_interval` will produce metrics for the last sampling interval
1762    ///     let mut next_interval = || intervals.next().unwrap();
1763    ///
1764    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1765    ///
1766    ///     // this task completes in three slow polls
1767    ///     let _ = metrics_monitor.instrument(async {
1768    ///         spin_for(slow).await; // slow poll 1
1769    ///         spin_for(slow).await; // slow poll 2
1770    ///         spin_for(slow)        // slow poll 3
1771    ///     }).await;
1772    ///
1773    ///     // in the previous sampling interval, there were 3 slow polls
1774    ///     assert_eq!(next_interval().total_slow_poll_count, 3);
1775    ///     assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 3);
1776    ///
1777    ///     // this task completes in two slow polls
1778    ///     let _ = metrics_monitor.instrument(async {
1779    ///         spin_for(slow).await; // slow poll 1
1780    ///         spin_for(slow)        // slow poll 2
1781    ///     }).await;
1782    ///
1783    ///     // in the previous sampling interval, there were 2 slow polls
1784    ///     assert_eq!(next_interval().total_slow_poll_count, 2);
1785    ///
1786    ///     // across all sampling interval, there were a total of 5 slow polls
1787    ///     assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1788    /// }
1789    ///
1790    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1791    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1792    ///     let start = tokio::time::Instant::now();
1793    ///     while start.elapsed() <= duration {}
1794    ///     tokio::task::yield_now()
1795    /// }
1796    /// ```
1797    pub fn cumulative(&self) -> TaskMetrics {
1798        self.metrics.metrics()
1799    }
1800
1801    /// Produces an unending iterator of metric sampling intervals.
1802    ///
1803    /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1804    /// produced by [`TaskMonitor::intervals`]. The item type of this iterator is [`TaskMetrics`],
1805    /// which is a bundle of task metrics that describe *only* events occurring within that sampling
1806    /// interval.
1807    ///
1808    /// ##### Examples
1809    /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1810    /// within the second sampling interval, and 2 slow polls occur within the third sampling
1811    /// interval; five slow polls occur across all sampling intervals:
1812    /// ```
1813    /// use std::future::Future;
1814    /// use std::time::Duration;
1815    ///
1816    /// #[tokio::main]
1817    /// async fn main() {
1818    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1819    ///
1820    ///     // initialize a stream of sampling intervals
1821    ///     let mut intervals = metrics_monitor.intervals();
1822    ///     // each call of `next_interval` will produce metrics for the last sampling interval
1823    ///     let mut next_interval = || intervals.next().unwrap();
1824    ///
1825    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1826    ///
1827    ///     // this task completes in three slow polls
1828    ///     let _ = metrics_monitor.instrument(async {
1829    ///         spin_for(slow).await; // slow poll 1
1830    ///         spin_for(slow).await; // slow poll 2
1831    ///         spin_for(slow)        // slow poll 3
1832    ///     }).await;
1833    ///
1834    ///     // in the previous sampling interval, there were 3 slow polls
1835    ///     assert_eq!(next_interval().total_slow_poll_count, 3);
1836    ///
1837    ///     // this task completes in two slow polls
1838    ///     let _ = metrics_monitor.instrument(async {
1839    ///         spin_for(slow).await; // slow poll 1
1840    ///         spin_for(slow)        // slow poll 2
1841    ///     }).await;
1842    ///
1843    ///     // in the previous sampling interval, there were 2 slow polls
1844    ///     assert_eq!(next_interval().total_slow_poll_count, 2);
1845    ///
1846    ///     // across all sampling intervals, there were a total of 5 slow polls
1847    ///     assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1848    /// }
1849    ///
1850    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1851    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1852    ///     let start = tokio::time::Instant::now();
1853    ///     while start.elapsed() <= duration {}
1854    ///     tokio::task::yield_now()
1855    /// }
1856    /// ```
1857    pub fn intervals(&self) -> TaskIntervals {
1858        TaskIntervals {
1859            metrics: self.metrics.clone(),
1860            previous: None,
1861        }
1862    }
1863}
1864
1865impl RawMetrics {
1866    fn get_and_reset_local_max_idle_duration(&self) -> Duration {
1867        Duration::from_nanos(self.local_max_idle_duration_ns.swap(0, SeqCst))
1868    }
1869
1870    fn metrics(&self) -> TaskMetrics {
1871        let total_fast_poll_count = self.total_fast_poll_count.load(SeqCst);
1872        let total_slow_poll_count = self.total_slow_poll_count.load(SeqCst);
1873
1874        let total_fast_poll_duration =
1875            Duration::from_nanos(self.total_fast_poll_duration_ns.load(SeqCst));
1876        let total_slow_poll_duration =
1877            Duration::from_nanos(self.total_slow_poll_duration.load(SeqCst));
1878
1879        let total_poll_count = total_fast_poll_count + total_slow_poll_count;
1880        let total_poll_duration = total_fast_poll_duration + total_slow_poll_duration;
1881
1882        TaskMetrics {
1883            instrumented_count: self.instrumented_count.load(SeqCst),
1884            dropped_count: self.dropped_count.load(SeqCst),
1885
1886            total_poll_count,
1887            total_poll_duration,
1888            first_poll_count: self.first_poll_count.load(SeqCst),
1889            total_idled_count: self.total_idled_count.load(SeqCst),
1890            total_scheduled_count: self.total_scheduled_count.load(SeqCst),
1891            total_fast_poll_count: self.total_fast_poll_count.load(SeqCst),
1892            total_slow_poll_count: self.total_slow_poll_count.load(SeqCst),
1893            total_short_delay_count: self.total_short_delay_count.load(SeqCst),
1894            total_long_delay_count: self.total_long_delay_count.load(SeqCst),
1895            total_first_poll_delay: Duration::from_nanos(
1896                self.total_first_poll_delay_ns.load(SeqCst),
1897            ),
1898            max_idle_duration: Duration::from_nanos(self.global_max_idle_duration_ns.load(SeqCst)),
1899            total_idle_duration: Duration::from_nanos(self.total_idle_duration_ns.load(SeqCst)),
1900            total_scheduled_duration: Duration::from_nanos(
1901                self.total_scheduled_duration_ns.load(SeqCst),
1902            ),
1903            total_fast_poll_duration: Duration::from_nanos(
1904                self.total_fast_poll_duration_ns.load(SeqCst),
1905            ),
1906            total_slow_poll_duration: Duration::from_nanos(
1907                self.total_slow_poll_duration.load(SeqCst),
1908            ),
1909            total_short_delay_duration: Duration::from_nanos(
1910                self.total_short_delay_duration_ns.load(SeqCst),
1911            ),
1912            total_long_delay_duration: Duration::from_nanos(
1913                self.total_long_delay_duration_ns.load(SeqCst),
1914            ),
1915        }
1916    }
1917}
1918
1919impl Default for TaskMonitor {
1920    fn default() -> TaskMonitor {
1921        TaskMonitor::new()
1922    }
1923}
1924
1925impl TaskMetrics {
1926    /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
1927    /// are first polled.
1928    ///
1929    /// ##### Definition
1930    /// This metric is derived from [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay]
1931    /// ÷ [`first_poll_count`][TaskMetrics::first_poll_count].
1932    ///
1933    /// ##### Interpretation
1934    /// If this metric increases, it means that, on average, tasks spent longer waiting to be
1935    /// initially polled.
1936    ///
1937    /// ##### See also
1938    /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**   
1939    ///   The mean duration that tasks spent waiting to be executed after awakening.
1940    ///
1941    /// ##### Examples
1942    /// In the below example, no tasks are instrumented or polled within the first sampling
1943    /// interval; in the second sampling interval, 500ms elapse between the instrumentation of a
1944    /// task and its first poll; in the third sampling interval, a mean of 750ms elapse between the
1945    /// instrumentation and first poll of two tasks:
1946    /// ```
1947    /// use std::time::Duration;
1948    ///
1949    /// #[tokio::main]
1950    /// async fn main() {
1951    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1952    ///     let mut interval = metrics_monitor.intervals();
1953    ///     let mut next_interval = || interval.next().unwrap();
1954    ///
1955    ///     // no tasks have yet been created, instrumented, or polled
1956    ///     assert_eq!(next_interval().mean_first_poll_delay(), Duration::ZERO);
1957    ///
1958    ///     // constructs and instruments a task, pauses for `pause_time`, awaits the task, then
1959    ///     // produces the total time it took to do all of the aforementioned
1960    ///     async fn instrument_pause_await(
1961    ///         metrics_monitor: &tokio_metrics::TaskMonitor,
1962    ///         pause_time: Duration
1963    ///     ) -> Duration
1964    ///     {
1965    ///         let before_instrumentation = tokio::time::Instant::now();
1966    ///         let task = metrics_monitor.instrument(async move {});
1967    ///         tokio::time::sleep(pause_time).await;
1968    ///         task.await;
1969    ///         before_instrumentation.elapsed()
1970    ///     }
1971    ///
1972    ///     // construct and await a task that pauses for 500ms between instrumentation and first poll
1973    ///     let task_a_pause_time = Duration::from_millis(500);
1974    ///     let task_a_total_time = instrument_pause_await(&metrics_monitor, task_a_pause_time).await;
1975    ///
1976    ///     // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
1977    ///     // pause time of 500ms, and less-than-or-equal-to the total runtime of `task_a`
1978    ///     let mean_first_poll_delay = next_interval().mean_first_poll_delay();
1979    ///     assert!(mean_first_poll_delay >= task_a_pause_time);
1980    ///     assert!(mean_first_poll_delay <= task_a_total_time);
1981    ///
1982    ///     // construct and await a task that pauses for 500ms between instrumentation and first poll
1983    ///     let task_b_pause_time = Duration::from_millis(500);
1984    ///     let task_b_total_time = instrument_pause_await(&metrics_monitor, task_b_pause_time).await;
1985    ///
1986    ///     // construct and await a task that pauses for 1000ms between instrumentation and first poll
1987    ///     let task_c_pause_time = Duration::from_millis(1000);
1988    ///     let task_c_total_time = instrument_pause_await(&metrics_monitor, task_c_pause_time).await;
1989    ///
1990    ///     // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
1991    ///     // average pause time of 500ms, and less-than-or-equal-to the combined total runtime of
1992    ///     // `task_b` and `task_c`
1993    ///     let mean_first_poll_delay = next_interval().mean_first_poll_delay();
1994    ///     assert!(mean_first_poll_delay >= (task_b_pause_time + task_c_pause_time) / 2);
1995    ///     assert!(mean_first_poll_delay <= (task_b_total_time + task_c_total_time) / 2);
1996    /// }
1997    /// ```
1998    pub fn mean_first_poll_delay(&self) -> Duration {
1999        mean(self.total_first_poll_delay, self.first_poll_count)
2000    }
2001
2002    /// The mean duration of idles.
2003    ///
2004    /// ##### Definition
2005    /// This metric is derived from [`total_idle_duration`][TaskMetrics::total_idle_duration] ÷
2006    /// [`total_idled_count`][TaskMetrics::total_idled_count].
2007    ///
2008    /// ##### Interpretation
2009    /// The idle state is the duration spanning the instant a task completes a poll, and the instant
2010    /// that it is next awoken. Tasks inhabit this state when they are waiting for task-external
2011    /// events to complete (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this
2012    /// metric increases, it means that tasks, in aggregate, spent more time waiting for
2013    /// task-external events to complete.
2014    ///
2015    /// ##### Examples
2016    /// ```
2017    /// #[tokio::main]
2018    /// async fn main() {
2019    ///     let monitor = tokio_metrics::TaskMonitor::new();
2020    ///     let one_sec = std::time::Duration::from_secs(1);
2021    ///
2022    ///     monitor.instrument(async move {
2023    ///         tokio::time::sleep(one_sec).await;
2024    ///     }).await;
2025    ///
2026    ///     assert!(monitor.cumulative().mean_idle_duration() >= one_sec);
2027    /// }
2028    /// ```
2029    pub fn mean_idle_duration(&self) -> Duration {
2030        mean(self.total_idle_duration, self.total_idled_count)
2031    }
2032
2033    /// The mean duration that tasks spent waiting to be executed after awakening.
2034    ///
2035    /// ##### Definition
2036    /// This metric is derived from
2037    /// [`total_scheduled_duration`][TaskMetrics::total_scheduled_duration] ÷
2038    /// [`total_scheduled_count`][`TaskMetrics::total_scheduled_count`].
2039    ///
2040    /// ##### Interpretation
2041    /// If this metric increases, it means that, on average, tasks spent longer in the runtime's
2042    /// queues before being polled.
2043    ///
2044    /// ##### See also
2045    /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**   
2046    ///   The mean duration elapsed between the instant tasks are instrumented, and the instant they
2047    ///   are first polled.
2048    ///
2049    /// ##### Examples
2050    /// ```
2051    /// use tokio::time::Duration;
2052    ///
2053    /// #[tokio::main(flavor = "current_thread")]
2054    /// async fn main() {
2055    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2056    ///     let mut interval = metrics_monitor.intervals();
2057    ///     let mut next_interval = || interval.next().unwrap();
2058    ///
2059    ///     // construct and instrument and spawn a task that yields endlessly
2060    ///     tokio::spawn(metrics_monitor.instrument(async {
2061    ///         loop { tokio::task::yield_now().await }
2062    ///     }));
2063    ///
2064    ///     tokio::task::yield_now().await;
2065    ///
2066    ///     // block the executor for 1 second
2067    ///     std::thread::sleep(Duration::from_millis(1000));
2068    ///
2069    ///     // get the task to run twice
2070    ///     // the first will have a 1 sec scheduling delay, the second will have almost none
2071    ///     tokio::task::yield_now().await;
2072    ///     tokio::task::yield_now().await;
2073    ///
2074    ///     // `endless_task` will have spent approximately one second waiting
2075    ///     let mean_scheduled_duration = next_interval().mean_scheduled_duration();
2076    ///     assert!(mean_scheduled_duration >= Duration::from_millis(500), "{}", mean_scheduled_duration.as_secs_f64());
2077    ///     assert!(mean_scheduled_duration <= Duration::from_millis(600), "{}", mean_scheduled_duration.as_secs_f64());
2078    /// }
2079    /// ```
2080    pub fn mean_scheduled_duration(&self) -> Duration {
2081        mean(self.total_scheduled_duration, self.total_scheduled_count)
2082    }
2083
2084    /// The mean duration of polls.
2085    ///
2086    /// ##### Definition
2087    /// This metric is derived from [`total_poll_duration`][TaskMetrics::total_poll_duration] ÷
2088    /// [`total_poll_count`][TaskMetrics::total_poll_count].
2089    ///
2090    /// ##### Interpretation
2091    /// If this metric increases, it means that, on average, individual polls are tending to take
2092    /// longer. However, this does not necessarily imply increased task latency: An increase in poll
2093    /// durations could be offset by fewer polls.
2094    ///
2095    /// ##### See also
2096    /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**   
2097    ///   The ratio between the number polls categorized as slow and fast.
2098    /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**   
2099    ///   The mean duration of slow polls.
2100    ///
2101    /// ##### Examples
2102    /// ```
2103    /// use std::time::Duration;
2104    ///
2105    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
2106    /// async fn main() {
2107    ///     let monitor = tokio_metrics::TaskMonitor::new();
2108    ///     let mut interval = monitor.intervals();
2109    ///     let mut next_interval = move || interval.next().unwrap();
2110    ///  
2111    ///     assert_eq!(next_interval().mean_poll_duration(), Duration::ZERO);
2112    ///  
2113    ///     monitor.instrument(async {
2114    ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
2115    ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
2116    ///         ()                                                  // poll 3 (0s)
2117    ///     }).await;
2118    ///  
2119    ///     assert_eq!(next_interval().mean_poll_duration(), Duration::from_secs(2) / 3);
2120    /// }
2121    /// ```
2122    pub fn mean_poll_duration(&self) -> Duration {
2123        mean(self.total_poll_duration, self.total_poll_count)
2124    }
2125
2126    /// The ratio between the number polls categorized as slow and fast.
2127    ///
2128    /// ##### Definition
2129    /// This metric is derived from [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count] ÷
2130    /// [`total_poll_count`][TaskMetrics::total_poll_count].
2131    ///
2132    /// ##### Interpretation
2133    /// If this metric increases, it means that a greater proportion of polls took excessively long
2134    /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2135    /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2136    /// However, as a rule, *should* yield to the scheduler frequently.
2137    ///
2138    /// ##### See also
2139    /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**   
2140    ///   The mean duration of polls.
2141    /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**   
2142    ///   The mean duration of slow polls.
2143    ///
2144    /// ##### Examples
2145    /// Changes in this metric may be observed by varying the ratio of slow and slow fast within
2146    /// sampling intervals; for instance:
2147    /// ```
2148    /// use std::future::Future;
2149    /// use std::time::Duration;
2150    ///
2151    /// #[tokio::main]
2152    /// async fn main() {
2153    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2154    ///     let mut interval = metrics_monitor.intervals();
2155    ///     let mut next_interval = || interval.next().unwrap();
2156    ///
2157    ///     // no tasks have been constructed, instrumented, or polled
2158    ///     let interval = next_interval();
2159    ///     assert_eq!(interval.total_fast_poll_count, 0);
2160    ///     assert_eq!(interval.total_slow_poll_count, 0);
2161    ///     assert!(interval.slow_poll_ratio().is_nan());
2162    ///
2163    ///     let fast = Duration::ZERO;
2164    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
2165    ///
2166    ///     // this task completes in three fast polls
2167    ///     metrics_monitor.instrument(async {
2168    ///         spin_for(fast).await;   // fast poll 1
2169    ///         spin_for(fast).await;   // fast poll 2
2170    ///         spin_for(fast);         // fast poll 3
2171    ///     }).await;
2172    ///
2173    ///     // this task completes in two slow polls
2174    ///     metrics_monitor.instrument(async {
2175    ///         spin_for(slow).await;   // slow poll 1
2176    ///         spin_for(slow);         // slow poll 2
2177    ///     }).await;
2178    ///
2179    ///     let interval = next_interval();
2180    ///     assert_eq!(interval.total_fast_poll_count, 3);
2181    ///     assert_eq!(interval.total_slow_poll_count, 2);
2182    ///     assert_eq!(interval.slow_poll_ratio(), ratio(2., 3.));
2183    ///
2184    ///     // this task completes in three slow polls
2185    ///     metrics_monitor.instrument(async {
2186    ///         spin_for(slow).await;   // slow poll 1
2187    ///         spin_for(slow).await;   // slow poll 2
2188    ///         spin_for(slow);         // slow poll 3
2189    ///     }).await;
2190    ///
2191    ///     // this task completes in two fast polls
2192    ///     metrics_monitor.instrument(async {
2193    ///         spin_for(fast).await; // fast poll 1
2194    ///         spin_for(fast);       // fast poll 2
2195    ///     }).await;
2196    ///
2197    ///     let interval = next_interval();
2198    ///     assert_eq!(interval.total_fast_poll_count, 2);
2199    ///     assert_eq!(interval.total_slow_poll_count, 3);
2200    ///     assert_eq!(interval.slow_poll_ratio(), ratio(3., 2.));
2201    /// }
2202    ///
2203    /// fn ratio(a: f64, b: f64) -> f64 {
2204    ///     a / (a + b)
2205    /// }
2206    ///
2207    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2208    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2209    ///     let start = tokio::time::Instant::now();
2210    ///     while start.elapsed() <= duration {}
2211    ///     tokio::task::yield_now()
2212    /// }
2213    /// ```
2214    pub fn slow_poll_ratio(&self) -> f64 {
2215        self.total_slow_poll_count as f64 / self.total_poll_count as f64
2216    }
2217
2218    /// The ratio of tasks exceeding [`long_delay_threshold`][TaskMonitor::long_delay_threshold].
2219    ///
2220    /// ##### Definition
2221    /// This metric is derived from [`total_long_delay_count`][TaskMetrics::total_long_delay_count] ÷
2222    /// [`total_scheduled_count`][TaskMetrics::total_scheduled_count].
2223    pub fn long_delay_ratio(&self) -> f64 {
2224        self.total_long_delay_count as f64 / self.total_scheduled_count as f64
2225    }
2226
2227    /// The mean duration of fast polls.
2228    ///
2229    /// ##### Definition
2230    /// This metric is derived from
2231    /// [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration] ÷
2232    /// [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count].
2233    ///
2234    /// ##### Examples
2235    /// In the below example, no tasks are polled in the first sampling interval; three fast polls
2236    /// consume a mean of
2237    /// ⅜ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2238    /// second sampling interval; and two fast polls consume a total of
2239    /// ½ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2240    /// third sampling interval:
2241    /// ```
2242    /// use std::future::Future;
2243    /// use std::time::Duration;
2244    ///
2245    /// #[tokio::main]
2246    /// async fn main() {
2247    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2248    ///     let mut interval = metrics_monitor.intervals();
2249    ///     let mut next_interval = || interval.next().unwrap();
2250    ///
2251    ///     // no tasks have been constructed, instrumented, or polled
2252    ///     assert_eq!(next_interval().mean_fast_poll_duration(), Duration::ZERO);
2253    ///
2254    ///     let threshold = metrics_monitor.slow_poll_threshold();
2255    ///     let fast_1 = 1 * Duration::from_micros(1);
2256    ///     let fast_2 = 2 * Duration::from_micros(1);
2257    ///     let fast_3 = 3 * Duration::from_micros(1);
2258    ///
2259    ///     // this task completes in two fast polls
2260    ///     let total_time = time(metrics_monitor.instrument(async {
2261    ///         spin_for(fast_1).await; // fast poll 1
2262    ///         spin_for(fast_2)        // fast poll 2
2263    ///     })).await;
2264    ///
2265    ///     // `mean_fast_poll_duration` ≈ the mean of `fast_1` and `fast_2`
2266    ///     let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2267    ///     assert!(mean_fast_poll_duration >= (fast_1 + fast_2) / 2);
2268    ///     assert!(mean_fast_poll_duration <= total_time / 2);
2269    ///
2270    ///     // this task completes in three fast polls
2271    ///     let total_time = time(metrics_monitor.instrument(async {
2272    ///         spin_for(fast_1).await; // fast poll 1
2273    ///         spin_for(fast_2).await; // fast poll 2
2274    ///         spin_for(fast_3)        // fast poll 3
2275    ///     })).await;
2276    ///
2277    ///     // `mean_fast_poll_duration` ≈ the mean of `fast_1`, `fast_2`, `fast_3`
2278    ///     let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2279    ///     assert!(mean_fast_poll_duration >= (fast_1 + fast_2 + fast_3) / 3);
2280    ///     assert!(mean_fast_poll_duration <= total_time / 3);
2281    /// }
2282    ///
2283    /// /// Produces the amount of time it took to await a given task.
2284    /// async fn time(task: impl Future) -> Duration {
2285    ///     let start = tokio::time::Instant::now();
2286    ///     task.await;
2287    ///     start.elapsed()
2288    /// }
2289    ///
2290    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2291    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2292    ///     let start = tokio::time::Instant::now();
2293    ///     while start.elapsed() <= duration {}
2294    ///     tokio::task::yield_now()
2295    /// }
2296    /// ```
2297    pub fn mean_fast_poll_duration(&self) -> Duration {
2298        mean(self.total_fast_poll_duration, self.total_fast_poll_count)
2299    }
2300
2301    /// The average time taken for a task with a short scheduling delay to be executed after being
2302    /// scheduled.
2303    ///
2304    /// ##### Definition
2305    /// This metric is derived from
2306    /// [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration] ÷
2307    /// [`total_short_delay_count`][TaskMetrics::total_short_delay_count].
2308    pub fn mean_short_delay_duration(&self) -> Duration {
2309        mean(
2310            self.total_short_delay_duration,
2311            self.total_short_delay_count,
2312        )
2313    }
2314
2315    /// The mean duration of slow polls.
2316    ///
2317    /// ##### Definition
2318    /// This metric is derived from
2319    /// [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration] ÷
2320    /// [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
2321    ///
2322    /// ##### Interpretation
2323    /// If this metric increases, it means that a greater proportion of polls took excessively long
2324    /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2325    /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2326    ///
2327    /// ##### See also
2328    /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**   
2329    ///   The mean duration of polls.
2330    /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**   
2331    ///   The ratio between the number polls categorized as slow and fast.
2332    ///
2333    /// ##### Interpretation
2334    /// If this metric increases, it means that, on average, slow polls got even slower. This does
2335    /// necessarily imply increased task latency: An increase in average slow poll duration could be
2336    /// offset by fewer or faster polls. However, as a rule, *should* yield to the scheduler
2337    /// frequently.
2338    ///
2339    /// ##### Examples
2340    /// In the below example, no tasks are polled in the first sampling interval; three slow polls
2341    /// consume a mean of
2342    /// 1.5 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2343    /// second sampling interval; and two slow polls consume a total of
2344    /// 2 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2345    /// third sampling interval:
2346    /// ```
2347    /// use std::future::Future;
2348    /// use std::time::Duration;
2349    ///
2350    /// #[tokio::main]
2351    /// async fn main() {
2352    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2353    ///     let mut interval = metrics_monitor.intervals();
2354    ///     let mut next_interval = || interval.next().unwrap();
2355    ///
2356    ///     // no tasks have been constructed, instrumented, or polled
2357    ///     assert_eq!(next_interval().mean_slow_poll_duration(), Duration::ZERO);
2358    ///
2359    ///     let threshold = metrics_monitor.slow_poll_threshold();
2360    ///     let slow_1 = 1 * threshold;
2361    ///     let slow_2 = 2 * threshold;
2362    ///     let slow_3 = 3 * threshold;
2363    ///
2364    ///     // this task completes in two slow polls
2365    ///     let total_time = time(metrics_monitor.instrument(async {
2366    ///         spin_for(slow_1).await; // slow poll 1
2367    ///         spin_for(slow_2)        // slow poll 2
2368    ///     })).await;
2369    ///
2370    ///     // `mean_slow_poll_duration` ≈ the mean of `slow_1` and `slow_2`
2371    ///     let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2372    ///     assert!(mean_slow_poll_duration >= (slow_1 + slow_2) / 2);
2373    ///     assert!(mean_slow_poll_duration <= total_time / 2);
2374    ///
2375    ///     // this task completes in three slow polls
2376    ///     let total_time = time(metrics_monitor.instrument(async {
2377    ///         spin_for(slow_1).await; // slow poll 1
2378    ///         spin_for(slow_2).await; // slow poll 2
2379    ///         spin_for(slow_3)        // slow poll 3
2380    ///     })).await;
2381    ///
2382    ///     // `mean_slow_poll_duration` ≈ the mean of `slow_1`, `slow_2`, `slow_3`
2383    ///     let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2384    ///     assert!(mean_slow_poll_duration >= (slow_1 + slow_2 + slow_3) / 3);
2385    ///     assert!(mean_slow_poll_duration <= total_time / 3);
2386    /// }
2387    ///
2388    /// /// Produces the amount of time it took to await a given task.
2389    /// async fn time(task: impl Future) -> Duration {
2390    ///     let start = tokio::time::Instant::now();
2391    ///     task.await;
2392    ///     start.elapsed()
2393    /// }
2394    ///
2395    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2396    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2397    ///     let start = tokio::time::Instant::now();
2398    ///     while start.elapsed() <= duration {}
2399    ///     tokio::task::yield_now()
2400    /// }
2401    /// ```
2402    pub fn mean_slow_poll_duration(&self) -> Duration {
2403        mean(self.total_slow_poll_duration, self.total_slow_poll_count)
2404    }
2405
2406    /// The average scheduling delay for a task which takes a long time to start executing after
2407    /// being scheduled.
2408    ///
2409    /// ##### Definition
2410    /// This metric is derived from
2411    /// [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration] ÷
2412    /// [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
2413    pub fn mean_long_delay_duration(&self) -> Duration {
2414        mean(self.total_long_delay_duration, self.total_long_delay_count)
2415    }
2416}
2417
2418impl<T: Future> Future for Instrumented<T> {
2419    type Output = T::Output;
2420
2421    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2422        instrument_poll(cx, self, Future::poll)
2423    }
2424}
2425
2426impl<T: Stream> Stream for Instrumented<T> {
2427    type Item = T::Item;
2428
2429    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2430        instrument_poll(cx, self, Stream::poll_next)
2431    }
2432}
2433
2434fn instrument_poll<T, Out>(
2435    cx: &mut Context<'_>,
2436    instrumented: Pin<&mut Instrumented<T>>,
2437    poll_fn: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Out>,
2438) -> Poll<Out> {
2439    let poll_start = Instant::now();
2440    let this = instrumented.project();
2441    let idled_at = this.idled_at;
2442    let state = this.state;
2443    let instrumented_at = state.instrumented_at;
2444    let metrics = &state.metrics;
2445    /* accounting for time-to-first-poll and tasks-count */
2446    // is this the first time this task has been polled?
2447    if !*this.did_poll_once {
2448        // if so, we need to do three things:
2449        /* 1. note that this task *has* been polled */
2450        *this.did_poll_once = true;
2451
2452        /* 2. account for the time-to-first-poll of this task */
2453        // if the time-to-first-poll of this task exceeds `u64::MAX` ns,
2454        // round down to `u64::MAX` nanoseconds
2455        let elapsed = (poll_start - instrumented_at)
2456            .as_nanos()
2457            .try_into()
2458            .unwrap_or(u64::MAX);
2459        // add this duration to `time_to_first_poll_ns_total`
2460        metrics.total_first_poll_delay_ns.fetch_add(elapsed, SeqCst);
2461
2462        /* 3. increment the count of tasks that have been polled at least once */
2463        state.metrics.first_poll_count.fetch_add(1, SeqCst);
2464    }
2465    /* accounting for time-idled and time-scheduled */
2466    // 1. note (and reset) the instant this task was last awoke
2467    let woke_at = state.woke_at.swap(0, SeqCst);
2468    // The state of a future is *idling* in the interim between the instant
2469    // it completes a `poll`, and the instant it is next awoken.
2470    if *idled_at < woke_at {
2471        // increment the counter of how many idles occurred
2472        metrics.total_idled_count.fetch_add(1, SeqCst);
2473
2474        // compute the duration of the idle
2475        let idle_ns = woke_at - *idled_at;
2476
2477        // update the max time tasks spent idling, both locally and
2478        // globally.
2479        metrics
2480            .local_max_idle_duration_ns
2481            .fetch_max(idle_ns, SeqCst);
2482        metrics
2483            .global_max_idle_duration_ns
2484            .fetch_max(idle_ns, SeqCst);
2485        // adjust the total elapsed time monitored tasks spent idling
2486        metrics.total_idle_duration_ns.fetch_add(idle_ns, SeqCst);
2487    }
2488    // if this task spent any time in the scheduled state after instrumentation,
2489    // and after first poll, `woke_at` will be greater than 0.
2490    if woke_at > 0 {
2491        // increment the counter of how many schedules occurred
2492        metrics.total_scheduled_count.fetch_add(1, SeqCst);
2493
2494        // recall that the `woke_at` field is internally represented as
2495        // nanoseconds-since-instrumentation. here, for accounting purposes,
2496        // we need to instead represent it as a proper `Instant`.
2497        let woke_instant = instrumented_at + Duration::from_nanos(woke_at);
2498
2499        // the duration this task spent scheduled is time time elapsed between
2500        // when this task was awoke, and when it was polled.
2501        let scheduled_ns = (poll_start - woke_instant)
2502            .as_nanos()
2503            .try_into()
2504            .unwrap_or(u64::MAX);
2505
2506        let scheduled = Duration::from_nanos(scheduled_ns);
2507
2508        let (count_bucket, duration_bucket) = // was the scheduling delay long or short?
2509            if scheduled >= metrics.long_delay_threshold {
2510                (&metrics.total_long_delay_count, &metrics.total_long_delay_duration_ns)
2511            } else {
2512                (&metrics.total_short_delay_count, &metrics.total_short_delay_duration_ns)
2513            };
2514        // update the appropriate bucket
2515        count_bucket.fetch_add(1, SeqCst);
2516        duration_bucket.fetch_add(scheduled_ns, SeqCst);
2517
2518        // add `scheduled_ns` to the Monitor's total
2519        metrics
2520            .total_scheduled_duration_ns
2521            .fetch_add(scheduled_ns, SeqCst);
2522    }
2523    // Register the waker
2524    state.waker.register(cx.waker());
2525    // Get the instrumented waker
2526    let waker_ref = futures_util::task::waker_ref(state);
2527    let mut cx = Context::from_waker(&waker_ref);
2528    // Poll the task
2529    let inner_poll_start = Instant::now();
2530    let ret = poll_fn(this.task, &mut cx);
2531    let inner_poll_end = Instant::now();
2532    /* idle time starts now */
2533    *idled_at = (inner_poll_end - instrumented_at)
2534        .as_nanos()
2535        .try_into()
2536        .unwrap_or(u64::MAX);
2537    /* accounting for poll time */
2538    let inner_poll_duration = inner_poll_end - inner_poll_start;
2539    let inner_poll_ns: u64 = inner_poll_duration
2540        .as_nanos()
2541        .try_into()
2542        .unwrap_or(u64::MAX);
2543    let (count_bucket, duration_bucket) = // was this a slow or fast poll?
2544            if inner_poll_duration >= metrics.slow_poll_threshold {
2545                (&metrics.total_slow_poll_count, &metrics.total_slow_poll_duration)
2546            } else {
2547                (&metrics.total_fast_poll_count, &metrics.total_fast_poll_duration_ns)
2548            };
2549    // update the appropriate bucket
2550    count_bucket.fetch_add(1, SeqCst);
2551    duration_bucket.fetch_add(inner_poll_ns, SeqCst);
2552    ret
2553}
2554
2555impl State {
2556    fn on_wake(&self) {
2557        let woke_at: u64 = match self.instrumented_at.elapsed().as_nanos().try_into() {
2558            Ok(woke_at) => woke_at,
2559            // This is highly unlikely as it would mean the task ran for over
2560            // 500 years. If you ran your service for 500 years. If you are
2561            // reading this 500 years in the future, I'm sorry.
2562            Err(_) => return,
2563        };
2564
2565        // We don't actually care about the result
2566        let _ = self.woke_at.compare_exchange(0, woke_at, SeqCst, SeqCst);
2567    }
2568}
2569
2570impl ArcWake for State {
2571    fn wake_by_ref(arc_self: &Arc<State>) {
2572        arc_self.on_wake();
2573        arc_self.waker.wake();
2574    }
2575
2576    fn wake(self: Arc<State>) {
2577        self.on_wake();
2578        self.waker.wake();
2579    }
2580}
2581
2582/// Iterator returned by [`TaskMonitor::intervals`].
2583///
2584/// See that method's documentation for more details.
2585#[derive(Debug)]
2586pub struct TaskIntervals {
2587    metrics: Arc<RawMetrics>,
2588    previous: Option<TaskMetrics>,
2589}
2590
2591impl TaskIntervals {
2592    fn probe(&mut self) -> TaskMetrics {
2593        let latest = self.metrics.metrics();
2594        let local_max_idle_duration = self.metrics.get_and_reset_local_max_idle_duration();
2595
2596        let next = if let Some(previous) = self.previous {
2597            TaskMetrics {
2598                instrumented_count: latest
2599                    .instrumented_count
2600                    .wrapping_sub(previous.instrumented_count),
2601                dropped_count: latest.dropped_count.wrapping_sub(previous.dropped_count),
2602                total_poll_count: latest
2603                    .total_poll_count
2604                    .wrapping_sub(previous.total_poll_count),
2605                total_poll_duration: sub(latest.total_poll_duration, previous.total_poll_duration),
2606                first_poll_count: latest
2607                    .first_poll_count
2608                    .wrapping_sub(previous.first_poll_count),
2609                total_idled_count: latest
2610                    .total_idled_count
2611                    .wrapping_sub(previous.total_idled_count),
2612                total_scheduled_count: latest
2613                    .total_scheduled_count
2614                    .wrapping_sub(previous.total_scheduled_count),
2615                total_fast_poll_count: latest
2616                    .total_fast_poll_count
2617                    .wrapping_sub(previous.total_fast_poll_count),
2618                total_short_delay_count: latest
2619                    .total_short_delay_count
2620                    .wrapping_sub(previous.total_short_delay_count),
2621                total_slow_poll_count: latest
2622                    .total_slow_poll_count
2623                    .wrapping_sub(previous.total_slow_poll_count),
2624                total_long_delay_count: latest
2625                    .total_long_delay_count
2626                    .wrapping_sub(previous.total_long_delay_count),
2627                total_first_poll_delay: sub(
2628                    latest.total_first_poll_delay,
2629                    previous.total_first_poll_delay,
2630                ),
2631                max_idle_duration: local_max_idle_duration,
2632                total_idle_duration: sub(latest.total_idle_duration, previous.total_idle_duration),
2633                total_scheduled_duration: sub(
2634                    latest.total_scheduled_duration,
2635                    previous.total_scheduled_duration,
2636                ),
2637                total_fast_poll_duration: sub(
2638                    latest.total_fast_poll_duration,
2639                    previous.total_fast_poll_duration,
2640                ),
2641                total_short_delay_duration: sub(
2642                    latest.total_short_delay_duration,
2643                    previous.total_short_delay_duration,
2644                ),
2645                total_slow_poll_duration: sub(
2646                    latest.total_slow_poll_duration,
2647                    previous.total_slow_poll_duration,
2648                ),
2649                total_long_delay_duration: sub(
2650                    latest.total_long_delay_duration,
2651                    previous.total_long_delay_duration,
2652                ),
2653            }
2654        } else {
2655            latest
2656        };
2657
2658        self.previous = Some(latest);
2659
2660        next
2661    }
2662}
2663
2664impl Iterator for TaskIntervals {
2665    type Item = TaskMetrics;
2666
2667    fn next(&mut self) -> Option<Self::Item> {
2668        Some(self.probe())
2669    }
2670}
2671
2672#[inline(always)]
2673fn to_nanos(d: Duration) -> u64 {
2674    debug_assert!(d <= Duration::from_nanos(u64::MAX));
2675    d.as_secs()
2676        .wrapping_mul(1_000_000_000)
2677        .wrapping_add(d.subsec_nanos() as u64)
2678}
2679
2680#[inline(always)]
2681fn sub(a: Duration, b: Duration) -> Duration {
2682    let nanos = to_nanos(a).wrapping_sub(to_nanos(b));
2683    Duration::from_nanos(nanos)
2684}
2685
2686#[inline(always)]
2687fn mean(d: Duration, count: u64) -> Duration {
2688    if let Some(quotient) = to_nanos(d).checked_div(count) {
2689        Duration::from_nanos(quotient)
2690    } else {
2691        Duration::ZERO
2692    }
2693}