1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Arc;
4use std::sync::Mutex;
5use std::time::{Duration, Instant};
6
7use crate::config::Config;
8use crate::InflightGuard;
9use crate::RunSink;
10use crate::{
11 unix_time_ms, BuildError, InFlightSnapshot, Outcome, QueueEvent, QueueTimer, RequestEvent,
12 RequestOptions, Run, RunMetadata, RuntimeSnapshot, SinkError, StageEvent, StageTimer,
13 UnfinishedRequestSample,
14};
15
16pub struct Tailtriage {
18 pub(crate) run: Mutex<Run>,
19 pub(crate) inflight_counts: Mutex<HashMap<String, u64>>,
20 pending_requests: Mutex<HashMap<u64, PendingRequest>>,
21 pub(crate) sink: Arc<dyn RunSink + Send + Sync>,
22 pub(crate) limits: crate::CaptureLimits,
23 pub(crate) strict_lifecycle: bool,
24}
25
26#[derive(Debug, Clone)]
27struct PendingRequest {
28 request_id: String,
29 route: String,
30 kind: Option<String>,
31 started_at_unix_ms: u64,
32 started: Instant,
33}
34
35impl std::fmt::Debug for Tailtriage {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("Tailtriage")
38 .field("limits", &self.limits)
39 .field("strict_lifecycle", &self.strict_lifecycle)
40 .finish_non_exhaustive()
41 }
42}
43
44#[must_use = "request completion must be finished explicitly"]
46#[derive(Debug)]
47pub struct StartedRequest<'a> {
48 pub handle: RequestHandle<'a>,
50 pub completion: RequestCompletion<'a>,
52}
53
54#[must_use = "request completion must be finished explicitly"]
56#[derive(Debug)]
57pub struct OwnedStartedRequest {
58 pub handle: OwnedRequestHandle,
60 pub completion: OwnedRequestCompletion,
62}
63
64#[derive(Debug, Clone)]
66pub struct RequestHandle<'a> {
67 tailtriage: &'a Tailtriage,
68 request_id: String,
69 route: String,
70 kind: Option<String>,
71}
72
73#[derive(Debug, Clone)]
75pub struct OwnedRequestHandle {
76 tailtriage: Arc<Tailtriage>,
77 request_id: String,
78 route: String,
79 kind: Option<String>,
80}
81
82#[must_use = "request completion tokens must be finished explicitly"]
84#[derive(Debug)]
85pub struct RequestCompletion<'a> {
86 tailtriage: &'a Tailtriage,
87 pending_key: u64,
88 finished: bool,
89}
90
91#[must_use = "request completion tokens must be finished explicitly"]
93#[derive(Debug)]
94pub struct OwnedRequestCompletion {
95 tailtriage: Arc<Tailtriage>,
96 pending_key: u64,
97 finished: bool,
98}
99
100impl Tailtriage {
101 #[must_use]
103 pub fn builder(service_name: impl Into<String>) -> crate::TailtriageBuilder {
104 crate::TailtriageBuilder::new(service_name)
105 }
106
107 pub(crate) fn from_config(config: Config) -> Result<Self, BuildError> {
108 if config.service_name.trim().is_empty() {
109 return Err(BuildError::EmptyServiceName);
110 }
111
112 let now = unix_time_ms();
113 let run = Run::new(RunMetadata {
114 run_id: config.run_id.unwrap_or_else(generate_run_id),
115 service_name: config.service_name,
116 service_version: config.service_version,
117 started_at_unix_ms: now,
118 finished_at_unix_ms: now,
119 mode: config.mode,
120 host: None,
121 pid: Some(std::process::id()),
122 lifecycle_warnings: Vec::new(),
123 unfinished_requests: crate::UnfinishedRequests::default(),
124 });
125
126 Ok(Self {
127 run: Mutex::new(run),
128 inflight_counts: Mutex::new(HashMap::new()),
129 pending_requests: Mutex::new(HashMap::new()),
130 sink: config.sink,
131 limits: config.capture_limits,
132 strict_lifecycle: config.strict_lifecycle,
133 })
134 }
135
136 pub fn begin_request(&self, route: impl Into<String>) -> StartedRequest<'_> {
138 self.begin_request_with(route, RequestOptions::new())
139 }
140
141 pub fn begin_request_with(
143 &self,
144 route: impl Into<String>,
145 options: RequestOptions,
146 ) -> StartedRequest<'_> {
147 let (request_id, route, kind, pending_key) = self.start_request(route.into(), options);
148
149 StartedRequest {
150 handle: RequestHandle {
151 tailtriage: self,
152 request_id: request_id.clone(),
153 route,
154 kind,
155 },
156 completion: RequestCompletion {
157 tailtriage: self,
158 pending_key,
159 finished: false,
160 },
161 }
162 }
163
164 pub fn begin_request_owned(self: &Arc<Self>, route: impl Into<String>) -> OwnedStartedRequest {
166 self.begin_request_with_owned(route, RequestOptions::new())
167 }
168
169 pub fn begin_request_with_owned(
171 self: &Arc<Self>,
172 route: impl Into<String>,
173 options: RequestOptions,
174 ) -> OwnedStartedRequest {
175 let (request_id, route, kind, pending_key) = self.start_request(route.into(), options);
176
177 OwnedStartedRequest {
178 handle: OwnedRequestHandle {
179 tailtriage: Arc::clone(self),
180 request_id: request_id.clone(),
181 route,
182 kind,
183 },
184 completion: OwnedRequestCompletion {
185 tailtriage: Arc::clone(self),
186 pending_key,
187 finished: false,
188 },
189 }
190 }
191
192 #[must_use]
194 pub fn snapshot(&self) -> Run {
195 lock_run(&self.run).clone()
196 }
197
198 pub fn shutdown(&self) -> Result<(), SinkError> {
204 let mut pending_samples = Vec::new();
205 let pending_count = {
206 let pending = lock_pending(&self.pending_requests);
207 pending_samples.extend(pending.values().take(5).map(|req| UnfinishedRequestSample {
208 request_id: req.request_id.clone(),
209 route: req.route.clone(),
210 }));
211 pending.len()
212 };
213
214 let mut guard = lock_run(&self.run);
215 guard.metadata.finished_at_unix_ms = unix_time_ms();
216 if pending_count > 0 {
217 guard.metadata.lifecycle_warnings.push(format!(
218 "{pending_count} unfinished request(s) remained at shutdown; run includes no fabricated completions"
219 ));
220 guard.metadata.unfinished_requests.count = pending_count as u64;
221 guard.metadata.unfinished_requests.sample = pending_samples;
222 if self.strict_lifecycle {
223 return Err(SinkError::Lifecycle {
224 unfinished_count: pending_count,
225 });
226 }
227 }
228
229 self.sink.write(&guard)
230 }
231
232 #[must_use]
234 pub(crate) fn inflight(&self, gauge: impl Into<String>) -> InflightGuard<'_> {
235 let gauge = gauge.into();
236 let count = {
237 let mut counts = lock_map(&self.inflight_counts);
238 let entry = counts.entry(gauge.clone()).or_insert(0);
239 *entry += 1;
240 *entry
241 };
242
243 self.record_inflight_snapshot(InFlightSnapshot {
244 gauge: gauge.clone(),
245 at_unix_ms: unix_time_ms(),
246 count,
247 });
248
249 InflightGuard {
250 tailtriage: self,
251 gauge,
252 }
253 }
254
255 pub fn record_runtime_snapshot(&self, snapshot: RuntimeSnapshot) {
257 let mut run = lock_run(&self.run);
258 if run.runtime_snapshots.len() >= self.limits.max_runtime_snapshots {
259 run.truncation.dropped_runtime_snapshots =
260 run.truncation.dropped_runtime_snapshots.saturating_add(1);
261 } else {
262 run.runtime_snapshots.push(snapshot);
263 }
264 }
265
266 pub(crate) fn record_stage_event(&self, event: StageEvent) {
267 let mut run = lock_run(&self.run);
268 if run.stages.len() >= self.limits.max_stages {
269 run.truncation.dropped_stages = run.truncation.dropped_stages.saturating_add(1);
270 } else {
271 run.stages.push(event);
272 }
273 }
274
275 pub(crate) fn record_queue_event(&self, event: QueueEvent) {
276 let mut run = lock_run(&self.run);
277 if run.queues.len() >= self.limits.max_queues {
278 run.truncation.dropped_queues = run.truncation.dropped_queues.saturating_add(1);
279 } else {
280 run.queues.push(event);
281 }
282 }
283
284 pub(crate) fn record_inflight_snapshot(&self, snapshot: InFlightSnapshot) {
285 let mut run = lock_run(&self.run);
286 if run.inflight.len() >= self.limits.max_inflight_snapshots {
287 run.truncation.dropped_inflight_snapshots =
288 run.truncation.dropped_inflight_snapshots.saturating_add(1);
289 } else {
290 run.inflight.push(snapshot);
291 }
292 }
293
294 fn record_request_event(&self, event: RequestEvent) {
295 let mut run = lock_run(&self.run);
296 if run.requests.len() >= self.limits.max_requests {
297 run.truncation.dropped_requests = run.truncation.dropped_requests.saturating_add(1);
298 } else {
299 run.requests.push(event);
300 }
301 }
302
303 fn start_request(
304 &self,
305 route: String,
306 options: RequestOptions,
307 ) -> (String, String, Option<String>, u64) {
308 let request_id = options
309 .request_id
310 .unwrap_or_else(|| generate_request_id(&route));
311 let pending_key = PENDING_SEQUENCE.fetch_add(1, Ordering::Relaxed);
312 let kind = options.kind;
313 let pending = PendingRequest {
314 request_id: request_id.clone(),
315 route: route.clone(),
316 kind: kind.clone(),
317 started_at_unix_ms: unix_time_ms(),
318 started: Instant::now(),
319 };
320 lock_pending(&self.pending_requests).insert(pending_key, pending);
321
322 (request_id, route, kind, pending_key)
323 }
324}
325
326impl RequestHandle<'_> {
327 #[must_use]
329 pub fn request_id(&self) -> &str {
330 &self.request_id
331 }
332
333 #[must_use]
335 pub fn route(&self) -> &str {
336 &self.route
337 }
338
339 #[must_use]
341 pub fn kind(&self) -> Option<&str> {
342 self.kind.as_deref()
343 }
344
345 #[must_use]
347 pub fn queue(&self, queue: impl Into<String>) -> QueueTimer<'_> {
348 QueueTimer {
349 tailtriage: self.tailtriage,
350 request_id: self.request_id.clone(),
351 queue: queue.into(),
352 depth_at_start: None,
353 }
354 }
355
356 #[must_use]
358 pub fn stage(&self, stage: impl Into<String>) -> StageTimer<'_> {
359 StageTimer {
360 tailtriage: self.tailtriage,
361 request_id: self.request_id.clone(),
362 stage: stage.into(),
363 }
364 }
365
366 #[must_use]
368 pub fn inflight(&self, gauge: impl Into<String>) -> InflightGuard<'_> {
369 self.tailtriage.inflight(gauge)
370 }
371}
372
373impl RequestCompletion<'_> {
374 pub fn finish(mut self, outcome: Outcome) {
376 self.finish_internal(outcome);
377 }
378
379 pub fn finish_ok(self) {
381 self.finish(Outcome::Ok);
382 }
383
384 pub fn finish_result<T, E>(self, result: Result<T, E>) -> Result<T, E> {
391 let outcome = if result.is_ok() {
392 Outcome::Ok
393 } else {
394 Outcome::Error
395 };
396 self.finish(outcome);
397 result
398 }
399
400 fn finish_internal(&mut self, outcome: Outcome) {
401 if self.finished {
402 debug_assert!(
403 !self.finished,
404 "tailtriage request completion was finished more than once; each request must be finished exactly once"
405 );
406 return;
407 }
408
409 let pending = lock_pending(&self.tailtriage.pending_requests).remove(&self.pending_key);
410 let Some(pending) = pending else {
411 debug_assert!(
412 false,
413 "tailtriage request completion token had no pending request entry"
414 );
415 self.finished = true;
416 return;
417 };
418 self.finished = true;
419
420 self.tailtriage.record_request_event(RequestEvent {
421 request_id: pending.request_id,
422 route: pending.route,
423 kind: pending.kind,
424 started_at_unix_ms: pending.started_at_unix_ms,
425 finished_at_unix_ms: unix_time_ms(),
426 latency_us: duration_to_us(pending.started.elapsed()),
427 outcome: outcome.into_string(),
428 });
429 }
430}
431
432impl OwnedRequestHandle {
433 #[must_use]
435 pub fn request_id(&self) -> &str {
436 &self.request_id
437 }
438
439 #[must_use]
441 pub fn route(&self) -> &str {
442 &self.route
443 }
444
445 #[must_use]
447 pub fn kind(&self) -> Option<&str> {
448 self.kind.as_deref()
449 }
450
451 #[must_use]
453 pub fn queue(&self, queue: impl Into<String>) -> QueueTimer<'_> {
454 QueueTimer {
455 tailtriage: self.tailtriage.as_ref(),
456 request_id: self.request_id.clone(),
457 queue: queue.into(),
458 depth_at_start: None,
459 }
460 }
461
462 #[must_use]
464 pub fn stage(&self, stage: impl Into<String>) -> StageTimer<'_> {
465 StageTimer {
466 tailtriage: self.tailtriage.as_ref(),
467 request_id: self.request_id.clone(),
468 stage: stage.into(),
469 }
470 }
471
472 #[must_use]
474 pub fn inflight(&self, gauge: impl Into<String>) -> InflightGuard<'_> {
475 self.tailtriage.as_ref().inflight(gauge)
476 }
477}
478
479impl OwnedRequestCompletion {
480 pub fn finish(mut self, outcome: Outcome) {
482 self.finish_internal(outcome);
483 }
484
485 pub fn finish_ok(self) {
487 self.finish(Outcome::Ok);
488 }
489
490 pub fn finish_result<T, E>(self, result: Result<T, E>) -> Result<T, E> {
497 let outcome = if result.is_ok() {
498 Outcome::Ok
499 } else {
500 Outcome::Error
501 };
502 self.finish(outcome);
503 result
504 }
505
506 fn finish_internal(&mut self, outcome: Outcome) {
507 if self.finished {
508 debug_assert!(
509 !self.finished,
510 "tailtriage request completion was finished more than once; each request must be finished exactly once"
511 );
512 return;
513 }
514
515 let pending = lock_pending(&self.tailtriage.pending_requests).remove(&self.pending_key);
516 let Some(pending) = pending else {
517 self.finished = true;
518 return;
519 };
520 self.finished = true;
521
522 self.tailtriage.record_request_event(RequestEvent {
523 request_id: pending.request_id,
524 route: pending.route,
525 kind: pending.kind,
526 started_at_unix_ms: pending.started_at_unix_ms,
527 finished_at_unix_ms: unix_time_ms(),
528 latency_us: duration_to_us(pending.started.elapsed()),
529 outcome: outcome.into_string(),
530 });
531 }
532}
533
534impl Drop for RequestCompletion<'_> {
535 fn drop(&mut self) {
536 debug_assert!(
537 self.finished || std::thread::panicking(),
538 "tailtriage request completion dropped without finish(...), finish_ok(), or finish_result(...)"
539 );
540 }
541}
542
543impl Drop for OwnedRequestCompletion {
544 fn drop(&mut self) {
545 debug_assert!(
546 self.finished || std::thread::panicking(),
547 "tailtriage request completion dropped without finish(...), finish_ok(), or finish_result(...)"
548 );
549 }
550}
551
552pub(crate) fn lock_run(run: &Mutex<Run>) -> std::sync::MutexGuard<'_, Run> {
553 match run.lock() {
554 Ok(guard) => guard,
555 Err(poisoned) => poisoned.into_inner(),
556 }
557}
558
559pub(crate) fn lock_map(
560 map: &Mutex<HashMap<String, u64>>,
561) -> std::sync::MutexGuard<'_, HashMap<String, u64>> {
562 match map.lock() {
563 Ok(guard) => guard,
564 Err(poisoned) => poisoned.into_inner(),
565 }
566}
567
568fn lock_pending(
569 map: &Mutex<HashMap<u64, PendingRequest>>,
570) -> std::sync::MutexGuard<'_, HashMap<u64, PendingRequest>> {
571 match map.lock() {
572 Ok(guard) => guard,
573 Err(poisoned) => poisoned.into_inner(),
574 }
575}
576
577pub(crate) fn duration_to_us(duration: Duration) -> u64 {
578 duration.as_micros().try_into().unwrap_or(u64::MAX)
579}
580
581fn generate_run_id() -> String {
582 format!("run-{}", unix_time_ms())
583}
584
585fn generate_request_id(route: &str) -> String {
586 let route_prefix = route
587 .chars()
588 .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '_' })
589 .collect::<String>();
590 let sequence = REQUEST_SEQUENCE.fetch_add(1, Ordering::Relaxed);
591 format!("{route_prefix}-{}-{sequence}", unix_time_ms())
592}
593
594static REQUEST_SEQUENCE: AtomicU64 = AtomicU64::new(0);
595static PENDING_SEQUENCE: AtomicU64 = AtomicU64::new(0);