Skip to main content

tailtriage_core/
timers.rs

1use std::time::Instant;
2
3use crate::collector::{duration_to_us, lock_map};
4use crate::{unix_time_ms, InFlightSnapshot, QueueEvent, StageEvent, Tailtriage};
5
6/// RAII guard tracking one in-flight unit for a named gauge.
7#[derive(Debug)]
8pub struct InflightGuard<'a> {
9    pub(crate) tailtriage: &'a Tailtriage,
10    pub(crate) gauge: String,
11    pub(crate) enabled: bool,
12}
13
14impl Drop for InflightGuard<'_> {
15    fn drop(&mut self) {
16        if !self.enabled {
17            return;
18        }
19
20        let count = {
21            let mut counts = lock_map(&self.tailtriage.inflight_counts);
22            let entry = counts.entry(self.gauge.clone()).or_insert(0);
23            if *entry > 0 {
24                *entry -= 1;
25            }
26            *entry
27        };
28
29        self.tailtriage.record_inflight_snapshot(InFlightSnapshot {
30            gauge: self.gauge.clone(),
31            at_unix_ms: unix_time_ms(),
32            count,
33        });
34    }
35}
36
37/// Thin wrapper for recording stage latency around one await point.
38#[derive(Debug)]
39pub struct StageTimer<'a> {
40    pub(crate) tailtriage: &'a Tailtriage,
41    pub(crate) enabled: bool,
42    pub(crate) request_id: String,
43    pub(crate) stage: String,
44}
45
46impl StageTimer<'_> {
47    /// Awaits `fut`, records stage duration, and returns the original output.
48    ///
49    /// This helper is intended for fallible stage work where success can be
50    /// derived from `Result::is_ok`.
51    ///
52    /// Prefer this method when your stage naturally returns `Result<T, E>` and
53    /// you want success/failure evidence in the resulting triage report.
54    ///
55    /// # Errors
56    ///
57    /// Returns the same error value produced by `fut` after recording the
58    /// stage event with `success = false`.
59    pub async fn await_on<Fut, T, E>(self, fut: Fut) -> Result<T, E>
60    where
61        Fut: std::future::Future<Output = Result<T, E>>,
62    {
63        if !self.enabled {
64            return fut.await;
65        }
66
67        let started_at_unix_ms = unix_time_ms();
68        let started = Instant::now();
69        let value = fut.await;
70        let finished_at_unix_ms = unix_time_ms();
71        let success = value.is_ok();
72
73        self.tailtriage.record_stage_event(StageEvent {
74            request_id: self.request_id,
75            stage: self.stage,
76            started_at_unix_ms,
77            finished_at_unix_ms,
78            latency_us: duration_to_us(started.elapsed()),
79            success,
80        });
81
82        value
83    }
84
85    /// Awaits an infallible stage future and records a successful stage event.
86    ///
87    /// Use this method when there is no meaningful stage-level error signal
88    /// (for example, internal CPU work or a prevalidated transformation).
89    pub async fn await_value<Fut, T>(self, fut: Fut) -> T
90    where
91        Fut: std::future::Future<Output = T>,
92    {
93        if !self.enabled {
94            return fut.await;
95        }
96
97        let started_at_unix_ms = unix_time_ms();
98        let started = Instant::now();
99        let value = fut.await;
100        let finished_at_unix_ms = unix_time_ms();
101
102        self.tailtriage.record_stage_event(StageEvent {
103            request_id: self.request_id,
104            stage: self.stage,
105            started_at_unix_ms,
106            finished_at_unix_ms,
107            latency_us: duration_to_us(started.elapsed()),
108            success: true,
109        });
110
111        value
112    }
113}
114
115/// Thin wrapper for recording queue-wait latency around one await point.
116#[derive(Debug)]
117pub struct QueueTimer<'a> {
118    pub(crate) tailtriage: &'a Tailtriage,
119    pub(crate) enabled: bool,
120    pub(crate) request_id: String,
121    pub(crate) queue: String,
122    pub(crate) depth_at_start: Option<u64>,
123}
124
125impl QueueTimer<'_> {
126    /// Sets the queue depth sample captured at wait start.
127    #[must_use]
128    pub fn with_depth_at_start(mut self, depth_at_start: u64) -> Self {
129        self.depth_at_start = Some(depth_at_start);
130        self
131    }
132
133    /// Awaits `fut`, records queue wait duration, and returns the original output.
134    ///
135    /// Queue events are interpreted as application-level wait evidence (a lead,
136    /// not proof). Record these around bounded resources to help separate
137    /// queueing pressure from slow downstream stage time.
138    pub async fn await_on<Fut, T>(self, fut: Fut) -> T
139    where
140        Fut: std::future::Future<Output = T>,
141    {
142        if !self.enabled {
143            return fut.await;
144        }
145
146        let waited_from_unix_ms = unix_time_ms();
147        let started = Instant::now();
148        let value = fut.await;
149        let waited_until_unix_ms = unix_time_ms();
150
151        self.tailtriage.record_queue_event(QueueEvent {
152            request_id: self.request_id,
153            queue: self.queue,
154            waited_from_unix_ms,
155            waited_until_unix_ms,
156            wait_us: duration_to_us(started.elapsed()),
157            depth_at_start: self.depth_at_start,
158        });
159
160        value
161    }
162}