tokio_metrics/task.rs
1use futures_util::task::{ArcWake, AtomicWaker};
2use pin_project_lite::pin_project;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
6use std::sync::Arc;
7use std::task::{Context, Poll};
8use tokio_stream::Stream;
9
10#[cfg(feature = "rt")]
11use tokio::time::{Duration, Instant};
12
13#[cfg(not(feature = "rt"))]
14use std::time::{Duration, Instant};
15
16#[cfg(all(feature = "rt", feature = "metrics-rs-integration"))]
17pub(crate) mod metrics_rs_integration;
18
19/// Monitors key metrics of instrumented tasks.
20///
21/// ### Basic Usage
22/// A [`TaskMonitor`] tracks key [metrics][TaskMetrics] of async tasks that have been
23/// [instrumented][`TaskMonitor::instrument`] with the monitor.
24///
25/// In the below example, a [`TaskMonitor`] is [constructed][TaskMonitor::new] and used to
26/// [instrument][TaskMonitor::instrument] three worker tasks; meanwhile, a fourth task
27/// prints [metrics][TaskMetrics] in 500ms [intervals][TaskMonitor::intervals].
28/// ```
29/// use std::time::Duration;
30///
31/// #[tokio::main]
32/// async fn main() {
33/// // construct a metrics monitor
34/// let metrics_monitor = tokio_metrics::TaskMonitor::new();
35///
36/// // print task metrics every 500ms
37/// {
38/// let metrics_monitor = metrics_monitor.clone();
39/// tokio::spawn(async move {
40/// for interval in metrics_monitor.intervals() {
41/// // pretty-print the metric interval
42/// println!("{:?}", interval);
43/// // wait 500ms
44/// tokio::time::sleep(Duration::from_millis(500)).await;
45/// }
46/// });
47/// }
48///
49/// // instrument some tasks and await them
50/// // note that the same TaskMonitor can be used for multiple tasks
51/// tokio::join![
52/// metrics_monitor.instrument(do_work()),
53/// metrics_monitor.instrument(do_work()),
54/// metrics_monitor.instrument(do_work())
55/// ];
56/// }
57///
58/// async fn do_work() {
59/// for _ in 0..25 {
60/// tokio::task::yield_now().await;
61/// tokio::time::sleep(Duration::from_millis(100)).await;
62/// }
63/// }
64/// ```
65///
66/// ### What should I instrument?
67/// In most cases, you should construct a *distinct* [`TaskMonitor`] for each kind of key task.
68///
69/// #### Instrumenting a web application
70/// For instance, a web service should have a distinct [`TaskMonitor`] for each endpoint. Within
71/// each endpoint, it's prudent to additionally instrument major sub-tasks, each with their own
72/// distinct [`TaskMonitor`]s. [*Why are my tasks slow?*](#why-are-my-tasks-slow) explores a
73/// debugging scenario for a web service that takes this approach to instrumentation. This
74/// approach is exemplified in the below example:
75/// ```no_run
76/// // The unabridged version of this snippet is in the examples directory of this crate.
77///
78/// #[tokio::main]
79/// async fn main() {
80/// // construct a TaskMonitor for root endpoint
81/// let monitor_root = tokio_metrics::TaskMonitor::new();
82///
83/// // construct TaskMonitors for create_users endpoint
84/// let monitor_create_user = CreateUserMonitors {
85/// // monitor for the entire endpoint
86/// route: tokio_metrics::TaskMonitor::new(),
87/// // monitor for database insertion subtask
88/// insert: tokio_metrics::TaskMonitor::new(),
89/// };
90///
91/// // build our application with two instrumented endpoints
92/// let app = axum::Router::new()
93/// // `GET /` goes to `root`
94/// .route("/", axum::routing::get({
95/// let monitor = monitor_root.clone();
96/// move || monitor.instrument(async { "Hello, World!" })
97/// }))
98/// // `POST /users` goes to `create_user`
99/// .route("/users", axum::routing::post({
100/// let monitors = monitor_create_user.clone();
101/// let route = monitors.route.clone();
102/// move |payload| {
103/// route.instrument(create_user(payload, monitors))
104/// }
105/// }));
106///
107/// // print task metrics for each endpoint every 1s
108/// let metrics_frequency = std::time::Duration::from_secs(1);
109/// tokio::spawn(async move {
110/// let root_intervals = monitor_root.intervals();
111/// let create_user_route_intervals =
112/// monitor_create_user.route.intervals();
113/// let create_user_insert_intervals =
114/// monitor_create_user.insert.intervals();
115/// let create_user_intervals =
116/// create_user_route_intervals.zip(create_user_insert_intervals);
117///
118/// let intervals = root_intervals.zip(create_user_intervals);
119/// for (root_route, (create_user_route, create_user_insert)) in intervals {
120/// println!("root_route = {:#?}", root_route);
121/// println!("create_user_route = {:#?}", create_user_route);
122/// println!("create_user_insert = {:#?}", create_user_insert);
123/// tokio::time::sleep(metrics_frequency).await;
124/// }
125/// });
126///
127/// // run the server
128/// let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 3000));
129/// let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
130/// axum::serve(listener, app)
131/// .await
132/// .unwrap();
133/// }
134///
135/// async fn create_user(
136/// axum::Json(payload): axum::Json<CreateUser>,
137/// monitors: CreateUserMonitors,
138/// ) -> impl axum::response::IntoResponse {
139/// let user = User { id: 1337, username: payload.username, };
140/// // instrument inserting the user into the db:
141/// let _ = monitors.insert.instrument(insert_user(user.clone())).await;
142/// (axum::http::StatusCode::CREATED, axum::Json(user))
143/// }
144///
145/// /* definitions of CreateUserMonitors, CreateUser and User omitted for brevity */
146///
147/// #
148/// # #[derive(Clone)]
149/// # struct CreateUserMonitors {
150/// # // monitor for the entire endpoint
151/// # route: tokio_metrics::TaskMonitor,
152/// # // monitor for database insertion subtask
153/// # insert: tokio_metrics::TaskMonitor,
154/// # }
155/// #
156/// # #[derive(serde::Deserialize)] struct CreateUser { username: String, }
157/// # #[derive(Clone, serde::Serialize)] struct User { id: u64, username: String, }
158/// #
159/// // insert the user into the database
160/// async fn insert_user(_: User) {
161/// /* implementation details elided */
162/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
163/// }
164/// ```
165///
166/// ### Why are my tasks slow?
167/// **Scenario:** You track key, high-level metrics about the customer response time. An alarm warns
168/// you that P90 latency for an endpoint exceeds your targets. What is causing the increase?
169///
170/// #### Identifying the high-level culprits
171/// A set of tasks will appear to execute more slowly if:
172/// - they are taking longer to poll (i.e., they consume too much CPU time)
173/// - they are waiting longer to be polled (e.g., they're waiting longer in tokio's scheduling
174/// queues)
175/// - they are waiting longer on external events to complete (e.g., asynchronous network requests)
176///
177/// The culprits, at a high level, may be some combination of these sources of latency. Fortunately,
178/// you have instrumented the key tasks of each of your endpoints with distinct [`TaskMonitor`]s.
179/// Using the monitors on the endpoint experiencing elevated latency, you begin by answering:
180/// - [*Are my tasks taking longer to poll?*](#are-my-tasks-taking-longer-to-poll)
181/// - [*Are my tasks spending more time waiting to be polled?*](#are-my-tasks-spending-more-time-waiting-to-be-polled)
182/// - [*Are my tasks spending more time waiting on external events to complete?*](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
183///
184/// ##### Are my tasks taking longer to poll?
185/// - **Did [`mean_poll_duration`][TaskMetrics::mean_poll_duration] increase?**
186/// This metric reflects the mean poll duration. If it increased, it means that, on average,
187/// individual polls tended to take longer. However, this does not necessarily imply increased
188/// task latency: An increase in poll durations could be offset by fewer polls.
189/// - **Did [`slow_poll_ratio`][TaskMetrics::slow_poll_ratio] increase?**
190/// This metric reflects the proportion of polls that were 'slow'. If it increased, it means that
191/// a greater proportion of polls performed excessive computation before yielding. This does not
192/// necessarily imply increased task latency: An increase in the proportion of slow polls could be
193/// offset by fewer or faster polls.
194/// - **Did [`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration] increase?**
195/// This metric reflects the mean duration of slow polls. If it increased, it means that, on
196/// average, slow polls got slower. This does not necessarily imply increased task latency: An
197/// increase in average slow poll duration could be offset by fewer or faster polls.
198///
199/// If so, [*why are my tasks taking longer to poll?*](#why-are-my-tasks-taking-longer-to-poll)
200///
201/// ##### Are my tasks spending more time waiting to be polled?
202/// - **Did [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increase?**
203/// This metric reflects the mean delay between the instant a task is first instrumented and the
204/// instant it is first polled. If it increases, it means that, on average, tasks spent longer
205/// waiting to be initially run.
206/// - **Did [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration] increase?**
207/// This metric reflects the mean duration that tasks spent in the scheduled state. The
208/// 'scheduled' state of a task is the duration between the instant a task is awoken and the
209/// instant it is subsequently polled. If this metric increases, it means that, on average, tasks
210/// spent longer in tokio's queues before being polled.
211/// - **Did [`long_delay_ratio`][TaskMetrics::long_delay_ratio] increase?**
212/// This metric reflects the proportion of scheduling delays which were 'long'. If it increased,
213/// it means that a greater proportion of tasks experienced excessive delays before they could
214/// execute after being woken. This does not necessarily indicate an increase in latency, as this
215/// could be offset by fewer or faster task polls.
216/// - **Did [`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration] increase?**
217/// This metric reflects the mean duration of long delays. If it increased, it means that, on
218/// average, long delays got even longer. This does not necessarily imply increased task latency:
219/// an increase in average long delay duration could be offset by fewer or faster polls or more
220/// short schedules.
221///
222/// If so, [*why are my tasks spending more time waiting to be polled?*](#why-are-my-tasks-spending-more-time-waiting-to-be-polled)
223///
224/// ##### Are my tasks spending more time waiting on external events to complete?
225/// - **Did [`mean_idle_duration`][TaskMetrics::mean_idle_duration] increase?**
226/// This metric reflects the mean duration that tasks spent in the idle state. The idle state is
227/// the duration spanning the instant a task completes a poll, and the instant that it is next
228/// awoken. Tasks inhabit this state when they are waiting for task-external events to complete
229/// (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this metric increases,
230/// tasks, in aggregate, spent more time waiting for task-external events to complete.
231///
232/// If so, [*why are my tasks spending more time waiting on external events to complete?*](#why-are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
233///
234/// #### Digging deeper
235/// Having [established the high-level culprits](#identifying-the-high-level-culprits), you now
236/// search for further explanation...
237///
238/// ##### Why are my tasks taking longer to poll?
239/// You observed that [your tasks are taking longer to poll](#are-my-tasks-taking-longer-to-poll).
240/// The culprit is likely some combination of:
241/// - **Your tasks are accidentally blocking.** Common culprits include:
242/// 1. Using the Rust standard library's [filesystem](https://doc.rust-lang.org/std/fs/) or
243/// [networking](https://doc.rust-lang.org/std/net/) APIs.
244/// These APIs are synchronous; use tokio's [filesystem](https://docs.rs/tokio/latest/tokio/fs/)
245/// and [networking](https://docs.rs/tokio/latest/tokio/net/) APIs, instead.
246/// 3. Calling [`block_on`](https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.block_on).
247/// 4. Invoking `println!` or other synchronous logging routines.
248/// Invocations of `println!` involve acquiring an exclusive lock on stdout, followed by a
249/// synchronous write to stdout.
250/// 2. **Your tasks are computationally expensive.** Common culprits include:
251/// 1. TLS/cryptographic routines
252/// 2. doing a lot of processing on bytes
253/// 3. calling non-Tokio resources
254///
255/// ##### Why are my tasks spending more time waiting to be polled?
256/// You observed that [your tasks are spending more time waiting to be polled](#are-my-tasks-spending-more-time-waiting-to-be-polled)
257/// suggesting some combination of:
258/// - Your application is inflating the time elapsed between instrumentation and first poll.
259/// - Your tasks are being scheduled into tokio's global queue.
260/// - Other tasks are spending too long without yielding, thus backing up tokio's queues.
261///
262/// Start by asking: [*Is time-to-first-poll unusually high?*](#is-time-to-first-poll-unusually-high)
263///
264/// ##### Why are my tasks spending more time waiting on external events to complete?
265/// You observed that [your tasks are spending more time waiting waiting on external events to
266/// complete](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete). But what
267/// event? Fortunately, within the task experiencing increased idle times, you monitored several
268/// sub-tasks with distinct [`TaskMonitor`]s. For each of these sub-tasks, you [*you try to identify
269/// the performance culprits...*](#identifying-the-high-level-culprits)
270///
271/// #### Digging even deeper
272///
273/// ##### Is time-to-first-poll unusually high?
274/// Contrast these two metrics:
275/// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
276/// This metric reflects the mean delay between the instant a task is first instrumented and the
277/// instant it is *first* polled.
278/// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
279/// This metric reflects the mean delay between the instant when tasks were awoken and the
280/// instant they were subsequently polled.
281///
282/// If the former metric exceeds the latter (or increased unexpectedly more than the latter), then
283/// start by investigating [*if your application is artificially delaying the time-to-first-poll*](#is-my-application-delaying-the-time-to-first-poll).
284///
285/// Otherwise, investigate [*if other tasks are polling too long without yielding*](#are-other-tasks-polling-too-long-without-yielding).
286///
287/// ##### Is my application delaying the time-to-first-poll?
288/// You observed that [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increased, more
289/// than [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]. Your application may be
290/// needlessly inflating the time elapsed between instrumentation and first poll. Are you
291/// constructing (and instrumenting) tasks separately from awaiting or spawning them?
292///
293/// For instance, in the below example, the application induces 1 second delay between when `task`
294/// is instrumented and when it is awaited:
295/// ```rust
296/// #[tokio::main]
297/// async fn main() {
298/// use tokio::time::Duration;
299/// let monitor = tokio_metrics::TaskMonitor::new();
300///
301/// let task = monitor.instrument(async move {});
302///
303/// let one_sec = Duration::from_secs(1);
304/// tokio::time::sleep(one_sec).await;
305///
306/// let _ = tokio::spawn(task).await;
307///
308/// assert!(monitor.cumulative().total_first_poll_delay >= one_sec);
309/// }
310/// ```
311///
312/// Otherwise, [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] might be unusually high
313/// because [*your application is spawning key tasks into tokio's global queue...*](#is-my-application-spawning-more-tasks-into-tokio’s-global-queue)
314///
315/// ##### Is my application spawning more tasks into tokio's global queue?
316/// Tasks awoken from threads *not* managed by the tokio runtime are scheduled with a slower,
317/// global "injection" queue.
318///
319/// You may be notifying runtime tasks from off-runtime. For instance, Given the following:
320/// ```ignore
321/// #[tokio::main]
322/// async fn main() {
323/// for _ in 0..100 {
324/// let (tx, rx) = oneshot::channel();
325/// tokio::spawn(async move {
326/// tx.send(());
327/// })
328///
329/// rx.await;
330/// }
331/// }
332/// ```
333/// One would expect this to run efficiently, however, the main task is run *off* the main runtime
334/// and the spawned tasks are *on* runtime, which means the snippet will run much slower than:
335/// ```ignore
336/// #[tokio::main]
337/// async fn main() {
338/// tokio::spawn(async {
339/// for _ in 0..100 {
340/// let (tx, rx) = oneshot::channel();
341/// tokio::spawn(async move {
342/// tx.send(());
343/// })
344///
345/// rx.await;
346/// }
347/// }).await;
348/// }
349/// ```
350/// The slowdown is caused by a higher time between the `rx` task being notified (in `tx.send()`)
351/// and the task being polled.
352///
353/// ##### Are other tasks polling too long without yielding?
354/// You suspect that your tasks are slow because they're backed up in tokio's scheduling queues. For
355/// *each* of your application's [`TaskMonitor`]s you check to see [*if their associated tasks are
356/// taking longer to poll...*](#are-my-tasks-taking-longer-to-poll)
357///
358/// ### Limitations
359/// The [`TaskMetrics`] type uses [`u64`] to represent both event counters and durations (measured
360/// in nanoseconds). Consequently, event counters are accurate for ≤ [`u64::MAX`] events, and
361/// durations are accurate for ≤ [`u64::MAX`] nanoseconds.
362///
363/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::cumulative`] increase
364/// monotonically with each successive invocation of [`TaskMonitor::cumulative`]. Upon overflow,
365/// counters and durations wrap.
366///
367/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::intervals`] are
368/// calculated by computing the difference of metrics in successive invocations of
369/// [`TaskMonitor::cumulative`]. If, within a monitoring interval, an event occurs more than
370/// [`u64::MAX`] times, or a monitored duration exceeds [`u64::MAX`] nanoseconds, the metrics for
371/// that interval will overflow and not be accurate.
372///
373/// ##### Examples at the limits
374/// Consider the [`TaskMetrics::total_first_poll_delay`] metric. This metric accurately reflects
375/// delays between instrumentation and first-poll ≤ [`u64::MAX`] nanoseconds:
376/// ```
377/// use tokio::time::Duration;
378///
379/// #[tokio::main(flavor = "current_thread", start_paused = true)]
380/// async fn main() {
381/// let monitor = tokio_metrics::TaskMonitor::new();
382/// let mut interval = monitor.intervals();
383/// let mut next_interval = || interval.next().unwrap();
384///
385/// // construct and instrument a task, but do not `await` it
386/// let task = monitor.instrument(async {});
387///
388/// // this is the maximum duration representable by tokio_metrics
389/// let max_duration = Duration::from_nanos(u64::MAX);
390///
391/// // let's advance the clock by this amount and poll `task`
392/// let _ = tokio::time::advance(max_duration).await;
393/// task.await;
394///
395/// // durations ≤ `max_duration` are accurately reflected in this metric
396/// assert_eq!(next_interval().total_first_poll_delay, max_duration);
397/// assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
398/// }
399/// ```
400/// If the total delay between instrumentation and first poll exceeds [`u64::MAX`] nanoseconds,
401/// [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay] will overflow:
402/// ```
403/// # use tokio::time::Duration;
404/// #
405/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
406/// # async fn main() {
407/// # let monitor = tokio_metrics::TaskMonitor::new();
408/// #
409/// // construct and instrument a task, but do not `await` it
410/// let task_a = monitor.instrument(async {});
411/// let task_b = monitor.instrument(async {});
412///
413/// // this is the maximum duration representable by tokio_metrics
414/// let max_duration = Duration::from_nanos(u64::MAX);
415///
416/// // let's advance the clock by 1.5x this amount and await `task`
417/// let _ = tokio::time::advance(3 * (max_duration / 2)).await;
418/// task_a.await;
419/// task_b.await;
420///
421/// // the `total_first_poll_delay` has overflowed
422/// assert!(monitor.cumulative().total_first_poll_delay < max_duration);
423/// # }
424/// ```
425/// If *many* tasks are spawned, it will take far less than a [`u64::MAX`]-nanosecond delay to bring
426/// this metric to the precipice of overflow:
427/// ```
428/// # use tokio::time::Duration;
429/// #
430/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
431/// # async fn main() {
432/// # let monitor = tokio_metrics::TaskMonitor::new();
433/// # let mut interval = monitor.intervals();
434/// # let mut next_interval = || interval.next().unwrap();
435/// #
436/// // construct and instrument u16::MAX tasks, but do not `await` them
437/// let first_poll_count = u16::MAX as u64;
438/// let mut tasks = Vec::with_capacity(first_poll_count as usize);
439/// for _ in 0..first_poll_count { tasks.push(monitor.instrument(async {})); }
440///
441/// // this is the maximum duration representable by tokio_metrics
442/// let max_duration = u64::MAX;
443///
444/// // let's advance the clock justenough such that all of the time-to-first-poll
445/// // delays summed nearly equals `max_duration_nanos`, less some remainder...
446/// let iffy_delay = max_duration / (first_poll_count as u64);
447/// let small_remainder = max_duration % first_poll_count;
448/// let _ = tokio::time::advance(Duration::from_nanos(iffy_delay)).await;
449///
450/// // ...then poll all of the instrumented tasks:
451/// for task in tasks { task.await; }
452///
453/// // `total_first_poll_delay` is at the precipice of overflowing!
454/// assert_eq!(
455/// next_interval().total_first_poll_delay.as_nanos(),
456/// (max_duration - small_remainder) as u128
457/// );
458/// assert_eq!(
459/// monitor.cumulative().total_first_poll_delay.as_nanos(),
460/// (max_duration - small_remainder) as u128
461/// );
462/// # }
463/// ```
464/// Frequent, interval-sampled metrics will retain their accuracy, even if the cumulative
465/// metrics counter overflows at most once in the midst of an interval:
466/// ```
467/// # use tokio::time::Duration;
468/// # use tokio_metrics::TaskMonitor;
469/// #
470/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
471/// # async fn main() {
472/// # let monitor = TaskMonitor::new();
473/// # let mut interval = monitor.intervals();
474/// # let mut next_interval = || interval.next().unwrap();
475/// #
476/// let first_poll_count = u16::MAX as u64;
477/// let batch_size = first_poll_count / 3;
478///
479/// let max_duration_ns = u64::MAX;
480/// let iffy_delay_ns = max_duration_ns / first_poll_count;
481///
482/// // Instrument `batch_size` number of tasks, wait for `delay` nanoseconds,
483/// // then await the instrumented tasks.
484/// async fn run_batch(monitor: &TaskMonitor, batch_size: usize, delay: u64) {
485/// let mut tasks = Vec::with_capacity(batch_size);
486/// for _ in 0..batch_size { tasks.push(monitor.instrument(async {})); }
487/// let _ = tokio::time::advance(Duration::from_nanos(delay)).await;
488/// for task in tasks { task.await; }
489/// }
490///
491/// // this is how much `total_time_to_first_poll_ns` will
492/// // increase with each batch we run
493/// let batch_delay = iffy_delay_ns * batch_size;
494///
495/// // run batches 1, 2, and 3
496/// for i in 1..=3 {
497/// run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
498/// assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
499/// assert_eq!(i * batch_delay as u128, monitor.cumulative().total_first_poll_delay.as_nanos());
500/// }
501///
502/// /* now, the `total_time_to_first_poll_ns` counter is at the precipice of overflow */
503/// assert_eq!(monitor.cumulative().total_first_poll_delay.as_nanos(), max_duration_ns as u128);
504///
505/// // run batch 4
506/// run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
507/// // the interval counter remains accurate
508/// assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
509/// // but the cumulative counter has overflowed
510/// assert_eq!(batch_delay as u128 - 1, monitor.cumulative().total_first_poll_delay.as_nanos());
511/// # }
512/// ```
513/// If a cumulative metric overflows *more than once* in the midst of an interval,
514/// its interval-sampled counterpart will also overflow.
515#[derive(Clone, Debug)]
516pub struct TaskMonitor {
517 metrics: Arc<RawMetrics>,
518}
519
520/// Provides an interface for constructing a [`TaskMonitor`] with specialized configuration
521/// parameters.
522#[derive(Clone, Debug, Default)]
523pub struct TaskMonitorBuilder {
524 slow_poll_threshold: Option<Duration>,
525 long_delay_threshold: Option<Duration>,
526}
527
528impl TaskMonitorBuilder {
529 /// Creates a new [`TaskMonitorBuilder`].
530 pub fn new() -> Self {
531 Self {
532 slow_poll_threshold: None,
533 long_delay_threshold: None,
534 }
535 }
536
537 /// Specifies the threshold at which polls are considered 'slow'.
538 pub fn with_slow_poll_threshold(&mut self, threshold: Duration) -> &mut Self {
539 self.slow_poll_threshold = Some(threshold);
540
541 self
542 }
543
544 /// Specifies the threshold at which schedules are considered 'long'.
545 pub fn with_long_delay_threshold(&mut self, threshold: Duration) -> &mut Self {
546 self.long_delay_threshold = Some(threshold);
547
548 self
549 }
550
551 /// Consume the builder, producing a [`TaskMonitor`].
552 pub fn build(self) -> TaskMonitor {
553 TaskMonitor::create(
554 self.slow_poll_threshold
555 .unwrap_or(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD),
556 self.long_delay_threshold
557 .unwrap_or(TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD),
558 )
559 }
560}
561
562pin_project! {
563 /// An async task that has been instrumented with [`TaskMonitor::instrument`].
564 #[derive(Debug)]
565 pub struct Instrumented<T> {
566 // The task being instrumented
567 #[pin]
568 task: T,
569
570 // True when the task is polled for the first time
571 did_poll_once: bool,
572
573 // The instant, tracked as nanoseconds since `instrumented_at`, at which the future finished
574 // its last poll.
575 idled_at: u64,
576
577 // State shared between the task and its instrumented waker.
578 state: Arc<State>,
579 }
580
581 impl<T> PinnedDrop for Instrumented<T> {
582 fn drop(this: Pin<&mut Self>) {
583 this.state.metrics.dropped_count.fetch_add(1, SeqCst);
584 }
585 }
586}
587
588/// Key metrics of [instrumented][`TaskMonitor::instrument`] tasks.
589#[non_exhaustive]
590#[derive(Debug, Clone, Copy, Default)]
591pub struct TaskMetrics {
592 /// The number of tasks instrumented.
593 ///
594 /// ##### Examples
595 /// ```
596 /// #[tokio::main]
597 /// async fn main() {
598 /// let monitor = tokio_metrics::TaskMonitor::new();
599 /// let mut interval = monitor.intervals();
600 /// let mut next_interval = || interval.next().unwrap();
601 ///
602 /// // 0 tasks have been instrumented
603 /// assert_eq!(next_interval().instrumented_count, 0);
604 ///
605 /// monitor.instrument(async {});
606 ///
607 /// // 1 task has been instrumented
608 /// assert_eq!(next_interval().instrumented_count, 1);
609 ///
610 /// monitor.instrument(async {});
611 /// monitor.instrument(async {});
612 ///
613 /// // 2 tasks have been instrumented
614 /// assert_eq!(next_interval().instrumented_count, 2);
615 ///
616 /// // since the last interval was produced, 0 tasks have been instrumented
617 /// assert_eq!(next_interval().instrumented_count, 0);
618 /// }
619 /// ```
620 pub instrumented_count: u64,
621
622 /// The number of tasks dropped.
623 ///
624 /// ##### Examples
625 /// ```
626 /// #[tokio::main]
627 /// async fn main() {
628 /// let monitor = tokio_metrics::TaskMonitor::new();
629 /// let mut interval = monitor.intervals();
630 /// let mut next_interval = || interval.next().unwrap();
631 ///
632 /// // 0 tasks have been dropped
633 /// assert_eq!(next_interval().dropped_count, 0);
634 ///
635 /// let _task = monitor.instrument(async {});
636 ///
637 /// // 0 tasks have been dropped
638 /// assert_eq!(next_interval().dropped_count, 0);
639 ///
640 /// monitor.instrument(async {}).await;
641 /// drop(monitor.instrument(async {}));
642 ///
643 /// // 2 tasks have been dropped
644 /// assert_eq!(next_interval().dropped_count, 2);
645 ///
646 /// // since the last interval was produced, 0 tasks have been dropped
647 /// assert_eq!(next_interval().dropped_count, 0);
648 /// }
649 /// ```
650 pub dropped_count: u64,
651
652 /// The number of tasks polled for the first time.
653 ///
654 /// ##### Derived metrics
655 /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
656 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
657 /// are first polled.
658 ///
659 /// ##### Examples
660 /// In the below example, no tasks are instrumented or polled in the first sampling interval;
661 /// one task is instrumented (but not polled) in the second sampling interval; that task is
662 /// awaited to completion (and, thus, polled at least once) in the third sampling interval; no
663 /// additional tasks are polled for the first time within the fourth sampling interval:
664 /// ```
665 /// #[tokio::main]
666 /// async fn main() {
667 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
668 /// let mut interval = metrics_monitor.intervals();
669 /// let mut next_interval = || interval.next().unwrap();
670 ///
671 /// // no tasks have been constructed, instrumented, and polled at least once
672 /// assert_eq!(next_interval().first_poll_count, 0);
673 ///
674 /// let task = metrics_monitor.instrument(async {});
675 ///
676 /// // `task` has been constructed and instrumented, but has not yet been polled
677 /// assert_eq!(next_interval().first_poll_count, 0);
678 ///
679 /// // poll `task` to completion
680 /// task.await;
681 ///
682 /// // `task` has been constructed, instrumented, and polled at least once
683 /// assert_eq!(next_interval().first_poll_count, 1);
684 ///
685 /// // since the last interval was produced, 0 tasks have been constructed, instrumented and polled
686 /// assert_eq!(next_interval().first_poll_count, 0);
687 ///
688 /// }
689 /// ```
690 pub first_poll_count: u64,
691
692 /// The total duration elapsed between the instant tasks are instrumented, and the instant they
693 /// are first polled.
694 ///
695 /// ##### Derived metrics
696 /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
697 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
698 /// are first polled.
699 ///
700 /// ##### Examples
701 /// In the below example, 0 tasks have been instrumented or polled within the first sampling
702 /// interval, a total of 500ms elapse between the instrumentation and polling of tasks within
703 /// the second sampling interval, and a total of 350ms elapse between the instrumentation and
704 /// polling of tasks within the third sampling interval:
705 /// ```
706 /// use tokio::time::Duration;
707 ///
708 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
709 /// async fn main() {
710 /// let monitor = tokio_metrics::TaskMonitor::new();
711 /// let mut interval = monitor.intervals();
712 /// let mut next_interval = || interval.next().unwrap();
713 ///
714 /// // no tasks have yet been created, instrumented, or polled
715 /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
716 /// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
717 ///
718 /// // constructs and instruments a task, pauses a given duration, then awaits the task
719 /// async fn instrument_pause_await(monitor: &tokio_metrics::TaskMonitor, pause: Duration) {
720 /// let task = monitor.instrument(async move {});
721 /// tokio::time::sleep(pause).await;
722 /// task.await;
723 /// }
724 ///
725 /// // construct and await a task that pauses for 500ms between instrumentation and first poll
726 /// let task_a_pause_time = Duration::from_millis(500);
727 /// instrument_pause_await(&monitor, task_a_pause_time).await;
728 ///
729 /// assert_eq!(next_interval().total_first_poll_delay, task_a_pause_time);
730 /// assert_eq!(monitor.cumulative().total_first_poll_delay, task_a_pause_time);
731 ///
732 /// // construct and await a task that pauses for 250ms between instrumentation and first poll
733 /// let task_b_pause_time = Duration::from_millis(250);
734 /// instrument_pause_await(&monitor, task_b_pause_time).await;
735 ///
736 /// // construct and await a task that pauses for 100ms between instrumentation and first poll
737 /// let task_c_pause_time = Duration::from_millis(100);
738 /// instrument_pause_await(&monitor, task_c_pause_time).await;
739 ///
740 /// assert_eq!(
741 /// next_interval().total_first_poll_delay,
742 /// task_b_pause_time + task_c_pause_time
743 /// );
744 /// assert_eq!(
745 /// monitor.cumulative().total_first_poll_delay,
746 /// task_a_pause_time + task_b_pause_time + task_c_pause_time
747 /// );
748 /// }
749 /// ```
750 ///
751 /// ##### When is this metric recorded?
752 /// The delay between instrumentation and first poll is not recorded until the first poll
753 /// actually occurs:
754 /// ```
755 /// # use tokio::time::Duration;
756 /// #
757 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
758 /// # async fn main() {
759 /// # let monitor = tokio_metrics::TaskMonitor::new();
760 /// # let mut interval = monitor.intervals();
761 /// # let mut next_interval = || interval.next().unwrap();
762 /// #
763 /// // we construct and instrument a task, but do not `await` it
764 /// let task = monitor.instrument(async {});
765 ///
766 /// // let's sleep for 1s before we poll `task`
767 /// let one_sec = Duration::from_secs(1);
768 /// let _ = tokio::time::sleep(one_sec).await;
769 ///
770 /// // although 1s has now elapsed since the instrumentation of `task`,
771 /// // this is not reflected in `total_first_poll_delay`...
772 /// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
773 /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
774 ///
775 /// // ...and won't be until `task` is actually polled
776 /// task.await;
777 ///
778 /// // now, the 1s delay is reflected in `total_first_poll_delay`:
779 /// assert_eq!(next_interval().total_first_poll_delay, one_sec);
780 /// assert_eq!(monitor.cumulative().total_first_poll_delay, one_sec);
781 /// # }
782 /// ```
783 ///
784 /// ##### What if first-poll-delay is very large?
785 /// The first-poll-delay of *individual* tasks saturates at `u64::MAX` nanoseconds. However, if
786 /// the *total* first-poll-delay *across* monitored tasks exceeds `u64::MAX` nanoseconds, this
787 /// metric will wrap around:
788 /// ```
789 /// use tokio::time::Duration;
790 ///
791 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
792 /// async fn main() {
793 /// let monitor = tokio_metrics::TaskMonitor::new();
794 ///
795 /// // construct and instrument a task, but do not `await` it
796 /// let task = monitor.instrument(async {});
797 ///
798 /// // this is the maximum duration representable by tokio_metrics
799 /// let max_duration = Duration::from_nanos(u64::MAX);
800 ///
801 /// // let's advance the clock by double this amount and await `task`
802 /// let _ = tokio::time::advance(max_duration * 2).await;
803 /// task.await;
804 ///
805 /// // the time-to-first-poll of `task` saturates at `max_duration`
806 /// assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
807 ///
808 /// // ...but note that the metric *will* wrap around if more tasks are involved
809 /// let task = monitor.instrument(async {});
810 /// let _ = tokio::time::advance(Duration::from_nanos(1)).await;
811 /// task.await;
812 /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
813 /// }
814 /// ```
815 pub total_first_poll_delay: Duration,
816
817 /// The total number of times that tasks idled, waiting to be awoken.
818 ///
819 /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
820 /// task completes a poll, and the instant that it is next awoken.
821 ///
822 /// ##### Derived metrics
823 /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
824 /// The mean duration of idles.
825 ///
826 /// ##### Examples
827 /// ```
828 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
829 /// async fn main() {
830 /// let monitor = tokio_metrics::TaskMonitor::new();
831 /// let mut interval = monitor.intervals();
832 /// let mut next_interval = move || interval.next().unwrap();
833 /// let one_sec = std::time::Duration::from_secs(1);
834 ///
835 /// monitor.instrument(async {}).await;
836 ///
837 /// assert_eq!(next_interval().total_idled_count, 0);
838 /// assert_eq!(monitor.cumulative().total_idled_count, 0);
839 ///
840 /// monitor.instrument(async move {
841 /// tokio::time::sleep(one_sec).await;
842 /// }).await;
843 ///
844 /// assert_eq!(next_interval().total_idled_count, 1);
845 /// assert_eq!(monitor.cumulative().total_idled_count, 1);
846 ///
847 /// monitor.instrument(async {
848 /// tokio::time::sleep(one_sec).await;
849 /// tokio::time::sleep(one_sec).await;
850 /// }).await;
851 ///
852 /// assert_eq!(next_interval().total_idled_count, 2);
853 /// assert_eq!(monitor.cumulative().total_idled_count, 3);
854 /// }
855 /// ```
856 pub total_idled_count: u64,
857
858 /// The total duration that tasks idled.
859 ///
860 /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
861 /// task completes a poll, and the instant that it is next awoken.
862 ///
863 /// ##### Derived metrics
864 /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
865 /// The mean duration of idles.
866 ///
867 /// ##### Examples
868 /// ```
869 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
870 /// async fn main() {
871 /// let monitor = tokio_metrics::TaskMonitor::new();
872 /// let mut interval = monitor.intervals();
873 /// let mut next_interval = move || interval.next().unwrap();
874 /// let one_sec = std::time::Duration::from_secs(1);
875 /// let two_sec = std::time::Duration::from_secs(2);
876 ///
877 /// assert_eq!(next_interval().total_idle_duration.as_nanos(), 0);
878 /// assert_eq!(monitor.cumulative().total_idle_duration.as_nanos(), 0);
879 ///
880 /// monitor.instrument(async move {
881 /// tokio::time::sleep(one_sec).await;
882 /// }).await;
883 ///
884 /// assert_eq!(next_interval().total_idle_duration, one_sec);
885 /// assert_eq!(monitor.cumulative().total_idle_duration, one_sec);
886 ///
887 /// monitor.instrument(async move {
888 /// tokio::time::sleep(two_sec).await;
889 /// }).await;
890 ///
891 /// assert_eq!(next_interval().total_idle_duration, two_sec);
892 /// assert_eq!(monitor.cumulative().total_idle_duration, one_sec + two_sec);
893 /// }
894 /// ```
895 pub total_idle_duration: Duration,
896
897 /// The maximum idle duration that a task took.
898 ///
899 /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
900 /// task completes a poll, and the instant that it is next awoken.
901 ///
902 /// ##### Derived metrics
903 /// - **[`max_idle_duration`][TaskMetrics::max_idle_duration]**
904 /// The longest duration a task spent idle.
905 ///
906 /// ##### Examples
907 /// ```
908 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
909 /// async fn main() {
910 /// let monitor = tokio_metrics::TaskMonitor::new();
911 /// let mut interval = monitor.intervals();
912 /// let mut next_interval = move || interval.next().unwrap();
913 /// let one_sec = std::time::Duration::from_secs(1);
914 /// let two_sec = std::time::Duration::from_secs(2);
915 ///
916 /// assert_eq!(next_interval().max_idle_duration.as_nanos(), 0);
917 /// assert_eq!(monitor.cumulative().max_idle_duration.as_nanos(), 0);
918 ///
919 /// monitor.instrument(async move {
920 /// tokio::time::sleep(one_sec).await;
921 /// }).await;
922 ///
923 /// assert_eq!(next_interval().max_idle_duration, one_sec);
924 /// assert_eq!(monitor.cumulative().max_idle_duration, one_sec);
925 ///
926 /// monitor.instrument(async move {
927 /// tokio::time::sleep(two_sec).await;
928 /// }).await;
929 ///
930 /// assert_eq!(next_interval().max_idle_duration, two_sec);
931 /// assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
932 ///
933 /// monitor.instrument(async move {
934 /// tokio::time::sleep(one_sec).await;
935 /// }).await;
936 ///
937 /// assert_eq!(next_interval().max_idle_duration, one_sec);
938 /// assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
939 /// }
940 /// ```
941 pub max_idle_duration: Duration,
942
943 /// The total number of times that tasks were awoken (and then, presumably, scheduled for
944 /// execution).
945 ///
946 /// ##### Definition
947 /// This metric is equal to [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration]
948 /// \+ [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration].
949 ///
950 /// ##### Derived metrics
951 /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
952 /// The mean duration that tasks spent waiting to be executed after awakening.
953 ///
954 /// ##### Examples
955 /// In the below example, a task yields to the scheduler a varying number of times between
956 /// sampling intervals; this metric is equal to the number of times the task yielded:
957 /// ```
958 /// #[tokio::main]
959 /// async fn main(){
960 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
961 ///
962 /// // [A] no tasks have been created, instrumented, and polled more than once
963 /// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
964 ///
965 /// // [B] a `task` is created and instrumented
966 /// let task = {
967 /// let monitor = metrics_monitor.clone();
968 /// metrics_monitor.instrument(async move {
969 /// let mut interval = monitor.intervals();
970 /// let mut next_interval = move || interval.next().unwrap();
971 ///
972 /// // [E] `task` has not yet yielded to the scheduler, and
973 /// // thus has not yet been scheduled since its first `poll`
974 /// assert_eq!(next_interval().total_scheduled_count, 0);
975 ///
976 /// tokio::task::yield_now().await; // yield to the scheduler
977 ///
978 /// // [F] `task` has yielded to the scheduler once (and thus been
979 /// // scheduled once) since the last sampling interval
980 /// assert_eq!(next_interval().total_scheduled_count, 1);
981 ///
982 /// tokio::task::yield_now().await; // yield to the scheduler
983 /// tokio::task::yield_now().await; // yield to the scheduler
984 /// tokio::task::yield_now().await; // yield to the scheduler
985 ///
986 /// // [G] `task` has yielded to the scheduler thrice (and thus been
987 /// // scheduled thrice) since the last sampling interval
988 /// assert_eq!(next_interval().total_scheduled_count, 3);
989 ///
990 /// tokio::task::yield_now().await; // yield to the scheduler
991 ///
992 /// next_interval
993 /// })
994 /// };
995 ///
996 /// // [C] `task` has not yet been polled at all
997 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
998 /// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
999 ///
1000 /// // [D] poll `task` to completion
1001 /// let mut next_interval = task.await;
1002 ///
1003 /// // [H] `task` has been polled 1 times since the last sample
1004 /// assert_eq!(next_interval().total_scheduled_count, 1);
1005 ///
1006 /// // [I] `task` has been polled 0 times since the last sample
1007 /// assert_eq!(next_interval().total_scheduled_count, 0);
1008 ///
1009 /// // [J] `task` has yielded to the scheduler a total of five times
1010 /// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 5);
1011 /// }
1012 /// ```
1013 #[doc(alias = "total_delay_count")]
1014 pub total_scheduled_count: u64,
1015
1016 /// The total duration that tasks spent waiting to be polled after awakening.
1017 ///
1018 /// ##### Definition
1019 /// This metric is equal to [`total_short_delay_count`][TaskMetrics::total_short_delay_count]
1020 /// \+ [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
1021 ///
1022 /// ##### Derived metrics
1023 /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
1024 /// The mean duration that tasks spent waiting to be executed after awakening.
1025 ///
1026 /// ##### Examples
1027 /// ```
1028 /// use tokio::time::Duration;
1029 ///
1030 /// #[tokio::main(flavor = "current_thread")]
1031 /// async fn main() {
1032 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1033 /// let mut interval = metrics_monitor.intervals();
1034 /// let mut next_interval = || interval.next().unwrap();
1035 ///
1036 /// // construct and instrument and spawn a task that yields endlessly
1037 /// tokio::spawn(metrics_monitor.instrument(async {
1038 /// loop { tokio::task::yield_now().await }
1039 /// }));
1040 ///
1041 /// tokio::task::yield_now().await;
1042 ///
1043 /// // block the executor for 1 second
1044 /// std::thread::sleep(Duration::from_millis(1000));
1045 ///
1046 /// tokio::task::yield_now().await;
1047 ///
1048 /// // `endless_task` will have spent approximately one second waiting
1049 /// let total_scheduled_duration = next_interval().total_scheduled_duration;
1050 /// assert!(total_scheduled_duration >= Duration::from_millis(1000));
1051 /// assert!(total_scheduled_duration <= Duration::from_millis(1100));
1052 /// }
1053 /// ```
1054 #[doc(alias = "total_delay_duration")]
1055 pub total_scheduled_duration: Duration,
1056
1057 /// The total number of times that tasks were polled.
1058 ///
1059 /// ##### Definition
1060 /// This metric is equal to [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count]
1061 /// \+ [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
1062 ///
1063 /// ##### Derived metrics
1064 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
1065 /// The mean duration of polls.
1066 ///
1067 /// ##### Examples
1068 /// In the below example, a task with multiple yield points is await'ed to completion; this
1069 /// metric reflects the number of `await`s within each sampling interval:
1070 /// ```
1071 /// #[tokio::main]
1072 /// async fn main() {
1073 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1074 ///
1075 /// // [A] no tasks have been created, instrumented, and polled more than once
1076 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1077 ///
1078 /// // [B] a `task` is created and instrumented
1079 /// let task = {
1080 /// let monitor = metrics_monitor.clone();
1081 /// metrics_monitor.instrument(async move {
1082 /// let mut interval = monitor.intervals();
1083 /// let mut next_interval = move || interval.next().unwrap();
1084 ///
1085 /// // [E] task is in the midst of its first poll
1086 /// assert_eq!(next_interval().total_poll_count, 0);
1087 ///
1088 /// tokio::task::yield_now().await; // poll 1
1089 ///
1090 /// // [F] task has been polled 1 time
1091 /// assert_eq!(next_interval().total_poll_count, 1);
1092 ///
1093 /// tokio::task::yield_now().await; // poll 2
1094 /// tokio::task::yield_now().await; // poll 3
1095 /// tokio::task::yield_now().await; // poll 4
1096 ///
1097 /// // [G] task has been polled 3 times
1098 /// assert_eq!(next_interval().total_poll_count, 3);
1099 ///
1100 /// tokio::task::yield_now().await; // poll 5
1101 ///
1102 /// next_interval // poll 6
1103 /// })
1104 /// };
1105 ///
1106 /// // [C] `task` has not yet been polled at all
1107 /// assert_eq!(metrics_monitor.cumulative().total_poll_count, 0);
1108 ///
1109 /// // [D] poll `task` to completion
1110 /// let mut next_interval = task.await;
1111 ///
1112 /// // [H] `task` has been polled 2 times since the last sample
1113 /// assert_eq!(next_interval().total_poll_count, 2);
1114 ///
1115 /// // [I] `task` has been polled 0 times since the last sample
1116 /// assert_eq!(next_interval().total_poll_count, 0);
1117 ///
1118 /// // [J] `task` has been polled 6 times
1119 /// assert_eq!(metrics_monitor.cumulative().total_poll_count, 6);
1120 /// }
1121 /// ```
1122 pub total_poll_count: u64,
1123
1124 /// The total duration elapsed during polls.
1125 ///
1126 /// ##### Definition
1127 /// This metric is equal to [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration]
1128 /// \+ [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration].
1129 ///
1130 /// ##### Derived metrics
1131 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
1132 /// The mean duration of polls.
1133 ///
1134 /// #### Examples
1135 /// ```
1136 /// use tokio::time::Duration;
1137 ///
1138 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
1139 /// async fn main() {
1140 /// let monitor = tokio_metrics::TaskMonitor::new();
1141 /// let mut interval = monitor.intervals();
1142 /// let mut next_interval = move || interval.next().unwrap();
1143 ///
1144 /// assert_eq!(next_interval().total_poll_duration, Duration::ZERO);
1145 ///
1146 /// monitor.instrument(async {
1147 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
1148 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
1149 /// () // poll 3 (0s)
1150 /// }).await;
1151 ///
1152 /// assert_eq!(next_interval().total_poll_duration, Duration::from_secs(2));
1153 /// }
1154 /// ```
1155 pub total_poll_duration: Duration,
1156
1157 /// The total number of times that polling tasks completed swiftly.
1158 ///
1159 /// Here, 'swiftly' is defined as completing in strictly less time than
1160 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1161 ///
1162 /// ##### Derived metrics
1163 /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**
1164 /// The mean duration of fast polls.
1165 ///
1166 /// ##### Examples
1167 /// In the below example, 0 polls occur within the first sampling interval, 3 fast polls occur
1168 /// within the second sampling interval, and 2 fast polls occur within the third sampling
1169 /// interval:
1170 /// ```
1171 /// use std::future::Future;
1172 /// use std::time::Duration;
1173 ///
1174 /// #[tokio::main]
1175 /// async fn main() {
1176 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1177 /// let mut interval = metrics_monitor.intervals();
1178 /// let mut next_interval = || interval.next().unwrap();
1179 ///
1180 /// // no tasks have been constructed, instrumented, or polled
1181 /// assert_eq!(next_interval().total_fast_poll_count, 0);
1182 ///
1183 /// let fast = Duration::ZERO;
1184 ///
1185 /// // this task completes in three fast polls
1186 /// let _ = metrics_monitor.instrument(async {
1187 /// spin_for(fast).await; // fast poll 1
1188 /// spin_for(fast).await; // fast poll 2
1189 /// spin_for(fast) // fast poll 3
1190 /// }).await;
1191 ///
1192 /// assert_eq!(next_interval().total_fast_poll_count, 3);
1193 ///
1194 /// // this task completes in two fast polls
1195 /// let _ = metrics_monitor.instrument(async {
1196 /// spin_for(fast).await; // fast poll 1
1197 /// spin_for(fast) // fast poll 2
1198 /// }).await;
1199 ///
1200 /// assert_eq!(next_interval().total_fast_poll_count, 2);
1201 /// }
1202 ///
1203 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1204 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1205 /// let start = tokio::time::Instant::now();
1206 /// while start.elapsed() <= duration {}
1207 /// tokio::task::yield_now()
1208 /// }
1209 /// ```
1210 pub total_fast_poll_count: u64,
1211
1212 /// The total duration of fast polls.
1213 ///
1214 /// Here, 'fast' is defined as completing in strictly less time than
1215 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1216 ///
1217 /// ##### Derived metrics
1218 /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**
1219 /// The mean duration of fast polls.
1220 ///
1221 /// ##### Examples
1222 /// In the below example, no tasks are polled in the first sampling interval; three fast polls
1223 /// consume a total of 3μs time in the second sampling interval; and two fast polls consume a
1224 /// total of 2μs time in the third sampling interval:
1225 /// ```
1226 /// use std::future::Future;
1227 /// use std::time::Duration;
1228 ///
1229 /// #[tokio::main]
1230 /// async fn main() {
1231 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1232 /// let mut interval = metrics_monitor.intervals();
1233 /// let mut next_interval = || interval.next().unwrap();
1234 ///
1235 /// // no tasks have been constructed, instrumented, or polled
1236 /// let interval = next_interval();
1237 /// assert_eq!(interval.total_fast_poll_duration, Duration::ZERO);
1238 ///
1239 /// let fast = Duration::from_micros(1);
1240 ///
1241 /// // this task completes in three fast polls
1242 /// let task_a_time = time(metrics_monitor.instrument(async {
1243 /// spin_for(fast).await; // fast poll 1
1244 /// spin_for(fast).await; // fast poll 2
1245 /// spin_for(fast) // fast poll 3
1246 /// })).await;
1247 ///
1248 /// let interval = next_interval();
1249 /// assert!(interval.total_fast_poll_duration >= fast * 3);
1250 /// assert!(interval.total_fast_poll_duration <= task_a_time);
1251 ///
1252 /// // this task completes in two fast polls
1253 /// let task_b_time = time(metrics_monitor.instrument(async {
1254 /// spin_for(fast).await; // fast poll 1
1255 /// spin_for(fast) // fast poll 2
1256 /// })).await;
1257 ///
1258 /// let interval = next_interval();
1259 /// assert!(interval.total_fast_poll_duration >= fast * 2);
1260 /// assert!(interval.total_fast_poll_duration <= task_b_time);
1261 /// }
1262 ///
1263 /// /// Produces the amount of time it took to await a given async task.
1264 /// async fn time(task: impl Future) -> Duration {
1265 /// let start = tokio::time::Instant::now();
1266 /// task.await;
1267 /// start.elapsed()
1268 /// }
1269 ///
1270 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1271 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1272 /// let start = tokio::time::Instant::now();
1273 /// while start.elapsed() <= duration {}
1274 /// tokio::task::yield_now()
1275 /// }
1276 /// ```
1277 pub total_fast_poll_duration: Duration,
1278
1279 /// The total number of times that polling tasks completed slowly.
1280 ///
1281 /// Here, 'slowly' is defined as completing in at least as much time as
1282 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1283 ///
1284 /// ##### Derived metrics
1285 /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**
1286 /// The mean duration of slow polls.
1287 ///
1288 /// ##### Examples
1289 /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1290 /// within the second sampling interval, and 2 slow polls occur within the third sampling
1291 /// interval:
1292 /// ```
1293 /// use std::future::Future;
1294 /// use std::time::Duration;
1295 ///
1296 /// #[tokio::main]
1297 /// async fn main() {
1298 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1299 /// let mut interval = metrics_monitor.intervals();
1300 /// let mut next_interval = || interval.next().unwrap();
1301 ///
1302 /// // no tasks have been constructed, instrumented, or polled
1303 /// assert_eq!(next_interval().total_slow_poll_count, 0);
1304 ///
1305 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1306 ///
1307 /// // this task completes in three slow polls
1308 /// let _ = metrics_monitor.instrument(async {
1309 /// spin_for(slow).await; // slow poll 1
1310 /// spin_for(slow).await; // slow poll 2
1311 /// spin_for(slow) // slow poll 3
1312 /// }).await;
1313 ///
1314 /// assert_eq!(next_interval().total_slow_poll_count, 3);
1315 ///
1316 /// // this task completes in two slow polls
1317 /// let _ = metrics_monitor.instrument(async {
1318 /// spin_for(slow).await; // slow poll 1
1319 /// spin_for(slow) // slow poll 2
1320 /// }).await;
1321 ///
1322 /// assert_eq!(next_interval().total_slow_poll_count, 2);
1323 /// }
1324 ///
1325 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1326 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1327 /// let start = tokio::time::Instant::now();
1328 /// while start.elapsed() <= duration {}
1329 /// tokio::task::yield_now()
1330 /// }
1331 /// ```
1332 pub total_slow_poll_count: u64,
1333
1334 /// The total duration of slow polls.
1335 ///
1336 /// Here, 'slowly' is defined as completing in at least as much time as
1337 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1338 ///
1339 /// ##### Derived metrics
1340 /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**
1341 /// The mean duration of slow polls.
1342 ///
1343 /// ##### Examples
1344 /// In the below example, no tasks are polled in the first sampling interval; three slow polls
1345 /// consume a total of
1346 /// 30 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD]
1347 /// time in the second sampling interval; and two slow polls consume a total of
1348 /// 20 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
1349 /// third sampling interval:
1350 /// ```
1351 /// use std::future::Future;
1352 /// use std::time::Duration;
1353 ///
1354 /// #[tokio::main]
1355 /// async fn main() {
1356 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1357 /// let mut interval = metrics_monitor.intervals();
1358 /// let mut next_interval = || interval.next().unwrap();
1359 ///
1360 /// // no tasks have been constructed, instrumented, or polled
1361 /// let interval = next_interval();
1362 /// assert_eq!(interval.total_slow_poll_duration, Duration::ZERO);
1363 ///
1364 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1365 ///
1366 /// // this task completes in three slow polls
1367 /// let task_a_time = time(metrics_monitor.instrument(async {
1368 /// spin_for(slow).await; // slow poll 1
1369 /// spin_for(slow).await; // slow poll 2
1370 /// spin_for(slow) // slow poll 3
1371 /// })).await;
1372 ///
1373 /// let interval = next_interval();
1374 /// assert!(interval.total_slow_poll_duration >= slow * 3);
1375 /// assert!(interval.total_slow_poll_duration <= task_a_time);
1376 ///
1377 /// // this task completes in two slow polls
1378 /// let task_b_time = time(metrics_monitor.instrument(async {
1379 /// spin_for(slow).await; // slow poll 1
1380 /// spin_for(slow) // slow poll 2
1381 /// })).await;
1382 ///
1383 /// let interval = next_interval();
1384 /// assert!(interval.total_slow_poll_duration >= slow * 2);
1385 /// assert!(interval.total_slow_poll_duration <= task_b_time);
1386 /// }
1387 ///
1388 /// /// Produces the amount of time it took to await a given async task.
1389 /// async fn time(task: impl Future) -> Duration {
1390 /// let start = tokio::time::Instant::now();
1391 /// task.await;
1392 /// start.elapsed()
1393 /// }
1394 ///
1395 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1396 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1397 /// let start = tokio::time::Instant::now();
1398 /// while start.elapsed() <= duration {}
1399 /// tokio::task::yield_now()
1400 /// }
1401 /// ```
1402 pub total_slow_poll_duration: Duration,
1403
1404 /// The total count of tasks with short scheduling delays.
1405 ///
1406 /// This is defined as tasks taking strictly less than
1407 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1408 /// scheduled.
1409 ///
1410 /// ##### Derived metrics
1411 /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**
1412 /// The mean duration of short scheduling delays.
1413 pub total_short_delay_count: u64,
1414
1415 /// The total count of tasks with long scheduling delays.
1416 ///
1417 /// This is defined as tasks taking
1418 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] or longer to be executed
1419 /// after being scheduled.
1420 ///
1421 /// ##### Derived metrics
1422 /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**
1423 /// The mean duration of short scheduling delays.
1424 pub total_long_delay_count: u64,
1425
1426 /// The total duration of tasks with short scheduling delays.
1427 ///
1428 /// This is defined as tasks taking strictly less than
1429 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1430 /// scheduled.
1431 ///
1432 /// ##### Derived metrics
1433 /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**
1434 /// The mean duration of short scheduling delays.
1435 pub total_short_delay_duration: Duration,
1436
1437 /// The total number of times that a task had a long scheduling duration.
1438 ///
1439 /// Here, a long scheduling duration is defined as taking longer to start execution after
1440 /// scheduling than [`long_delay_threshold`][TaskMonitor::long_delay_threshold].
1441 ///
1442 /// ##### Derived metrics
1443 /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**
1444 /// The mean duration of short scheduling delays.
1445 pub total_long_delay_duration: Duration,
1446}
1447
1448/// Tracks the metrics, shared across the various types.
1449#[derive(Debug)]
1450struct RawMetrics {
1451 /// A task poll takes longer than this, it is considered a slow poll.
1452 slow_poll_threshold: Duration,
1453
1454 /// A scheduling delay of at least this long will be considered a long delay
1455 long_delay_threshold: Duration,
1456
1457 /// Total number of instrumented tasks.
1458 instrumented_count: AtomicU64,
1459
1460 /// Total number of instrumented tasks polled at least once.
1461 first_poll_count: AtomicU64,
1462
1463 /// Total number of times tasks entered the `idle` state.
1464 total_idled_count: AtomicU64,
1465
1466 /// Total number of times tasks were scheduled.
1467 total_scheduled_count: AtomicU64,
1468
1469 /// Total number of times tasks were polled fast
1470 total_fast_poll_count: AtomicU64,
1471
1472 /// Total number of times tasks were polled slow
1473 total_slow_poll_count: AtomicU64,
1474
1475 /// Total number of times tasks had long delay,
1476 total_long_delay_count: AtomicU64,
1477
1478 /// Total number of times tasks had little delay
1479 total_short_delay_count: AtomicU64,
1480
1481 /// Total number of times tasks were dropped
1482 dropped_count: AtomicU64,
1483
1484 /// Total amount of time until the first poll
1485 total_first_poll_delay_ns: AtomicU64,
1486
1487 /// Total amount of time tasks spent in the `idle` state.
1488 total_idle_duration_ns: AtomicU64,
1489
1490 /// The longest time tasks spent in the `idle` state locally.
1491 /// This will be used to track the local max between interval
1492 /// metric snapshots.
1493 local_max_idle_duration_ns: AtomicU64,
1494
1495 /// The longest time tasks spent in the `idle` state.
1496 global_max_idle_duration_ns: AtomicU64,
1497
1498 /// Total amount of time tasks spent in the waking state.
1499 total_scheduled_duration_ns: AtomicU64,
1500
1501 /// Total amount of time tasks spent being polled below the slow cut off.
1502 total_fast_poll_duration_ns: AtomicU64,
1503
1504 /// Total amount of time tasks spent being polled above the slow cut off.
1505 total_slow_poll_duration: AtomicU64,
1506
1507 /// Total amount of time tasks spent being polled below the long delay cut off.
1508 total_short_delay_duration_ns: AtomicU64,
1509
1510 /// Total amount of time tasks spent being polled at or above the long delay cut off.
1511 total_long_delay_duration_ns: AtomicU64,
1512}
1513
1514#[derive(Debug)]
1515struct State {
1516 /// Where metrics should be recorded
1517 metrics: Arc<RawMetrics>,
1518
1519 /// Instant at which the task was instrumented. This is used to track the time to first poll.
1520 instrumented_at: Instant,
1521
1522 /// The instant, tracked as nanoseconds since `instrumented_at`, at which the future
1523 /// was last woken.
1524 woke_at: AtomicU64,
1525
1526 /// Waker to forward notifications to.
1527 waker: AtomicWaker,
1528}
1529
1530impl TaskMonitor {
1531 /// The default duration at which polls cross the threshold into being categorized as 'slow' is
1532 /// 50μs.
1533 #[cfg(not(test))]
1534 pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_micros(50);
1535 #[cfg(test)]
1536 #[allow(missing_docs)]
1537 pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_millis(500);
1538
1539 /// The default duration at which schedules cross the threshold into being categorized as 'long'
1540 /// is 50μs.
1541 #[cfg(not(test))]
1542 pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_micros(50);
1543 #[cfg(test)]
1544 #[allow(missing_docs)]
1545 pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_millis(500);
1546
1547 /// Constructs a new task monitor.
1548 ///
1549 /// Uses [`Self::DEFAULT_SLOW_POLL_THRESHOLD`] as the threshold at which polls will be
1550 /// considered 'slow'.
1551 ///
1552 /// Uses [`Self::DEFAULT_LONG_DELAY_THRESHOLD`] as the threshold at which scheduling will be
1553 /// considered 'long'.
1554 pub fn new() -> TaskMonitor {
1555 TaskMonitor::with_slow_poll_threshold(Self::DEFAULT_SLOW_POLL_THRESHOLD)
1556 }
1557
1558 /// Constructs a builder for a task monitor.
1559 pub fn builder() -> TaskMonitorBuilder {
1560 TaskMonitorBuilder::new()
1561 }
1562
1563 /// Constructs a new task monitor with a given threshold at which polls are considered 'slow'.
1564 ///
1565 /// ##### Selecting an appropriate threshold
1566 /// TODO. What advice can we give here?
1567 ///
1568 /// ##### Examples
1569 /// In the below example, low-threshold and high-threshold monitors are constructed and
1570 /// instrument identical tasks; the low-threshold monitor reports4 slow polls, and the
1571 /// high-threshold monitor reports only 2 slow polls:
1572 /// ```
1573 /// use std::future::Future;
1574 /// use std::time::Duration;
1575 /// use tokio_metrics::TaskMonitor;
1576 ///
1577 /// #[tokio::main]
1578 /// async fn main() {
1579 /// let lo_threshold = Duration::from_micros(10);
1580 /// let hi_threshold = Duration::from_millis(10);
1581 ///
1582 /// let lo_monitor = TaskMonitor::with_slow_poll_threshold(lo_threshold);
1583 /// let hi_monitor = TaskMonitor::with_slow_poll_threshold(hi_threshold);
1584 ///
1585 /// let make_task = || async {
1586 /// spin_for(lo_threshold).await; // faster poll 1
1587 /// spin_for(lo_threshold).await; // faster poll 2
1588 /// spin_for(hi_threshold).await; // slower poll 3
1589 /// spin_for(hi_threshold).await // slower poll 4
1590 /// };
1591 ///
1592 /// lo_monitor.instrument(make_task()).await;
1593 /// hi_monitor.instrument(make_task()).await;
1594 ///
1595 /// // the low-threshold monitor reported 4 slow polls:
1596 /// assert_eq!(lo_monitor.cumulative().total_slow_poll_count, 4);
1597 /// // the high-threshold monitor reported only 2 slow polls:
1598 /// assert_eq!(hi_monitor.cumulative().total_slow_poll_count, 2);
1599 /// }
1600 ///
1601 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1602 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1603 /// let start = tokio::time::Instant::now();
1604 /// while start.elapsed() <= duration {}
1605 /// tokio::task::yield_now()
1606 /// }
1607 /// ```
1608 pub fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitor {
1609 Self::create(slow_poll_cut_off, Self::DEFAULT_LONG_DELAY_THRESHOLD)
1610 }
1611
1612 fn create(slow_poll_cut_off: Duration, long_delay_cut_off: Duration) -> TaskMonitor {
1613 TaskMonitor {
1614 metrics: Arc::new(RawMetrics {
1615 slow_poll_threshold: slow_poll_cut_off,
1616 first_poll_count: AtomicU64::new(0),
1617 total_idled_count: AtomicU64::new(0),
1618 total_scheduled_count: AtomicU64::new(0),
1619 total_fast_poll_count: AtomicU64::new(0),
1620 total_slow_poll_count: AtomicU64::new(0),
1621 total_long_delay_count: AtomicU64::new(0),
1622 instrumented_count: AtomicU64::new(0),
1623 dropped_count: AtomicU64::new(0),
1624 total_first_poll_delay_ns: AtomicU64::new(0),
1625 total_scheduled_duration_ns: AtomicU64::new(0),
1626 local_max_idle_duration_ns: AtomicU64::new(0),
1627 global_max_idle_duration_ns: AtomicU64::new(0),
1628 total_idle_duration_ns: AtomicU64::new(0),
1629 total_fast_poll_duration_ns: AtomicU64::new(0),
1630 total_slow_poll_duration: AtomicU64::new(0),
1631 total_short_delay_duration_ns: AtomicU64::new(0),
1632 long_delay_threshold: long_delay_cut_off,
1633 total_short_delay_count: AtomicU64::new(0),
1634 total_long_delay_duration_ns: AtomicU64::new(0),
1635 }),
1636 }
1637 }
1638
1639 /// Produces the duration greater-than-or-equal-to at which polls are categorized as slow.
1640 ///
1641 /// ##### Examples
1642 /// In the below example, [`TaskMonitor`] is initialized with [`TaskMonitor::new`];
1643 /// consequently, its slow-poll threshold equals [`TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD`]:
1644 /// ```
1645 /// use tokio_metrics::TaskMonitor;
1646 ///
1647 /// #[tokio::main]
1648 /// async fn main() {
1649 /// let metrics_monitor = TaskMonitor::new();
1650 ///
1651 /// assert_eq!(
1652 /// metrics_monitor.slow_poll_threshold(),
1653 /// TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD
1654 /// );
1655 /// }
1656 /// ```
1657 pub fn slow_poll_threshold(&self) -> Duration {
1658 self.metrics.slow_poll_threshold
1659 }
1660
1661 /// Produces the duration greater-than-or-equal-to at which scheduling delays are categorized
1662 /// as long.
1663 pub fn long_delay_threshold(&self) -> Duration {
1664 self.metrics.long_delay_threshold
1665 }
1666
1667 /// Produces an instrumented façade around a given async task.
1668 ///
1669 /// ##### Examples
1670 /// Instrument an async task by passing it to [`TaskMonitor::instrument`]:
1671 /// ```
1672 /// #[tokio::main]
1673 /// async fn main() {
1674 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1675 ///
1676 /// // 0 tasks have been instrumented, much less polled
1677 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1678 ///
1679 /// // instrument a task and poll it to completion
1680 /// metrics_monitor.instrument(async {}).await;
1681 ///
1682 /// // 1 task has been instrumented and polled
1683 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 1);
1684 ///
1685 /// // instrument a task and poll it to completion
1686 /// metrics_monitor.instrument(async {}).await;
1687 ///
1688 /// // 2 tasks have been instrumented and polled
1689 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 2);
1690 /// }
1691 /// ```
1692 /// An aync task may be tracked by multiple [`TaskMonitor`]s; e.g.:
1693 /// ```
1694 /// #[tokio::main]
1695 /// async fn main() {
1696 /// let monitor_a = tokio_metrics::TaskMonitor::new();
1697 /// let monitor_b = tokio_metrics::TaskMonitor::new();
1698 ///
1699 /// // 0 tasks have been instrumented, much less polled
1700 /// assert_eq!(monitor_a.cumulative().first_poll_count, 0);
1701 /// assert_eq!(monitor_b.cumulative().first_poll_count, 0);
1702 ///
1703 /// // instrument a task and poll it to completion
1704 /// monitor_a.instrument(monitor_b.instrument(async {})).await;
1705 ///
1706 /// // 1 task has been instrumented and polled
1707 /// assert_eq!(monitor_a.cumulative().first_poll_count, 1);
1708 /// assert_eq!(monitor_b.cumulative().first_poll_count, 1);
1709 /// }
1710 /// ```
1711 /// It is also possible (but probably undesirable) to instrument an async task multiple times
1712 /// with the same [`TaskMonitor`]; e.g.:
1713 /// ```
1714 /// #[tokio::main]
1715 /// async fn main() {
1716 /// let monitor = tokio_metrics::TaskMonitor::new();
1717 ///
1718 /// // 0 tasks have been instrumented, much less polled
1719 /// assert_eq!(monitor.cumulative().first_poll_count, 0);
1720 ///
1721 /// // instrument a task and poll it to completion
1722 /// monitor.instrument(monitor.instrument(async {})).await;
1723 ///
1724 /// // 2 tasks have been instrumented and polled, supposedly
1725 /// assert_eq!(monitor.cumulative().first_poll_count, 2);
1726 /// }
1727 /// ```
1728 pub fn instrument<F>(&self, task: F) -> Instrumented<F> {
1729 self.metrics.instrumented_count.fetch_add(1, SeqCst);
1730 Instrumented {
1731 task,
1732 did_poll_once: false,
1733 idled_at: 0,
1734 state: Arc::new(State {
1735 metrics: self.metrics.clone(),
1736 instrumented_at: Instant::now(),
1737 woke_at: AtomicU64::new(0),
1738 waker: AtomicWaker::new(),
1739 }),
1740 }
1741 }
1742
1743 /// Produces [`TaskMetrics`] for the tasks instrumented by this [`TaskMonitor`], collected since
1744 /// the construction of [`TaskMonitor`].
1745 ///
1746 /// ##### See also
1747 /// - [`TaskMonitor::intervals`]:
1748 /// produces [`TaskMetrics`] for user-defined sampling intervals, instead of cumulatively
1749 ///
1750 /// ##### Examples
1751 /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1752 /// within the second sampling interval, and 2 slow polls occur within the third sampling
1753 /// interval; five slow polls occur across all sampling intervals:
1754 /// ```
1755 /// use std::future::Future;
1756 /// use std::time::Duration;
1757 ///
1758 /// #[tokio::main]
1759 /// async fn main() {
1760 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1761 ///
1762 /// // initialize a stream of sampling intervals
1763 /// let mut intervals = metrics_monitor.intervals();
1764 /// // each call of `next_interval` will produce metrics for the last sampling interval
1765 /// let mut next_interval = || intervals.next().unwrap();
1766 ///
1767 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1768 ///
1769 /// // this task completes in three slow polls
1770 /// let _ = metrics_monitor.instrument(async {
1771 /// spin_for(slow).await; // slow poll 1
1772 /// spin_for(slow).await; // slow poll 2
1773 /// spin_for(slow) // slow poll 3
1774 /// }).await;
1775 ///
1776 /// // in the previous sampling interval, there were 3 slow polls
1777 /// assert_eq!(next_interval().total_slow_poll_count, 3);
1778 /// assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 3);
1779 ///
1780 /// // this task completes in two slow polls
1781 /// let _ = metrics_monitor.instrument(async {
1782 /// spin_for(slow).await; // slow poll 1
1783 /// spin_for(slow) // slow poll 2
1784 /// }).await;
1785 ///
1786 /// // in the previous sampling interval, there were 2 slow polls
1787 /// assert_eq!(next_interval().total_slow_poll_count, 2);
1788 ///
1789 /// // across all sampling interval, there were a total of 5 slow polls
1790 /// assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1791 /// }
1792 ///
1793 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1794 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1795 /// let start = tokio::time::Instant::now();
1796 /// while start.elapsed() <= duration {}
1797 /// tokio::task::yield_now()
1798 /// }
1799 /// ```
1800 pub fn cumulative(&self) -> TaskMetrics {
1801 self.metrics.metrics()
1802 }
1803
1804 /// Produces an unending iterator of metric sampling intervals.
1805 ///
1806 /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1807 /// produced by [`TaskMonitor::intervals`]. The item type of this iterator is [`TaskMetrics`],
1808 /// which is a bundle of task metrics that describe *only* events occurring within that sampling
1809 /// interval.
1810 ///
1811 /// ##### Examples
1812 /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1813 /// within the second sampling interval, and 2 slow polls occur within the third sampling
1814 /// interval; five slow polls occur across all sampling intervals:
1815 /// ```
1816 /// use std::future::Future;
1817 /// use std::time::Duration;
1818 ///
1819 /// #[tokio::main]
1820 /// async fn main() {
1821 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1822 ///
1823 /// // initialize a stream of sampling intervals
1824 /// let mut intervals = metrics_monitor.intervals();
1825 /// // each call of `next_interval` will produce metrics for the last sampling interval
1826 /// let mut next_interval = || intervals.next().unwrap();
1827 ///
1828 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1829 ///
1830 /// // this task completes in three slow polls
1831 /// let _ = metrics_monitor.instrument(async {
1832 /// spin_for(slow).await; // slow poll 1
1833 /// spin_for(slow).await; // slow poll 2
1834 /// spin_for(slow) // slow poll 3
1835 /// }).await;
1836 ///
1837 /// // in the previous sampling interval, there were 3 slow polls
1838 /// assert_eq!(next_interval().total_slow_poll_count, 3);
1839 ///
1840 /// // this task completes in two slow polls
1841 /// let _ = metrics_monitor.instrument(async {
1842 /// spin_for(slow).await; // slow poll 1
1843 /// spin_for(slow) // slow poll 2
1844 /// }).await;
1845 ///
1846 /// // in the previous sampling interval, there were 2 slow polls
1847 /// assert_eq!(next_interval().total_slow_poll_count, 2);
1848 ///
1849 /// // across all sampling intervals, there were a total of 5 slow polls
1850 /// assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1851 /// }
1852 ///
1853 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1854 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1855 /// let start = tokio::time::Instant::now();
1856 /// while start.elapsed() <= duration {}
1857 /// tokio::task::yield_now()
1858 /// }
1859 /// ```
1860 pub fn intervals(&self) -> TaskIntervals {
1861 TaskIntervals {
1862 metrics: self.metrics.clone(),
1863 previous: None,
1864 }
1865 }
1866}
1867
1868impl RawMetrics {
1869 fn get_and_reset_local_max_idle_duration(&self) -> Duration {
1870 Duration::from_nanos(self.local_max_idle_duration_ns.swap(0, SeqCst))
1871 }
1872
1873 fn metrics(&self) -> TaskMetrics {
1874 let total_fast_poll_count = self.total_fast_poll_count.load(SeqCst);
1875 let total_slow_poll_count = self.total_slow_poll_count.load(SeqCst);
1876
1877 let total_fast_poll_duration =
1878 Duration::from_nanos(self.total_fast_poll_duration_ns.load(SeqCst));
1879 let total_slow_poll_duration =
1880 Duration::from_nanos(self.total_slow_poll_duration.load(SeqCst));
1881
1882 let total_poll_count = total_fast_poll_count + total_slow_poll_count;
1883 let total_poll_duration = total_fast_poll_duration + total_slow_poll_duration;
1884
1885 TaskMetrics {
1886 instrumented_count: self.instrumented_count.load(SeqCst),
1887 dropped_count: self.dropped_count.load(SeqCst),
1888
1889 total_poll_count,
1890 total_poll_duration,
1891 first_poll_count: self.first_poll_count.load(SeqCst),
1892 total_idled_count: self.total_idled_count.load(SeqCst),
1893 total_scheduled_count: self.total_scheduled_count.load(SeqCst),
1894 total_fast_poll_count: self.total_fast_poll_count.load(SeqCst),
1895 total_slow_poll_count: self.total_slow_poll_count.load(SeqCst),
1896 total_short_delay_count: self.total_short_delay_count.load(SeqCst),
1897 total_long_delay_count: self.total_long_delay_count.load(SeqCst),
1898 total_first_poll_delay: Duration::from_nanos(
1899 self.total_first_poll_delay_ns.load(SeqCst),
1900 ),
1901 max_idle_duration: Duration::from_nanos(self.global_max_idle_duration_ns.load(SeqCst)),
1902 total_idle_duration: Duration::from_nanos(self.total_idle_duration_ns.load(SeqCst)),
1903 total_scheduled_duration: Duration::from_nanos(
1904 self.total_scheduled_duration_ns.load(SeqCst),
1905 ),
1906 total_fast_poll_duration: Duration::from_nanos(
1907 self.total_fast_poll_duration_ns.load(SeqCst),
1908 ),
1909 total_slow_poll_duration: Duration::from_nanos(
1910 self.total_slow_poll_duration.load(SeqCst),
1911 ),
1912 total_short_delay_duration: Duration::from_nanos(
1913 self.total_short_delay_duration_ns.load(SeqCst),
1914 ),
1915 total_long_delay_duration: Duration::from_nanos(
1916 self.total_long_delay_duration_ns.load(SeqCst),
1917 ),
1918 }
1919 }
1920}
1921
1922impl Default for TaskMonitor {
1923 fn default() -> TaskMonitor {
1924 TaskMonitor::new()
1925 }
1926}
1927
1928impl TaskMetrics {
1929 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
1930 /// are first polled.
1931 ///
1932 /// ##### Definition
1933 /// This metric is derived from [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay]
1934 /// ÷ [`first_poll_count`][TaskMetrics::first_poll_count].
1935 ///
1936 /// ##### Interpretation
1937 /// If this metric increases, it means that, on average, tasks spent longer waiting to be
1938 /// initially polled.
1939 ///
1940 /// ##### See also
1941 /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
1942 /// The mean duration that tasks spent waiting to be executed after awakening.
1943 ///
1944 /// ##### Examples
1945 /// In the below example, no tasks are instrumented or polled within the first sampling
1946 /// interval; in the second sampling interval, 500ms elapse between the instrumentation of a
1947 /// task and its first poll; in the third sampling interval, a mean of 750ms elapse between the
1948 /// instrumentation and first poll of two tasks:
1949 /// ```
1950 /// use std::time::Duration;
1951 ///
1952 /// #[tokio::main]
1953 /// async fn main() {
1954 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1955 /// let mut interval = metrics_monitor.intervals();
1956 /// let mut next_interval = || interval.next().unwrap();
1957 ///
1958 /// // no tasks have yet been created, instrumented, or polled
1959 /// assert_eq!(next_interval().mean_first_poll_delay(), Duration::ZERO);
1960 ///
1961 /// // constructs and instruments a task, pauses for `pause_time`, awaits the task, then
1962 /// // produces the total time it took to do all of the aforementioned
1963 /// async fn instrument_pause_await(
1964 /// metrics_monitor: &tokio_metrics::TaskMonitor,
1965 /// pause_time: Duration
1966 /// ) -> Duration
1967 /// {
1968 /// let before_instrumentation = tokio::time::Instant::now();
1969 /// let task = metrics_monitor.instrument(async move {});
1970 /// tokio::time::sleep(pause_time).await;
1971 /// task.await;
1972 /// before_instrumentation.elapsed()
1973 /// }
1974 ///
1975 /// // construct and await a task that pauses for 500ms between instrumentation and first poll
1976 /// let task_a_pause_time = Duration::from_millis(500);
1977 /// let task_a_total_time = instrument_pause_await(&metrics_monitor, task_a_pause_time).await;
1978 ///
1979 /// // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
1980 /// // pause time of 500ms, and less-than-or-equal-to the total runtime of `task_a`
1981 /// let mean_first_poll_delay = next_interval().mean_first_poll_delay();
1982 /// assert!(mean_first_poll_delay >= task_a_pause_time);
1983 /// assert!(mean_first_poll_delay <= task_a_total_time);
1984 ///
1985 /// // construct and await a task that pauses for 500ms between instrumentation and first poll
1986 /// let task_b_pause_time = Duration::from_millis(500);
1987 /// let task_b_total_time = instrument_pause_await(&metrics_monitor, task_b_pause_time).await;
1988 ///
1989 /// // construct and await a task that pauses for 1000ms between instrumentation and first poll
1990 /// let task_c_pause_time = Duration::from_millis(1000);
1991 /// let task_c_total_time = instrument_pause_await(&metrics_monitor, task_c_pause_time).await;
1992 ///
1993 /// // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
1994 /// // average pause time of 500ms, and less-than-or-equal-to the combined total runtime of
1995 /// // `task_b` and `task_c`
1996 /// let mean_first_poll_delay = next_interval().mean_first_poll_delay();
1997 /// assert!(mean_first_poll_delay >= (task_b_pause_time + task_c_pause_time) / 2);
1998 /// assert!(mean_first_poll_delay <= (task_b_total_time + task_c_total_time) / 2);
1999 /// }
2000 /// ```
2001 pub fn mean_first_poll_delay(&self) -> Duration {
2002 mean(self.total_first_poll_delay, self.first_poll_count)
2003 }
2004
2005 /// The mean duration of idles.
2006 ///
2007 /// ##### Definition
2008 /// This metric is derived from [`total_idle_duration`][TaskMetrics::total_idle_duration] ÷
2009 /// [`total_idled_count`][TaskMetrics::total_idled_count].
2010 ///
2011 /// ##### Interpretation
2012 /// The idle state is the duration spanning the instant a task completes a poll, and the instant
2013 /// that it is next awoken. Tasks inhabit this state when they are waiting for task-external
2014 /// events to complete (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this
2015 /// metric increases, it means that tasks, in aggregate, spent more time waiting for
2016 /// task-external events to complete.
2017 ///
2018 /// ##### Examples
2019 /// ```
2020 /// #[tokio::main]
2021 /// async fn main() {
2022 /// let monitor = tokio_metrics::TaskMonitor::new();
2023 /// let one_sec = std::time::Duration::from_secs(1);
2024 ///
2025 /// monitor.instrument(async move {
2026 /// tokio::time::sleep(one_sec).await;
2027 /// }).await;
2028 ///
2029 /// assert!(monitor.cumulative().mean_idle_duration() >= one_sec);
2030 /// }
2031 /// ```
2032 pub fn mean_idle_duration(&self) -> Duration {
2033 mean(self.total_idle_duration, self.total_idled_count)
2034 }
2035
2036 /// The mean duration that tasks spent waiting to be executed after awakening.
2037 ///
2038 /// ##### Definition
2039 /// This metric is derived from
2040 /// [`total_scheduled_duration`][TaskMetrics::total_scheduled_duration] ÷
2041 /// [`total_scheduled_count`][`TaskMetrics::total_scheduled_count`].
2042 ///
2043 /// ##### Interpretation
2044 /// If this metric increases, it means that, on average, tasks spent longer in the runtime's
2045 /// queues before being polled.
2046 ///
2047 /// ##### See also
2048 /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
2049 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
2050 /// are first polled.
2051 ///
2052 /// ##### Examples
2053 /// ```
2054 /// use tokio::time::Duration;
2055 ///
2056 /// #[tokio::main(flavor = "current_thread")]
2057 /// async fn main() {
2058 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2059 /// let mut interval = metrics_monitor.intervals();
2060 /// let mut next_interval = || interval.next().unwrap();
2061 ///
2062 /// // construct and instrument and spawn a task that yields endlessly
2063 /// tokio::spawn(metrics_monitor.instrument(async {
2064 /// loop { tokio::task::yield_now().await }
2065 /// }));
2066 ///
2067 /// tokio::task::yield_now().await;
2068 ///
2069 /// // block the executor for 1 second
2070 /// std::thread::sleep(Duration::from_millis(1000));
2071 ///
2072 /// // get the task to run twice
2073 /// // the first will have a 1 sec scheduling delay, the second will have almost none
2074 /// tokio::task::yield_now().await;
2075 /// tokio::task::yield_now().await;
2076 ///
2077 /// // `endless_task` will have spent approximately one second waiting
2078 /// let mean_scheduled_duration = next_interval().mean_scheduled_duration();
2079 /// assert!(mean_scheduled_duration >= Duration::from_millis(500), "{}", mean_scheduled_duration.as_secs_f64());
2080 /// assert!(mean_scheduled_duration <= Duration::from_millis(600), "{}", mean_scheduled_duration.as_secs_f64());
2081 /// }
2082 /// ```
2083 pub fn mean_scheduled_duration(&self) -> Duration {
2084 mean(self.total_scheduled_duration, self.total_scheduled_count)
2085 }
2086
2087 /// The mean duration of polls.
2088 ///
2089 /// ##### Definition
2090 /// This metric is derived from [`total_poll_duration`][TaskMetrics::total_poll_duration] ÷
2091 /// [`total_poll_count`][TaskMetrics::total_poll_count].
2092 ///
2093 /// ##### Interpretation
2094 /// If this metric increases, it means that, on average, individual polls are tending to take
2095 /// longer. However, this does not necessarily imply increased task latency: An increase in poll
2096 /// durations could be offset by fewer polls.
2097 ///
2098 /// ##### See also
2099 /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**
2100 /// The ratio between the number polls categorized as slow and fast.
2101 /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**
2102 /// The mean duration of slow polls.
2103 ///
2104 /// ##### Examples
2105 /// ```
2106 /// use std::time::Duration;
2107 ///
2108 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
2109 /// async fn main() {
2110 /// let monitor = tokio_metrics::TaskMonitor::new();
2111 /// let mut interval = monitor.intervals();
2112 /// let mut next_interval = move || interval.next().unwrap();
2113 ///
2114 /// assert_eq!(next_interval().mean_poll_duration(), Duration::ZERO);
2115 ///
2116 /// monitor.instrument(async {
2117 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
2118 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
2119 /// () // poll 3 (0s)
2120 /// }).await;
2121 ///
2122 /// assert_eq!(next_interval().mean_poll_duration(), Duration::from_secs(2) / 3);
2123 /// }
2124 /// ```
2125 pub fn mean_poll_duration(&self) -> Duration {
2126 mean(self.total_poll_duration, self.total_poll_count)
2127 }
2128
2129 /// The ratio between the number polls categorized as slow and fast.
2130 ///
2131 /// ##### Definition
2132 /// This metric is derived from [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count] ÷
2133 /// [`total_poll_count`][TaskMetrics::total_poll_count].
2134 ///
2135 /// ##### Interpretation
2136 /// If this metric increases, it means that a greater proportion of polls took excessively long
2137 /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2138 /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2139 /// However, as a rule, *should* yield to the scheduler frequently.
2140 ///
2141 /// ##### See also
2142 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
2143 /// The mean duration of polls.
2144 /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**
2145 /// The mean duration of slow polls.
2146 ///
2147 /// ##### Examples
2148 /// Changes in this metric may be observed by varying the ratio of slow and slow fast within
2149 /// sampling intervals; for instance:
2150 /// ```
2151 /// use std::future::Future;
2152 /// use std::time::Duration;
2153 ///
2154 /// #[tokio::main]
2155 /// async fn main() {
2156 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2157 /// let mut interval = metrics_monitor.intervals();
2158 /// let mut next_interval = || interval.next().unwrap();
2159 ///
2160 /// // no tasks have been constructed, instrumented, or polled
2161 /// let interval = next_interval();
2162 /// assert_eq!(interval.total_fast_poll_count, 0);
2163 /// assert_eq!(interval.total_slow_poll_count, 0);
2164 /// assert!(interval.slow_poll_ratio().is_nan());
2165 ///
2166 /// let fast = Duration::ZERO;
2167 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
2168 ///
2169 /// // this task completes in three fast polls
2170 /// metrics_monitor.instrument(async {
2171 /// spin_for(fast).await; // fast poll 1
2172 /// spin_for(fast).await; // fast poll 2
2173 /// spin_for(fast); // fast poll 3
2174 /// }).await;
2175 ///
2176 /// // this task completes in two slow polls
2177 /// metrics_monitor.instrument(async {
2178 /// spin_for(slow).await; // slow poll 1
2179 /// spin_for(slow); // slow poll 2
2180 /// }).await;
2181 ///
2182 /// let interval = next_interval();
2183 /// assert_eq!(interval.total_fast_poll_count, 3);
2184 /// assert_eq!(interval.total_slow_poll_count, 2);
2185 /// assert_eq!(interval.slow_poll_ratio(), ratio(2., 3.));
2186 ///
2187 /// // this task completes in three slow polls
2188 /// metrics_monitor.instrument(async {
2189 /// spin_for(slow).await; // slow poll 1
2190 /// spin_for(slow).await; // slow poll 2
2191 /// spin_for(slow); // slow poll 3
2192 /// }).await;
2193 ///
2194 /// // this task completes in two fast polls
2195 /// metrics_monitor.instrument(async {
2196 /// spin_for(fast).await; // fast poll 1
2197 /// spin_for(fast); // fast poll 2
2198 /// }).await;
2199 ///
2200 /// let interval = next_interval();
2201 /// assert_eq!(interval.total_fast_poll_count, 2);
2202 /// assert_eq!(interval.total_slow_poll_count, 3);
2203 /// assert_eq!(interval.slow_poll_ratio(), ratio(3., 2.));
2204 /// }
2205 ///
2206 /// fn ratio(a: f64, b: f64) -> f64 {
2207 /// a / (a + b)
2208 /// }
2209 ///
2210 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2211 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2212 /// let start = tokio::time::Instant::now();
2213 /// while start.elapsed() <= duration {}
2214 /// tokio::task::yield_now()
2215 /// }
2216 /// ```
2217 pub fn slow_poll_ratio(&self) -> f64 {
2218 self.total_slow_poll_count as f64 / self.total_poll_count as f64
2219 }
2220
2221 /// The ratio of tasks exceeding [`long_delay_threshold`][TaskMonitor::long_delay_threshold].
2222 ///
2223 /// ##### Definition
2224 /// This metric is derived from [`total_long_delay_count`][TaskMetrics::total_long_delay_count] ÷
2225 /// [`total_scheduled_count`][TaskMetrics::total_scheduled_count].
2226 pub fn long_delay_ratio(&self) -> f64 {
2227 self.total_long_delay_count as f64 / self.total_scheduled_count as f64
2228 }
2229
2230 /// The mean duration of fast polls.
2231 ///
2232 /// ##### Definition
2233 /// This metric is derived from
2234 /// [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration] ÷
2235 /// [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count].
2236 ///
2237 /// ##### Examples
2238 /// In the below example, no tasks are polled in the first sampling interval; three fast polls
2239 /// consume a mean of
2240 /// ⅜ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2241 /// second sampling interval; and two fast polls consume a total of
2242 /// ½ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2243 /// third sampling interval:
2244 /// ```
2245 /// use std::future::Future;
2246 /// use std::time::Duration;
2247 ///
2248 /// #[tokio::main]
2249 /// async fn main() {
2250 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2251 /// let mut interval = metrics_monitor.intervals();
2252 /// let mut next_interval = || interval.next().unwrap();
2253 ///
2254 /// // no tasks have been constructed, instrumented, or polled
2255 /// assert_eq!(next_interval().mean_fast_poll_duration(), Duration::ZERO);
2256 ///
2257 /// let threshold = metrics_monitor.slow_poll_threshold();
2258 /// let fast_1 = 1 * Duration::from_micros(1);
2259 /// let fast_2 = 2 * Duration::from_micros(1);
2260 /// let fast_3 = 3 * Duration::from_micros(1);
2261 ///
2262 /// // this task completes in two fast polls
2263 /// let total_time = time(metrics_monitor.instrument(async {
2264 /// spin_for(fast_1).await; // fast poll 1
2265 /// spin_for(fast_2) // fast poll 2
2266 /// })).await;
2267 ///
2268 /// // `mean_fast_poll_duration` ≈ the mean of `fast_1` and `fast_2`
2269 /// let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2270 /// assert!(mean_fast_poll_duration >= (fast_1 + fast_2) / 2);
2271 /// assert!(mean_fast_poll_duration <= total_time / 2);
2272 ///
2273 /// // this task completes in three fast polls
2274 /// let total_time = time(metrics_monitor.instrument(async {
2275 /// spin_for(fast_1).await; // fast poll 1
2276 /// spin_for(fast_2).await; // fast poll 2
2277 /// spin_for(fast_3) // fast poll 3
2278 /// })).await;
2279 ///
2280 /// // `mean_fast_poll_duration` ≈ the mean of `fast_1`, `fast_2`, `fast_3`
2281 /// let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2282 /// assert!(mean_fast_poll_duration >= (fast_1 + fast_2 + fast_3) / 3);
2283 /// assert!(mean_fast_poll_duration <= total_time / 3);
2284 /// }
2285 ///
2286 /// /// Produces the amount of time it took to await a given task.
2287 /// async fn time(task: impl Future) -> Duration {
2288 /// let start = tokio::time::Instant::now();
2289 /// task.await;
2290 /// start.elapsed()
2291 /// }
2292 ///
2293 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2294 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2295 /// let start = tokio::time::Instant::now();
2296 /// while start.elapsed() <= duration {}
2297 /// tokio::task::yield_now()
2298 /// }
2299 /// ```
2300 pub fn mean_fast_poll_duration(&self) -> Duration {
2301 mean(self.total_fast_poll_duration, self.total_fast_poll_count)
2302 }
2303
2304 /// The average time taken for a task with a short scheduling delay to be executed after being
2305 /// scheduled.
2306 ///
2307 /// ##### Definition
2308 /// This metric is derived from
2309 /// [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration] ÷
2310 /// [`total_short_delay_count`][TaskMetrics::total_short_delay_count].
2311 pub fn mean_short_delay_duration(&self) -> Duration {
2312 mean(
2313 self.total_short_delay_duration,
2314 self.total_short_delay_count,
2315 )
2316 }
2317
2318 /// The mean duration of slow polls.
2319 ///
2320 /// ##### Definition
2321 /// This metric is derived from
2322 /// [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration] ÷
2323 /// [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
2324 ///
2325 /// ##### Interpretation
2326 /// If this metric increases, it means that a greater proportion of polls took excessively long
2327 /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2328 /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2329 ///
2330 /// ##### See also
2331 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
2332 /// The mean duration of polls.
2333 /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**
2334 /// The ratio between the number polls categorized as slow and fast.
2335 ///
2336 /// ##### Interpretation
2337 /// If this metric increases, it means that, on average, slow polls got even slower. This does
2338 /// necessarily imply increased task latency: An increase in average slow poll duration could be
2339 /// offset by fewer or faster polls. However, as a rule, *should* yield to the scheduler
2340 /// frequently.
2341 ///
2342 /// ##### Examples
2343 /// In the below example, no tasks are polled in the first sampling interval; three slow polls
2344 /// consume a mean of
2345 /// 1.5 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2346 /// second sampling interval; and two slow polls consume a total of
2347 /// 2 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2348 /// third sampling interval:
2349 /// ```
2350 /// use std::future::Future;
2351 /// use std::time::Duration;
2352 ///
2353 /// #[tokio::main]
2354 /// async fn main() {
2355 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2356 /// let mut interval = metrics_monitor.intervals();
2357 /// let mut next_interval = || interval.next().unwrap();
2358 ///
2359 /// // no tasks have been constructed, instrumented, or polled
2360 /// assert_eq!(next_interval().mean_slow_poll_duration(), Duration::ZERO);
2361 ///
2362 /// let threshold = metrics_monitor.slow_poll_threshold();
2363 /// let slow_1 = 1 * threshold;
2364 /// let slow_2 = 2 * threshold;
2365 /// let slow_3 = 3 * threshold;
2366 ///
2367 /// // this task completes in two slow polls
2368 /// let total_time = time(metrics_monitor.instrument(async {
2369 /// spin_for(slow_1).await; // slow poll 1
2370 /// spin_for(slow_2) // slow poll 2
2371 /// })).await;
2372 ///
2373 /// // `mean_slow_poll_duration` ≈ the mean of `slow_1` and `slow_2`
2374 /// let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2375 /// assert!(mean_slow_poll_duration >= (slow_1 + slow_2) / 2);
2376 /// assert!(mean_slow_poll_duration <= total_time / 2);
2377 ///
2378 /// // this task completes in three slow polls
2379 /// let total_time = time(metrics_monitor.instrument(async {
2380 /// spin_for(slow_1).await; // slow poll 1
2381 /// spin_for(slow_2).await; // slow poll 2
2382 /// spin_for(slow_3) // slow poll 3
2383 /// })).await;
2384 ///
2385 /// // `mean_slow_poll_duration` ≈ the mean of `slow_1`, `slow_2`, `slow_3`
2386 /// let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2387 /// assert!(mean_slow_poll_duration >= (slow_1 + slow_2 + slow_3) / 3);
2388 /// assert!(mean_slow_poll_duration <= total_time / 3);
2389 /// }
2390 ///
2391 /// /// Produces the amount of time it took to await a given task.
2392 /// async fn time(task: impl Future) -> Duration {
2393 /// let start = tokio::time::Instant::now();
2394 /// task.await;
2395 /// start.elapsed()
2396 /// }
2397 ///
2398 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2399 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2400 /// let start = tokio::time::Instant::now();
2401 /// while start.elapsed() <= duration {}
2402 /// tokio::task::yield_now()
2403 /// }
2404 /// ```
2405 pub fn mean_slow_poll_duration(&self) -> Duration {
2406 mean(self.total_slow_poll_duration, self.total_slow_poll_count)
2407 }
2408
2409 /// The average scheduling delay for a task which takes a long time to start executing after
2410 /// being scheduled.
2411 ///
2412 /// ##### Definition
2413 /// This metric is derived from
2414 /// [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration] ÷
2415 /// [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
2416 pub fn mean_long_delay_duration(&self) -> Duration {
2417 mean(self.total_long_delay_duration, self.total_long_delay_count)
2418 }
2419}
2420
2421impl<T: Future> Future for Instrumented<T> {
2422 type Output = T::Output;
2423
2424 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2425 instrument_poll(cx, self, Future::poll)
2426 }
2427}
2428
2429impl<T: Stream> Stream for Instrumented<T> {
2430 type Item = T::Item;
2431
2432 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2433 instrument_poll(cx, self, Stream::poll_next)
2434 }
2435}
2436
2437fn instrument_poll<T, Out>(
2438 cx: &mut Context<'_>,
2439 instrumented: Pin<&mut Instrumented<T>>,
2440 poll_fn: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Out>,
2441) -> Poll<Out> {
2442 let poll_start = Instant::now();
2443 let this = instrumented.project();
2444 let idled_at = this.idled_at;
2445 let state = this.state;
2446 let instrumented_at = state.instrumented_at;
2447 let metrics = &state.metrics;
2448 /* accounting for time-to-first-poll and tasks-count */
2449 // is this the first time this task has been polled?
2450 if !*this.did_poll_once {
2451 // if so, we need to do three things:
2452 /* 1. note that this task *has* been polled */
2453 *this.did_poll_once = true;
2454
2455 /* 2. account for the time-to-first-poll of this task */
2456 // if the time-to-first-poll of this task exceeds `u64::MAX` ns,
2457 // round down to `u64::MAX` nanoseconds
2458 let elapsed = (poll_start - instrumented_at)
2459 .as_nanos()
2460 .try_into()
2461 .unwrap_or(u64::MAX);
2462 // add this duration to `time_to_first_poll_ns_total`
2463 metrics.total_first_poll_delay_ns.fetch_add(elapsed, SeqCst);
2464
2465 /* 3. increment the count of tasks that have been polled at least once */
2466 state.metrics.first_poll_count.fetch_add(1, SeqCst);
2467 }
2468 /* accounting for time-idled and time-scheduled */
2469 // 1. note (and reset) the instant this task was last awoke
2470 let woke_at = state.woke_at.swap(0, SeqCst);
2471 // The state of a future is *idling* in the interim between the instant
2472 // it completes a `poll`, and the instant it is next awoken.
2473 if *idled_at < woke_at {
2474 // increment the counter of how many idles occurred
2475 metrics.total_idled_count.fetch_add(1, SeqCst);
2476
2477 // compute the duration of the idle
2478 let idle_ns = woke_at - *idled_at;
2479
2480 // update the max time tasks spent idling, both locally and
2481 // globally.
2482 metrics
2483 .local_max_idle_duration_ns
2484 .fetch_max(idle_ns, SeqCst);
2485 metrics
2486 .global_max_idle_duration_ns
2487 .fetch_max(idle_ns, SeqCst);
2488 // adjust the total elapsed time monitored tasks spent idling
2489 metrics.total_idle_duration_ns.fetch_add(idle_ns, SeqCst);
2490 }
2491 // if this task spent any time in the scheduled state after instrumentation,
2492 // and after first poll, `woke_at` will be greater than 0.
2493 if woke_at > 0 {
2494 // increment the counter of how many schedules occurred
2495 metrics.total_scheduled_count.fetch_add(1, SeqCst);
2496
2497 // recall that the `woke_at` field is internally represented as
2498 // nanoseconds-since-instrumentation. here, for accounting purposes,
2499 // we need to instead represent it as a proper `Instant`.
2500 let woke_instant = instrumented_at + Duration::from_nanos(woke_at);
2501
2502 // the duration this task spent scheduled is time time elapsed between
2503 // when this task was awoke, and when it was polled.
2504 let scheduled_ns = (poll_start - woke_instant)
2505 .as_nanos()
2506 .try_into()
2507 .unwrap_or(u64::MAX);
2508
2509 let scheduled = Duration::from_nanos(scheduled_ns);
2510
2511 let (count_bucket, duration_bucket) = // was the scheduling delay long or short?
2512 if scheduled >= metrics.long_delay_threshold {
2513 (&metrics.total_long_delay_count, &metrics.total_long_delay_duration_ns)
2514 } else {
2515 (&metrics.total_short_delay_count, &metrics.total_short_delay_duration_ns)
2516 };
2517 // update the appropriate bucket
2518 count_bucket.fetch_add(1, SeqCst);
2519 duration_bucket.fetch_add(scheduled_ns, SeqCst);
2520
2521 // add `scheduled_ns` to the Monitor's total
2522 metrics
2523 .total_scheduled_duration_ns
2524 .fetch_add(scheduled_ns, SeqCst);
2525 }
2526 // Register the waker
2527 state.waker.register(cx.waker());
2528 // Get the instrumented waker
2529 let waker_ref = futures_util::task::waker_ref(state);
2530 let mut cx = Context::from_waker(&waker_ref);
2531 // Poll the task
2532 let inner_poll_start = Instant::now();
2533 let ret = poll_fn(this.task, &mut cx);
2534 let inner_poll_end = Instant::now();
2535 /* idle time starts now */
2536 *idled_at = (inner_poll_end - instrumented_at)
2537 .as_nanos()
2538 .try_into()
2539 .unwrap_or(u64::MAX);
2540 /* accounting for poll time */
2541 let inner_poll_duration = inner_poll_end - inner_poll_start;
2542 let inner_poll_ns: u64 = inner_poll_duration
2543 .as_nanos()
2544 .try_into()
2545 .unwrap_or(u64::MAX);
2546 let (count_bucket, duration_bucket) = // was this a slow or fast poll?
2547 if inner_poll_duration >= metrics.slow_poll_threshold {
2548 (&metrics.total_slow_poll_count, &metrics.total_slow_poll_duration)
2549 } else {
2550 (&metrics.total_fast_poll_count, &metrics.total_fast_poll_duration_ns)
2551 };
2552 // update the appropriate bucket
2553 count_bucket.fetch_add(1, SeqCst);
2554 duration_bucket.fetch_add(inner_poll_ns, SeqCst);
2555 ret
2556}
2557
2558impl State {
2559 fn on_wake(&self) {
2560 let woke_at: u64 = match self.instrumented_at.elapsed().as_nanos().try_into() {
2561 Ok(woke_at) => woke_at,
2562 // This is highly unlikely as it would mean the task ran for over
2563 // 500 years. If you ran your service for 500 years. If you are
2564 // reading this 500 years in the future, I'm sorry.
2565 Err(_) => return,
2566 };
2567
2568 // We don't actually care about the result
2569 let _ = self.woke_at.compare_exchange(0, woke_at, SeqCst, SeqCst);
2570 }
2571}
2572
2573impl ArcWake for State {
2574 fn wake_by_ref(arc_self: &Arc<State>) {
2575 arc_self.on_wake();
2576 arc_self.waker.wake();
2577 }
2578
2579 fn wake(self: Arc<State>) {
2580 self.on_wake();
2581 self.waker.wake();
2582 }
2583}
2584
2585/// Iterator returned by [`TaskMonitor::intervals`].
2586///
2587/// See that method's documentation for more details.
2588#[derive(Debug)]
2589pub struct TaskIntervals {
2590 metrics: Arc<RawMetrics>,
2591 previous: Option<TaskMetrics>,
2592}
2593
2594impl TaskIntervals {
2595 fn probe(&mut self) -> TaskMetrics {
2596 let latest = self.metrics.metrics();
2597 let local_max_idle_duration = self.metrics.get_and_reset_local_max_idle_duration();
2598
2599 let next = if let Some(previous) = self.previous {
2600 TaskMetrics {
2601 instrumented_count: latest
2602 .instrumented_count
2603 .wrapping_sub(previous.instrumented_count),
2604 dropped_count: latest.dropped_count.wrapping_sub(previous.dropped_count),
2605 total_poll_count: latest
2606 .total_poll_count
2607 .wrapping_sub(previous.total_poll_count),
2608 total_poll_duration: sub(latest.total_poll_duration, previous.total_poll_duration),
2609 first_poll_count: latest
2610 .first_poll_count
2611 .wrapping_sub(previous.first_poll_count),
2612 total_idled_count: latest
2613 .total_idled_count
2614 .wrapping_sub(previous.total_idled_count),
2615 total_scheduled_count: latest
2616 .total_scheduled_count
2617 .wrapping_sub(previous.total_scheduled_count),
2618 total_fast_poll_count: latest
2619 .total_fast_poll_count
2620 .wrapping_sub(previous.total_fast_poll_count),
2621 total_short_delay_count: latest
2622 .total_short_delay_count
2623 .wrapping_sub(previous.total_short_delay_count),
2624 total_slow_poll_count: latest
2625 .total_slow_poll_count
2626 .wrapping_sub(previous.total_slow_poll_count),
2627 total_long_delay_count: latest
2628 .total_long_delay_count
2629 .wrapping_sub(previous.total_long_delay_count),
2630 total_first_poll_delay: sub(
2631 latest.total_first_poll_delay,
2632 previous.total_first_poll_delay,
2633 ),
2634 max_idle_duration: local_max_idle_duration,
2635 total_idle_duration: sub(latest.total_idle_duration, previous.total_idle_duration),
2636 total_scheduled_duration: sub(
2637 latest.total_scheduled_duration,
2638 previous.total_scheduled_duration,
2639 ),
2640 total_fast_poll_duration: sub(
2641 latest.total_fast_poll_duration,
2642 previous.total_fast_poll_duration,
2643 ),
2644 total_short_delay_duration: sub(
2645 latest.total_short_delay_duration,
2646 previous.total_short_delay_duration,
2647 ),
2648 total_slow_poll_duration: sub(
2649 latest.total_slow_poll_duration,
2650 previous.total_slow_poll_duration,
2651 ),
2652 total_long_delay_duration: sub(
2653 latest.total_long_delay_duration,
2654 previous.total_long_delay_duration,
2655 ),
2656 }
2657 } else {
2658 latest
2659 };
2660
2661 self.previous = Some(latest);
2662
2663 next
2664 }
2665}
2666
2667impl Iterator for TaskIntervals {
2668 type Item = TaskMetrics;
2669
2670 fn next(&mut self) -> Option<Self::Item> {
2671 Some(self.probe())
2672 }
2673}
2674
2675#[inline(always)]
2676fn to_nanos(d: Duration) -> u64 {
2677 debug_assert!(d <= Duration::from_nanos(u64::MAX));
2678 d.as_secs()
2679 .wrapping_mul(1_000_000_000)
2680 .wrapping_add(d.subsec_nanos() as u64)
2681}
2682
2683#[inline(always)]
2684fn sub(a: Duration, b: Duration) -> Duration {
2685 let nanos = to_nanos(a).wrapping_sub(to_nanos(b));
2686 Duration::from_nanos(nanos)
2687}
2688
2689#[inline(always)]
2690fn mean(d: Duration, count: u64) -> Duration {
2691 if let Some(quotient) = to_nanos(d).checked_div(count) {
2692 Duration::from_nanos(quotient)
2693 } else {
2694 Duration::ZERO
2695 }
2696}