future_profiler/lib.rs
1//! # Future Profiler
2//!
3//! The `future-profiler` crate provides a utility for profiling asynchronous Rust code.
4//!
5//! The `FutureProfiler` struct wraps a future and collects data before and after each
6//! invocation of the `poll()` function. The ability to track time spent executing and
7//! sleeping is built in. If the user desires additional data, they may implement the
8//! `Profiler` trait. Several implementations are provided by `future-profiler`, and the
9//! user may include one of these within their own `Profiler` implementation.
10//!
11//! ### `Profiler` Trait
12//! - `new`: Creates a new instance of the profiler.
13//! - `prepare`: Called before polling the future.
14//! - `update`: Called after polling the future.
15//! - `finish`: Emits the collected metrics; this function is called when the future is dropped.
16//! - `error`: Detects if the future was dropped before it completed and emits an error.
17//!
18//! #### Example
19//!
20//! ```rust, ignore
21//! use future_profiler::{FutureProfiler, DefaultProfiler};
22//! use std::time::Duration;
23//!
24//! #[tokio::main]
25//! async fn main() {
26//! let future = async {
27//! .await;
28//! 42
29//! };
30//!
31//! let profiler = FutureProfiler::<_, _, DefaultProfiler>::new("example_future", future);
32//! let result = profiler.await;
33//! println!("Future result: {}", result);
34//! }
35//! ```
36//! #### Custom Profiler Example
37//!
38//! ```rust, ignore
39//! use future_profiler::{FutureProfiler, Profiler, CpuProfiler};
40//! use std::time::Duration;
41//!
42//! // the user may compose one profiler out of many.
43//! struct CustomProfiler {
44//! cpu_profiler: CpuProfiler,
45//! }
46//!
47//! impl Profiler for CustomProfiler {
48//! fn new() -> Self {
49//! Self {
50//! cpu_profiler: CpuProfiler::new(),
51//! }
52//! }
53//!
54//! fn prepare(&mut self) {
55//! self.cpu_profiler.prepare();
56//! }
57//!
58//! fn update(&mut self) {
59//! self.cpu_profiler.update();
60//! }
61//!
62//! fn finish(&self, label: &str, wake_time: Duration, sleep_time: Duration) {
63//! log::debug!("{label}, wake_time: {}ms, sleep_time: {}ms, cpu_instructions: {}", wake_time.as_millis(), sleep_time.as_millis(), cpu_profiler.instructions());
64//! }
65//!
66//! fn error(&self, label: &str) {
67//! log::error!("future didn't finish: {label}");
68//! }
69//! }
70//!
71//! #[tokio::main]
72//! async fn main() {
73//! let future = async {
74//! tokio::time::sleep(Duration::from_millis(100)).await;
75//! (0..1000000).sum::<u64>()
76//! };
77//!
78//! let profiler = FutureProfiler::<_, _, CustomProfiler>::new("custom_profiler", future);
79//! let result = profiler.await;
80//! println!("Future result: {}", result);
81//! }
82//! ```
83//!
84//! ## License
85//!
86//! This crate is licensed under the MIT License.
87
88use std::future::Future;
89use std::pin::Pin;
90use std::task::{Context, Poll};
91use std::time::{Duration, Instant};
92
93mod profiler;
94pub use profiler::*;
95
96pub trait Profiler {
97 fn new() -> Self;
98 /// called before poll
99 fn prepare(&mut self);
100 /// called after poll
101 fn update(&mut self);
102 /// logs the metrics. it takes as arguments the metrics collected by the AsyncTracer too.
103 fn finish(&self, label: &str, wake_time: Duration, sleep_time: Duration);
104 /// called when the future is dropped early
105 fn error(&self, label: &str);
106}
107
108pub struct FutureProfiler<T, R, P>
109where
110 T: Future<Output = R> + Send,
111 P: Profiler,
112{
113 label: String,
114 // used to calculate sleep_time
115 start: Instant,
116 wake_time: Duration,
117 sleep_time: Option<Duration>,
118 user_profiler: P,
119 // the future of interest. has to be pinned
120 future: Pin<Box<T>>,
121}
122
123impl<T, R, P> FutureProfiler<T, R, P>
124where
125 T: Future<Output = R> + Send,
126 P: Profiler,
127{
128 pub fn new<S: Into<String>>(label: S, future: T) -> Self {
129 Self {
130 label: label.into(),
131 user_profiler: P::new(),
132 start: Instant::now(),
133 wake_time: Duration::ZERO,
134 sleep_time: None,
135 future: Box::pin(future),
136 }
137 }
138}
139
140impl<T, R, P> Drop for FutureProfiler<T, R, P>
141where
142 T: Future<Output = R> + Send,
143 P: Profiler,
144{
145 fn drop(&mut self) {
146 // if self.sleep_time is None then the future was not polled to completion.
147 if let Some(sleep_time) = self.sleep_time {
148 self.user_profiler
149 .finish(&self.label, self.wake_time, sleep_time);
150 } else {
151 self.user_profiler.error(&self.label);
152 }
153 }
154}
155
156impl<T, R, P> Future for FutureProfiler<T, R, P>
157where
158 T: Future<Output = R> + Send,
159 P: Profiler,
160{
161 type Output = R;
162
163 /// # Safety
164 /// The `this` variable must not have any data moved out of it.
165 /// It also must not be invalidated.
166 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
167 let poll_start = Instant::now();
168 let this = unsafe { self.get_unchecked_mut() };
169
170 this.user_profiler.prepare();
171 let r = this.future.as_mut().poll(cx);
172 let elapsed = poll_start.elapsed();
173 this.user_profiler.update();
174 this.wake_time += elapsed;
175
176 // update sleep_time when the future is completed. this could be done on drop but
177 // if the caller doesn't drop the future, then sleep_time could be misreported.
178 if !matches!(r, Poll::Pending) {
179 this.sleep_time
180 .replace(this.start.elapsed() - this.wake_time);
181 }
182
183 r
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190 use std::task::Poll;
191
192 #[tokio::test]
193 async fn sleep_then_block() {
194 let future = async {
195 tokio::time::sleep(Duration::from_millis(100)).await;
196 std::thread::sleep(Duration::from_millis(101));
197 42
198 };
199
200 let mut profiler = FutureProfiler::<_, _, DefaultProfiler>::new("waiter", future);
201 let waker = futures::task::noop_waker();
202 let mut cx = Context::from_waker(&waker);
203
204 match Pin::new(&mut profiler).poll(&mut cx) {
205 Poll::Pending => {}
206 Poll::Ready(_) => panic!("Future should not be ready yet"),
207 }
208
209 tokio::time::sleep(Duration::from_millis(150)).await;
210
211 match Pin::new(&mut profiler).poll(&mut cx) {
212 Poll::Ready(output) => assert_eq!(output, 42),
213 Poll::Pending => panic!("Future should be ready now"),
214 }
215
216 assert!(profiler.wake_time <= Duration::from_millis(103));
217 assert!(profiler.wake_time >= Duration::from_millis(101));
218 }
219
220 #[tokio::test]
221 async fn block_then_sleep() {
222 let future = async {
223 std::thread::sleep(Duration::from_millis(101));
224 tokio::time::sleep(Duration::from_millis(100)).await;
225 std::thread::sleep(Duration::from_millis(10));
226 tokio::time::sleep(Duration::from_millis(20)).await;
227 std::thread::sleep(Duration::from_millis(30));
228 42
229 };
230
231 let mut profiler = FutureProfiler::<_, _, DefaultProfiler>::new("waiter", future);
232 let waker = futures::task::noop_waker();
233 let mut cx = Context::from_waker(&waker);
234
235 match Pin::new(&mut profiler).poll(&mut cx) {
236 Poll::Pending => {}
237 Poll::Ready(_) => panic!("Future should not be ready yet"),
238 }
239
240 tokio::time::sleep(Duration::from_millis(100)).await;
241
242 loop {
243 match Pin::new(&mut profiler).poll(&mut cx) {
244 Poll::Ready(_output) => break,
245 Poll::Pending => {
246 tokio::time::sleep(Duration::from_millis(5)).await;
247 }
248 }
249 }
250
251 assert!(profiler.wake_time <= Duration::from_millis(142));
252 assert!(profiler.wake_time >= Duration::from_millis(141));
253 }
254}