tokio_metrics/task.rs
1use futures_util::task::{ArcWake, AtomicWaker};
2use pin_project_lite::pin_project;
3use std::future::Future;
4use std::ops::Deref;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
7use std::sync::Arc;
8use std::task::{Context, Poll};
9use tokio_stream::Stream;
10
11#[cfg(feature = "rt")]
12use tokio::time::{Duration, Instant};
13
14use crate::derived_metrics::derived_metrics;
15#[cfg(not(feature = "rt"))]
16use std::time::{Duration, Instant};
17
18#[cfg(feature = "metrics-rs-integration")]
19pub(crate) mod metrics_rs_integration;
20
21/// Monitors key metrics of instrumented tasks.
22///
23/// This struct is preferred for generating a variable number of monitors at runtime.
24/// If you can construct a fixed count of `static` monitors instead, see [`TaskMonitorCore`].
25///
26/// ### Basic Usage
27/// A [`TaskMonitor`] tracks key [metrics][TaskMetrics] of async tasks that have been
28/// [instrumented][`TaskMonitor::instrument`] with the monitor.
29///
30/// In the below example, a [`TaskMonitor`] is [constructed][TaskMonitor::new] and used to
31/// [instrument][TaskMonitor::instrument] three worker tasks; meanwhile, a fourth task
32/// prints [metrics][TaskMetrics] in 500ms [intervals][TaskMonitor::intervals].
33/// ```
34/// use std::time::Duration;
35///
36/// #[tokio::main]
37/// async fn main() {
38/// // construct a metrics monitor
39/// let metrics_monitor = tokio_metrics::TaskMonitor::new();
40///
41/// // print task metrics every 500ms
42/// {
43/// let metrics_monitor = metrics_monitor.clone();
44/// tokio::spawn(async move {
45/// for interval in metrics_monitor.intervals() {
46/// // pretty-print the metric interval
47/// println!("{:?}", interval);
48/// // wait 500ms
49/// tokio::time::sleep(Duration::from_millis(500)).await;
50/// }
51/// });
52/// }
53///
54/// // instrument some tasks and await them
55/// // note that the same TaskMonitor can be used for multiple tasks
56/// tokio::join![
57/// metrics_monitor.instrument(do_work()),
58/// metrics_monitor.instrument(do_work()),
59/// metrics_monitor.instrument(do_work())
60/// ];
61/// }
62///
63/// async fn do_work() {
64/// for _ in 0..25 {
65/// tokio::task::yield_now().await;
66/// tokio::time::sleep(Duration::from_millis(100)).await;
67/// }
68/// }
69/// ```
70///
71/// ### What should I instrument?
72/// In most cases, you should construct a *distinct* [`TaskMonitor`] for each kind of key task.
73///
74/// #### Instrumenting a web application
75/// For instance, a web service should have a distinct [`TaskMonitor`] for each endpoint. Within
76/// each endpoint, it's prudent to additionally instrument major sub-tasks, each with their own
77/// distinct [`TaskMonitor`]s. [*Why are my tasks slow?*](#why-are-my-tasks-slow) explores a
78/// debugging scenario for a web service that takes this approach to instrumentation. This
79/// approach is exemplified in the below example:
80/// ```no_run
81/// // The unabridged version of this snippet is in the examples directory of this crate.
82///
83/// #[tokio::main]
84/// async fn main() {
85/// // construct a TaskMonitor for root endpoint
86/// let monitor_root = tokio_metrics::TaskMonitor::new();
87///
88/// // construct TaskMonitors for create_users endpoint
89/// let monitor_create_user = CreateUserMonitors {
90/// // monitor for the entire endpoint
91/// route: tokio_metrics::TaskMonitor::new(),
92/// // monitor for database insertion subtask
93/// insert: tokio_metrics::TaskMonitor::new(),
94/// };
95///
96/// // build our application with two instrumented endpoints
97/// let app = axum::Router::new()
98/// // `GET /` goes to `root`
99/// .route("/", axum::routing::get({
100/// let monitor = monitor_root.clone();
101/// move || monitor.instrument(async { "Hello, World!" })
102/// }))
103/// // `POST /users` goes to `create_user`
104/// .route("/users", axum::routing::post({
105/// let monitors = monitor_create_user.clone();
106/// let route = monitors.route.clone();
107/// move |payload| {
108/// route.instrument(create_user(payload, monitors))
109/// }
110/// }));
111///
112/// // print task metrics for each endpoint every 1s
113/// let metrics_frequency = std::time::Duration::from_secs(1);
114/// tokio::spawn(async move {
115/// let root_intervals = monitor_root.intervals();
116/// let create_user_route_intervals =
117/// monitor_create_user.route.intervals();
118/// let create_user_insert_intervals =
119/// monitor_create_user.insert.intervals();
120/// let create_user_intervals =
121/// create_user_route_intervals.zip(create_user_insert_intervals);
122///
123/// let intervals = root_intervals.zip(create_user_intervals);
124/// for (root_route, (create_user_route, create_user_insert)) in intervals {
125/// println!("root_route = {:#?}", root_route);
126/// println!("create_user_route = {:#?}", create_user_route);
127/// println!("create_user_insert = {:#?}", create_user_insert);
128/// tokio::time::sleep(metrics_frequency).await;
129/// }
130/// });
131///
132/// // run the server
133/// let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 3000));
134/// let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
135/// axum::serve(listener, app)
136/// .await
137/// .unwrap();
138/// }
139///
140/// async fn create_user(
141/// axum::Json(payload): axum::Json<CreateUser>,
142/// monitors: CreateUserMonitors,
143/// ) -> impl axum::response::IntoResponse {
144/// let user = User { id: 1337, username: payload.username, };
145/// // instrument inserting the user into the db:
146/// let _ = monitors.insert.instrument(insert_user(user.clone())).await;
147/// (axum::http::StatusCode::CREATED, axum::Json(user))
148/// }
149///
150/// /* definitions of CreateUserMonitors, CreateUser and User omitted for brevity */
151///
152/// #
153/// # #[derive(Clone)]
154/// # struct CreateUserMonitors {
155/// # // monitor for the entire endpoint
156/// # route: tokio_metrics::TaskMonitor,
157/// # // monitor for database insertion subtask
158/// # insert: tokio_metrics::TaskMonitor,
159/// # }
160/// #
161/// # #[derive(serde::Deserialize)] struct CreateUser { username: String, }
162/// # #[derive(Clone, serde::Serialize)] struct User { id: u64, username: String, }
163/// #
164/// // insert the user into the database
165/// async fn insert_user(_: User) {
166/// /* implementation details elided */
167/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
168/// }
169/// ```
170///
171/// ### Why are my tasks slow?
172/// **Scenario:** You track key, high-level metrics about the customer response time. An alarm warns
173/// you that P90 latency for an endpoint exceeds your targets. What is causing the increase?
174///
175/// #### Identifying the high-level culprits
176/// A set of tasks will appear to execute more slowly if:
177/// - they are taking longer to poll (i.e., they consume too much CPU time)
178/// - they are waiting longer to be polled (e.g., they're waiting longer in tokio's scheduling
179/// queues)
180/// - they are waiting longer on external events to complete (e.g., asynchronous network requests)
181///
182/// The culprits, at a high level, may be some combination of these sources of latency. Fortunately,
183/// you have instrumented the key tasks of each of your endpoints with distinct [`TaskMonitor`]s.
184/// Using the monitors on the endpoint experiencing elevated latency, you begin by answering:
185/// - [*Are my tasks taking longer to poll?*](#are-my-tasks-taking-longer-to-poll)
186/// - [*Are my tasks spending more time waiting to be polled?*](#are-my-tasks-spending-more-time-waiting-to-be-polled)
187/// - [*Are my tasks spending more time waiting on external events to complete?*](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
188///
189/// ##### Are my tasks taking longer to poll?
190/// - **Did [`mean_poll_duration`][TaskMetrics::mean_poll_duration] increase?**
191/// This metric reflects the mean poll duration. If it increased, it means that, on average,
192/// individual polls tended to take longer. However, this does not necessarily imply increased
193/// task latency: An increase in poll durations could be offset by fewer polls.
194/// - **Did [`slow_poll_ratio`][TaskMetrics::slow_poll_ratio] increase?**
195/// This metric reflects the proportion of polls that were 'slow'. If it increased, it means that
196/// a greater proportion of polls performed excessive computation before yielding. This does not
197/// necessarily imply increased task latency: An increase in the proportion of slow polls could be
198/// offset by fewer or faster polls.
199/// - **Did [`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration] increase?**
200/// This metric reflects the mean duration of slow polls. If it increased, it means that, on
201/// average, slow polls got slower. This does not necessarily imply increased task latency: An
202/// increase in average slow poll duration could be offset by fewer or faster polls.
203///
204/// If so, [*why are my tasks taking longer to poll?*](#why-are-my-tasks-taking-longer-to-poll)
205///
206/// ##### Are my tasks spending more time waiting to be polled?
207/// - **Did [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increase?**
208/// This metric reflects the mean delay between the instant a task is first instrumented and the
209/// instant it is first polled. If it increases, it means that, on average, tasks spent longer
210/// waiting to be initially run.
211/// - **Did [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration] increase?**
212/// This metric reflects the mean duration that tasks spent in the scheduled state. The
213/// 'scheduled' state of a task is the duration between the instant a task is awoken and the
214/// instant it is subsequently polled. If this metric increases, it means that, on average, tasks
215/// spent longer in tokio's queues before being polled.
216/// - **Did [`long_delay_ratio`][TaskMetrics::long_delay_ratio] increase?**
217/// This metric reflects the proportion of scheduling delays which were 'long'. If it increased,
218/// it means that a greater proportion of tasks experienced excessive delays before they could
219/// execute after being woken. This does not necessarily indicate an increase in latency, as this
220/// could be offset by fewer or faster task polls.
221/// - **Did [`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration] increase?**
222/// This metric reflects the mean duration of long delays. If it increased, it means that, on
223/// average, long delays got even longer. This does not necessarily imply increased task latency:
224/// an increase in average long delay duration could be offset by fewer or faster polls or more
225/// short schedules.
226///
227/// If so, [*why are my tasks spending more time waiting to be polled?*](#why-are-my-tasks-spending-more-time-waiting-to-be-polled)
228///
229/// ##### Are my tasks spending more time waiting on external events to complete?
230/// - **Did [`mean_idle_duration`][TaskMetrics::mean_idle_duration] increase?**
231/// This metric reflects the mean duration that tasks spent in the idle state. The idle state is
232/// the duration spanning the instant a task completes a poll, and the instant that it is next
233/// awoken. Tasks inhabit this state when they are waiting for task-external events to complete
234/// (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this metric increases,
235/// tasks, in aggregate, spent more time waiting for task-external events to complete.
236///
237/// If so, [*why are my tasks spending more time waiting on external events to complete?*](#why-are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
238///
239/// #### Digging deeper
240/// Having [established the high-level culprits](#identifying-the-high-level-culprits), you now
241/// search for further explanation...
242///
243/// ##### Why are my tasks taking longer to poll?
244/// You observed that [your tasks are taking longer to poll](#are-my-tasks-taking-longer-to-poll).
245/// The culprit is likely some combination of:
246/// - **Your tasks are accidentally blocking.** Common culprits include:
247/// 1. Using the Rust standard library's [filesystem](https://doc.rust-lang.org/std/fs/) or
248/// [networking](https://doc.rust-lang.org/std/net/) APIs.
249/// These APIs are synchronous; use tokio's [filesystem](https://docs.rs/tokio/latest/tokio/fs/)
250/// and [networking](https://docs.rs/tokio/latest/tokio/net/) APIs, instead.
251/// 3. Calling [`block_on`](https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.block_on).
252/// 4. Invoking `println!` or other synchronous logging routines.
253/// Invocations of `println!` involve acquiring an exclusive lock on stdout, followed by a
254/// synchronous write to stdout.
255/// 2. **Your tasks are computationally expensive.** Common culprits include:
256/// 1. TLS/cryptographic routines
257/// 2. doing a lot of processing on bytes
258/// 3. calling non-Tokio resources
259///
260/// ##### Why are my tasks spending more time waiting to be polled?
261/// You observed that [your tasks are spending more time waiting to be polled](#are-my-tasks-spending-more-time-waiting-to-be-polled)
262/// suggesting some combination of:
263/// - Your application is inflating the time elapsed between instrumentation and first poll.
264/// - Your tasks are being scheduled into tokio's global queue.
265/// - Other tasks are spending too long without yielding, thus backing up tokio's queues.
266///
267/// Start by asking: [*Is time-to-first-poll unusually high?*](#is-time-to-first-poll-unusually-high)
268///
269/// ##### Why are my tasks spending more time waiting on external events to complete?
270/// You observed that [your tasks are spending more time waiting waiting on external events to
271/// complete](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete). But what
272/// event? Fortunately, within the task experiencing increased idle times, you monitored several
273/// sub-tasks with distinct [`TaskMonitor`]s. For each of these sub-tasks, you [*you try to identify
274/// the performance culprits...*](#identifying-the-high-level-culprits)
275///
276/// #### Digging even deeper
277///
278/// ##### Is time-to-first-poll unusually high?
279/// Contrast these two metrics:
280/// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
281/// This metric reflects the mean delay between the instant a task is first instrumented and the
282/// instant it is *first* polled.
283/// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
284/// This metric reflects the mean delay between the instant when tasks were awoken and the
285/// instant they were subsequently polled.
286///
287/// If the former metric exceeds the latter (or increased unexpectedly more than the latter), then
288/// start by investigating [*if your application is artificially delaying the time-to-first-poll*](#is-my-application-delaying-the-time-to-first-poll).
289///
290/// Otherwise, investigate [*if other tasks are polling too long without yielding*](#are-other-tasks-polling-too-long-without-yielding).
291///
292/// ##### Is my application delaying the time-to-first-poll?
293/// You observed that [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increased, more
294/// than [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]. Your application may be
295/// needlessly inflating the time elapsed between instrumentation and first poll. Are you
296/// constructing (and instrumenting) tasks separately from awaiting or spawning them?
297///
298/// For instance, in the below example, the application induces 1 second delay between when `task`
299/// is instrumented and when it is awaited:
300/// ```rust
301/// #[tokio::main]
302/// async fn main() {
303/// use tokio::time::Duration;
304/// let monitor = tokio_metrics::TaskMonitor::new();
305///
306/// let task = monitor.instrument(async move {});
307///
308/// let one_sec = Duration::from_secs(1);
309/// tokio::time::sleep(one_sec).await;
310///
311/// let _ = tokio::spawn(task).await;
312///
313/// assert!(monitor.cumulative().total_first_poll_delay >= one_sec);
314/// }
315/// ```
316///
317/// Otherwise, [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] might be unusually high
318/// because [*your application is spawning key tasks into tokio's global queue...*](#is-my-application-spawning-more-tasks-into-tokio’s-global-queue)
319///
320/// ##### Is my application spawning more tasks into tokio's global queue?
321/// Tasks awoken from threads *not* managed by the tokio runtime are scheduled with a slower,
322/// global "injection" queue.
323///
324/// You may be notifying runtime tasks from off-runtime. For instance, Given the following:
325/// ```ignore
326/// #[tokio::main]
327/// async fn main() {
328/// for _ in 0..100 {
329/// let (tx, rx) = oneshot::channel();
330/// tokio::spawn(async move {
331/// tx.send(());
332/// })
333///
334/// rx.await;
335/// }
336/// }
337/// ```
338/// One would expect this to run efficiently, however, the main task is run *off* the main runtime
339/// and the spawned tasks are *on* runtime, which means the snippet will run much slower than:
340/// ```ignore
341/// #[tokio::main]
342/// async fn main() {
343/// tokio::spawn(async {
344/// for _ in 0..100 {
345/// let (tx, rx) = oneshot::channel();
346/// tokio::spawn(async move {
347/// tx.send(());
348/// })
349///
350/// rx.await;
351/// }
352/// }).await;
353/// }
354/// ```
355/// The slowdown is caused by a higher time between the `rx` task being notified (in `tx.send()`)
356/// and the task being polled.
357///
358/// ##### Are other tasks polling too long without yielding?
359/// You suspect that your tasks are slow because they're backed up in tokio's scheduling queues. For
360/// *each* of your application's [`TaskMonitor`]s you check to see [*if their associated tasks are
361/// taking longer to poll...*](#are-my-tasks-taking-longer-to-poll)
362///
363/// ### Limitations
364/// The [`TaskMetrics`] type uses [`u64`] to represent both event counters and durations (measured
365/// in nanoseconds). Consequently, event counters are accurate for ≤ [`u64::MAX`] events, and
366/// durations are accurate for ≤ [`u64::MAX`] nanoseconds.
367///
368/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::cumulative`] increase
369/// monotonically with each successive invocation of [`TaskMonitor::cumulative`]. Upon overflow,
370/// counters and durations wrap.
371///
372/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::intervals`] are
373/// calculated by computing the difference of metrics in successive invocations of
374/// [`TaskMonitor::cumulative`]. If, within a monitoring interval, an event occurs more than
375/// [`u64::MAX`] times, or a monitored duration exceeds [`u64::MAX`] nanoseconds, the metrics for
376/// that interval will overflow and not be accurate.
377///
378/// ##### Examples at the limits
379/// Consider the [`TaskMetrics::total_first_poll_delay`] metric. This metric accurately reflects
380/// delays between instrumentation and first-poll ≤ [`u64::MAX`] nanoseconds:
381/// ```
382/// use tokio::time::Duration;
383///
384/// #[tokio::main(flavor = "current_thread", start_paused = true)]
385/// async fn main() {
386/// let monitor = tokio_metrics::TaskMonitor::new();
387/// let mut interval = monitor.intervals();
388/// let mut next_interval = || interval.next().unwrap();
389///
390/// // construct and instrument a task, but do not `await` it
391/// let task = monitor.instrument(async {});
392///
393/// // this is the maximum duration representable by tokio_metrics
394/// let max_duration = Duration::from_nanos(u64::MAX);
395///
396/// // let's advance the clock by this amount and poll `task`
397/// let _ = tokio::time::advance(max_duration).await;
398/// task.await;
399///
400/// // durations ≤ `max_duration` are accurately reflected in this metric
401/// assert_eq!(next_interval().total_first_poll_delay, max_duration);
402/// assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
403/// }
404/// ```
405/// If the total delay between instrumentation and first poll exceeds [`u64::MAX`] nanoseconds,
406/// [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay] will overflow:
407/// ```
408/// # use tokio::time::Duration;
409/// #
410/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
411/// # async fn main() {
412/// # let monitor = tokio_metrics::TaskMonitor::new();
413/// #
414/// // construct and instrument a task, but do not `await` it
415/// let task_a = monitor.instrument(async {});
416/// let task_b = monitor.instrument(async {});
417///
418/// // this is the maximum duration representable by tokio_metrics
419/// let max_duration = Duration::from_nanos(u64::MAX);
420///
421/// // let's advance the clock by 1.5x this amount and await `task`
422/// let _ = tokio::time::advance(3 * (max_duration / 2)).await;
423/// task_a.await;
424/// task_b.await;
425///
426/// // the `total_first_poll_delay` has overflowed
427/// assert!(monitor.cumulative().total_first_poll_delay < max_duration);
428/// # }
429/// ```
430/// If *many* tasks are spawned, it will take far less than a [`u64::MAX`]-nanosecond delay to bring
431/// this metric to the precipice of overflow:
432/// ```
433/// # use tokio::time::Duration;
434/// #
435/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
436/// # async fn main() {
437/// # let monitor = tokio_metrics::TaskMonitor::new();
438/// # let mut interval = monitor.intervals();
439/// # let mut next_interval = || interval.next().unwrap();
440/// #
441/// // construct and instrument u16::MAX tasks, but do not `await` them
442/// let first_poll_count = u16::MAX as u64;
443/// let mut tasks = Vec::with_capacity(first_poll_count as usize);
444/// for _ in 0..first_poll_count { tasks.push(monitor.instrument(async {})); }
445///
446/// // this is the maximum duration representable by tokio_metrics
447/// let max_duration = u64::MAX;
448///
449/// // let's advance the clock justenough such that all of the time-to-first-poll
450/// // delays summed nearly equals `max_duration_nanos`, less some remainder...
451/// let iffy_delay = max_duration / (first_poll_count as u64);
452/// let small_remainder = max_duration % first_poll_count;
453/// let _ = tokio::time::advance(Duration::from_nanos(iffy_delay)).await;
454///
455/// // ...then poll all of the instrumented tasks:
456/// for task in tasks { task.await; }
457///
458/// // `total_first_poll_delay` is at the precipice of overflowing!
459/// assert_eq!(
460/// next_interval().total_first_poll_delay.as_nanos(),
461/// (max_duration - small_remainder) as u128
462/// );
463/// assert_eq!(
464/// monitor.cumulative().total_first_poll_delay.as_nanos(),
465/// (max_duration - small_remainder) as u128
466/// );
467/// # }
468/// ```
469/// Frequent, interval-sampled metrics will retain their accuracy, even if the cumulative
470/// metrics counter overflows at most once in the midst of an interval:
471/// ```
472/// # use tokio::time::Duration;
473/// # use tokio_metrics::TaskMonitor;
474/// #
475/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
476/// # async fn main() {
477/// # let monitor = TaskMonitor::new();
478/// # let mut interval = monitor.intervals();
479/// # let mut next_interval = || interval.next().unwrap();
480/// #
481/// let first_poll_count = u16::MAX as u64;
482/// let batch_size = first_poll_count / 3;
483///
484/// let max_duration_ns = u64::MAX;
485/// let iffy_delay_ns = max_duration_ns / first_poll_count;
486///
487/// // Instrument `batch_size` number of tasks, wait for `delay` nanoseconds,
488/// // then await the instrumented tasks.
489/// async fn run_batch(monitor: &TaskMonitor, batch_size: usize, delay: u64) {
490/// let mut tasks = Vec::with_capacity(batch_size);
491/// for _ in 0..batch_size { tasks.push(monitor.instrument(async {})); }
492/// let _ = tokio::time::advance(Duration::from_nanos(delay)).await;
493/// for task in tasks { task.await; }
494/// }
495///
496/// // this is how much `total_time_to_first_poll_ns` will
497/// // increase with each batch we run
498/// let batch_delay = iffy_delay_ns * batch_size;
499///
500/// // run batches 1, 2, and 3
501/// for i in 1..=3 {
502/// run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
503/// assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
504/// assert_eq!(i * batch_delay as u128, monitor.cumulative().total_first_poll_delay.as_nanos());
505/// }
506///
507/// /* now, the `total_time_to_first_poll_ns` counter is at the precipice of overflow */
508/// assert_eq!(monitor.cumulative().total_first_poll_delay.as_nanos(), max_duration_ns as u128);
509///
510/// // run batch 4
511/// run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
512/// // the interval counter remains accurate
513/// assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
514/// // but the cumulative counter has overflowed
515/// assert_eq!(batch_delay as u128 - 1, monitor.cumulative().total_first_poll_delay.as_nanos());
516/// # }
517/// ```
518/// If a cumulative metric overflows *more than once* in the midst of an interval,
519/// its interval-sampled counterpart will also overflow.
520#[derive(Clone, Debug)]
521pub struct TaskMonitor {
522 base: Arc<TaskMonitorCore>,
523}
524
525impl Deref for TaskMonitor {
526 type Target = TaskMonitorCore;
527
528 fn deref(&self) -> &Self::Target {
529 &self.base
530 }
531}
532
533impl AsRef<TaskMonitorCore> for TaskMonitor {
534 fn as_ref(&self) -> &TaskMonitorCore {
535 &self.base
536 }
537}
538
539/// A non-`Clone`, non-allocated, static-friendly version of [`TaskMonitor`].
540/// See full docs on the [`TaskMonitor`] struct.
541///
542/// You should use [`TaskMonitorCore`] if you have a known count of monitors
543/// that you want to initialize as compile-time `static` structs.
544///
545/// You can also use [`TaskMonitorCore`] if you are already passing around an `Arc`-wrapped
546/// struct that you want to store your monitor in. This way, you can avoid double-`Arc`'ing it.
547///
548/// For other most other non-static usage, [`TaskMonitor`] will be more ergonomic.
549///
550/// ##### Examples
551///
552/// Static usage:
553/// ```
554/// use tokio_metrics::TaskMonitorCore;
555///
556/// static MONITOR: TaskMonitorCore = TaskMonitorCore::new();
557///
558/// #[tokio::main]
559/// async fn main() {
560/// assert_eq!(MONITOR.cumulative().first_poll_count, 0);
561///
562/// MONITOR.instrument(async {}).await;
563/// assert_eq!(MONITOR.cumulative().first_poll_count, 1);
564/// }
565/// ```
566///
567/// Usage with wrapper struct and [`TaskMonitorCore::instrument_with`]:
568/// ```
569/// use std::sync::Arc;
570/// use tokio_metrics::TaskMonitorCore;
571///
572/// #[derive(Clone)]
573/// struct SharedState(Arc<SharedStateInner>);
574/// struct SharedStateInner {
575/// monitor: TaskMonitorCore,
576/// other_state: SomeOtherSharedState,
577/// }
578/// /// Imagine: a type that wasn't `Clone` that you want to pass around
579/// /// in a similar way as the monitor
580/// struct SomeOtherSharedState;
581///
582/// impl AsRef<TaskMonitorCore> for SharedState {
583/// fn as_ref(&self) -> &TaskMonitorCore {
584/// &self.0.monitor
585/// }
586/// }
587///
588/// #[tokio::main]
589/// async fn main() {
590/// let state = SharedState(Arc::new(SharedStateInner {
591/// monitor: TaskMonitorCore::new(),
592/// other_state: SomeOtherSharedState,
593/// }));
594///
595/// assert_eq!(state.0.monitor.cumulative().first_poll_count, 0);
596///
597/// TaskMonitorCore::instrument_with(async {}, state.clone()).await;
598/// assert_eq!(state.0.monitor.cumulative().first_poll_count, 1);
599/// }
600/// ```
601#[derive(Debug)]
602pub struct TaskMonitorCore {
603 metrics: RawMetrics,
604}
605
606/// Provides an interface for constructing a [`TaskMonitor`] with specialized configuration
607/// parameters.
608#[derive(Clone, Debug, Default)]
609pub struct TaskMonitorBuilder(TaskMonitorCoreBuilder);
610
611impl TaskMonitorBuilder {
612 /// Creates a new [`TaskMonitorBuilder`].
613 pub fn new() -> Self {
614 Self(TaskMonitorCoreBuilder::new())
615 }
616
617 /// Specifies the threshold at which polls are considered 'slow'.
618 pub fn with_slow_poll_threshold(&mut self, threshold: Duration) -> &mut Self {
619 self.0.slow_poll_threshold = Some(threshold);
620 self
621 }
622
623 /// Specifies the threshold at which schedules are considered 'long'.
624 pub fn with_long_delay_threshold(&mut self, threshold: Duration) -> &mut Self {
625 self.0.long_delay_threshold = Some(threshold);
626 self
627 }
628
629 /// Consume the builder, producing a [`TaskMonitor`].
630 pub fn build(self) -> TaskMonitor {
631 TaskMonitor {
632 base: Arc::new(self.0.build()),
633 }
634 }
635}
636
637/// Provides an interface for constructing a [`TaskMonitorCore`] with specialized configuration
638/// parameters.
639///
640/// ```
641/// use std::time::Duration;
642/// use tokio_metrics::TaskMonitorCoreBuilder;
643///
644/// static MONITOR: tokio_metrics::TaskMonitorCore = TaskMonitorCoreBuilder::new()
645/// .with_slow_poll_threshold(Duration::from_micros(100))
646/// .build();
647/// ```
648#[derive(Clone, Debug, Default)]
649pub struct TaskMonitorCoreBuilder {
650 slow_poll_threshold: Option<Duration>,
651 long_delay_threshold: Option<Duration>,
652}
653
654impl TaskMonitorCoreBuilder {
655 /// Creates a new [`TaskMonitorCoreBuilder`].
656 pub const fn new() -> Self {
657 Self {
658 slow_poll_threshold: None,
659 long_delay_threshold: None,
660 }
661 }
662
663 /// Specifies the threshold at which polls are considered 'slow'.
664 pub const fn with_slow_poll_threshold(self, threshold: Duration) -> Self {
665 Self {
666 slow_poll_threshold: Some(threshold),
667 ..self
668 }
669 }
670
671 /// Specifies the threshold at which schedules are considered 'long'.
672 pub const fn with_long_delay_threshold(self, threshold: Duration) -> Self {
673 Self {
674 long_delay_threshold: Some(threshold),
675 ..self
676 }
677 }
678
679 /// Consume the builder, producing a [`TaskMonitorCore`].
680 pub const fn build(self) -> TaskMonitorCore {
681 let slow = match self.slow_poll_threshold {
682 Some(v) => v,
683 None => TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD,
684 };
685 let long = match self.long_delay_threshold {
686 Some(v) => v,
687 None => TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD,
688 };
689 TaskMonitorCore::create(slow, long)
690 }
691}
692
693pin_project! {
694 /// An async task that has been instrumented with [`TaskMonitor::instrument`].
695 #[derive(Debug)]
696 pub struct Instrumented<T, M: AsRef<TaskMonitorCore> = TaskMonitor> {
697 // The task being instrumented
698 #[pin]
699 task: T,
700
701 // True when the task is polled for the first time
702 did_poll_once: bool,
703
704 // The instant, tracked as nanoseconds since `instrumented_at`, at which the future finished
705 // its last poll.
706 idled_at: u64,
707
708 // State shared between the task and its instrumented waker.
709 state: Arc<State<M>>,
710 }
711
712 impl<T, M: AsRef<TaskMonitorCore>> PinnedDrop for Instrumented<T, M> {
713 fn drop(this: Pin<&mut Self>) {
714 this.state.monitor.as_ref().metrics.dropped_count.fetch_add(1, SeqCst);
715 }
716 }
717}
718
719/// Key metrics of [instrumented][`TaskMonitor::instrument`] tasks.
720#[non_exhaustive]
721#[derive(Debug, Clone, Copy, Default)]
722pub struct TaskMetrics {
723 /// The number of tasks instrumented.
724 ///
725 /// ##### Examples
726 /// ```
727 /// #[tokio::main]
728 /// async fn main() {
729 /// let monitor = tokio_metrics::TaskMonitor::new();
730 /// let mut interval = monitor.intervals();
731 /// let mut next_interval = || interval.next().unwrap();
732 ///
733 /// // 0 tasks have been instrumented
734 /// assert_eq!(next_interval().instrumented_count, 0);
735 ///
736 /// monitor.instrument(async {});
737 ///
738 /// // 1 task has been instrumented
739 /// assert_eq!(next_interval().instrumented_count, 1);
740 ///
741 /// monitor.instrument(async {});
742 /// monitor.instrument(async {});
743 ///
744 /// // 2 tasks have been instrumented
745 /// assert_eq!(next_interval().instrumented_count, 2);
746 ///
747 /// // since the last interval was produced, 0 tasks have been instrumented
748 /// assert_eq!(next_interval().instrumented_count, 0);
749 /// }
750 /// ```
751 pub instrumented_count: u64,
752
753 /// The number of tasks dropped.
754 ///
755 /// ##### Examples
756 /// ```
757 /// #[tokio::main]
758 /// async fn main() {
759 /// let monitor = tokio_metrics::TaskMonitor::new();
760 /// let mut interval = monitor.intervals();
761 /// let mut next_interval = || interval.next().unwrap();
762 ///
763 /// // 0 tasks have been dropped
764 /// assert_eq!(next_interval().dropped_count, 0);
765 ///
766 /// let _task = monitor.instrument(async {});
767 ///
768 /// // 0 tasks have been dropped
769 /// assert_eq!(next_interval().dropped_count, 0);
770 ///
771 /// monitor.instrument(async {}).await;
772 /// drop(monitor.instrument(async {}));
773 ///
774 /// // 2 tasks have been dropped
775 /// assert_eq!(next_interval().dropped_count, 2);
776 ///
777 /// // since the last interval was produced, 0 tasks have been dropped
778 /// assert_eq!(next_interval().dropped_count, 0);
779 /// }
780 /// ```
781 pub dropped_count: u64,
782
783 /// The number of tasks polled for the first time.
784 ///
785 /// ##### Derived metrics
786 /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
787 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
788 /// are first polled.
789 ///
790 /// ##### Examples
791 /// In the below example, no tasks are instrumented or polled in the first sampling interval;
792 /// one task is instrumented (but not polled) in the second sampling interval; that task is
793 /// awaited to completion (and, thus, polled at least once) in the third sampling interval; no
794 /// additional tasks are polled for the first time within the fourth sampling interval:
795 /// ```
796 /// #[tokio::main]
797 /// async fn main() {
798 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
799 /// let mut interval = metrics_monitor.intervals();
800 /// let mut next_interval = || interval.next().unwrap();
801 ///
802 /// // no tasks have been constructed, instrumented, and polled at least once
803 /// assert_eq!(next_interval().first_poll_count, 0);
804 ///
805 /// let task = metrics_monitor.instrument(async {});
806 ///
807 /// // `task` has been constructed and instrumented, but has not yet been polled
808 /// assert_eq!(next_interval().first_poll_count, 0);
809 ///
810 /// // poll `task` to completion
811 /// task.await;
812 ///
813 /// // `task` has been constructed, instrumented, and polled at least once
814 /// assert_eq!(next_interval().first_poll_count, 1);
815 ///
816 /// // since the last interval was produced, 0 tasks have been constructed, instrumented and polled
817 /// assert_eq!(next_interval().first_poll_count, 0);
818 ///
819 /// }
820 /// ```
821 pub first_poll_count: u64,
822
823 /// The total duration elapsed between the instant tasks are instrumented, and the instant they
824 /// are first polled.
825 ///
826 /// ##### Derived metrics
827 /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
828 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
829 /// are first polled.
830 ///
831 /// ##### Examples
832 /// In the below example, 0 tasks have been instrumented or polled within the first sampling
833 /// interval, a total of 500ms elapse between the instrumentation and polling of tasks within
834 /// the second sampling interval, and a total of 350ms elapse between the instrumentation and
835 /// polling of tasks within the third sampling interval:
836 /// ```
837 /// use tokio::time::Duration;
838 ///
839 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
840 /// async fn main() {
841 /// let monitor = tokio_metrics::TaskMonitor::new();
842 /// let mut interval = monitor.intervals();
843 /// let mut next_interval = || interval.next().unwrap();
844 ///
845 /// // no tasks have yet been created, instrumented, or polled
846 /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
847 /// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
848 ///
849 /// // constructs and instruments a task, pauses a given duration, then awaits the task
850 /// async fn instrument_pause_await(monitor: &tokio_metrics::TaskMonitor, pause: Duration) {
851 /// let task = monitor.instrument(async move {});
852 /// tokio::time::sleep(pause).await;
853 /// task.await;
854 /// }
855 ///
856 /// // construct and await a task that pauses for 500ms between instrumentation and first poll
857 /// let task_a_pause_time = Duration::from_millis(500);
858 /// instrument_pause_await(&monitor, task_a_pause_time).await;
859 ///
860 /// assert_eq!(next_interval().total_first_poll_delay, task_a_pause_time);
861 /// assert_eq!(monitor.cumulative().total_first_poll_delay, task_a_pause_time);
862 ///
863 /// // construct and await a task that pauses for 250ms between instrumentation and first poll
864 /// let task_b_pause_time = Duration::from_millis(250);
865 /// instrument_pause_await(&monitor, task_b_pause_time).await;
866 ///
867 /// // construct and await a task that pauses for 100ms between instrumentation and first poll
868 /// let task_c_pause_time = Duration::from_millis(100);
869 /// instrument_pause_await(&monitor, task_c_pause_time).await;
870 ///
871 /// assert_eq!(
872 /// next_interval().total_first_poll_delay,
873 /// task_b_pause_time + task_c_pause_time
874 /// );
875 /// assert_eq!(
876 /// monitor.cumulative().total_first_poll_delay,
877 /// task_a_pause_time + task_b_pause_time + task_c_pause_time
878 /// );
879 /// }
880 /// ```
881 ///
882 /// ##### When is this metric recorded?
883 /// The delay between instrumentation and first poll is not recorded until the first poll
884 /// actually occurs:
885 /// ```
886 /// # use tokio::time::Duration;
887 /// #
888 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
889 /// # async fn main() {
890 /// # let monitor = tokio_metrics::TaskMonitor::new();
891 /// # let mut interval = monitor.intervals();
892 /// # let mut next_interval = || interval.next().unwrap();
893 /// #
894 /// // we construct and instrument a task, but do not `await` it
895 /// let task = monitor.instrument(async {});
896 ///
897 /// // let's sleep for 1s before we poll `task`
898 /// let one_sec = Duration::from_secs(1);
899 /// let _ = tokio::time::sleep(one_sec).await;
900 ///
901 /// // although 1s has now elapsed since the instrumentation of `task`,
902 /// // this is not reflected in `total_first_poll_delay`...
903 /// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
904 /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
905 ///
906 /// // ...and won't be until `task` is actually polled
907 /// task.await;
908 ///
909 /// // now, the 1s delay is reflected in `total_first_poll_delay`:
910 /// assert_eq!(next_interval().total_first_poll_delay, one_sec);
911 /// assert_eq!(monitor.cumulative().total_first_poll_delay, one_sec);
912 /// # }
913 /// ```
914 ///
915 /// ##### What if first-poll-delay is very large?
916 /// The first-poll-delay of *individual* tasks saturates at `u64::MAX` nanoseconds. However, if
917 /// the *total* first-poll-delay *across* monitored tasks exceeds `u64::MAX` nanoseconds, this
918 /// metric will wrap around:
919 /// ```
920 /// use tokio::time::Duration;
921 ///
922 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
923 /// async fn main() {
924 /// let monitor = tokio_metrics::TaskMonitor::new();
925 ///
926 /// // construct and instrument a task, but do not `await` it
927 /// let task = monitor.instrument(async {});
928 ///
929 /// // this is the maximum duration representable by tokio_metrics
930 /// let max_duration = Duration::from_nanos(u64::MAX);
931 ///
932 /// // let's advance the clock by double this amount and await `task`
933 /// let _ = tokio::time::advance(max_duration * 2).await;
934 /// task.await;
935 ///
936 /// // the time-to-first-poll of `task` saturates at `max_duration`
937 /// assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
938 ///
939 /// // ...but note that the metric *will* wrap around if more tasks are involved
940 /// let task = monitor.instrument(async {});
941 /// let _ = tokio::time::advance(Duration::from_nanos(1)).await;
942 /// task.await;
943 /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
944 /// }
945 /// ```
946 pub total_first_poll_delay: Duration,
947
948 /// The total number of times that tasks idled, waiting to be awoken.
949 ///
950 /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
951 /// task completes a poll, and the instant that it is next awoken.
952 ///
953 /// ##### Derived metrics
954 /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
955 /// The mean duration of idles.
956 ///
957 /// ##### Examples
958 /// ```
959 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
960 /// async fn main() {
961 /// let monitor = tokio_metrics::TaskMonitor::new();
962 /// let mut interval = monitor.intervals();
963 /// let mut next_interval = move || interval.next().unwrap();
964 /// let one_sec = std::time::Duration::from_secs(1);
965 ///
966 /// monitor.instrument(async {}).await;
967 ///
968 /// assert_eq!(next_interval().total_idled_count, 0);
969 /// assert_eq!(monitor.cumulative().total_idled_count, 0);
970 ///
971 /// monitor.instrument(async move {
972 /// tokio::time::sleep(one_sec).await;
973 /// }).await;
974 ///
975 /// assert_eq!(next_interval().total_idled_count, 1);
976 /// assert_eq!(monitor.cumulative().total_idled_count, 1);
977 ///
978 /// monitor.instrument(async {
979 /// tokio::time::sleep(one_sec).await;
980 /// tokio::time::sleep(one_sec).await;
981 /// }).await;
982 ///
983 /// assert_eq!(next_interval().total_idled_count, 2);
984 /// assert_eq!(monitor.cumulative().total_idled_count, 3);
985 /// }
986 /// ```
987 pub total_idled_count: u64,
988
989 /// The total duration that tasks idled.
990 ///
991 /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
992 /// task completes a poll, and the instant that it is next awoken.
993 ///
994 /// ##### Derived metrics
995 /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
996 /// The mean duration of idles.
997 ///
998 /// ##### Examples
999 /// ```
1000 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
1001 /// async fn main() {
1002 /// let monitor = tokio_metrics::TaskMonitor::new();
1003 /// let mut interval = monitor.intervals();
1004 /// let mut next_interval = move || interval.next().unwrap();
1005 /// let one_sec = std::time::Duration::from_secs(1);
1006 /// let two_sec = std::time::Duration::from_secs(2);
1007 ///
1008 /// assert_eq!(next_interval().total_idle_duration.as_nanos(), 0);
1009 /// assert_eq!(monitor.cumulative().total_idle_duration.as_nanos(), 0);
1010 ///
1011 /// monitor.instrument(async move {
1012 /// tokio::time::sleep(one_sec).await;
1013 /// }).await;
1014 ///
1015 /// assert_eq!(next_interval().total_idle_duration, one_sec);
1016 /// assert_eq!(monitor.cumulative().total_idle_duration, one_sec);
1017 ///
1018 /// monitor.instrument(async move {
1019 /// tokio::time::sleep(two_sec).await;
1020 /// }).await;
1021 ///
1022 /// assert_eq!(next_interval().total_idle_duration, two_sec);
1023 /// assert_eq!(monitor.cumulative().total_idle_duration, one_sec + two_sec);
1024 /// }
1025 /// ```
1026 pub total_idle_duration: Duration,
1027
1028 /// The maximum idle duration that a task took.
1029 ///
1030 /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
1031 /// task completes a poll, and the instant that it is next awoken.
1032 ///
1033 /// ##### Examples
1034 /// ```
1035 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
1036 /// async fn main() {
1037 /// let monitor = tokio_metrics::TaskMonitor::new();
1038 /// let mut interval = monitor.intervals();
1039 /// let mut next_interval = move || interval.next().unwrap();
1040 /// let one_sec = std::time::Duration::from_secs(1);
1041 /// let two_sec = std::time::Duration::from_secs(2);
1042 ///
1043 /// assert_eq!(next_interval().max_idle_duration.as_nanos(), 0);
1044 /// assert_eq!(monitor.cumulative().max_idle_duration.as_nanos(), 0);
1045 ///
1046 /// monitor.instrument(async move {
1047 /// tokio::time::sleep(one_sec).await;
1048 /// }).await;
1049 ///
1050 /// assert_eq!(next_interval().max_idle_duration, one_sec);
1051 /// assert_eq!(monitor.cumulative().max_idle_duration, one_sec);
1052 ///
1053 /// monitor.instrument(async move {
1054 /// tokio::time::sleep(two_sec).await;
1055 /// }).await;
1056 ///
1057 /// assert_eq!(next_interval().max_idle_duration, two_sec);
1058 /// assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
1059 ///
1060 /// monitor.instrument(async move {
1061 /// tokio::time::sleep(one_sec).await;
1062 /// }).await;
1063 ///
1064 /// assert_eq!(next_interval().max_idle_duration, one_sec);
1065 /// assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
1066 /// }
1067 /// ```
1068 pub max_idle_duration: Duration,
1069
1070 /// The total number of times that tasks were awoken (and then, presumably, scheduled for
1071 /// execution).
1072 ///
1073 /// ##### Definition
1074 /// This metric is equal to [`total_short_delay_count`][TaskMetrics::total_short_delay_count]
1075 /// \+ [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
1076 ///
1077 /// ##### Derived metrics
1078 /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
1079 /// The mean duration that tasks spent waiting to be executed after awakening.
1080 ///
1081 /// ##### Examples
1082 /// In the below example, a task yields to the scheduler a varying number of times between
1083 /// sampling intervals; this metric is equal to the number of times the task yielded:
1084 /// ```
1085 /// #[tokio::main]
1086 /// async fn main(){
1087 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1088 ///
1089 /// // [A] no tasks have been created, instrumented, and polled more than once
1090 /// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
1091 ///
1092 /// // [B] a `task` is created and instrumented
1093 /// let task = {
1094 /// let monitor = metrics_monitor.clone();
1095 /// metrics_monitor.instrument(async move {
1096 /// let mut interval = monitor.intervals();
1097 /// let mut next_interval = move || interval.next().unwrap();
1098 ///
1099 /// // [E] `task` has not yet yielded to the scheduler, and
1100 /// // thus has not yet been scheduled since its first `poll`
1101 /// assert_eq!(next_interval().total_scheduled_count, 0);
1102 ///
1103 /// tokio::task::yield_now().await; // yield to the scheduler
1104 ///
1105 /// // [F] `task` has yielded to the scheduler once (and thus been
1106 /// // scheduled once) since the last sampling interval
1107 /// assert_eq!(next_interval().total_scheduled_count, 1);
1108 ///
1109 /// tokio::task::yield_now().await; // yield to the scheduler
1110 /// tokio::task::yield_now().await; // yield to the scheduler
1111 /// tokio::task::yield_now().await; // yield to the scheduler
1112 ///
1113 /// // [G] `task` has yielded to the scheduler thrice (and thus been
1114 /// // scheduled thrice) since the last sampling interval
1115 /// assert_eq!(next_interval().total_scheduled_count, 3);
1116 ///
1117 /// tokio::task::yield_now().await; // yield to the scheduler
1118 ///
1119 /// next_interval
1120 /// })
1121 /// };
1122 ///
1123 /// // [C] `task` has not yet been polled at all
1124 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1125 /// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
1126 ///
1127 /// // [D] poll `task` to completion
1128 /// let mut next_interval = task.await;
1129 ///
1130 /// // [H] `task` has been polled 1 times since the last sample
1131 /// assert_eq!(next_interval().total_scheduled_count, 1);
1132 ///
1133 /// // [I] `task` has been polled 0 times since the last sample
1134 /// assert_eq!(next_interval().total_scheduled_count, 0);
1135 ///
1136 /// // [J] `task` has yielded to the scheduler a total of five times
1137 /// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 5);
1138 /// }
1139 /// ```
1140 #[doc(alias = "total_delay_count")]
1141 pub total_scheduled_count: u64,
1142
1143 /// The total duration that tasks spent waiting to be polled after awakening.
1144 ///
1145 /// ##### Definition
1146 /// This metric is equal to [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration]
1147 /// \+ [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration].
1148 ///
1149 /// ##### Derived metrics
1150 /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
1151 /// The mean duration that tasks spent waiting to be executed after awakening.
1152 ///
1153 /// ##### Examples
1154 /// ```
1155 /// use tokio::time::Duration;
1156 ///
1157 /// #[tokio::main(flavor = "current_thread")]
1158 /// async fn main() {
1159 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1160 /// let mut interval = metrics_monitor.intervals();
1161 /// let mut next_interval = || interval.next().unwrap();
1162 ///
1163 /// // construct and instrument and spawn a task that yields endlessly
1164 /// tokio::spawn(metrics_monitor.instrument(async {
1165 /// loop { tokio::task::yield_now().await }
1166 /// }));
1167 ///
1168 /// tokio::task::yield_now().await;
1169 ///
1170 /// // block the executor for 1 second
1171 /// std::thread::sleep(Duration::from_millis(1000));
1172 ///
1173 /// tokio::task::yield_now().await;
1174 ///
1175 /// // `endless_task` will have spent approximately one second waiting
1176 /// let total_scheduled_duration = next_interval().total_scheduled_duration;
1177 /// assert!(total_scheduled_duration >= Duration::from_millis(1000));
1178 /// assert!(total_scheduled_duration <= Duration::from_millis(1100));
1179 /// }
1180 /// ```
1181 #[doc(alias = "total_delay_duration")]
1182 pub total_scheduled_duration: Duration,
1183
1184 /// The total number of times that tasks were polled.
1185 ///
1186 /// ##### Definition
1187 /// This metric is equal to [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count]
1188 /// \+ [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
1189 ///
1190 /// ##### Derived metrics
1191 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
1192 /// The mean duration of polls.
1193 ///
1194 /// ##### Examples
1195 /// In the below example, a task with multiple yield points is await'ed to completion; this
1196 /// metric reflects the number of `await`s within each sampling interval:
1197 /// ```
1198 /// #[tokio::main]
1199 /// async fn main() {
1200 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1201 ///
1202 /// // [A] no tasks have been created, instrumented, and polled more than once
1203 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1204 ///
1205 /// // [B] a `task` is created and instrumented
1206 /// let task = {
1207 /// let monitor = metrics_monitor.clone();
1208 /// metrics_monitor.instrument(async move {
1209 /// let mut interval = monitor.intervals();
1210 /// let mut next_interval = move || interval.next().unwrap();
1211 ///
1212 /// // [E] task is in the midst of its first poll
1213 /// assert_eq!(next_interval().total_poll_count, 0);
1214 ///
1215 /// tokio::task::yield_now().await; // poll 1
1216 ///
1217 /// // [F] task has been polled 1 time
1218 /// assert_eq!(next_interval().total_poll_count, 1);
1219 ///
1220 /// tokio::task::yield_now().await; // poll 2
1221 /// tokio::task::yield_now().await; // poll 3
1222 /// tokio::task::yield_now().await; // poll 4
1223 ///
1224 /// // [G] task has been polled 3 times
1225 /// assert_eq!(next_interval().total_poll_count, 3);
1226 ///
1227 /// tokio::task::yield_now().await; // poll 5
1228 ///
1229 /// next_interval // poll 6
1230 /// })
1231 /// };
1232 ///
1233 /// // [C] `task` has not yet been polled at all
1234 /// assert_eq!(metrics_monitor.cumulative().total_poll_count, 0);
1235 ///
1236 /// // [D] poll `task` to completion
1237 /// let mut next_interval = task.await;
1238 ///
1239 /// // [H] `task` has been polled 2 times since the last sample
1240 /// assert_eq!(next_interval().total_poll_count, 2);
1241 ///
1242 /// // [I] `task` has been polled 0 times since the last sample
1243 /// assert_eq!(next_interval().total_poll_count, 0);
1244 ///
1245 /// // [J] `task` has been polled 6 times
1246 /// assert_eq!(metrics_monitor.cumulative().total_poll_count, 6);
1247 /// }
1248 /// ```
1249 pub total_poll_count: u64,
1250
1251 /// The total duration elapsed during polls.
1252 ///
1253 /// ##### Definition
1254 /// This metric is equal to [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration]
1255 /// \+ [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration].
1256 ///
1257 /// ##### Derived metrics
1258 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
1259 /// The mean duration of polls.
1260 ///
1261 /// #### Examples
1262 /// ```
1263 /// use tokio::time::Duration;
1264 ///
1265 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
1266 /// async fn main() {
1267 /// let monitor = tokio_metrics::TaskMonitor::new();
1268 /// let mut interval = monitor.intervals();
1269 /// let mut next_interval = move || interval.next().unwrap();
1270 ///
1271 /// assert_eq!(next_interval().total_poll_duration, Duration::ZERO);
1272 ///
1273 /// monitor.instrument(async {
1274 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
1275 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
1276 /// () // poll 3 (0s)
1277 /// }).await;
1278 ///
1279 /// assert_eq!(next_interval().total_poll_duration, Duration::from_secs(2));
1280 /// }
1281 /// ```
1282 pub total_poll_duration: Duration,
1283
1284 /// The total number of times that polling tasks completed swiftly.
1285 ///
1286 /// Here, 'swiftly' is defined as completing in strictly less time than
1287 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1288 ///
1289 /// ##### Derived metrics
1290 /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**
1291 /// The mean duration of fast polls.
1292 ///
1293 /// ##### Examples
1294 /// In the below example, 0 polls occur within the first sampling interval, 3 fast polls occur
1295 /// within the second sampling interval, and 2 fast polls occur within the third sampling
1296 /// interval:
1297 /// ```
1298 /// use std::future::Future;
1299 /// use std::time::Duration;
1300 ///
1301 /// #[tokio::main]
1302 /// async fn main() {
1303 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1304 /// let mut interval = metrics_monitor.intervals();
1305 /// let mut next_interval = || interval.next().unwrap();
1306 ///
1307 /// // no tasks have been constructed, instrumented, or polled
1308 /// assert_eq!(next_interval().total_fast_poll_count, 0);
1309 ///
1310 /// let fast = Duration::ZERO;
1311 ///
1312 /// // this task completes in three fast polls
1313 /// let _ = metrics_monitor.instrument(async {
1314 /// spin_for(fast).await; // fast poll 1
1315 /// spin_for(fast).await; // fast poll 2
1316 /// spin_for(fast) // fast poll 3
1317 /// }).await;
1318 ///
1319 /// assert_eq!(next_interval().total_fast_poll_count, 3);
1320 ///
1321 /// // this task completes in two fast polls
1322 /// let _ = metrics_monitor.instrument(async {
1323 /// spin_for(fast).await; // fast poll 1
1324 /// spin_for(fast) // fast poll 2
1325 /// }).await;
1326 ///
1327 /// assert_eq!(next_interval().total_fast_poll_count, 2);
1328 /// }
1329 ///
1330 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1331 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1332 /// let start = tokio::time::Instant::now();
1333 /// while start.elapsed() <= duration {}
1334 /// tokio::task::yield_now()
1335 /// }
1336 /// ```
1337 pub total_fast_poll_count: u64,
1338
1339 /// The total duration of fast polls.
1340 ///
1341 /// Here, 'fast' is defined as completing in strictly less time than
1342 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1343 ///
1344 /// ##### Derived metrics
1345 /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**
1346 /// The mean duration of fast polls.
1347 ///
1348 /// ##### Examples
1349 /// In the below example, no tasks are polled in the first sampling interval; three fast polls
1350 /// consume a total of 3μs time in the second sampling interval; and two fast polls consume a
1351 /// total of 2μs time in the third sampling interval:
1352 /// ```
1353 /// use std::future::Future;
1354 /// use std::time::Duration;
1355 ///
1356 /// #[tokio::main]
1357 /// async fn main() {
1358 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1359 /// let mut interval = metrics_monitor.intervals();
1360 /// let mut next_interval = || interval.next().unwrap();
1361 ///
1362 /// // no tasks have been constructed, instrumented, or polled
1363 /// let interval = next_interval();
1364 /// assert_eq!(interval.total_fast_poll_duration, Duration::ZERO);
1365 ///
1366 /// let fast = Duration::from_micros(1);
1367 ///
1368 /// // this task completes in three fast polls
1369 /// let task_a_time = time(metrics_monitor.instrument(async {
1370 /// spin_for(fast).await; // fast poll 1
1371 /// spin_for(fast).await; // fast poll 2
1372 /// spin_for(fast) // fast poll 3
1373 /// })).await;
1374 ///
1375 /// let interval = next_interval();
1376 /// assert!(interval.total_fast_poll_duration >= fast * 3);
1377 /// assert!(interval.total_fast_poll_duration <= task_a_time);
1378 ///
1379 /// // this task completes in two fast polls
1380 /// let task_b_time = time(metrics_monitor.instrument(async {
1381 /// spin_for(fast).await; // fast poll 1
1382 /// spin_for(fast) // fast poll 2
1383 /// })).await;
1384 ///
1385 /// let interval = next_interval();
1386 /// assert!(interval.total_fast_poll_duration >= fast * 2);
1387 /// assert!(interval.total_fast_poll_duration <= task_b_time);
1388 /// }
1389 ///
1390 /// /// Produces the amount of time it took to await a given async task.
1391 /// async fn time(task: impl Future) -> Duration {
1392 /// let start = tokio::time::Instant::now();
1393 /// task.await;
1394 /// start.elapsed()
1395 /// }
1396 ///
1397 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1398 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1399 /// let start = tokio::time::Instant::now();
1400 /// while start.elapsed() <= duration {}
1401 /// tokio::task::yield_now()
1402 /// }
1403 /// ```
1404 pub total_fast_poll_duration: Duration,
1405
1406 /// The total number of times that polling tasks completed slowly.
1407 ///
1408 /// Here, 'slowly' is defined as completing in at least as much time as
1409 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1410 ///
1411 /// ##### Derived metrics
1412 /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**
1413 /// The mean duration of slow polls.
1414 ///
1415 /// ##### Examples
1416 /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1417 /// within the second sampling interval, and 2 slow polls occur within the third sampling
1418 /// interval:
1419 /// ```
1420 /// use std::future::Future;
1421 /// use std::time::Duration;
1422 ///
1423 /// #[tokio::main]
1424 /// async fn main() {
1425 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1426 /// let mut interval = metrics_monitor.intervals();
1427 /// let mut next_interval = || interval.next().unwrap();
1428 ///
1429 /// // no tasks have been constructed, instrumented, or polled
1430 /// assert_eq!(next_interval().total_slow_poll_count, 0);
1431 ///
1432 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1433 ///
1434 /// // this task completes in three slow polls
1435 /// let _ = metrics_monitor.instrument(async {
1436 /// spin_for(slow).await; // slow poll 1
1437 /// spin_for(slow).await; // slow poll 2
1438 /// spin_for(slow) // slow poll 3
1439 /// }).await;
1440 ///
1441 /// assert_eq!(next_interval().total_slow_poll_count, 3);
1442 ///
1443 /// // this task completes in two slow polls
1444 /// let _ = metrics_monitor.instrument(async {
1445 /// spin_for(slow).await; // slow poll 1
1446 /// spin_for(slow) // slow poll 2
1447 /// }).await;
1448 ///
1449 /// assert_eq!(next_interval().total_slow_poll_count, 2);
1450 /// }
1451 ///
1452 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1453 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1454 /// let start = tokio::time::Instant::now();
1455 /// while start.elapsed() <= duration {}
1456 /// tokio::task::yield_now()
1457 /// }
1458 /// ```
1459 pub total_slow_poll_count: u64,
1460
1461 /// The total duration of slow polls.
1462 ///
1463 /// Here, 'slowly' is defined as completing in at least as much time as
1464 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1465 ///
1466 /// ##### Derived metrics
1467 /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**
1468 /// The mean duration of slow polls.
1469 ///
1470 /// ##### Examples
1471 /// In the below example, no tasks are polled in the first sampling interval; three slow polls
1472 /// consume a total of
1473 /// 30 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD]
1474 /// time in the second sampling interval; and two slow polls consume a total of
1475 /// 20 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
1476 /// third sampling interval:
1477 /// ```
1478 /// use std::future::Future;
1479 /// use std::time::Duration;
1480 ///
1481 /// #[tokio::main]
1482 /// async fn main() {
1483 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1484 /// let mut interval = metrics_monitor.intervals();
1485 /// let mut next_interval = || interval.next().unwrap();
1486 ///
1487 /// // no tasks have been constructed, instrumented, or polled
1488 /// let interval = next_interval();
1489 /// assert_eq!(interval.total_slow_poll_duration, Duration::ZERO);
1490 ///
1491 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1492 ///
1493 /// // this task completes in three slow polls
1494 /// let task_a_time = time(metrics_monitor.instrument(async {
1495 /// spin_for(slow).await; // slow poll 1
1496 /// spin_for(slow).await; // slow poll 2
1497 /// spin_for(slow) // slow poll 3
1498 /// })).await;
1499 ///
1500 /// let interval = next_interval();
1501 /// assert!(interval.total_slow_poll_duration >= slow * 3);
1502 /// assert!(interval.total_slow_poll_duration <= task_a_time);
1503 ///
1504 /// // this task completes in two slow polls
1505 /// let task_b_time = time(metrics_monitor.instrument(async {
1506 /// spin_for(slow).await; // slow poll 1
1507 /// spin_for(slow) // slow poll 2
1508 /// })).await;
1509 ///
1510 /// let interval = next_interval();
1511 /// assert!(interval.total_slow_poll_duration >= slow * 2);
1512 /// assert!(interval.total_slow_poll_duration <= task_b_time);
1513 /// }
1514 ///
1515 /// /// Produces the amount of time it took to await a given async task.
1516 /// async fn time(task: impl Future) -> Duration {
1517 /// let start = tokio::time::Instant::now();
1518 /// task.await;
1519 /// start.elapsed()
1520 /// }
1521 ///
1522 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1523 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1524 /// let start = tokio::time::Instant::now();
1525 /// while start.elapsed() <= duration {}
1526 /// tokio::task::yield_now()
1527 /// }
1528 /// ```
1529 pub total_slow_poll_duration: Duration,
1530
1531 /// The total count of tasks with short scheduling delays.
1532 ///
1533 /// This is defined as tasks taking strictly less than
1534 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1535 /// scheduled.
1536 ///
1537 /// ##### Derived metrics
1538 /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**
1539 /// The mean duration of short scheduling delays.
1540 pub total_short_delay_count: u64,
1541
1542 /// The total duration of tasks with short scheduling delays.
1543 ///
1544 /// This is defined as tasks taking strictly less than
1545 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1546 /// scheduled.
1547 ///
1548 /// ##### Derived metrics
1549 /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**
1550 /// The mean duration of short scheduling delays.
1551 pub total_short_delay_duration: Duration,
1552
1553 /// The total count of tasks with long scheduling delays.
1554 ///
1555 /// This is defined as tasks taking
1556 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] or longer to be executed
1557 /// after being scheduled.
1558 ///
1559 /// ##### Derived metrics
1560 /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**
1561 /// The mean duration of short scheduling delays.
1562 pub total_long_delay_count: u64,
1563
1564 /// The total duration of tasks with long scheduling delays.
1565 ///
1566 /// This is defined as tasks taking
1567 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] or longer to be executed
1568 /// after being scheduled.
1569 ///
1570 /// ##### Derived metrics
1571 /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**
1572 /// The mean duration of short scheduling delays.
1573 pub total_long_delay_duration: Duration,
1574}
1575
1576/// Tracks the metrics, shared across the various types.
1577#[derive(Debug)]
1578struct RawMetrics {
1579 /// A task poll takes longer than this, it is considered a slow poll.
1580 slow_poll_threshold: Duration,
1581
1582 /// A scheduling delay of at least this long will be considered a long delay
1583 long_delay_threshold: Duration,
1584
1585 /// Total number of instrumented tasks.
1586 instrumented_count: AtomicU64,
1587
1588 /// Total number of instrumented tasks polled at least once.
1589 first_poll_count: AtomicU64,
1590
1591 /// Total number of times tasks entered the `idle` state.
1592 total_idled_count: AtomicU64,
1593
1594 /// Total number of times tasks were scheduled.
1595 total_scheduled_count: AtomicU64,
1596
1597 /// Total number of times tasks were polled fast
1598 total_fast_poll_count: AtomicU64,
1599
1600 /// Total number of times tasks were polled slow
1601 total_slow_poll_count: AtomicU64,
1602
1603 /// Total number of times tasks had long delay,
1604 total_long_delay_count: AtomicU64,
1605
1606 /// Total number of times tasks had little delay
1607 total_short_delay_count: AtomicU64,
1608
1609 /// Total number of times tasks were dropped
1610 dropped_count: AtomicU64,
1611
1612 /// Total amount of time until the first poll
1613 total_first_poll_delay_ns: AtomicU64,
1614
1615 /// Total amount of time tasks spent in the `idle` state.
1616 total_idle_duration_ns: AtomicU64,
1617
1618 /// The longest time tasks spent in the `idle` state locally.
1619 /// This will be used to track the local max between interval
1620 /// metric snapshots.
1621 local_max_idle_duration_ns: AtomicU64,
1622
1623 /// The longest time tasks spent in the `idle` state.
1624 global_max_idle_duration_ns: AtomicU64,
1625
1626 /// Total amount of time tasks spent in the waking state.
1627 total_scheduled_duration_ns: AtomicU64,
1628
1629 /// Total amount of time tasks spent being polled below the slow cut off.
1630 total_fast_poll_duration_ns: AtomicU64,
1631
1632 /// Total amount of time tasks spent being polled above the slow cut off.
1633 total_slow_poll_duration: AtomicU64,
1634
1635 /// Total amount of time tasks spent being polled below the long delay cut off.
1636 total_short_delay_duration_ns: AtomicU64,
1637
1638 /// Total amount of time tasks spent being polled at or above the long delay cut off.
1639 total_long_delay_duration_ns: AtomicU64,
1640}
1641
1642#[derive(Debug)]
1643struct State<M> {
1644 /// Where metrics should be recorded
1645 monitor: M,
1646
1647 /// Instant at which the task was instrumented. This is used to track the time to first poll.
1648 instrumented_at: Instant,
1649
1650 /// The instant, tracked as nanoseconds since `instrumented_at`, at which the future
1651 /// was last woken.
1652 woke_at: AtomicU64,
1653
1654 /// Waker to forward notifications to.
1655 waker: AtomicWaker,
1656}
1657
1658impl TaskMonitor {
1659 /// The default duration at which polls cross the threshold into being categorized as 'slow' is
1660 /// 50μs.
1661 #[cfg(not(test))]
1662 pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_micros(50);
1663 #[cfg(test)]
1664 #[allow(missing_docs)]
1665 pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_millis(500);
1666
1667 /// The default duration at which schedules cross the threshold into being categorized as 'long'
1668 /// is 50μs.
1669 #[cfg(not(test))]
1670 pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_micros(50);
1671 #[cfg(test)]
1672 #[allow(missing_docs)]
1673 pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_millis(500);
1674
1675 /// Constructs a new task monitor.
1676 ///
1677 /// Uses [`Self::DEFAULT_SLOW_POLL_THRESHOLD`] as the threshold at which polls will be
1678 /// considered 'slow'.
1679 ///
1680 /// Uses [`Self::DEFAULT_LONG_DELAY_THRESHOLD`] as the threshold at which scheduling will be
1681 /// considered 'long'.
1682 pub fn new() -> TaskMonitor {
1683 TaskMonitor::with_slow_poll_threshold(Self::DEFAULT_SLOW_POLL_THRESHOLD)
1684 }
1685
1686 /// Constructs a builder for a task monitor.
1687 pub fn builder() -> TaskMonitorBuilder {
1688 TaskMonitorBuilder::new()
1689 }
1690
1691 /// Constructs a new task monitor with a given threshold at which polls are considered 'slow'.
1692 ///
1693 /// ##### Selecting an appropriate threshold
1694 /// TODO. What advice can we give here?
1695 ///
1696 /// ##### Examples
1697 /// In the below example, low-threshold and high-threshold monitors are constructed and
1698 /// instrument identical tasks; the low-threshold monitor reports4 slow polls, and the
1699 /// high-threshold monitor reports only 2 slow polls:
1700 /// ```
1701 /// use std::future::Future;
1702 /// use std::time::Duration;
1703 /// use tokio_metrics::TaskMonitor;
1704 ///
1705 /// #[tokio::main]
1706 /// async fn main() {
1707 /// let lo_threshold = Duration::from_micros(10);
1708 /// let hi_threshold = Duration::from_millis(10);
1709 ///
1710 /// let lo_monitor = TaskMonitor::with_slow_poll_threshold(lo_threshold);
1711 /// let hi_monitor = TaskMonitor::with_slow_poll_threshold(hi_threshold);
1712 ///
1713 /// let make_task = || async {
1714 /// spin_for(lo_threshold).await; // faster poll 1
1715 /// spin_for(lo_threshold).await; // faster poll 2
1716 /// spin_for(hi_threshold).await; // slower poll 3
1717 /// spin_for(hi_threshold).await // slower poll 4
1718 /// };
1719 ///
1720 /// lo_monitor.instrument(make_task()).await;
1721 /// hi_monitor.instrument(make_task()).await;
1722 ///
1723 /// // the low-threshold monitor reported 4 slow polls:
1724 /// assert_eq!(lo_monitor.cumulative().total_slow_poll_count, 4);
1725 /// // the high-threshold monitor reported only 2 slow polls:
1726 /// assert_eq!(hi_monitor.cumulative().total_slow_poll_count, 2);
1727 /// }
1728 ///
1729 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1730 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1731 /// let start = tokio::time::Instant::now();
1732 /// while start.elapsed() <= duration {}
1733 /// tokio::task::yield_now()
1734 /// }
1735 /// ```
1736 pub fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitor {
1737 let base = TaskMonitorCore::create(slow_poll_cut_off, Self::DEFAULT_LONG_DELAY_THRESHOLD);
1738 TaskMonitor {
1739 base: Arc::new(base),
1740 }
1741 }
1742
1743 /// Produces the duration greater-than-or-equal-to at which polls are categorized as slow.
1744 ///
1745 /// ##### Examples
1746 /// In the below example, [`TaskMonitor`] is initialized with [`TaskMonitor::new`];
1747 /// consequently, its slow-poll threshold equals [`TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD`]:
1748 /// ```
1749 /// use tokio_metrics::TaskMonitor;
1750 ///
1751 /// #[tokio::main]
1752 /// async fn main() {
1753 /// let metrics_monitor = TaskMonitor::new();
1754 ///
1755 /// assert_eq!(
1756 /// metrics_monitor.slow_poll_threshold(),
1757 /// TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD
1758 /// );
1759 /// }
1760 /// ```
1761 pub fn slow_poll_threshold(&self) -> Duration {
1762 self.base.metrics.slow_poll_threshold
1763 }
1764
1765 /// Produces the duration greater-than-or-equal-to at which scheduling delays are categorized
1766 /// as long.
1767 pub fn long_delay_threshold(&self) -> Duration {
1768 self.base.metrics.long_delay_threshold
1769 }
1770
1771 /// Produces an instrumented façade around a given async task.
1772 ///
1773 /// ##### Examples
1774 /// Instrument an async task by passing it to [`TaskMonitor::instrument`]:
1775 /// ```
1776 /// #[tokio::main]
1777 /// async fn main() {
1778 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1779 ///
1780 /// // 0 tasks have been instrumented, much less polled
1781 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1782 ///
1783 /// // instrument a task and poll it to completion
1784 /// metrics_monitor.instrument(async {}).await;
1785 ///
1786 /// // 1 task has been instrumented and polled
1787 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 1);
1788 ///
1789 /// // instrument a task and poll it to completion
1790 /// metrics_monitor.instrument(async {}).await;
1791 ///
1792 /// // 2 tasks have been instrumented and polled
1793 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 2);
1794 /// }
1795 /// ```
1796 /// An aync task may be tracked by multiple [`TaskMonitor`]s; e.g.:
1797 /// ```
1798 /// #[tokio::main]
1799 /// async fn main() {
1800 /// let monitor_a = tokio_metrics::TaskMonitor::new();
1801 /// let monitor_b = tokio_metrics::TaskMonitor::new();
1802 ///
1803 /// // 0 tasks have been instrumented, much less polled
1804 /// assert_eq!(monitor_a.cumulative().first_poll_count, 0);
1805 /// assert_eq!(monitor_b.cumulative().first_poll_count, 0);
1806 ///
1807 /// // instrument a task and poll it to completion
1808 /// monitor_a.instrument(monitor_b.instrument(async {})).await;
1809 ///
1810 /// // 1 task has been instrumented and polled
1811 /// assert_eq!(monitor_a.cumulative().first_poll_count, 1);
1812 /// assert_eq!(monitor_b.cumulative().first_poll_count, 1);
1813 /// }
1814 /// ```
1815 /// It is also possible (but probably undesirable) to instrument an async task multiple times
1816 /// with the same [`TaskMonitor`]; e.g.:
1817 /// ```
1818 /// #[tokio::main]
1819 /// async fn main() {
1820 /// let monitor = tokio_metrics::TaskMonitor::new();
1821 ///
1822 /// // 0 tasks have been instrumented, much less polled
1823 /// assert_eq!(monitor.cumulative().first_poll_count, 0);
1824 ///
1825 /// // instrument a task and poll it to completion
1826 /// monitor.instrument(monitor.instrument(async {})).await;
1827 ///
1828 /// // 2 tasks have been instrumented and polled, supposedly
1829 /// assert_eq!(monitor.cumulative().first_poll_count, 2);
1830 /// }
1831 /// ```
1832 pub fn instrument<F>(&self, task: F) -> Instrumented<F> {
1833 TaskMonitorCore::instrument_with(task, self.clone())
1834 }
1835
1836 /// Produces [`TaskMetrics`] for the tasks instrumented by this [`TaskMonitor`], collected since
1837 /// the construction of [`TaskMonitor`].
1838 ///
1839 /// ##### See also
1840 /// - [`TaskMonitor::intervals`]:
1841 /// produces [`TaskMetrics`] for user-defined sampling intervals, instead of cumulatively
1842 ///
1843 /// ##### Examples
1844 /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1845 /// within the second sampling interval, and 2 slow polls occur within the third sampling
1846 /// interval; five slow polls occur across all sampling intervals:
1847 /// ```
1848 /// use std::future::Future;
1849 /// use std::time::Duration;
1850 ///
1851 /// #[tokio::main]
1852 /// async fn main() {
1853 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1854 ///
1855 /// // initialize a stream of sampling intervals
1856 /// let mut intervals = metrics_monitor.intervals();
1857 /// // each call of `next_interval` will produce metrics for the last sampling interval
1858 /// let mut next_interval = || intervals.next().unwrap();
1859 ///
1860 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1861 ///
1862 /// // this task completes in three slow polls
1863 /// let _ = metrics_monitor.instrument(async {
1864 /// spin_for(slow).await; // slow poll 1
1865 /// spin_for(slow).await; // slow poll 2
1866 /// spin_for(slow) // slow poll 3
1867 /// }).await;
1868 ///
1869 /// // in the previous sampling interval, there were 3 slow polls
1870 /// assert_eq!(next_interval().total_slow_poll_count, 3);
1871 /// assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 3);
1872 ///
1873 /// // this task completes in two slow polls
1874 /// let _ = metrics_monitor.instrument(async {
1875 /// spin_for(slow).await; // slow poll 1
1876 /// spin_for(slow) // slow poll 2
1877 /// }).await;
1878 ///
1879 /// // in the previous sampling interval, there were 2 slow polls
1880 /// assert_eq!(next_interval().total_slow_poll_count, 2);
1881 ///
1882 /// // across all sampling interval, there were a total of 5 slow polls
1883 /// assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1884 /// }
1885 ///
1886 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1887 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1888 /// let start = tokio::time::Instant::now();
1889 /// while start.elapsed() <= duration {}
1890 /// tokio::task::yield_now()
1891 /// }
1892 /// ```
1893 pub fn cumulative(&self) -> TaskMetrics {
1894 self.base.metrics.metrics()
1895 }
1896
1897 /// Produces an unending iterator of metric sampling intervals.
1898 ///
1899 /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1900 /// produced by [`TaskMonitor::intervals`]. The item type of this iterator is [`TaskMetrics`],
1901 /// which is a bundle of task metrics that describe *only* events occurring within that sampling
1902 /// interval.
1903 ///
1904 /// ##### Examples
1905 /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1906 /// within the second sampling interval, and 2 slow polls occur within the third sampling
1907 /// interval; five slow polls occur across all sampling intervals:
1908 /// ```
1909 /// use std::future::Future;
1910 /// use std::time::Duration;
1911 ///
1912 /// #[tokio::main]
1913 /// async fn main() {
1914 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1915 ///
1916 /// // initialize a stream of sampling intervals
1917 /// let mut intervals = metrics_monitor.intervals();
1918 /// // each call of `next_interval` will produce metrics for the last sampling interval
1919 /// let mut next_interval = || intervals.next().unwrap();
1920 ///
1921 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1922 ///
1923 /// // this task completes in three slow polls
1924 /// let _ = metrics_monitor.instrument(async {
1925 /// spin_for(slow).await; // slow poll 1
1926 /// spin_for(slow).await; // slow poll 2
1927 /// spin_for(slow) // slow poll 3
1928 /// }).await;
1929 ///
1930 /// // in the previous sampling interval, there were 3 slow polls
1931 /// assert_eq!(next_interval().total_slow_poll_count, 3);
1932 ///
1933 /// // this task completes in two slow polls
1934 /// let _ = metrics_monitor.instrument(async {
1935 /// spin_for(slow).await; // slow poll 1
1936 /// spin_for(slow) // slow poll 2
1937 /// }).await;
1938 ///
1939 /// // in the previous sampling interval, there were 2 slow polls
1940 /// assert_eq!(next_interval().total_slow_poll_count, 2);
1941 ///
1942 /// // across all sampling intervals, there were a total of 5 slow polls
1943 /// assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1944 /// }
1945 ///
1946 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1947 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1948 /// let start = tokio::time::Instant::now();
1949 /// while start.elapsed() <= duration {}
1950 /// tokio::task::yield_now()
1951 /// }
1952 /// ```
1953 pub fn intervals(&self) -> TaskIntervals {
1954 TaskIntervals {
1955 monitor: self.clone(),
1956 previous: None,
1957 }
1958 }
1959}
1960
1961impl TaskMonitorCore {
1962 /// Returns a const-friendly [`TaskMonitorCoreBuilder`].
1963 pub const fn builder() -> TaskMonitorCoreBuilder {
1964 TaskMonitorCoreBuilder::new()
1965 }
1966
1967 /// Constructs a new [`TaskMonitorCore`]. Refer to the struct documentation for more discussion
1968 /// of benefits compared to [`TaskMonitor`].
1969 ///
1970 /// Uses [`TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD`] as the threshold at which polls will be
1971 /// considered 'slow'.
1972 ///
1973 /// Uses [`TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD`] as the threshold at which scheduling will be
1974 /// considered 'long'.
1975 pub const fn new() -> TaskMonitorCore {
1976 TaskMonitorCore::with_slow_poll_threshold(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD)
1977 }
1978
1979 /// Constructs a new task monitor with a given threshold at which polls are considered 'slow'.
1980 ///
1981 /// Refer to [`TaskMonitor::with_slow_poll_threshold`] for examples.
1982 pub const fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitorCore {
1983 Self::create(slow_poll_cut_off, TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD)
1984 }
1985
1986 /// Produces the duration greater-than-or-equal-to at which polls are categorized as slow.
1987 ///
1988 /// Refer to [`TaskMonitor::slow_poll_threshold`] for examples.
1989 pub fn slow_poll_threshold(&self) -> Duration {
1990 self.metrics.slow_poll_threshold
1991 }
1992
1993 /// Produces the duration greater-than-or-equal-to at which scheduling delays are categorized
1994 /// as long.
1995 pub fn long_delay_threshold(&self) -> Duration {
1996 self.metrics.long_delay_threshold
1997 }
1998
1999 /// Produces an instrumented façade around a given async task.
2000 ///
2001 /// ##### Examples
2002 /// ```
2003 /// use tokio_metrics::TaskMonitorCore;
2004 ///
2005 /// static MONITOR: TaskMonitorCore = TaskMonitorCore::new();
2006 ///
2007 /// #[tokio::main]
2008 /// async fn main() {
2009 /// assert_eq!(MONITOR.cumulative().first_poll_count, 0);
2010 ///
2011 /// MONITOR.instrument(async {}).await;
2012 /// assert_eq!(MONITOR.cumulative().first_poll_count, 1);
2013 /// }
2014 /// ```
2015 pub fn instrument<F>(&'static self, task: F) -> Instrumented<F, &'static Self> {
2016 Self::instrument_with(task, self)
2017 }
2018
2019 /// Produces an instrumented façade around a given async task, with an explicit monitor.
2020 ///
2021 /// Use this when you have a non-static monitor reference, such as an `Arc<TaskMonitorCore>`.
2022 ///
2023 /// ##### Examples
2024 /// ```
2025 /// use std::sync::Arc;
2026 /// use tokio_metrics::TaskMonitorCore;
2027 ///
2028 /// #[derive(Clone)]
2029 /// struct SharedState(Arc<SharedStateInner>);
2030 /// struct SharedStateInner {
2031 /// monitor: TaskMonitorCore,
2032 /// other_state: SomeOtherSharedState,
2033 /// }
2034 /// /// Imagine: a type that wasn't `Clone` that you want to pass around
2035 /// /// in a similar way as the monitor
2036 /// struct SomeOtherSharedState;
2037 ///
2038 /// impl AsRef<TaskMonitorCore> for SharedState {
2039 /// fn as_ref(&self) -> &TaskMonitorCore {
2040 /// &self.0.monitor
2041 /// }
2042 /// }
2043 ///
2044 /// #[tokio::main]
2045 /// async fn main() {
2046 /// let state = SharedState(Arc::new(SharedStateInner {
2047 /// monitor: TaskMonitorCore::new(),
2048 /// other_state: SomeOtherSharedState,
2049 /// }));
2050 ///
2051 /// assert_eq!(state.0.monitor.cumulative().first_poll_count, 0);
2052 ///
2053 /// TaskMonitorCore::instrument_with(async {}, state.clone()).await;
2054 /// assert_eq!(state.0.monitor.cumulative().first_poll_count, 1);
2055 /// }
2056 /// ```
2057 pub fn instrument_with<F, M: AsRef<TaskMonitorCore> + Send + Sync + 'static>(
2058 task: F,
2059 monitor: M,
2060 ) -> Instrumented<F, M> {
2061 monitor
2062 .as_ref()
2063 .metrics
2064 .instrumented_count
2065 .fetch_add(1, SeqCst);
2066
2067 let state: State<M> = State {
2068 monitor,
2069 instrumented_at: Instant::now(),
2070 woke_at: AtomicU64::new(0),
2071 waker: AtomicWaker::new(),
2072 };
2073
2074 let instrumented: Instrumented<F, M> = Instrumented {
2075 task,
2076 did_poll_once: false,
2077 idled_at: 0,
2078 state: Arc::new(state),
2079 };
2080
2081 instrumented
2082 }
2083
2084 /// Produces [`TaskMetrics`] for the tasks instrumented by this [`TaskMonitorCore`], collected since
2085 /// the construction of [`TaskMonitorCore`].
2086 ///
2087 /// ##### See also
2088 /// - [`TaskMonitorCore::intervals`]:
2089 /// produces [`TaskMetrics`] for user-defined sampling intervals, instead of cumulatively
2090 ///
2091 /// See [`TaskMonitor::cumulative`] for examples.
2092 pub fn cumulative(&self) -> TaskMetrics {
2093 self.metrics.metrics()
2094 }
2095
2096 /// Produces an unending iterator of metric sampling intervals.
2097 ///
2098 /// Each sampling interval is defined by the time elapsed between advancements of the iterator
2099 /// produced by [`TaskMonitorCore::intervals`]. The item type of this iterator is [`TaskMetrics`],
2100 /// which is a bundle of task metrics that describe *only* events occurring within that sampling
2101 /// interval.
2102 ///
2103 /// ##### Examples
2104 /// The below example demonstrates construction of [`TaskIntervals`] with [`TaskMonitorCore`].
2105 ///
2106 /// See [`TaskMonitor::intervals`] for more usage examples.
2107 ///
2108 /// ```
2109 /// use std::sync::Arc;
2110 ///
2111 /// fn main() {
2112 /// let metrics_monitor = Arc::new(tokio_metrics::TaskMonitorCore::new());
2113 ///
2114 /// let mut _intervals = tokio_metrics::TaskMonitorCore::intervals(metrics_monitor);
2115 /// }
2116 /// ```
2117 pub fn intervals<Monitor: AsRef<TaskMonitorCore> + Send + Sync + 'static>(
2118 monitor: Monitor,
2119 ) -> TaskIntervals<Monitor> {
2120 let intervals: TaskIntervals<Monitor> = TaskIntervals {
2121 monitor,
2122 previous: None,
2123 };
2124
2125 intervals
2126 }
2127}
2128
2129impl AsRef<TaskMonitorCore> for TaskMonitorCore {
2130 fn as_ref(&self) -> &TaskMonitorCore {
2131 self
2132 }
2133}
2134
2135impl TaskMonitorCore {
2136 const fn create(slow_poll_cut_off: Duration, long_delay_cut_off: Duration) -> TaskMonitorCore {
2137 TaskMonitorCore {
2138 metrics: RawMetrics {
2139 slow_poll_threshold: slow_poll_cut_off,
2140 first_poll_count: AtomicU64::new(0),
2141 total_idled_count: AtomicU64::new(0),
2142 total_scheduled_count: AtomicU64::new(0),
2143 total_fast_poll_count: AtomicU64::new(0),
2144 total_slow_poll_count: AtomicU64::new(0),
2145 total_long_delay_count: AtomicU64::new(0),
2146 instrumented_count: AtomicU64::new(0),
2147 dropped_count: AtomicU64::new(0),
2148 total_first_poll_delay_ns: AtomicU64::new(0),
2149 total_scheduled_duration_ns: AtomicU64::new(0),
2150 local_max_idle_duration_ns: AtomicU64::new(0),
2151 global_max_idle_duration_ns: AtomicU64::new(0),
2152 total_idle_duration_ns: AtomicU64::new(0),
2153 total_fast_poll_duration_ns: AtomicU64::new(0),
2154 total_slow_poll_duration: AtomicU64::new(0),
2155 total_short_delay_duration_ns: AtomicU64::new(0),
2156 long_delay_threshold: long_delay_cut_off,
2157 total_short_delay_count: AtomicU64::new(0),
2158 total_long_delay_duration_ns: AtomicU64::new(0),
2159 },
2160 }
2161 }
2162}
2163
2164impl RawMetrics {
2165 fn get_and_reset_local_max_idle_duration(&self) -> Duration {
2166 Duration::from_nanos(self.local_max_idle_duration_ns.swap(0, SeqCst))
2167 }
2168
2169 fn metrics(&self) -> TaskMetrics {
2170 let total_fast_poll_count = self.total_fast_poll_count.load(SeqCst);
2171 let total_slow_poll_count = self.total_slow_poll_count.load(SeqCst);
2172
2173 let total_fast_poll_duration =
2174 Duration::from_nanos(self.total_fast_poll_duration_ns.load(SeqCst));
2175 let total_slow_poll_duration =
2176 Duration::from_nanos(self.total_slow_poll_duration.load(SeqCst));
2177
2178 let total_poll_count = total_fast_poll_count.saturating_add(total_slow_poll_count);
2179 let total_poll_duration = total_fast_poll_duration.saturating_add(total_slow_poll_duration);
2180
2181 TaskMetrics {
2182 instrumented_count: self.instrumented_count.load(SeqCst),
2183 dropped_count: self.dropped_count.load(SeqCst),
2184
2185 total_poll_count,
2186 total_poll_duration,
2187 first_poll_count: self.first_poll_count.load(SeqCst),
2188 total_idled_count: self.total_idled_count.load(SeqCst),
2189 total_scheduled_count: self.total_scheduled_count.load(SeqCst),
2190 total_fast_poll_count: self.total_fast_poll_count.load(SeqCst),
2191 total_slow_poll_count: self.total_slow_poll_count.load(SeqCst),
2192 total_short_delay_count: self.total_short_delay_count.load(SeqCst),
2193 total_long_delay_count: self.total_long_delay_count.load(SeqCst),
2194 total_first_poll_delay: Duration::from_nanos(
2195 self.total_first_poll_delay_ns.load(SeqCst),
2196 ),
2197 max_idle_duration: Duration::from_nanos(self.global_max_idle_duration_ns.load(SeqCst)),
2198 total_idle_duration: Duration::from_nanos(self.total_idle_duration_ns.load(SeqCst)),
2199 total_scheduled_duration: Duration::from_nanos(
2200 self.total_scheduled_duration_ns.load(SeqCst),
2201 ),
2202 total_fast_poll_duration: Duration::from_nanos(
2203 self.total_fast_poll_duration_ns.load(SeqCst),
2204 ),
2205 total_slow_poll_duration: Duration::from_nanos(
2206 self.total_slow_poll_duration.load(SeqCst),
2207 ),
2208 total_short_delay_duration: Duration::from_nanos(
2209 self.total_short_delay_duration_ns.load(SeqCst),
2210 ),
2211 total_long_delay_duration: Duration::from_nanos(
2212 self.total_long_delay_duration_ns.load(SeqCst),
2213 ),
2214 }
2215 }
2216}
2217
2218impl Default for TaskMonitor {
2219 fn default() -> TaskMonitor {
2220 TaskMonitor::new()
2221 }
2222}
2223
2224impl Default for TaskMonitorCore {
2225 fn default() -> TaskMonitorCore {
2226 TaskMonitorCore::new()
2227 }
2228}
2229
2230derived_metrics!(
2231 [TaskMetrics] {
2232 stable {
2233 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
2234 /// are first polled.
2235 ///
2236 /// ##### Definition
2237 /// This metric is derived from [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay]
2238 /// ÷ [`first_poll_count`][TaskMetrics::first_poll_count].
2239 ///
2240 /// ##### Interpretation
2241 /// If this metric increases, it means that, on average, tasks spent longer waiting to be
2242 /// initially polled.
2243 ///
2244 /// ##### See also
2245 /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
2246 /// The mean duration that tasks spent waiting to be executed after awakening.
2247 ///
2248 /// ##### Examples
2249 /// In the below example, no tasks are instrumented or polled within the first sampling
2250 /// interval; in the second sampling interval, 500ms elapse between the instrumentation of a
2251 /// task and its first poll; in the third sampling interval, a mean of 750ms elapse between the
2252 /// instrumentation and first poll of two tasks:
2253 /// ```
2254 /// use std::time::Duration;
2255 ///
2256 /// #[tokio::main]
2257 /// async fn main() {
2258 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2259 /// let mut interval = metrics_monitor.intervals();
2260 /// let mut next_interval = || interval.next().unwrap();
2261 ///
2262 /// // no tasks have yet been created, instrumented, or polled
2263 /// assert_eq!(next_interval().mean_first_poll_delay(), Duration::ZERO);
2264 ///
2265 /// // constructs and instruments a task, pauses for `pause_time`, awaits the task, then
2266 /// // produces the total time it took to do all of the aforementioned
2267 /// async fn instrument_pause_await(
2268 /// metrics_monitor: &tokio_metrics::TaskMonitor,
2269 /// pause_time: Duration
2270 /// ) -> Duration
2271 /// {
2272 /// let before_instrumentation = tokio::time::Instant::now();
2273 /// let task = metrics_monitor.instrument(async move {});
2274 /// tokio::time::sleep(pause_time).await;
2275 /// task.await;
2276 /// before_instrumentation.elapsed()
2277 /// }
2278 ///
2279 /// // construct and await a task that pauses for 500ms between instrumentation and first poll
2280 /// let task_a_pause_time = Duration::from_millis(500);
2281 /// let task_a_total_time = instrument_pause_await(&metrics_monitor, task_a_pause_time).await;
2282 ///
2283 /// // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
2284 /// // pause time of 500ms, and less-than-or-equal-to the total runtime of `task_a`
2285 /// let mean_first_poll_delay = next_interval().mean_first_poll_delay();
2286 /// assert!(mean_first_poll_delay >= task_a_pause_time);
2287 /// assert!(mean_first_poll_delay <= task_a_total_time);
2288 ///
2289 /// // construct and await a task that pauses for 500ms between instrumentation and first poll
2290 /// let task_b_pause_time = Duration::from_millis(500);
2291 /// let task_b_total_time = instrument_pause_await(&metrics_monitor, task_b_pause_time).await;
2292 ///
2293 /// // construct and await a task that pauses for 1000ms between instrumentation and first poll
2294 /// let task_c_pause_time = Duration::from_millis(1000);
2295 /// let task_c_total_time = instrument_pause_await(&metrics_monitor, task_c_pause_time).await;
2296 ///
2297 /// // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
2298 /// // average pause time of 500ms, and less-than-or-equal-to the combined total runtime of
2299 /// // `task_b` and `task_c`
2300 /// let mean_first_poll_delay = next_interval().mean_first_poll_delay();
2301 /// assert!(mean_first_poll_delay >= (task_b_pause_time + task_c_pause_time) / 2);
2302 /// assert!(mean_first_poll_delay <= (task_b_total_time + task_c_total_time) / 2);
2303 /// }
2304 /// ```
2305 pub fn mean_first_poll_delay(&self) -> Duration {
2306 mean(self.total_first_poll_delay, self.first_poll_count)
2307 }
2308
2309 /// The mean duration of idles.
2310 ///
2311 /// ##### Definition
2312 /// This metric is derived from [`total_idle_duration`][TaskMetrics::total_idle_duration] ÷
2313 /// [`total_idled_count`][TaskMetrics::total_idled_count].
2314 ///
2315 /// ##### Interpretation
2316 /// The idle state is the duration spanning the instant a task completes a poll, and the instant
2317 /// that it is next awoken. Tasks inhabit this state when they are waiting for task-external
2318 /// events to complete (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this
2319 /// metric increases, it means that tasks, in aggregate, spent more time waiting for
2320 /// task-external events to complete.
2321 ///
2322 /// ##### Examples
2323 /// ```
2324 /// #[tokio::main]
2325 /// async fn main() {
2326 /// let monitor = tokio_metrics::TaskMonitor::new();
2327 /// let one_sec = std::time::Duration::from_secs(1);
2328 ///
2329 /// monitor.instrument(async move {
2330 /// tokio::time::sleep(one_sec).await;
2331 /// }).await;
2332 ///
2333 /// assert!(monitor.cumulative().mean_idle_duration() >= one_sec);
2334 /// }
2335 /// ```
2336 pub fn mean_idle_duration(&self) -> Duration {
2337 mean(self.total_idle_duration, self.total_idled_count)
2338 }
2339
2340 /// The mean duration that tasks spent waiting to be executed after awakening.
2341 ///
2342 /// ##### Definition
2343 /// This metric is derived from
2344 /// [`total_scheduled_duration`][TaskMetrics::total_scheduled_duration] ÷
2345 /// [`total_scheduled_count`][`TaskMetrics::total_scheduled_count`].
2346 ///
2347 /// ##### Interpretation
2348 /// If this metric increases, it means that, on average, tasks spent longer in the runtime's
2349 /// queues before being polled.
2350 ///
2351 /// ##### See also
2352 /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
2353 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
2354 /// are first polled.
2355 ///
2356 /// ##### Examples
2357 /// ```
2358 /// use tokio::time::Duration;
2359 ///
2360 /// #[tokio::main(flavor = "current_thread")]
2361 /// async fn main() {
2362 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2363 /// let mut interval = metrics_monitor.intervals();
2364 /// let mut next_interval = || interval.next().unwrap();
2365 ///
2366 /// // construct and instrument and spawn a task that yields endlessly
2367 /// tokio::spawn(metrics_monitor.instrument(async {
2368 /// loop { tokio::task::yield_now().await }
2369 /// }));
2370 ///
2371 /// tokio::task::yield_now().await;
2372 ///
2373 /// // block the executor for 1 second
2374 /// std::thread::sleep(Duration::from_millis(1000));
2375 ///
2376 /// // get the task to run twice
2377 /// // the first will have a 1 sec scheduling delay, the second will have almost none
2378 /// tokio::task::yield_now().await;
2379 /// tokio::task::yield_now().await;
2380 ///
2381 /// // `endless_task` will have spent approximately one second waiting
2382 /// let mean_scheduled_duration = next_interval().mean_scheduled_duration();
2383 /// assert!(mean_scheduled_duration >= Duration::from_millis(500), "{}", mean_scheduled_duration.as_secs_f64());
2384 /// assert!(mean_scheduled_duration <= Duration::from_millis(600), "{}", mean_scheduled_duration.as_secs_f64());
2385 /// }
2386 /// ```
2387 pub fn mean_scheduled_duration(&self) -> Duration {
2388 mean(self.total_scheduled_duration, self.total_scheduled_count)
2389 }
2390
2391 /// The mean duration of polls.
2392 ///
2393 /// ##### Definition
2394 /// This metric is derived from [`total_poll_duration`][TaskMetrics::total_poll_duration] ÷
2395 /// [`total_poll_count`][TaskMetrics::total_poll_count].
2396 ///
2397 /// ##### Interpretation
2398 /// If this metric increases, it means that, on average, individual polls are tending to take
2399 /// longer. However, this does not necessarily imply increased task latency: An increase in poll
2400 /// durations could be offset by fewer polls.
2401 ///
2402 /// ##### See also
2403 /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**
2404 /// The ratio between the number polls categorized as slow and fast.
2405 /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**
2406 /// The mean duration of slow polls.
2407 ///
2408 /// ##### Examples
2409 /// ```
2410 /// use std::time::Duration;
2411 ///
2412 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
2413 /// async fn main() {
2414 /// let monitor = tokio_metrics::TaskMonitor::new();
2415 /// let mut interval = monitor.intervals();
2416 /// let mut next_interval = move || interval.next().unwrap();
2417 ///
2418 /// assert_eq!(next_interval().mean_poll_duration(), Duration::ZERO);
2419 ///
2420 /// monitor.instrument(async {
2421 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
2422 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
2423 /// () // poll 3 (0s)
2424 /// }).await;
2425 ///
2426 /// assert_eq!(next_interval().mean_poll_duration(), Duration::from_secs(2) / 3);
2427 /// }
2428 /// ```
2429 pub fn mean_poll_duration(&self) -> Duration {
2430 mean(self.total_poll_duration, self.total_poll_count)
2431 }
2432
2433 /// The ratio between the number polls categorized as slow and fast.
2434 ///
2435 /// ##### Definition
2436 /// This metric is derived from [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count] ÷
2437 /// [`total_poll_count`][TaskMetrics::total_poll_count].
2438 ///
2439 /// ##### Interpretation
2440 /// If this metric increases, it means that a greater proportion of polls took excessively long
2441 /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2442 /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2443 /// However, as a rule, *should* yield to the scheduler frequently.
2444 ///
2445 /// ##### See also
2446 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
2447 /// The mean duration of polls.
2448 /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**
2449 /// The mean duration of slow polls.
2450 ///
2451 /// ##### Examples
2452 /// Changes in this metric may be observed by varying the ratio of slow and slow fast within
2453 /// sampling intervals; for instance:
2454 /// ```
2455 /// use std::future::Future;
2456 /// use std::time::Duration;
2457 ///
2458 /// #[tokio::main]
2459 /// async fn main() {
2460 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2461 /// let mut interval = metrics_monitor.intervals();
2462 /// let mut next_interval = || interval.next().unwrap();
2463 ///
2464 /// // no tasks have been constructed, instrumented, or polled
2465 /// let interval = next_interval();
2466 /// assert_eq!(interval.total_fast_poll_count, 0);
2467 /// assert_eq!(interval.total_slow_poll_count, 0);
2468 /// assert!(interval.slow_poll_ratio().is_nan());
2469 ///
2470 /// let fast = Duration::ZERO;
2471 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
2472 ///
2473 /// // this task completes in three fast polls
2474 /// metrics_monitor.instrument(async {
2475 /// spin_for(fast).await; // fast poll 1
2476 /// spin_for(fast).await; // fast poll 2
2477 /// spin_for(fast); // fast poll 3
2478 /// }).await;
2479 ///
2480 /// // this task completes in two slow polls
2481 /// metrics_monitor.instrument(async {
2482 /// spin_for(slow).await; // slow poll 1
2483 /// spin_for(slow); // slow poll 2
2484 /// }).await;
2485 ///
2486 /// let interval = next_interval();
2487 /// assert_eq!(interval.total_fast_poll_count, 3);
2488 /// assert_eq!(interval.total_slow_poll_count, 2);
2489 /// assert_eq!(interval.slow_poll_ratio(), ratio(2., 3.));
2490 ///
2491 /// // this task completes in three slow polls
2492 /// metrics_monitor.instrument(async {
2493 /// spin_for(slow).await; // slow poll 1
2494 /// spin_for(slow).await; // slow poll 2
2495 /// spin_for(slow); // slow poll 3
2496 /// }).await;
2497 ///
2498 /// // this task completes in two fast polls
2499 /// metrics_monitor.instrument(async {
2500 /// spin_for(fast).await; // fast poll 1
2501 /// spin_for(fast); // fast poll 2
2502 /// }).await;
2503 ///
2504 /// let interval = next_interval();
2505 /// assert_eq!(interval.total_fast_poll_count, 2);
2506 /// assert_eq!(interval.total_slow_poll_count, 3);
2507 /// assert_eq!(interval.slow_poll_ratio(), ratio(3., 2.));
2508 /// }
2509 ///
2510 /// fn ratio(a: f64, b: f64) -> f64 {
2511 /// a / (a + b)
2512 /// }
2513 ///
2514 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2515 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2516 /// let start = tokio::time::Instant::now();
2517 /// while start.elapsed() <= duration {}
2518 /// tokio::task::yield_now()
2519 /// }
2520 /// ```
2521 pub fn slow_poll_ratio(&self) -> f64 {
2522 self.total_slow_poll_count as f64 / self.total_poll_count as f64
2523 }
2524
2525 /// The ratio of tasks exceeding [`long_delay_threshold`][TaskMonitor::long_delay_threshold].
2526 ///
2527 /// ##### Definition
2528 /// This metric is derived from [`total_long_delay_count`][TaskMetrics::total_long_delay_count] ÷
2529 /// [`total_scheduled_count`][TaskMetrics::total_scheduled_count].
2530 pub fn long_delay_ratio(&self) -> f64 {
2531 self.total_long_delay_count as f64 / self.total_scheduled_count as f64
2532 }
2533
2534 /// The mean duration of fast polls.
2535 ///
2536 /// ##### Definition
2537 /// This metric is derived from
2538 /// [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration] ÷
2539 /// [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count].
2540 ///
2541 /// ##### Examples
2542 /// In the below example, no tasks are polled in the first sampling interval; three fast polls
2543 /// consume a mean of
2544 /// ⅜ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2545 /// second sampling interval; and two fast polls consume a total of
2546 /// ½ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2547 /// third sampling interval:
2548 /// ```
2549 /// use std::future::Future;
2550 /// use std::time::Duration;
2551 ///
2552 /// #[tokio::main]
2553 /// async fn main() {
2554 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2555 /// let mut interval = metrics_monitor.intervals();
2556 /// let mut next_interval = || interval.next().unwrap();
2557 ///
2558 /// // no tasks have been constructed, instrumented, or polled
2559 /// assert_eq!(next_interval().mean_fast_poll_duration(), Duration::ZERO);
2560 ///
2561 /// let threshold = metrics_monitor.slow_poll_threshold();
2562 /// let fast_1 = 1 * Duration::from_micros(1);
2563 /// let fast_2 = 2 * Duration::from_micros(1);
2564 /// let fast_3 = 3 * Duration::from_micros(1);
2565 ///
2566 /// // this task completes in two fast polls
2567 /// let total_time = time(metrics_monitor.instrument(async {
2568 /// spin_for(fast_1).await; // fast poll 1
2569 /// spin_for(fast_2) // fast poll 2
2570 /// })).await;
2571 ///
2572 /// // `mean_fast_poll_duration` ≈ the mean of `fast_1` and `fast_2`
2573 /// let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2574 /// assert!(mean_fast_poll_duration >= (fast_1 + fast_2) / 2);
2575 /// assert!(mean_fast_poll_duration <= total_time / 2);
2576 ///
2577 /// // this task completes in three fast polls
2578 /// let total_time = time(metrics_monitor.instrument(async {
2579 /// spin_for(fast_1).await; // fast poll 1
2580 /// spin_for(fast_2).await; // fast poll 2
2581 /// spin_for(fast_3) // fast poll 3
2582 /// })).await;
2583 ///
2584 /// // `mean_fast_poll_duration` ≈ the mean of `fast_1`, `fast_2`, `fast_3`
2585 /// let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2586 /// assert!(mean_fast_poll_duration >= (fast_1 + fast_2 + fast_3) / 3);
2587 /// assert!(mean_fast_poll_duration <= total_time / 3);
2588 /// }
2589 ///
2590 /// /// Produces the amount of time it took to await a given task.
2591 /// async fn time(task: impl Future) -> Duration {
2592 /// let start = tokio::time::Instant::now();
2593 /// task.await;
2594 /// start.elapsed()
2595 /// }
2596 ///
2597 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2598 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2599 /// let start = tokio::time::Instant::now();
2600 /// while start.elapsed() <= duration {}
2601 /// tokio::task::yield_now()
2602 /// }
2603 /// ```
2604 pub fn mean_fast_poll_duration(&self) -> Duration {
2605 mean(self.total_fast_poll_duration, self.total_fast_poll_count)
2606 }
2607
2608 /// The mean duration of slow polls.
2609 ///
2610 /// ##### Definition
2611 /// This metric is derived from
2612 /// [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration] ÷
2613 /// [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
2614 ///
2615 /// ##### Interpretation
2616 /// If this metric increases, it means that a greater proportion of polls took excessively long
2617 /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2618 /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2619 ///
2620 /// ##### See also
2621 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
2622 /// The mean duration of polls.
2623 /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**
2624 /// The ratio between the number polls categorized as slow and fast.
2625 ///
2626 /// ##### Interpretation
2627 /// If this metric increases, it means that, on average, slow polls got even slower. This does
2628 /// necessarily imply increased task latency: An increase in average slow poll duration could be
2629 /// offset by fewer or faster polls. However, as a rule, *should* yield to the scheduler
2630 /// frequently.
2631 ///
2632 /// ##### Examples
2633 /// In the below example, no tasks are polled in the first sampling interval; three slow polls
2634 /// consume a mean of
2635 /// 1.5 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2636 /// second sampling interval; and two slow polls consume a total of
2637 /// 2 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2638 /// third sampling interval:
2639 /// ```
2640 /// use std::future::Future;
2641 /// use std::time::Duration;
2642 ///
2643 /// #[tokio::main]
2644 /// async fn main() {
2645 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2646 /// let mut interval = metrics_monitor.intervals();
2647 /// let mut next_interval = || interval.next().unwrap();
2648 ///
2649 /// // no tasks have been constructed, instrumented, or polled
2650 /// assert_eq!(next_interval().mean_slow_poll_duration(), Duration::ZERO);
2651 ///
2652 /// let threshold = metrics_monitor.slow_poll_threshold();
2653 /// let slow_1 = 1 * threshold;
2654 /// let slow_2 = 2 * threshold;
2655 /// let slow_3 = 3 * threshold;
2656 ///
2657 /// // this task completes in two slow polls
2658 /// let total_time = time(metrics_monitor.instrument(async {
2659 /// spin_for(slow_1).await; // slow poll 1
2660 /// spin_for(slow_2) // slow poll 2
2661 /// })).await;
2662 ///
2663 /// // `mean_slow_poll_duration` ≈ the mean of `slow_1` and `slow_2`
2664 /// let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2665 /// assert!(mean_slow_poll_duration >= (slow_1 + slow_2) / 2);
2666 /// assert!(mean_slow_poll_duration <= total_time / 2);
2667 ///
2668 /// // this task completes in three slow polls
2669 /// let total_time = time(metrics_monitor.instrument(async {
2670 /// spin_for(slow_1).await; // slow poll 1
2671 /// spin_for(slow_2).await; // slow poll 2
2672 /// spin_for(slow_3) // slow poll 3
2673 /// })).await;
2674 ///
2675 /// // `mean_slow_poll_duration` ≈ the mean of `slow_1`, `slow_2`, `slow_3`
2676 /// let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2677 /// assert!(mean_slow_poll_duration >= (slow_1 + slow_2 + slow_3) / 3);
2678 /// assert!(mean_slow_poll_duration <= total_time / 3);
2679 /// }
2680 ///
2681 /// /// Produces the amount of time it took to await a given task.
2682 /// async fn time(task: impl Future) -> Duration {
2683 /// let start = tokio::time::Instant::now();
2684 /// task.await;
2685 /// start.elapsed()
2686 /// }
2687 ///
2688 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2689 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2690 /// let start = tokio::time::Instant::now();
2691 /// while start.elapsed() <= duration {}
2692 /// tokio::task::yield_now()
2693 /// }
2694 /// ```
2695 pub fn mean_slow_poll_duration(&self) -> Duration {
2696 mean(self.total_slow_poll_duration, self.total_slow_poll_count)
2697 }
2698
2699 /// The average time taken for a task with a short scheduling delay to be executed after being
2700 /// scheduled.
2701 ///
2702 /// ##### Definition
2703 /// This metric is derived from
2704 /// [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration] ÷
2705 /// [`total_short_delay_count`][TaskMetrics::total_short_delay_count].
2706 pub fn mean_short_delay_duration(&self) -> Duration {
2707 mean(
2708 self.total_short_delay_duration,
2709 self.total_short_delay_count,
2710 )
2711 }
2712
2713 /// The average scheduling delay for a task which takes a long time to start executing after
2714 /// being scheduled.
2715 ///
2716 /// ##### Definition
2717 /// This metric is derived from
2718 /// [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration] ÷
2719 /// [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
2720 pub fn mean_long_delay_duration(&self) -> Duration {
2721 mean(self.total_long_delay_duration, self.total_long_delay_count)
2722 }
2723 }
2724 unstable {}
2725 }
2726);
2727
2728impl<T: Future, M: AsRef<TaskMonitorCore> + Send + Sync + 'static> Future for Instrumented<T, M> {
2729 type Output = T::Output;
2730
2731 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2732 instrument_poll(cx, self, Future::poll)
2733 }
2734}
2735
2736impl<T: Stream> Stream for Instrumented<T> {
2737 type Item = T::Item;
2738
2739 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2740 instrument_poll(cx, self, Stream::poll_next)
2741 }
2742}
2743
2744fn instrument_poll<T, M: AsRef<TaskMonitorCore> + Send + Sync + 'static, Out>(
2745 cx: &mut Context<'_>,
2746 instrumented: Pin<&mut Instrumented<T, M>>,
2747 poll_fn: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Out>,
2748) -> Poll<Out> {
2749 let poll_start = Instant::now();
2750 let this = instrumented.project();
2751 let idled_at = this.idled_at;
2752 let state = this.state;
2753 let instrumented_at = state.instrumented_at;
2754 let metrics = &state.monitor.as_ref().metrics;
2755 /* accounting for time-to-first-poll and tasks-count */
2756 // is this the first time this task has been polled?
2757 if !*this.did_poll_once {
2758 // if so, we need to do three things:
2759 /* 1. note that this task *has* been polled */
2760 *this.did_poll_once = true;
2761
2762 /* 2. account for the time-to-first-poll of this task */
2763 // if the time-to-first-poll of this task exceeds `u64::MAX` ns,
2764 // round down to `u64::MAX` nanoseconds
2765 let elapsed = poll_start
2766 .saturating_duration_since(instrumented_at)
2767 .as_nanos()
2768 .try_into()
2769 .unwrap_or(u64::MAX);
2770 // add this duration to `time_to_first_poll_ns_total`
2771 metrics.total_first_poll_delay_ns.fetch_add(elapsed, SeqCst);
2772
2773 /* 3. increment the count of tasks that have been polled at least once */
2774 metrics.first_poll_count.fetch_add(1, SeqCst);
2775 }
2776 /* accounting for time-idled and time-scheduled */
2777 // 1. note (and reset) the instant this task was last awoke
2778 let woke_at = state.woke_at.swap(0, SeqCst);
2779 // The state of a future is *idling* in the interim between the instant
2780 // it completes a `poll`, and the instant it is next awoken.
2781 if *idled_at < woke_at {
2782 // increment the counter of how many idles occurred
2783 metrics.total_idled_count.fetch_add(1, SeqCst);
2784
2785 // compute the duration of the idle
2786 let idle_ns = woke_at.saturating_sub(*idled_at);
2787
2788 // update the max time tasks spent idling, both locally and
2789 // globally.
2790 metrics
2791 .local_max_idle_duration_ns
2792 .fetch_max(idle_ns, SeqCst);
2793 metrics
2794 .global_max_idle_duration_ns
2795 .fetch_max(idle_ns, SeqCst);
2796 // adjust the total elapsed time monitored tasks spent idling
2797 metrics.total_idle_duration_ns.fetch_add(idle_ns, SeqCst);
2798 }
2799 // if this task spent any time in the scheduled state after instrumentation,
2800 // and after first poll, `woke_at` will be greater than 0.
2801 if woke_at > 0 {
2802 // increment the counter of how many schedules occurred
2803 metrics.total_scheduled_count.fetch_add(1, SeqCst);
2804
2805 // recall that the `woke_at` field is internally represented as
2806 // nanoseconds-since-instrumentation. here, for accounting purposes,
2807 // we need to instead represent it as a proper `Instant`.
2808 let woke_instant = instrumented_at
2809 .checked_add(Duration::from_nanos(woke_at))
2810 .unwrap_or(poll_start);
2811
2812 // the duration this task spent scheduled is time time elapsed between
2813 // when this task was awoke, and when it was polled.
2814 let scheduled_ns = poll_start
2815 .saturating_duration_since(woke_instant)
2816 .as_nanos()
2817 .try_into()
2818 .unwrap_or(u64::MAX);
2819
2820 let scheduled = Duration::from_nanos(scheduled_ns);
2821
2822 let (count_bucket, duration_bucket) = // was the scheduling delay long or short?
2823 if scheduled >= metrics.long_delay_threshold {
2824 (&metrics.total_long_delay_count, &metrics.total_long_delay_duration_ns)
2825 } else {
2826 (&metrics.total_short_delay_count, &metrics.total_short_delay_duration_ns)
2827 };
2828 // update the appropriate bucket
2829 count_bucket.fetch_add(1, SeqCst);
2830 duration_bucket.fetch_add(scheduled_ns, SeqCst);
2831
2832 // add `scheduled_ns` to the Monitor's total
2833 metrics
2834 .total_scheduled_duration_ns
2835 .fetch_add(scheduled_ns, SeqCst);
2836 }
2837 // Register the waker
2838 state.waker.register(cx.waker());
2839 // Get the instrumented waker
2840 let waker_ref = futures_util::task::waker_ref(state);
2841 let mut cx = Context::from_waker(&waker_ref);
2842 // Poll the task
2843 let inner_poll_start = Instant::now();
2844 let ret = poll_fn(this.task, &mut cx);
2845 let inner_poll_end = Instant::now();
2846 /* idle time starts now */
2847 *idled_at = inner_poll_end
2848 .saturating_duration_since(instrumented_at)
2849 .as_nanos()
2850 .try_into()
2851 .unwrap_or(u64::MAX);
2852 /* accounting for poll time */
2853 let inner_poll_duration = inner_poll_end.saturating_duration_since(inner_poll_start);
2854 let inner_poll_ns: u64 = inner_poll_duration
2855 .as_nanos()
2856 .try_into()
2857 .unwrap_or(u64::MAX);
2858 let (count_bucket, duration_bucket) = // was this a slow or fast poll?
2859 if inner_poll_duration >= metrics.slow_poll_threshold {
2860 (&metrics.total_slow_poll_count, &metrics.total_slow_poll_duration)
2861 } else {
2862 (&metrics.total_fast_poll_count, &metrics.total_fast_poll_duration_ns)
2863 };
2864 // update the appropriate bucket
2865 count_bucket.fetch_add(1, SeqCst);
2866 duration_bucket.fetch_add(inner_poll_ns, SeqCst);
2867 ret
2868}
2869
2870impl<M> State<M> {
2871 fn on_wake(&self) {
2872 let woke_at: u64 = match self.instrumented_at.elapsed().as_nanos().try_into() {
2873 Ok(woke_at) => woke_at,
2874 // This is highly unlikely as it would mean the task ran for over
2875 // 500 years. If you ran your service for 500 years. If you are
2876 // reading this 500 years in the future, I'm sorry.
2877 Err(_) => return,
2878 };
2879
2880 // We don't actually care about the result
2881 let _ = self.woke_at.compare_exchange(0, woke_at, SeqCst, SeqCst);
2882 }
2883}
2884
2885impl<M: Send + Sync> ArcWake for State<M> {
2886 fn wake_by_ref(arc_self: &Arc<State<M>>) {
2887 arc_self.on_wake();
2888 arc_self.waker.wake();
2889 }
2890
2891 fn wake(self: Arc<State<M>>) {
2892 self.on_wake();
2893 self.waker.wake();
2894 }
2895}
2896
2897/// Iterator returned by [`TaskMonitor::intervals`].
2898///
2899/// See that method's documentation for more details.
2900#[derive(Debug)]
2901pub struct TaskIntervals<M: AsRef<TaskMonitorCore> + Send + Sync + 'static = TaskMonitor> {
2902 monitor: M,
2903 previous: Option<TaskMetrics>,
2904}
2905
2906impl<M: AsRef<TaskMonitorCore> + Send + Sync + 'static> TaskIntervals<M> {
2907 fn probe(&mut self) -> TaskMetrics {
2908 let latest = self.monitor.as_ref().metrics.metrics();
2909 let local_max_idle_duration = self
2910 .monitor
2911 .as_ref()
2912 .metrics
2913 .get_and_reset_local_max_idle_duration();
2914
2915 let next = if let Some(previous) = self.previous {
2916 TaskMetrics {
2917 instrumented_count: latest
2918 .instrumented_count
2919 .wrapping_sub(previous.instrumented_count),
2920 dropped_count: latest.dropped_count.wrapping_sub(previous.dropped_count),
2921 total_poll_count: latest
2922 .total_poll_count
2923 .wrapping_sub(previous.total_poll_count),
2924 total_poll_duration: sub(latest.total_poll_duration, previous.total_poll_duration),
2925 first_poll_count: latest
2926 .first_poll_count
2927 .wrapping_sub(previous.first_poll_count),
2928 total_idled_count: latest
2929 .total_idled_count
2930 .wrapping_sub(previous.total_idled_count),
2931 total_scheduled_count: latest
2932 .total_scheduled_count
2933 .wrapping_sub(previous.total_scheduled_count),
2934 total_fast_poll_count: latest
2935 .total_fast_poll_count
2936 .wrapping_sub(previous.total_fast_poll_count),
2937 total_short_delay_count: latest
2938 .total_short_delay_count
2939 .wrapping_sub(previous.total_short_delay_count),
2940 total_slow_poll_count: latest
2941 .total_slow_poll_count
2942 .wrapping_sub(previous.total_slow_poll_count),
2943 total_long_delay_count: latest
2944 .total_long_delay_count
2945 .wrapping_sub(previous.total_long_delay_count),
2946 total_first_poll_delay: sub(
2947 latest.total_first_poll_delay,
2948 previous.total_first_poll_delay,
2949 ),
2950 max_idle_duration: local_max_idle_duration,
2951 total_idle_duration: sub(latest.total_idle_duration, previous.total_idle_duration),
2952 total_scheduled_duration: sub(
2953 latest.total_scheduled_duration,
2954 previous.total_scheduled_duration,
2955 ),
2956 total_fast_poll_duration: sub(
2957 latest.total_fast_poll_duration,
2958 previous.total_fast_poll_duration,
2959 ),
2960 total_short_delay_duration: sub(
2961 latest.total_short_delay_duration,
2962 previous.total_short_delay_duration,
2963 ),
2964 total_slow_poll_duration: sub(
2965 latest.total_slow_poll_duration,
2966 previous.total_slow_poll_duration,
2967 ),
2968 total_long_delay_duration: sub(
2969 latest.total_long_delay_duration,
2970 previous.total_long_delay_duration,
2971 ),
2972 }
2973 } else {
2974 latest
2975 };
2976
2977 self.previous = Some(latest);
2978
2979 next
2980 }
2981}
2982
2983impl<M: AsRef<TaskMonitorCore> + Send + Sync + 'static> Iterator for TaskIntervals<M> {
2984 type Item = TaskMetrics;
2985
2986 fn next(&mut self) -> Option<Self::Item> {
2987 Some(self.probe())
2988 }
2989}
2990
2991#[inline(always)]
2992fn to_nanos(d: Duration) -> u64 {
2993 debug_assert!(d <= Duration::from_nanos(u64::MAX));
2994 d.as_secs()
2995 .wrapping_mul(1_000_000_000)
2996 .wrapping_add(d.subsec_nanos() as u64)
2997}
2998
2999#[inline(always)]
3000fn sub(a: Duration, b: Duration) -> Duration {
3001 let nanos = to_nanos(a).wrapping_sub(to_nanos(b));
3002 Duration::from_nanos(nanos)
3003}
3004
3005#[inline(always)]
3006fn mean(d: Duration, count: u64) -> Duration {
3007 if let Some(quotient) = to_nanos(d).checked_div(count) {
3008 Duration::from_nanos(quotient)
3009 } else {
3010 Duration::ZERO
3011 }
3012}
3013
3014#[cfg(test)]
3015mod inference_tests {
3016 use super::*;
3017 use std::future::Future;
3018 use std::pin::Pin;
3019
3020 // Type alias — M defaults to TaskMonitor
3021 type _BoxedInstrumented = Instrumented<Pin<Box<dyn Future<Output = ()>>>>;
3022
3023 // Struct field — M defaults to TaskMonitor
3024 struct _Wrapper {
3025 _fut: Instrumented<Pin<Box<dyn Future<Output = ()>>>>,
3026 }
3027
3028 // Partial type annotation — M defaults to TaskMonitor
3029 async fn _partial_annotation(monitor: &TaskMonitor) {
3030 let fut: Instrumented<_> = monitor.instrument(async { 42 });
3031 fut.await;
3032 }
3033
3034 // Common path — fully inferred from instrument()'s return type
3035 async fn _common_usage(monitor: &TaskMonitor) {
3036 monitor.instrument(async { 42 }).await;
3037 }
3038
3039 // Storing without annotation — both T and M inferred
3040 async fn _store_without_annotation(monitor: &TaskMonitor) {
3041 let fut = monitor.instrument(async { 42 });
3042 fut.await;
3043 }
3044
3045 // Function boundary — M defaults to TaskMonitor in the signature
3046 async fn _function_boundary(fut: Instrumented<impl Future<Output = i32>>) -> i32 {
3047 fut.await
3048 }
3049
3050 // Return position — M defaults to TaskMonitor
3051 fn _return_position(monitor: &TaskMonitor) -> Instrumented<impl Future<Output = i32> + '_> {
3052 monitor.instrument(async { 42 })
3053 }
3054
3055 // intervals() inference
3056 fn _intervals_inference(monitor: &TaskMonitor) {
3057 let mut intervals = monitor.intervals();
3058 let _: Option<TaskMetrics> = intervals.next();
3059 }
3060
3061 #[tokio::test]
3062 async fn inference_compiles() {
3063 let monitor = TaskMonitor::new();
3064 _partial_annotation(&monitor).await;
3065 _common_usage(&monitor).await;
3066 _store_without_annotation(&monitor).await;
3067 _function_boundary(monitor.instrument(async { 42 })).await;
3068 _return_position(&monitor).await;
3069 _intervals_inference(&monitor);
3070 }
3071}