tailtriage_core/
timers.rs1use std::time::Instant;
2
3use crate::collector::{duration_to_us, lock_map};
4use crate::{unix_time_ms, InFlightSnapshot, QueueEvent, StageEvent, Tailtriage};
5
6#[derive(Debug)]
8pub struct InflightGuard<'a> {
9 pub(crate) tailtriage: &'a Tailtriage,
10 pub(crate) gauge: String,
11 pub(crate) enabled: bool,
12}
13
14impl Drop for InflightGuard<'_> {
15 fn drop(&mut self) {
16 if !self.enabled {
17 return;
18 }
19
20 let count = {
21 let mut counts = lock_map(&self.tailtriage.inflight_counts);
22 let entry = counts.entry(self.gauge.clone()).or_insert(0);
23 if *entry > 0 {
24 *entry -= 1;
25 }
26 *entry
27 };
28
29 self.tailtriage.record_inflight_snapshot(InFlightSnapshot {
30 gauge: self.gauge.clone(),
31 at_unix_ms: unix_time_ms(),
32 count,
33 });
34 }
35}
36
37#[derive(Debug)]
39pub struct StageTimer<'a> {
40 pub(crate) tailtriage: &'a Tailtriage,
41 pub(crate) enabled: bool,
42 pub(crate) request_id: String,
43 pub(crate) stage: String,
44}
45
46impl StageTimer<'_> {
47 pub async fn await_on<Fut, T, E>(self, fut: Fut) -> Result<T, E>
60 where
61 Fut: std::future::Future<Output = Result<T, E>>,
62 {
63 if !self.enabled {
64 return fut.await;
65 }
66
67 let started_at_unix_ms = unix_time_ms();
68 let started = Instant::now();
69 let value = fut.await;
70 let finished_at_unix_ms = unix_time_ms();
71 let success = value.is_ok();
72
73 self.tailtriage.record_stage_event(StageEvent {
74 request_id: self.request_id,
75 stage: self.stage,
76 started_at_unix_ms,
77 finished_at_unix_ms,
78 latency_us: duration_to_us(started.elapsed()),
79 success,
80 });
81
82 value
83 }
84
85 pub async fn await_value<Fut, T>(self, fut: Fut) -> T
90 where
91 Fut: std::future::Future<Output = T>,
92 {
93 if !self.enabled {
94 return fut.await;
95 }
96
97 let started_at_unix_ms = unix_time_ms();
98 let started = Instant::now();
99 let value = fut.await;
100 let finished_at_unix_ms = unix_time_ms();
101
102 self.tailtriage.record_stage_event(StageEvent {
103 request_id: self.request_id,
104 stage: self.stage,
105 started_at_unix_ms,
106 finished_at_unix_ms,
107 latency_us: duration_to_us(started.elapsed()),
108 success: true,
109 });
110
111 value
112 }
113}
114
115#[derive(Debug)]
117pub struct QueueTimer<'a> {
118 pub(crate) tailtriage: &'a Tailtriage,
119 pub(crate) enabled: bool,
120 pub(crate) request_id: String,
121 pub(crate) queue: String,
122 pub(crate) depth_at_start: Option<u64>,
123}
124
125impl QueueTimer<'_> {
126 #[must_use]
128 pub fn with_depth_at_start(mut self, depth_at_start: u64) -> Self {
129 self.depth_at_start = Some(depth_at_start);
130 self
131 }
132
133 pub async fn await_on<Fut, T>(self, fut: Fut) -> T
139 where
140 Fut: std::future::Future<Output = T>,
141 {
142 if !self.enabled {
143 return fut.await;
144 }
145
146 let waited_from_unix_ms = unix_time_ms();
147 let started = Instant::now();
148 let value = fut.await;
149 let waited_until_unix_ms = unix_time_ms();
150
151 self.tailtriage.record_queue_event(QueueEvent {
152 request_id: self.request_id,
153 queue: self.queue,
154 waited_from_unix_ms,
155 waited_until_unix_ms,
156 wait_us: duration_to_us(started.elapsed()),
157 depth_at_start: self.depth_at_start,
158 });
159
160 value
161 }
162}