Skip to main content

tailtriage_core/
collector.rs

1use std::collections::HashMap;
2use std::ffi::OsString;
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::Arc;
5use std::sync::Mutex;
6use std::time::{Duration, Instant};
7
8use crate::config::Config;
9use crate::InflightGuard;
10use crate::RunSink;
11use crate::{
12    unix_time_ms, BuildError, InFlightSnapshot, Outcome, QueueEvent, QueueTimer, RequestEvent,
13    RequestOptions, Run, RunEndReason, RunMetadata, RuntimeSnapshot, SinkError, StageEvent,
14    StageTimer, UnfinishedRequestSample,
15};
16
17/// Per-run collector that records request events and writes the final artifact.
18pub struct Tailtriage {
19    pub(crate) run: Mutex<Run>,
20    pub(crate) inflight_counts: Mutex<HashMap<String, u64>>,
21    pending_requests: Mutex<HashMap<u64, PendingRequest>>,
22    pub(crate) sink: Arc<dyn RunSink + Send + Sync>,
23    pub(crate) mode: crate::CaptureMode,
24    pub(crate) effective_core_config: crate::EffectiveCoreConfig,
25    pub(crate) limits: crate::CaptureLimits,
26    pub(crate) strict_lifecycle: bool,
27    truncation_state: TruncationState,
28    runtime_sampler_registered: AtomicBool,
29    limits_hit_listener: Mutex<Option<Arc<dyn Fn() + Send + Sync>>>,
30}
31
32#[derive(Debug, Default)]
33struct SectionSaturationState {
34    saturated: AtomicBool,
35    dropped_after_saturation: AtomicU64,
36}
37
38impl SectionSaturationState {
39    fn is_saturated(&self) -> bool {
40        self.saturated.load(Ordering::Relaxed)
41    }
42
43    fn mark_saturated(&self) {
44        self.saturated.store(true, Ordering::Relaxed);
45    }
46
47    fn increment_drop(&self) {
48        self.dropped_after_saturation
49            .fetch_add(1, Ordering::Relaxed);
50    }
51
52    fn dropped_after_saturation(&self) -> u64 {
53        self.dropped_after_saturation.load(Ordering::Relaxed)
54    }
55}
56
57#[derive(Debug, Default)]
58struct TruncationState {
59    limits_hit: AtomicBool,
60    requests: SectionSaturationState,
61    stages: SectionSaturationState,
62    queues: SectionSaturationState,
63    inflight: SectionSaturationState,
64    runtime_snapshots: SectionSaturationState,
65}
66
67impl TruncationState {
68    fn mark_limits_hit(&self) -> bool {
69        self.limits_hit
70            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
71            .is_ok()
72    }
73
74    fn merge_into(&self, truncation: &mut crate::TruncationSummary) {
75        truncation.dropped_requests = truncation
76            .dropped_requests
77            .saturating_add(self.requests.dropped_after_saturation());
78        truncation.dropped_stages = truncation
79            .dropped_stages
80            .saturating_add(self.stages.dropped_after_saturation());
81        truncation.dropped_queues = truncation
82            .dropped_queues
83            .saturating_add(self.queues.dropped_after_saturation());
84        truncation.dropped_inflight_snapshots = truncation
85            .dropped_inflight_snapshots
86            .saturating_add(self.inflight.dropped_after_saturation());
87        truncation.dropped_runtime_snapshots = truncation
88            .dropped_runtime_snapshots
89            .saturating_add(self.runtime_snapshots.dropped_after_saturation());
90        truncation.limits_hit |=
91            self.limits_hit.load(Ordering::Relaxed) || truncation.is_truncated();
92    }
93}
94
95#[derive(Debug, Clone)]
96struct PendingRequest {
97    request_id: String,
98    route: String,
99    kind: Option<String>,
100    started_at_unix_ms: u64,
101    started: Instant,
102}
103
104impl std::fmt::Debug for Tailtriage {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("Tailtriage")
107            .field("mode", &self.mode)
108            .field("limits", &self.limits)
109            .field("strict_lifecycle", &self.strict_lifecycle)
110            .finish_non_exhaustive()
111    }
112}
113
114/// Split request lifecycle start result.
115///
116/// Use `handle` to record queue/stage/inflight evidence, then finish exactly
117/// once with `completion`.
118///
119/// Instrumentation alone does not complete the request lifecycle.
120#[must_use = "request completion must be finished explicitly"]
121#[derive(Debug)]
122pub struct StartedRequest<'a> {
123    /// Instrumentation handle for queue/stage/inflight timing.
124    pub handle: RequestHandle<'a>,
125    /// Single-owner completion token for explicit request finish.
126    pub completion: RequestCompletion<'a>,
127}
128
129/// Split request lifecycle start result backed by `Arc<Tailtriage>`.
130///
131/// Use `handle` for instrumentation and `completion` to finish exactly once.
132#[must_use = "request completion must be finished explicitly"]
133#[derive(Debug)]
134pub struct OwnedStartedRequest {
135    /// Instrumentation handle for queue/stage/inflight timing.
136    pub handle: OwnedRequestHandle,
137    /// Single-owner completion token for explicit request finish.
138    pub completion: OwnedRequestCompletion,
139}
140
141/// Instrumentation-facing request handle.
142///
143/// This handle records queue/stage/inflight signals for one admitted request.
144/// It does not complete the request.
145///
146/// # Example
147///
148/// ```no_run
149/// use tailtriage_core::Tailtriage;
150///
151/// # async fn demo() -> Result<(), Box<dyn std::error::Error>> {
152/// let run = Tailtriage::builder("checkout-service").build()?;
153/// let started = run.begin_request("/checkout");
154///
155/// started.handle.queue("checkout_queue").await_on(async {}).await;
156/// let _: Result<(), std::io::Error> = started
157///     .handle
158///     .stage("db")
159///     .await_on(async { Ok(()) })
160///     .await;
161///
162/// started.completion.finish_ok();
163/// run.shutdown()?;
164/// # Ok(())
165/// # }
166/// ```
167#[derive(Debug, Clone)]
168pub struct RequestHandle<'a> {
169    tailtriage: &'a Tailtriage,
170    request_id: String,
171    route: String,
172    kind: Option<String>,
173}
174
175/// Instrumentation-facing request handle backed by `Arc<Tailtriage>`.
176///
177/// This is the owned variant of [`RequestHandle`], useful across spawned tasks
178/// and helper layers.
179#[derive(Debug, Clone)]
180pub struct OwnedRequestHandle {
181    tailtriage: Arc<Tailtriage>,
182    request_id: String,
183    route: String,
184    kind: Option<String>,
185}
186
187/// Completion-facing request token.
188///
189/// Each admitted request must be finished exactly once with
190/// [`RequestCompletion::finish`], [`RequestCompletion::finish_ok`], or
191/// [`RequestCompletion::finish_result`].
192///
193/// Dropping this token does not auto-finish the request.
194#[must_use = "request completion tokens must be finished explicitly"]
195#[derive(Debug)]
196pub struct RequestCompletion<'a> {
197    tailtriage: &'a Tailtriage,
198    pending_key: u64,
199    finished: bool,
200}
201
202/// Completion-facing request token backed by `Arc<Tailtriage>`.
203///
204/// Owned variant of [`RequestCompletion`]. Dropping this token does not
205/// auto-finish the request.
206#[must_use = "request completion tokens must be finished explicitly"]
207#[derive(Debug)]
208pub struct OwnedRequestCompletion {
209    tailtriage: Arc<Tailtriage>,
210    pending_key: u64,
211    finished: bool,
212}
213
214/// Error returned when registering Tokio runtime sampler metadata.
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum RuntimeSamplerRegistrationError {
217    /// A runtime sampler was already registered for this run.
218    DuplicateStart,
219}
220
221impl Tailtriage {
222    /// Creates a builder-based setup path for one service run.
223    #[must_use]
224    pub fn builder(service_name: impl Into<String>) -> crate::TailtriageBuilder {
225        crate::TailtriageBuilder::new(service_name)
226    }
227
228    pub(crate) fn from_config(config: Config) -> Result<Self, BuildError> {
229        if config.service_name.trim().is_empty() {
230            return Err(BuildError::EmptyServiceName);
231        }
232
233        let now = unix_time_ms();
234        let run = Run::new(RunMetadata {
235            run_id: config.run_id.unwrap_or_else(generate_run_id),
236            service_name: config.service_name,
237            service_version: config.service_version,
238            started_at_unix_ms: now,
239            finished_at_unix_ms: now,
240            finalized_at_unix_ms: None,
241            mode: config.mode,
242            effective_core_config: Some(config.effective_core),
243            effective_tokio_sampler_config: None,
244            host: lookup_host_name(),
245            pid: Some(std::process::id()),
246            lifecycle_warnings: Vec::new(),
247            unfinished_requests: crate::UnfinishedRequests::default(),
248            run_end_reason: None,
249        });
250
251        Ok(Self {
252            run: Mutex::new(run),
253            inflight_counts: Mutex::new(HashMap::new()),
254            pending_requests: Mutex::new(HashMap::new()),
255            sink: config.sink,
256            mode: config.mode,
257            effective_core_config: config.effective_core,
258            limits: config.effective_core.capture_limits,
259            strict_lifecycle: config.strict_lifecycle,
260            truncation_state: TruncationState::default(),
261            runtime_sampler_registered: AtomicBool::new(false),
262            limits_hit_listener: Mutex::new(None),
263        })
264    }
265
266    /// Returns the selected capture mode for this run.
267    #[must_use]
268    pub const fn selected_mode(&self) -> crate::CaptureMode {
269        self.mode
270    }
271
272    /// Returns the effective resolved core configuration for this run.
273    #[must_use]
274    pub const fn effective_core_config(&self) -> crate::EffectiveCoreConfig {
275        self.effective_core_config
276    }
277
278    /// Starts a request with autogenerated correlation for `route`.
279    ///
280    /// Finish the returned completion token exactly once.
281    pub fn begin_request(&self, route: impl Into<String>) -> StartedRequest<'_> {
282        self.begin_request_with(route, RequestOptions::new())
283    }
284
285    /// Starts a request with optional caller-provided request options.
286    ///
287    /// Queue/stage/inflight instrumentation from the returned handle records
288    /// evidence only; it does not finish the request. Finish must happen
289    /// exactly once through the returned completion token.
290    pub fn begin_request_with(
291        &self,
292        route: impl Into<String>,
293        options: RequestOptions,
294    ) -> StartedRequest<'_> {
295        let (request_id, route, kind, pending_key) = self.start_request(route.into(), options);
296
297        StartedRequest {
298            handle: RequestHandle {
299                tailtriage: self,
300                request_id: request_id.clone(),
301                route,
302                kind,
303            },
304            completion: RequestCompletion {
305                tailtriage: self,
306                pending_key,
307                finished: false,
308            },
309        }
310    }
311
312    /// Starts a request with autogenerated correlation for `route` using `Arc<Tailtriage>`.
313    ///
314    /// This is the owned variant for fractured code paths that need to move
315    /// handles across task boundaries.
316    pub fn begin_request_owned(self: &Arc<Self>, route: impl Into<String>) -> OwnedStartedRequest {
317        self.begin_request_with_owned(route, RequestOptions::new())
318    }
319
320    /// Starts a request with caller-provided options using `Arc<Tailtriage>`.
321    ///
322    /// Finish the returned completion token exactly once.
323    pub fn begin_request_with_owned(
324        self: &Arc<Self>,
325        route: impl Into<String>,
326        options: RequestOptions,
327    ) -> OwnedStartedRequest {
328        let (request_id, route, kind, pending_key) = self.start_request(route.into(), options);
329
330        OwnedStartedRequest {
331            handle: OwnedRequestHandle {
332                tailtriage: Arc::clone(self),
333                request_id: request_id.clone(),
334                route,
335                kind,
336            },
337            completion: OwnedRequestCompletion {
338                tailtriage: Arc::clone(self),
339                pending_key,
340                finished: false,
341            },
342        }
343    }
344
345    /// Returns a clone of the current in-memory run state.
346    ///
347    /// This does not persist anything. Use [`Tailtriage::shutdown`] to write
348    /// the final artifact through the configured sink.
349    ///
350    /// `snapshot()` is useful for diagnostics and tests while capture is still
351    /// running. While capture is active, `metadata.finalized_at_unix_ms` remains
352    /// `None` and `metadata.finished_at_unix_ms` is still provisional.
353    #[must_use]
354    pub fn snapshot(&self) -> Run {
355        let mut run = lock_run(&self.run).clone();
356        self.truncation_state.merge_into(&mut run.truncation);
357        run
358    }
359
360    /// Writes the current run artifact and finishes the run lifecycle.
361    ///
362    /// With default/non-strict lifecycle, unfinished requests are recorded in
363    /// metadata warnings and unfinished-request samples, then the artifact is written.
364    ///
365    /// With `strict_lifecycle(true)`, unfinished requests cause an early
366    /// [`SinkError::Lifecycle`] return and the artifact is not written.
367    ///
368    /// # Errors
369    ///
370    /// Returns [`SinkError`] if lifecycle validation fails in strict mode, or if
371    /// serialization or writing fails.
372    pub fn shutdown(&self) -> Result<(), SinkError> {
373        let mut pending_samples = Vec::new();
374        let pending_count = {
375            let pending = lock_pending(&self.pending_requests);
376            pending_samples.extend(pending.values().take(5).map(|req| UnfinishedRequestSample {
377                request_id: req.request_id.clone(),
378                route: req.route.clone(),
379            }));
380            pending.len()
381        };
382
383        let mut guard = lock_run(&self.run);
384        let finalized_at = unix_time_ms();
385        guard.metadata.finished_at_unix_ms = finalized_at;
386        guard.metadata.finalized_at_unix_ms = Some(finalized_at);
387        if pending_count > 0 {
388            guard.metadata.lifecycle_warnings.push(format!(
389                "{pending_count} unfinished request(s) remained at shutdown; run includes no fabricated completions"
390            ));
391            guard.metadata.unfinished_requests.count = pending_count as u64;
392            guard.metadata.unfinished_requests.sample = pending_samples;
393            if self.strict_lifecycle {
394                return Err(SinkError::Lifecycle {
395                    unfinished_count: pending_count,
396                });
397            }
398        }
399
400        self.truncation_state.merge_into(&mut guard.truncation);
401        self.sink.write(&guard)
402    }
403
404    /// Sets the run-end reason if not already set.
405    pub fn set_run_end_reason_if_absent(&self, reason: RunEndReason) {
406        let mut run = lock_run(&self.run);
407        if run.metadata.run_end_reason.is_none() {
408            run.metadata.run_end_reason = Some(reason);
409        }
410    }
411
412    /// Registers or clears a callback fired on the first transition to `limits_hit`.
413    ///
414    /// The callback is invoked at most once per run, exactly when truncation first
415    /// transitions from `false` to `true`.
416    ///
417    /// # Panics
418    ///
419    /// Panics if the limits-hit listener mutex is poisoned.
420    pub fn set_limits_hit_listener(&self, listener: Option<Arc<dyn Fn() + Send + Sync>>) {
421        let mut guard = self
422            .limits_hit_listener
423            .lock()
424            .expect("limits-hit listener lock poisoned");
425        *guard = listener;
426    }
427
428    /// Creates an in-flight guard for `gauge`.
429    #[must_use]
430    pub(crate) fn inflight(&self, gauge: impl Into<String>) -> InflightGuard<'_> {
431        let gauge = gauge.into();
432        let count = {
433            let mut counts = lock_map(&self.inflight_counts);
434            let entry = counts.entry(gauge.clone()).or_insert(0);
435            *entry += 1;
436            *entry
437        };
438
439        self.record_inflight_snapshot(InFlightSnapshot {
440            gauge: gauge.clone(),
441            at_unix_ms: unix_time_ms(),
442            count,
443        });
444
445        InflightGuard {
446            tailtriage: self,
447            gauge,
448            enabled: true,
449        }
450    }
451
452    /// Records one runtime metrics sample captured by an integration crate.
453    pub fn record_runtime_snapshot(&self, snapshot: RuntimeSnapshot) {
454        if self.truncation_state.runtime_snapshots.is_saturated() {
455            self.truncation_state.runtime_snapshots.increment_drop();
456            self.notify_limits_hit_transition();
457            return;
458        }
459
460        let mut notify_limits_hit = false;
461        {
462            let mut run = lock_run(&self.run);
463            if run.runtime_snapshots.len() >= self.limits.max_runtime_snapshots {
464                run.truncation.limits_hit = true;
465                run.truncation.dropped_runtime_snapshots =
466                    run.truncation.dropped_runtime_snapshots.saturating_add(1);
467                self.truncation_state.runtime_snapshots.mark_saturated();
468                notify_limits_hit = true;
469            } else {
470                run.runtime_snapshots.push(snapshot);
471            }
472        }
473        if notify_limits_hit {
474            self.notify_limits_hit_transition();
475        }
476    }
477
478    /// Registers one Tokio sampler startup and records effective sampler metadata.
479    ///
480    /// This method succeeds at most once per run.
481    ///
482    /// # Errors
483    ///
484    /// Returns [`RuntimeSamplerRegistrationError::DuplicateStart`] when a sampler
485    /// was already registered for this run.
486    pub(crate) fn register_tokio_runtime_sampler(
487        &self,
488        config: crate::EffectiveTokioSamplerConfig,
489    ) -> Result<(), RuntimeSamplerRegistrationError> {
490        if self
491            .runtime_sampler_registered
492            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
493            .is_err()
494        {
495            return Err(RuntimeSamplerRegistrationError::DuplicateStart);
496        }
497
498        let mut run = lock_run(&self.run);
499        run.metadata.effective_tokio_sampler_config = Some(config);
500        Ok(())
501    }
502
503    pub(crate) fn record_stage_event(&self, event: StageEvent) {
504        if self.truncation_state.stages.is_saturated() {
505            self.truncation_state.stages.increment_drop();
506            self.notify_limits_hit_transition();
507            return;
508        }
509
510        let mut notify_limits_hit = false;
511        {
512            let mut run = lock_run(&self.run);
513            if run.stages.len() >= self.limits.max_stages {
514                run.truncation.limits_hit = true;
515                run.truncation.dropped_stages = run.truncation.dropped_stages.saturating_add(1);
516                self.truncation_state.stages.mark_saturated();
517                notify_limits_hit = true;
518            } else {
519                run.stages.push(event);
520            }
521        }
522        if notify_limits_hit {
523            self.notify_limits_hit_transition();
524        }
525    }
526
527    pub(crate) fn record_queue_event(&self, event: QueueEvent) {
528        if self.truncation_state.queues.is_saturated() {
529            self.truncation_state.queues.increment_drop();
530            self.notify_limits_hit_transition();
531            return;
532        }
533
534        let mut notify_limits_hit = false;
535        {
536            let mut run = lock_run(&self.run);
537            if run.queues.len() >= self.limits.max_queues {
538                run.truncation.limits_hit = true;
539                run.truncation.dropped_queues = run.truncation.dropped_queues.saturating_add(1);
540                self.truncation_state.queues.mark_saturated();
541                notify_limits_hit = true;
542            } else {
543                run.queues.push(event);
544            }
545        }
546        if notify_limits_hit {
547            self.notify_limits_hit_transition();
548        }
549    }
550
551    pub(crate) fn record_inflight_snapshot(&self, snapshot: InFlightSnapshot) {
552        if self.truncation_state.inflight.is_saturated() {
553            self.truncation_state.inflight.increment_drop();
554            self.notify_limits_hit_transition();
555            return;
556        }
557
558        let mut notify_limits_hit = false;
559        {
560            let mut run = lock_run(&self.run);
561            if run.inflight.len() >= self.limits.max_inflight_snapshots {
562                run.truncation.limits_hit = true;
563                run.truncation.dropped_inflight_snapshots =
564                    run.truncation.dropped_inflight_snapshots.saturating_add(1);
565                self.truncation_state.inflight.mark_saturated();
566                notify_limits_hit = true;
567            } else {
568                run.inflight.push(snapshot);
569            }
570        }
571        if notify_limits_hit {
572            self.notify_limits_hit_transition();
573        }
574    }
575
576    fn record_request_event(&self, event: RequestEvent) {
577        if self.truncation_state.requests.is_saturated() {
578            self.truncation_state.requests.increment_drop();
579            self.notify_limits_hit_transition();
580            return;
581        }
582
583        let mut notify_limits_hit = false;
584        {
585            let mut run = lock_run(&self.run);
586            if run.requests.len() >= self.limits.max_requests {
587                run.truncation.limits_hit = true;
588                run.truncation.dropped_requests = run.truncation.dropped_requests.saturating_add(1);
589                self.truncation_state.requests.mark_saturated();
590                notify_limits_hit = true;
591            } else {
592                run.requests.push(event);
593            }
594        }
595        if notify_limits_hit {
596            self.notify_limits_hit_transition();
597        }
598    }
599
600    fn notify_limits_hit_transition(&self) {
601        if !self.truncation_state.mark_limits_hit() {
602            return;
603        }
604        let listener = self
605            .limits_hit_listener
606            .lock()
607            .expect("limits-hit listener lock poisoned")
608            .clone();
609        if let Some(listener) = listener {
610            listener();
611        }
612    }
613
614    fn start_request(
615        &self,
616        route: String,
617        options: RequestOptions,
618    ) -> (String, String, Option<String>, u64) {
619        let request_id = options
620            .request_id
621            .unwrap_or_else(|| generate_request_id(&route));
622        let pending_key = PENDING_SEQUENCE.fetch_add(1, Ordering::Relaxed);
623        let kind = options.kind;
624        let pending = PendingRequest {
625            request_id: request_id.clone(),
626            route: route.clone(),
627            kind: kind.clone(),
628            started_at_unix_ms: unix_time_ms(),
629            started: Instant::now(),
630        };
631        lock_pending(&self.pending_requests).insert(pending_key, pending);
632
633        (request_id, route, kind, pending_key)
634    }
635}
636
637impl RequestHandle<'_> {
638    /// Returns the stable request ID for this request lifecycle.
639    #[must_use]
640    pub fn request_id(&self) -> &str {
641        &self.request_id
642    }
643
644    /// Returns the route or operation name associated with this request.
645    #[must_use]
646    pub fn route(&self) -> &str {
647        &self.route
648    }
649
650    /// Returns the optional semantic request kind.
651    #[must_use]
652    pub fn kind(&self) -> Option<&str> {
653        self.kind.as_deref()
654    }
655
656    /// Starts queue-wait timing instrumentation for `queue`.
657    ///
658    /// Recording queue events does not finish the request.
659    #[must_use]
660    pub fn queue(&self, queue: impl Into<String>) -> QueueTimer<'_> {
661        QueueTimer {
662            tailtriage: self.tailtriage,
663            enabled: true,
664            request_id: self.request_id.clone(),
665            queue: queue.into(),
666            depth_at_start: None,
667        }
668    }
669
670    /// Starts stage timing instrumentation for `stage`.
671    ///
672    /// Recording stage events does not finish the request.
673    #[must_use]
674    pub fn stage(&self, stage: impl Into<String>) -> StageTimer<'_> {
675        StageTimer {
676            tailtriage: self.tailtriage,
677            enabled: true,
678            request_id: self.request_id.clone(),
679            stage: stage.into(),
680        }
681    }
682
683    /// Increments in-flight gauge tracking for `gauge` until the returned guard drops.
684    ///
685    /// In-flight instrumentation does not finish the request lifecycle.
686    #[must_use]
687    pub fn inflight(&self, gauge: impl Into<String>) -> InflightGuard<'_> {
688        self.tailtriage.inflight(gauge)
689    }
690}
691
692impl RequestCompletion<'_> {
693    /// Finishes this request with an explicit [`Outcome`].
694    pub fn finish(mut self, outcome: Outcome) {
695        self.finish_internal(outcome);
696    }
697
698    /// Convenience helper for successfully completed requests.
699    pub fn finish_ok(self) {
700        self.finish(Outcome::Ok);
701    }
702
703    /// Finishes this request from `result` and returns `result` unchanged.
704    ///
705    /// # Errors
706    ///
707    /// This method does not create new errors. It returns `result` unchanged,
708    /// including the original `Err(E)` value.
709    pub fn finish_result<T, E>(self, result: Result<T, E>) -> Result<T, E> {
710        let outcome = if result.is_ok() {
711            Outcome::Ok
712        } else {
713            Outcome::Error
714        };
715        self.finish(outcome);
716        result
717    }
718
719    fn finish_internal(&mut self, outcome: Outcome) {
720        if self.finished {
721            debug_assert!(
722                !self.finished,
723                "tailtriage request completion was finished more than once; each request must be finished exactly once"
724            );
725            return;
726        }
727
728        let pending = lock_pending(&self.tailtriage.pending_requests).remove(&self.pending_key);
729        let Some(pending) = pending else {
730            debug_assert!(
731                false,
732                "tailtriage request completion token had no pending request entry"
733            );
734            self.finished = true;
735            return;
736        };
737        self.finished = true;
738
739        self.tailtriage.record_request_event(RequestEvent {
740            request_id: pending.request_id,
741            route: pending.route,
742            kind: pending.kind,
743            started_at_unix_ms: pending.started_at_unix_ms,
744            finished_at_unix_ms: unix_time_ms(),
745            latency_us: duration_to_us(pending.started.elapsed()),
746            outcome: outcome.into_string(),
747        });
748    }
749}
750
751impl OwnedRequestHandle {
752    /// Correlation ID attached to this request.
753    #[must_use]
754    pub fn request_id(&self) -> &str {
755        &self.request_id
756    }
757
758    /// Route/operation name attached to this request.
759    #[must_use]
760    pub fn route(&self) -> &str {
761        &self.route
762    }
763
764    /// Optional kind metadata attached to this request.
765    #[must_use]
766    pub fn kind(&self) -> Option<&str> {
767        self.kind.as_deref()
768    }
769
770    /// Starts queue-wait timing instrumentation for `queue`.
771    ///
772    /// Recording queue events does not finish the request.
773    #[must_use]
774    pub fn queue(&self, queue: impl Into<String>) -> QueueTimer<'_> {
775        QueueTimer {
776            tailtriage: self.tailtriage.as_ref(),
777            enabled: true,
778            request_id: self.request_id.clone(),
779            queue: queue.into(),
780            depth_at_start: None,
781        }
782    }
783
784    /// Starts stage timing instrumentation for `stage`.
785    ///
786    /// Recording stage events does not finish the request.
787    #[must_use]
788    pub fn stage(&self, stage: impl Into<String>) -> StageTimer<'_> {
789        StageTimer {
790            tailtriage: self.tailtriage.as_ref(),
791            enabled: true,
792            request_id: self.request_id.clone(),
793            stage: stage.into(),
794        }
795    }
796
797    /// Creates an in-flight guard for `gauge`.
798    ///
799    /// In-flight instrumentation does not finish the request lifecycle.
800    #[must_use]
801    pub fn inflight(&self, gauge: impl Into<String>) -> InflightGuard<'_> {
802        self.tailtriage.as_ref().inflight(gauge)
803    }
804}
805
806impl OwnedRequestCompletion {
807    /// Finishes the request with explicit outcome.
808    pub fn finish(mut self, outcome: Outcome) {
809        self.finish_internal(outcome);
810    }
811
812    /// Finishes the request as success.
813    pub fn finish_ok(self) {
814        self.finish(Outcome::Ok);
815    }
816
817    /// Maps `result` into request outcome and returns the original result.
818    ///
819    /// # Errors
820    ///
821    /// This method does not create new errors. It returns `result` unchanged,
822    /// including the original `Err(E)` value.
823    pub fn finish_result<T, E>(self, result: Result<T, E>) -> Result<T, E> {
824        let outcome = if result.is_ok() {
825            Outcome::Ok
826        } else {
827            Outcome::Error
828        };
829        self.finish(outcome);
830        result
831    }
832
833    fn finish_internal(&mut self, outcome: Outcome) {
834        if self.finished {
835            debug_assert!(
836                !self.finished,
837                "tailtriage request completion was finished more than once; each request must be finished exactly once"
838            );
839            return;
840        }
841
842        let pending = lock_pending(&self.tailtriage.pending_requests).remove(&self.pending_key);
843        let Some(pending) = pending else {
844            self.finished = true;
845            return;
846        };
847        self.finished = true;
848
849        self.tailtriage.record_request_event(RequestEvent {
850            request_id: pending.request_id,
851            route: pending.route,
852            kind: pending.kind,
853            started_at_unix_ms: pending.started_at_unix_ms,
854            finished_at_unix_ms: unix_time_ms(),
855            latency_us: duration_to_us(pending.started.elapsed()),
856            outcome: outcome.into_string(),
857        });
858    }
859}
860
861impl Drop for RequestCompletion<'_> {
862    fn drop(&mut self) {
863        debug_assert!(
864            self.finished || std::thread::panicking(),
865            "tailtriage request completion dropped without finish(...), finish_ok(), or finish_result(...)"
866        );
867    }
868}
869
870impl Drop for OwnedRequestCompletion {
871    fn drop(&mut self) {
872        debug_assert!(
873            self.finished || std::thread::panicking(),
874            "tailtriage request completion dropped without finish(...), finish_ok(), or finish_result(...)"
875        );
876    }
877}
878
879pub(crate) fn lock_run(run: &Mutex<Run>) -> std::sync::MutexGuard<'_, Run> {
880    match run.lock() {
881        Ok(guard) => guard,
882        Err(poisoned) => poisoned.into_inner(),
883    }
884}
885
886pub(crate) fn lock_map(
887    map: &Mutex<HashMap<String, u64>>,
888) -> std::sync::MutexGuard<'_, HashMap<String, u64>> {
889    match map.lock() {
890        Ok(guard) => guard,
891        Err(poisoned) => poisoned.into_inner(),
892    }
893}
894
895fn lock_pending(
896    map: &Mutex<HashMap<u64, PendingRequest>>,
897) -> std::sync::MutexGuard<'_, HashMap<u64, PendingRequest>> {
898    match map.lock() {
899        Ok(guard) => guard,
900        Err(poisoned) => poisoned.into_inner(),
901    }
902}
903
904pub(crate) fn duration_to_us(duration: Duration) -> u64 {
905    duration.as_micros().try_into().unwrap_or(u64::MAX)
906}
907
908fn generate_run_id() -> String {
909    format!("run-{}", uuid::Uuid::new_v4())
910}
911
912fn lookup_host_name() -> Option<String> {
913    let os_host = hostname::get().ok()?;
914    normalize_host_name(os_host)
915}
916
917fn normalize_host_name(host: OsString) -> Option<String> {
918    let host = host.into_string().ok()?;
919    let trimmed = host.trim();
920    if trimmed.is_empty() {
921        return None;
922    }
923    Some(trimmed.to_owned())
924}
925
926fn generate_request_id(route: &str) -> String {
927    let route_prefix = route
928        .chars()
929        .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '_' })
930        .collect::<String>();
931    let sequence = REQUEST_SEQUENCE.fetch_add(1, Ordering::Relaxed);
932    format!("{route_prefix}-{}-{sequence}", unix_time_ms())
933}
934
935static REQUEST_SEQUENCE: AtomicU64 = AtomicU64::new(0);
936static PENDING_SEQUENCE: AtomicU64 = AtomicU64::new(0);
937
938#[cfg(test)]
939mod tests {
940    use super::normalize_host_name;
941    use std::ffi::OsString;
942
943    #[test]
944    fn normalize_host_name_rejects_blank_values() {
945        assert_eq!(normalize_host_name(OsString::from("")), None);
946        assert_eq!(normalize_host_name(OsString::from("   ")), None);
947    }
948
949    #[test]
950    fn normalize_host_name_trims_non_blank_values() {
951        assert_eq!(
952            normalize_host_name(OsString::from(" checkout-host \n")),
953            Some("checkout-host".to_owned())
954        );
955    }
956}