async_profiler_agent/pollcatch/mod.rs
1//! To emit `tokio.PollCatchV1` events, you can set up the task hooks when setting up your Tokio runtime:
2//!
3//! Then, you can use the `decoder` (look at the crate README) to find long polls in your program.
4//!
5//! Use it around your `main` like this:
6//! ```
7//! # async fn your_main() {}
8//!
9//! let mut rt: tokio::runtime::Builder = tokio::runtime::Builder::new_multi_thread();
10//! rt.enable_all();
11//!
12//! #[cfg(tokio_unstable)]
13//! {
14//! rt.on_before_task_poll(|_| async_profiler_agent::pollcatch::before_poll_hook())
15//! .on_after_task_poll(|_| async_profiler_agent::pollcatch::after_poll_hook());
16//! }
17//! let rt = rt.build().unwrap();
18//! rt.block_on(your_main())
19//! ```
20//!
21//! Except on a poll that is involved in a profiling sample, the poll hook overhead
22//! is limited to a few thread-local accesses and should be very very fast.
23//!
24//! When a profiling sample is taken (normally around 1 / second), it adds slightly
25//! more overhead to report the sample, but that is a matter of microseconds
26//! and therefore should not worsen tail latency problems.
27
28use std::{
29 cell::Cell,
30 sync::{
31 LazyLock,
32 atomic::{self, AtomicBool},
33 },
34};
35
36mod tsc;
37
38use crate::asprof::{self, AsProf};
39
40static POLLCATCH_JFR_KEY: LazyLock<Option<asprof::UserJfrKey>> = LazyLock::new(|| {
41 // Pollcatch V1 event contains:
42 // 8 byte little-endian "before" tsc timestamp
43 // 8 byte little-endian "after" tsc timestamp
44 AsProf::create_user_jfr_key(c"tokio.PollcatchV1")
45 .map_err(|e| {
46 tracing::warn!(message="error creating jfr key", error=?e);
47 })
48 .ok()
49});
50
51static EMITTED_JFR_ERROR: AtomicBool = AtomicBool::new(false);
52
53#[cold]
54#[inline(never)]
55fn write_timestamp(before: u64) {
56 if let Some(key) = *POLLCATCH_JFR_KEY {
57 let end = tsc::now();
58 let mut buf = [0u8; 16];
59
60 buf[0..8].copy_from_slice(&before.to_le_bytes()[..]);
61 buf[8..16].copy_from_slice(&end.to_le_bytes()[..]);
62 if let Err(e) = AsProf::emit_user_jfr(key, &buf)
63 && !EMITTED_JFR_ERROR.swap(true, atomic::Ordering::Relaxed)
64 {
65 tracing::warn!(message="error emitting jfr", error=?e);
66 }
67 }
68}
69
70thread_local! {
71 static BEFORE_POLL_TIMESTAMP: Cell<u64> = const { Cell::new(0) };
72 static BEFORE_POLL_SAMPLE_COUNTER: Cell<u64> = const { Cell::new(0) };
73}
74
75/// Call this in the Tokio before task hook
76pub fn before_poll_hook() {
77 let before = tsc::now();
78 BEFORE_POLL_TIMESTAMP.set(before);
79 BEFORE_POLL_SAMPLE_COUNTER.set(AsProf::get_sample_counter().unwrap_or(0));
80}
81
82/// Call this in the Tokio after task hook
83pub fn after_poll_hook() {
84 let sample_counter = AsProf::get_sample_counter().unwrap_or(0);
85 if sample_counter != BEFORE_POLL_SAMPLE_COUNTER.get() {
86 write_timestamp(BEFORE_POLL_TIMESTAMP.get());
87 }
88}