Skip to main content

tailtriage_core/
timers.rs

1use std::time::Instant;
2
3use crate::collector::{duration_to_us, lock_map};
4use crate::{unix_time_ms, InFlightSnapshot, QueueEvent, StageEvent, Tailtriage};
5
6/// RAII guard tracking one in-flight unit for a named gauge.
7#[derive(Debug)]
8pub struct InflightGuard<'a> {
9    pub(crate) tailtriage: &'a Tailtriage,
10    pub(crate) gauge: String,
11}
12
13impl Drop for InflightGuard<'_> {
14    fn drop(&mut self) {
15        let count = {
16            let mut counts = lock_map(&self.tailtriage.inflight_counts);
17            let entry = counts.entry(self.gauge.clone()).or_insert(0);
18            if *entry > 0 {
19                *entry -= 1;
20            }
21            *entry
22        };
23
24        self.tailtriage.record_inflight_snapshot(InFlightSnapshot {
25            gauge: self.gauge.clone(),
26            at_unix_ms: unix_time_ms(),
27            count,
28        });
29    }
30}
31
32/// Thin wrapper for recording stage latency around one await point.
33#[derive(Debug)]
34pub struct StageTimer<'a> {
35    pub(crate) tailtriage: &'a Tailtriage,
36    pub(crate) request_id: String,
37    pub(crate) stage: String,
38}
39
40impl StageTimer<'_> {
41    /// Awaits `fut`, records stage duration, and returns the original output.
42    ///
43    /// This helper is intended for fallible stage work where success can be
44    /// derived from `Result::is_ok`.
45    ///
46    /// Prefer this method when your stage naturally returns `Result<T, E>` and
47    /// you want success/failure evidence in the resulting triage report.
48    ///
49    /// # Errors
50    ///
51    /// Returns the same error value produced by `fut` after recording the
52    /// stage event with `success = false`.
53    pub async fn await_on<Fut, T, E>(self, fut: Fut) -> Result<T, E>
54    where
55        Fut: std::future::Future<Output = Result<T, E>>,
56    {
57        let started_at_unix_ms = unix_time_ms();
58        let started = Instant::now();
59        let value = fut.await;
60        let finished_at_unix_ms = unix_time_ms();
61        let success = value.is_ok();
62
63        self.tailtriage.record_stage_event(StageEvent {
64            request_id: self.request_id,
65            stage: self.stage,
66            started_at_unix_ms,
67            finished_at_unix_ms,
68            latency_us: duration_to_us(started.elapsed()),
69            success,
70        });
71
72        value
73    }
74
75    /// Awaits an infallible stage future and records a successful stage event.
76    ///
77    /// Use this method when there is no meaningful stage-level error signal
78    /// (for example, internal CPU work or a prevalidated transformation).
79    pub async fn await_value<Fut, T>(self, fut: Fut) -> T
80    where
81        Fut: std::future::Future<Output = T>,
82    {
83        let started_at_unix_ms = unix_time_ms();
84        let started = Instant::now();
85        let value = fut.await;
86        let finished_at_unix_ms = unix_time_ms();
87
88        self.tailtriage.record_stage_event(StageEvent {
89            request_id: self.request_id,
90            stage: self.stage,
91            started_at_unix_ms,
92            finished_at_unix_ms,
93            latency_us: duration_to_us(started.elapsed()),
94            success: true,
95        });
96
97        value
98    }
99}
100
101/// Thin wrapper for recording queue-wait latency around one await point.
102#[derive(Debug)]
103pub struct QueueTimer<'a> {
104    pub(crate) tailtriage: &'a Tailtriage,
105    pub(crate) request_id: String,
106    pub(crate) queue: String,
107    pub(crate) depth_at_start: Option<u64>,
108}
109
110impl QueueTimer<'_> {
111    /// Sets the queue depth sample captured at wait start.
112    #[must_use]
113    pub fn with_depth_at_start(mut self, depth_at_start: u64) -> Self {
114        self.depth_at_start = Some(depth_at_start);
115        self
116    }
117
118    /// Awaits `fut`, records queue wait duration, and returns the original output.
119    ///
120    /// Queue events are interpreted as application-level wait evidence (a lead,
121    /// not proof). Record these around bounded resources to help separate
122    /// queueing pressure from slow downstream stage time.
123    pub async fn await_on<Fut, T>(self, fut: Fut) -> T
124    where
125        Fut: std::future::Future<Output = T>,
126    {
127        let waited_from_unix_ms = unix_time_ms();
128        let started = Instant::now();
129        let value = fut.await;
130        let waited_until_unix_ms = unix_time_ms();
131
132        self.tailtriage.record_queue_event(QueueEvent {
133            request_id: self.request_id,
134            queue: self.queue,
135            waited_from_unix_ms,
136            waited_until_unix_ms,
137            wait_us: duration_to_us(started.elapsed()),
138            depth_at_start: self.depth_at_start,
139        });
140
141        value
142    }
143}