tailtriage_core/
timers.rs1use std::time::Instant;
2
3use crate::collector::{duration_to_us, lock_map};
4use crate::{unix_time_ms, InFlightSnapshot, QueueEvent, StageEvent, Tailtriage};
5
6#[derive(Debug)]
8pub struct InflightGuard<'a> {
9 pub(crate) tailtriage: &'a Tailtriage,
10 pub(crate) gauge: String,
11}
12
13impl Drop for InflightGuard<'_> {
14 fn drop(&mut self) {
15 let count = {
16 let mut counts = lock_map(&self.tailtriage.inflight_counts);
17 let entry = counts.entry(self.gauge.clone()).or_insert(0);
18 if *entry > 0 {
19 *entry -= 1;
20 }
21 *entry
22 };
23
24 self.tailtriage.record_inflight_snapshot(InFlightSnapshot {
25 gauge: self.gauge.clone(),
26 at_unix_ms: unix_time_ms(),
27 count,
28 });
29 }
30}
31
32#[derive(Debug)]
34pub struct StageTimer<'a> {
35 pub(crate) tailtriage: &'a Tailtriage,
36 pub(crate) request_id: String,
37 pub(crate) stage: String,
38}
39
40impl StageTimer<'_> {
41 pub async fn await_on<Fut, T, E>(self, fut: Fut) -> Result<T, E>
54 where
55 Fut: std::future::Future<Output = Result<T, E>>,
56 {
57 let started_at_unix_ms = unix_time_ms();
58 let started = Instant::now();
59 let value = fut.await;
60 let finished_at_unix_ms = unix_time_ms();
61 let success = value.is_ok();
62
63 self.tailtriage.record_stage_event(StageEvent {
64 request_id: self.request_id,
65 stage: self.stage,
66 started_at_unix_ms,
67 finished_at_unix_ms,
68 latency_us: duration_to_us(started.elapsed()),
69 success,
70 });
71
72 value
73 }
74
75 pub async fn await_value<Fut, T>(self, fut: Fut) -> T
80 where
81 Fut: std::future::Future<Output = T>,
82 {
83 let started_at_unix_ms = unix_time_ms();
84 let started = Instant::now();
85 let value = fut.await;
86 let finished_at_unix_ms = unix_time_ms();
87
88 self.tailtriage.record_stage_event(StageEvent {
89 request_id: self.request_id,
90 stage: self.stage,
91 started_at_unix_ms,
92 finished_at_unix_ms,
93 latency_us: duration_to_us(started.elapsed()),
94 success: true,
95 });
96
97 value
98 }
99}
100
101#[derive(Debug)]
103pub struct QueueTimer<'a> {
104 pub(crate) tailtriage: &'a Tailtriage,
105 pub(crate) request_id: String,
106 pub(crate) queue: String,
107 pub(crate) depth_at_start: Option<u64>,
108}
109
110impl QueueTimer<'_> {
111 #[must_use]
113 pub fn with_depth_at_start(mut self, depth_at_start: u64) -> Self {
114 self.depth_at_start = Some(depth_at_start);
115 self
116 }
117
118 pub async fn await_on<Fut, T>(self, fut: Fut) -> T
124 where
125 Fut: std::future::Future<Output = T>,
126 {
127 let waited_from_unix_ms = unix_time_ms();
128 let started = Instant::now();
129 let value = fut.await;
130 let waited_until_unix_ms = unix_time_ms();
131
132 self.tailtriage.record_queue_event(QueueEvent {
133 request_id: self.request_id,
134 queue: self.queue,
135 waited_from_unix_ms,
136 waited_until_unix_ms,
137 wait_us: duration_to_us(started.elapsed()),
138 depth_at_start: self.depth_at_start,
139 });
140
141 value
142 }
143}