use std::time::Instant;
use crate::collector::{duration_to_us, lock_map};
use crate::{unix_time_ms, InFlightSnapshot, QueueEvent, StageEvent, Tailtriage};
#[derive(Debug)]
pub struct InflightGuard<'a> {
pub(crate) tailtriage: &'a Tailtriage,
pub(crate) gauge: String,
pub(crate) enabled: bool,
}
impl Drop for InflightGuard<'_> {
fn drop(&mut self) {
if !self.enabled {
return;
}
let count = {
let mut counts = lock_map(&self.tailtriage.inflight_counts);
let entry = counts.entry(self.gauge.clone()).or_insert(0);
if *entry > 0 {
*entry -= 1;
}
*entry
};
self.tailtriage.record_inflight_snapshot(InFlightSnapshot {
gauge: self.gauge.clone(),
at_unix_ms: unix_time_ms(),
count,
});
}
}
#[derive(Debug)]
pub struct StageTimer<'a> {
pub(crate) tailtriage: &'a Tailtriage,
pub(crate) enabled: bool,
pub(crate) request_id: String,
pub(crate) stage: String,
}
impl StageTimer<'_> {
pub async fn await_on<Fut, T, E>(self, fut: Fut) -> Result<T, E>
where
Fut: std::future::Future<Output = Result<T, E>>,
{
if !self.enabled {
return fut.await;
}
let started_at_unix_ms = unix_time_ms();
let started = Instant::now();
let value = fut.await;
let finished_at_unix_ms = unix_time_ms();
let success = value.is_ok();
self.tailtriage.record_stage_event(StageEvent {
request_id: self.request_id,
stage: self.stage,
started_at_unix_ms,
finished_at_unix_ms,
latency_us: duration_to_us(started.elapsed()),
success,
});
value
}
pub async fn await_value<Fut, T>(self, fut: Fut) -> T
where
Fut: std::future::Future<Output = T>,
{
if !self.enabled {
return fut.await;
}
let started_at_unix_ms = unix_time_ms();
let started = Instant::now();
let value = fut.await;
let finished_at_unix_ms = unix_time_ms();
self.tailtriage.record_stage_event(StageEvent {
request_id: self.request_id,
stage: self.stage,
started_at_unix_ms,
finished_at_unix_ms,
latency_us: duration_to_us(started.elapsed()),
success: true,
});
value
}
}
#[derive(Debug)]
pub struct QueueTimer<'a> {
pub(crate) tailtriage: &'a Tailtriage,
pub(crate) enabled: bool,
pub(crate) request_id: String,
pub(crate) queue: String,
pub(crate) depth_at_start: Option<u64>,
}
impl QueueTimer<'_> {
#[must_use]
pub fn with_depth_at_start(mut self, depth_at_start: u64) -> Self {
self.depth_at_start = Some(depth_at_start);
self
}
pub async fn await_on<Fut, T>(self, fut: Fut) -> T
where
Fut: std::future::Future<Output = T>,
{
if !self.enabled {
return fut.await;
}
let waited_from_unix_ms = unix_time_ms();
let started = Instant::now();
let value = fut.await;
let waited_until_unix_ms = unix_time_ms();
self.tailtriage.record_queue_event(QueueEvent {
request_id: self.request_id,
queue: self.queue,
waited_from_unix_ms,
waited_until_unix_ms,
wait_us: duration_to_us(started.elapsed()),
depth_at_start: self.depth_at_start,
});
value
}
}