metrics_lib/
async_support.rs

1//! Async support for metrics
2//!
3//! Provides async-aware metric recording with zero-cost abstractions
4
5use crate::Timer;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Instant;
10
11/// Async timer guard that records on drop
12pub struct AsyncTimerGuard<'a> {
13    timer: &'a Timer,
14    start: Instant,
15    recorded: bool,
16}
17
18impl<'a> AsyncTimerGuard<'a> {
19    /// Creates a new `AsyncTimerGuard` for the given timer, starting timing immediately.
20    #[inline]
21    pub fn new(timer: &'a Timer) -> Self {
22        Self {
23            timer,
24            start: Instant::now(),
25            recorded: false,
26        }
27    }
28
29    /// Returns the elapsed time since the guard was created.
30    #[inline]
31    pub fn elapsed(&self) -> std::time::Duration {
32        self.start.elapsed()
33    }
34
35    /// Stops the timer and records the elapsed time if not already recorded.
36    #[inline]
37    pub fn stop(mut self) {
38        if !self.recorded {
39            self.timer.record(self.start.elapsed());
40            self.recorded = true;
41        }
42    }
43}
44
45impl<'a> Drop for AsyncTimerGuard<'a> {
46    #[inline]
47    fn drop(&mut self) {
48        if !self.recorded {
49            self.timer.record(self.start.elapsed());
50        }
51    }
52}
53
54/// Extension trait for async timer operations
55pub trait AsyncTimerExt {
56    /// Start an async-aware timer
57    fn start_async(&self) -> AsyncTimerGuard<'_>;
58
59    /// Time an async function
60    fn time_async<F, Fut, T>(&self, f: F) -> TimedFuture<'_, Fut>
61    where
62        F: FnOnce() -> Fut,
63        Fut: Future<Output = T>;
64}
65
66impl AsyncTimerExt for Timer {
67    #[inline]
68    fn start_async(&self) -> AsyncTimerGuard<'_> {
69        AsyncTimerGuard::new(self)
70    }
71
72    #[inline]
73    fn time_async<F, Fut, T>(&self, f: F) -> TimedFuture<'_, Fut>
74    where
75        F: FnOnce() -> Fut,
76        Fut: Future<Output = T>,
77    {
78        TimedFuture {
79            timer: self,
80            future: f(),
81            start: Some(Instant::now()),
82        }
83    }
84}
85
86/// Future wrapper that times execution
87pub struct TimedFuture<'a, F> {
88    timer: &'a Timer,
89    future: F,
90    start: Option<Instant>,
91}
92
93impl<'a, F, T> Future for TimedFuture<'a, F>
94where
95    F: Future<Output = T>,
96{
97    type Output = T;
98
99    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
100        let this = unsafe { self.get_unchecked_mut() };
101        let future = unsafe { Pin::new_unchecked(&mut this.future) };
102
103        match future.poll(cx) {
104            Poll::Ready(result) => {
105                if let Some(start) = this.start.take() {
106                    this.timer.record(start.elapsed());
107                }
108                Poll::Ready(result)
109            }
110            Poll::Pending => Poll::Pending,
111        }
112    }
113}
114
115/// Batched metric updates for async contexts
116pub struct AsyncMetricBatch {
117    updates: Vec<MetricUpdate>,
118}
119
120enum MetricUpdate {
121    CounterInc { name: &'static str, value: u64 },
122    GaugeSet { name: &'static str, value: f64 },
123    TimerRecord { name: &'static str, nanos: u64 },
124    RateTick { name: &'static str },
125}
126
127impl AsyncMetricBatch {
128    /// Create new batch
129    pub fn new() -> Self {
130        Self {
131            updates: Vec::with_capacity(64),
132        }
133    }
134}
135
136impl Default for AsyncMetricBatch {
137    fn default() -> Self {
138        Self::new()
139    }
140}
141
142impl AsyncMetricBatch {
143    /// Add counter increment
144    #[inline]
145    pub fn counter_inc(&mut self, name: &'static str, value: u64) {
146        self.updates.push(MetricUpdate::CounterInc { name, value });
147    }
148
149    /// Add gauge set
150    #[inline]
151    pub fn gauge_set(&mut self, name: &'static str, value: f64) {
152        self.updates.push(MetricUpdate::GaugeSet { name, value });
153    }
154
155    /// Add timer recording
156    #[inline]
157    pub fn timer_record(&mut self, name: &'static str, nanos: u64) {
158        self.updates.push(MetricUpdate::TimerRecord { name, nanos });
159    }
160
161    /// Add rate tick
162    #[inline]
163    pub fn rate_tick(&mut self, name: &'static str) {
164        self.updates.push(MetricUpdate::RateTick { name });
165    }
166
167    /// Flush all updates to metrics
168    pub fn flush(self, metrics: &crate::MetricsCore) {
169        for update in self.updates {
170            match update {
171                MetricUpdate::CounterInc { name, value } => {
172                    metrics.counter(name).add(value);
173                }
174                MetricUpdate::GaugeSet { name, value } => {
175                    metrics.gauge(name).set(value);
176                }
177                MetricUpdate::TimerRecord { name, nanos } => {
178                    metrics.timer(name).record_ns(nanos);
179                }
180                MetricUpdate::RateTick { name } => {
181                    metrics.rate(name).tick();
182                }
183            }
184        }
185    }
186
187    /// Check if batch is empty
188    #[inline]
189    pub fn is_empty(&self) -> bool {
190        self.updates.is_empty()
191    }
192
193    /// Get number of pending updates
194    #[inline]
195    pub fn len(&self) -> usize {
196        self.updates.len()
197    }
198}
199
200/// Async-aware metrics batcher with automatic flushing
201#[cfg(feature = "async")]
202#[allow(dead_code)]
203pub struct AsyncMetricsBatcher {
204    batch: tokio::sync::Mutex<AsyncMetricBatch>,
205    flush_interval: std::time::Duration,
206    max_batch_size: usize,
207}
208
209#[cfg(feature = "async")]
210impl AsyncMetricsBatcher {
211    /// Create new batcher
212    #[allow(dead_code)]
213    pub fn new(flush_interval: std::time::Duration, max_batch_size: usize) -> Self {
214        Self {
215            batch: tokio::sync::Mutex::new(AsyncMetricBatch::new()),
216            flush_interval,
217            max_batch_size,
218        }
219    }
220
221    /// Record metric update
222    #[allow(dead_code)]
223    pub async fn record(&self, update: impl FnOnce(&mut AsyncMetricBatch)) {
224        let mut batch = self.batch.lock().await;
225        update(&mut batch);
226
227        if batch.len() >= self.max_batch_size {
228            let batch = std::mem::take(&mut *batch);
229
230            // Flush in background
231            tokio::spawn(async move {
232                batch.flush(crate::metrics());
233            });
234        }
235    }
236
237    /// Start background flusher
238    #[allow(dead_code)]
239    pub fn start_flusher(self: std::sync::Arc<Self>) {
240        tokio::spawn(async move {
241            let mut interval = tokio::time::interval(self.flush_interval);
242
243            loop {
244                interval.tick().await;
245
246                let batch = {
247                    let mut guard = self.batch.lock().await;
248                    if guard.is_empty() {
249                        continue;
250                    }
251                    std::mem::take(&mut *guard)
252                };
253
254                batch.flush(crate::metrics());
255            }
256        });
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn test_async_timer_guard() {
266        let timer = Timer::new();
267
268        {
269            let _guard = timer.start_async();
270            std::thread::sleep(std::time::Duration::from_millis(10));
271        }
272
273        assert_eq!(timer.count(), 1);
274        assert!(timer.average() >= std::time::Duration::from_millis(9));
275    }
276
277    #[test]
278    fn test_metric_batch() {
279        let mut batch = AsyncMetricBatch::new();
280
281        batch.counter_inc("test", 5);
282        batch.gauge_set("test", 42.5);
283        batch.timer_record("test", 1000);
284        batch.rate_tick("test");
285
286        assert_eq!(batch.len(), 4);
287        assert!(!batch.is_empty());
288
289        let metrics = crate::MetricsCore::new();
290        batch.flush(&metrics);
291
292        assert_eq!(metrics.counter("test").get(), 5);
293        assert_eq!(metrics.gauge("test").get(), 42.5);
294        assert_eq!(metrics.timer("test").count(), 1);
295        metrics.rate("test").tick_n(1); // Simulate a tick
296    }
297
298    #[test]
299    fn test_async_timer_guard_elapsed_and_stop() {
300        let timer = Timer::new();
301
302        let guard = timer.start_async();
303        // Exercise elapsed path
304        let _elapsed = guard.elapsed();
305        // Exercise explicit stop path
306        guard.stop();
307
308        assert_eq!(timer.count(), 1);
309    }
310
311    // Manually poll a TimedFuture that is immediately ready to cover the Ready branch
312    #[test]
313    fn test_timed_future_manual_poll_ready() {
314        let timer = Timer::new();
315
316        // Create a future that is immediately ready without needing an async runtime
317        let mut timed = timer.time_async(|| async { 7 });
318
319        // Build a no-op waker/context to poll manually
320        fn dummy_raw_waker() -> std::task::RawWaker {
321            fn clone(_: *const ()) -> std::task::RawWaker {
322                dummy_raw_waker()
323            }
324            fn wake(_: *const ()) {}
325            fn wake_by_ref(_: *const ()) {}
326            fn drop(_: *const ()) {}
327            const VTABLE: std::task::RawWakerVTable =
328                std::task::RawWakerVTable::new(clone, wake, wake_by_ref, drop);
329            std::task::RawWaker::new(std::ptr::null(), &VTABLE)
330        }
331
332        let waker = unsafe { std::task::Waker::from_raw(dummy_raw_waker()) };
333        let mut cx = std::task::Context::from_waker(&waker);
334
335        let mut pinned = unsafe { std::pin::Pin::new_unchecked(&mut timed) };
336        match std::future::Future::poll(pinned.as_mut(), &mut cx) {
337            std::task::Poll::Ready(v) => assert_eq!(v, 7),
338            std::task::Poll::Pending => panic!("future should be immediately ready"),
339        }
340
341        // Ensure the timer recorded the elapsed time on Ready
342        assert_eq!(timer.count(), 1);
343    }
344
345    #[cfg(feature = "async")]
346    #[tokio::test]
347    async fn test_timed_future() {
348        let timer = Timer::new();
349
350        let result = timer
351            .time_async(|| async {
352                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
353                42
354            })
355            .await;
356
357        assert_eq!(result, 42);
358        assert_eq!(timer.count(), 1);
359        assert!(timer.average() >= std::time::Duration::from_millis(9));
360    }
361}