metrics_lib/
async_support.rs1use crate::Timer;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Instant;
10
11pub struct AsyncTimerGuard<'a> {
13 timer: &'a Timer,
14 start: Instant,
15 recorded: bool,
16}
17
18impl<'a> AsyncTimerGuard<'a> {
19 #[inline]
21 pub fn new(timer: &'a Timer) -> Self {
22 Self {
23 timer,
24 start: Instant::now(),
25 recorded: false,
26 }
27 }
28
29 #[inline]
31 pub fn elapsed(&self) -> std::time::Duration {
32 self.start.elapsed()
33 }
34
35 #[inline]
37 pub fn stop(mut self) {
38 if !self.recorded {
39 self.timer.record(self.start.elapsed());
40 self.recorded = true;
41 }
42 }
43}
44
45impl<'a> Drop for AsyncTimerGuard<'a> {
46 #[inline]
47 fn drop(&mut self) {
48 if !self.recorded {
49 self.timer.record(self.start.elapsed());
50 }
51 }
52}
53
54pub trait AsyncTimerExt {
56 fn start_async(&self) -> AsyncTimerGuard<'_>;
58
59 fn time_async<F, Fut, T>(&self, f: F) -> TimedFuture<'_, Fut>
61 where
62 F: FnOnce() -> Fut,
63 Fut: Future<Output = T>;
64}
65
66impl AsyncTimerExt for Timer {
67 #[inline]
68 fn start_async(&self) -> AsyncTimerGuard<'_> {
69 AsyncTimerGuard::new(self)
70 }
71
72 #[inline]
73 fn time_async<F, Fut, T>(&self, f: F) -> TimedFuture<'_, Fut>
74 where
75 F: FnOnce() -> Fut,
76 Fut: Future<Output = T>,
77 {
78 TimedFuture {
79 timer: self,
80 future: f(),
81 start: Some(Instant::now()),
82 }
83 }
84}
85
86pub struct TimedFuture<'a, F> {
88 timer: &'a Timer,
89 future: F,
90 start: Option<Instant>,
91}
92
93impl<'a, F, T> Future for TimedFuture<'a, F>
94where
95 F: Future<Output = T>,
96{
97 type Output = T;
98
99 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
100 let this = unsafe { self.get_unchecked_mut() };
101 let future = unsafe { Pin::new_unchecked(&mut this.future) };
102
103 match future.poll(cx) {
104 Poll::Ready(result) => {
105 if let Some(start) = this.start.take() {
106 this.timer.record(start.elapsed());
107 }
108 Poll::Ready(result)
109 }
110 Poll::Pending => Poll::Pending,
111 }
112 }
113}
114
115pub struct AsyncMetricBatch {
117 updates: Vec<MetricUpdate>,
118}
119
120enum MetricUpdate {
121 CounterInc { name: &'static str, value: u64 },
122 GaugeSet { name: &'static str, value: f64 },
123 TimerRecord { name: &'static str, nanos: u64 },
124 RateTick { name: &'static str },
125}
126
127impl AsyncMetricBatch {
128 pub fn new() -> Self {
130 Self {
131 updates: Vec::with_capacity(64),
132 }
133 }
134}
135
136impl Default for AsyncMetricBatch {
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142impl AsyncMetricBatch {
143 #[inline]
145 pub fn counter_inc(&mut self, name: &'static str, value: u64) {
146 self.updates.push(MetricUpdate::CounterInc { name, value });
147 }
148
149 #[inline]
151 pub fn gauge_set(&mut self, name: &'static str, value: f64) {
152 self.updates.push(MetricUpdate::GaugeSet { name, value });
153 }
154
155 #[inline]
157 pub fn timer_record(&mut self, name: &'static str, nanos: u64) {
158 self.updates.push(MetricUpdate::TimerRecord { name, nanos });
159 }
160
161 #[inline]
163 pub fn rate_tick(&mut self, name: &'static str) {
164 self.updates.push(MetricUpdate::RateTick { name });
165 }
166
167 pub fn flush(self, metrics: &crate::MetricsCore) {
169 for update in self.updates {
170 match update {
171 MetricUpdate::CounterInc { name, value } => {
172 metrics.counter(name).add(value);
173 }
174 MetricUpdate::GaugeSet { name, value } => {
175 metrics.gauge(name).set(value);
176 }
177 MetricUpdate::TimerRecord { name, nanos } => {
178 metrics.timer(name).record_ns(nanos);
179 }
180 MetricUpdate::RateTick { name } => {
181 metrics.rate(name).tick();
182 }
183 }
184 }
185 }
186
187 #[inline]
189 pub fn is_empty(&self) -> bool {
190 self.updates.is_empty()
191 }
192
193 #[inline]
195 pub fn len(&self) -> usize {
196 self.updates.len()
197 }
198}
199
200#[cfg(feature = "async")]
202#[allow(dead_code)]
203pub struct AsyncMetricsBatcher {
204 batch: tokio::sync::Mutex<AsyncMetricBatch>,
205 flush_interval: std::time::Duration,
206 max_batch_size: usize,
207}
208
209#[cfg(feature = "async")]
210impl AsyncMetricsBatcher {
211 #[allow(dead_code)]
213 pub fn new(flush_interval: std::time::Duration, max_batch_size: usize) -> Self {
214 Self {
215 batch: tokio::sync::Mutex::new(AsyncMetricBatch::new()),
216 flush_interval,
217 max_batch_size,
218 }
219 }
220
221 #[allow(dead_code)]
223 pub async fn record(&self, update: impl FnOnce(&mut AsyncMetricBatch)) {
224 let mut batch = self.batch.lock().await;
225 update(&mut batch);
226
227 if batch.len() >= self.max_batch_size {
228 let batch = std::mem::take(&mut *batch);
229
230 tokio::spawn(async move {
232 batch.flush(crate::metrics());
233 });
234 }
235 }
236
237 #[allow(dead_code)]
239 pub fn start_flusher(self: std::sync::Arc<Self>) {
240 tokio::spawn(async move {
241 let mut interval = tokio::time::interval(self.flush_interval);
242
243 loop {
244 interval.tick().await;
245
246 let batch = {
247 let mut guard = self.batch.lock().await;
248 if guard.is_empty() {
249 continue;
250 }
251 std::mem::take(&mut *guard)
252 };
253
254 batch.flush(crate::metrics());
255 }
256 });
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[test]
265 fn test_async_timer_guard() {
266 let timer = Timer::new();
267
268 {
269 let _guard = timer.start_async();
270 std::thread::sleep(std::time::Duration::from_millis(10));
271 }
272
273 assert_eq!(timer.count(), 1);
274 assert!(timer.average() >= std::time::Duration::from_millis(9));
275 }
276
277 #[test]
278 fn test_metric_batch() {
279 let mut batch = AsyncMetricBatch::new();
280
281 batch.counter_inc("test", 5);
282 batch.gauge_set("test", 42.5);
283 batch.timer_record("test", 1000);
284 batch.rate_tick("test");
285
286 assert_eq!(batch.len(), 4);
287 assert!(!batch.is_empty());
288
289 let metrics = crate::MetricsCore::new();
290 batch.flush(&metrics);
291
292 assert_eq!(metrics.counter("test").get(), 5);
293 assert_eq!(metrics.gauge("test").get(), 42.5);
294 assert_eq!(metrics.timer("test").count(), 1);
295 metrics.rate("test").tick_n(1); }
297
298 #[test]
299 fn test_async_timer_guard_elapsed_and_stop() {
300 let timer = Timer::new();
301
302 let guard = timer.start_async();
303 let _elapsed = guard.elapsed();
305 guard.stop();
307
308 assert_eq!(timer.count(), 1);
309 }
310
311 #[test]
313 fn test_timed_future_manual_poll_ready() {
314 let timer = Timer::new();
315
316 let mut timed = timer.time_async(|| async { 7 });
318
319 fn dummy_raw_waker() -> std::task::RawWaker {
321 fn clone(_: *const ()) -> std::task::RawWaker {
322 dummy_raw_waker()
323 }
324 fn wake(_: *const ()) {}
325 fn wake_by_ref(_: *const ()) {}
326 fn drop(_: *const ()) {}
327 const VTABLE: std::task::RawWakerVTable =
328 std::task::RawWakerVTable::new(clone, wake, wake_by_ref, drop);
329 std::task::RawWaker::new(std::ptr::null(), &VTABLE)
330 }
331
332 let waker = unsafe { std::task::Waker::from_raw(dummy_raw_waker()) };
333 let mut cx = std::task::Context::from_waker(&waker);
334
335 let mut pinned = unsafe { std::pin::Pin::new_unchecked(&mut timed) };
336 match std::future::Future::poll(pinned.as_mut(), &mut cx) {
337 std::task::Poll::Ready(v) => assert_eq!(v, 7),
338 std::task::Poll::Pending => panic!("future should be immediately ready"),
339 }
340
341 assert_eq!(timer.count(), 1);
343 }
344
345 #[cfg(feature = "async")]
346 #[tokio::test]
347 async fn test_timed_future() {
348 let timer = Timer::new();
349
350 let result = timer
351 .time_async(|| async {
352 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
353 42
354 })
355 .await;
356
357 assert_eq!(result, 42);
358 assert_eq!(timer.count(), 1);
359 assert!(timer.average() >= std::time::Duration::from_millis(9));
360 }
361}