Skip to main content

tailtriage_core/
collector.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Arc;
4use std::sync::Mutex;
5use std::time::{Duration, Instant};
6
7use crate::config::Config;
8use crate::InflightGuard;
9use crate::RunSink;
10use crate::{
11    unix_time_ms, BuildError, InFlightSnapshot, Outcome, QueueEvent, QueueTimer, RequestEvent,
12    RequestOptions, Run, RunMetadata, RuntimeSnapshot, SinkError, StageEvent, StageTimer,
13    UnfinishedRequestSample,
14};
15
16/// Per-run collector that records request events and writes the final artifact.
17pub struct Tailtriage {
18    pub(crate) run: Mutex<Run>,
19    pub(crate) inflight_counts: Mutex<HashMap<String, u64>>,
20    pending_requests: Mutex<HashMap<u64, PendingRequest>>,
21    pub(crate) sink: Arc<dyn RunSink + Send + Sync>,
22    pub(crate) limits: crate::CaptureLimits,
23    pub(crate) strict_lifecycle: bool,
24}
25
26#[derive(Debug, Clone)]
27struct PendingRequest {
28    request_id: String,
29    route: String,
30    kind: Option<String>,
31    started_at_unix_ms: u64,
32    started: Instant,
33}
34
35impl std::fmt::Debug for Tailtriage {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("Tailtriage")
38            .field("limits", &self.limits)
39            .field("strict_lifecycle", &self.strict_lifecycle)
40            .finish_non_exhaustive()
41    }
42}
43
44/// Split request lifecycle start result.
45#[must_use = "request completion must be finished explicitly"]
46#[derive(Debug)]
47pub struct StartedRequest<'a> {
48    /// Instrumentation handle for queue/stage/inflight timing.
49    pub handle: RequestHandle<'a>,
50    /// Single-owner completion token for explicit request finish.
51    pub completion: RequestCompletion<'a>,
52}
53
54/// Split request lifecycle start result backed by `Arc<Tailtriage>`.
55#[must_use = "request completion must be finished explicitly"]
56#[derive(Debug)]
57pub struct OwnedStartedRequest {
58    /// Instrumentation handle for queue/stage/inflight timing.
59    pub handle: OwnedRequestHandle,
60    /// Single-owner completion token for explicit request finish.
61    pub completion: OwnedRequestCompletion,
62}
63
64/// Instrumentation-facing request handle.
65#[derive(Debug, Clone)]
66pub struct RequestHandle<'a> {
67    tailtriage: &'a Tailtriage,
68    request_id: String,
69    route: String,
70    kind: Option<String>,
71}
72
73/// Instrumentation-facing request handle backed by `Arc<Tailtriage>`.
74#[derive(Debug, Clone)]
75pub struct OwnedRequestHandle {
76    tailtriage: Arc<Tailtriage>,
77    request_id: String,
78    route: String,
79    kind: Option<String>,
80}
81
82/// Completion-facing request token.
83#[must_use = "request completion tokens must be finished explicitly"]
84#[derive(Debug)]
85pub struct RequestCompletion<'a> {
86    tailtriage: &'a Tailtriage,
87    pending_key: u64,
88    finished: bool,
89}
90
91/// Completion-facing request token backed by `Arc<Tailtriage>`.
92#[must_use = "request completion tokens must be finished explicitly"]
93#[derive(Debug)]
94pub struct OwnedRequestCompletion {
95    tailtriage: Arc<Tailtriage>,
96    pending_key: u64,
97    finished: bool,
98}
99
100impl Tailtriage {
101    /// Creates a builder-based setup path for one service run.
102    #[must_use]
103    pub fn builder(service_name: impl Into<String>) -> crate::TailtriageBuilder {
104        crate::TailtriageBuilder::new(service_name)
105    }
106
107    pub(crate) fn from_config(config: Config) -> Result<Self, BuildError> {
108        if config.service_name.trim().is_empty() {
109            return Err(BuildError::EmptyServiceName);
110        }
111
112        let now = unix_time_ms();
113        let run = Run::new(RunMetadata {
114            run_id: config.run_id.unwrap_or_else(generate_run_id),
115            service_name: config.service_name,
116            service_version: config.service_version,
117            started_at_unix_ms: now,
118            finished_at_unix_ms: now,
119            mode: config.mode,
120            host: None,
121            pid: Some(std::process::id()),
122            lifecycle_warnings: Vec::new(),
123            unfinished_requests: crate::UnfinishedRequests::default(),
124        });
125
126        Ok(Self {
127            run: Mutex::new(run),
128            inflight_counts: Mutex::new(HashMap::new()),
129            pending_requests: Mutex::new(HashMap::new()),
130            sink: config.sink,
131            limits: config.capture_limits,
132            strict_lifecycle: config.strict_lifecycle,
133        })
134    }
135
136    /// Starts a request with autogenerated correlation for `route`.
137    pub fn begin_request(&self, route: impl Into<String>) -> StartedRequest<'_> {
138        self.begin_request_with(route, RequestOptions::new())
139    }
140
141    /// Starts a request with optional caller-provided request options.
142    pub fn begin_request_with(
143        &self,
144        route: impl Into<String>,
145        options: RequestOptions,
146    ) -> StartedRequest<'_> {
147        let (request_id, route, kind, pending_key) = self.start_request(route.into(), options);
148
149        StartedRequest {
150            handle: RequestHandle {
151                tailtriage: self,
152                request_id: request_id.clone(),
153                route,
154                kind,
155            },
156            completion: RequestCompletion {
157                tailtriage: self,
158                pending_key,
159                finished: false,
160            },
161        }
162    }
163
164    /// Starts a request with autogenerated correlation for `route` using `Arc<Tailtriage>`.
165    pub fn begin_request_owned(self: &Arc<Self>, route: impl Into<String>) -> OwnedStartedRequest {
166        self.begin_request_with_owned(route, RequestOptions::new())
167    }
168
169    /// Starts a request with caller-provided options using `Arc<Tailtriage>`.
170    pub fn begin_request_with_owned(
171        self: &Arc<Self>,
172        route: impl Into<String>,
173        options: RequestOptions,
174    ) -> OwnedStartedRequest {
175        let (request_id, route, kind, pending_key) = self.start_request(route.into(), options);
176
177        OwnedStartedRequest {
178            handle: OwnedRequestHandle {
179                tailtriage: Arc::clone(self),
180                request_id: request_id.clone(),
181                route,
182                kind,
183            },
184            completion: OwnedRequestCompletion {
185                tailtriage: Arc::clone(self),
186                pending_key,
187                finished: false,
188            },
189        }
190    }
191
192    /// Returns a clone of the current in-memory run state.
193    #[must_use]
194    pub fn snapshot(&self) -> Run {
195        lock_run(&self.run).clone()
196    }
197
198    /// Writes the current run artifact and finishes the run lifecycle.
199    ///
200    /// # Errors
201    ///
202    /// Returns [`SinkError`] if serialization or writing fails.
203    pub fn shutdown(&self) -> Result<(), SinkError> {
204        let mut pending_samples = Vec::new();
205        let pending_count = {
206            let pending = lock_pending(&self.pending_requests);
207            pending_samples.extend(pending.values().take(5).map(|req| UnfinishedRequestSample {
208                request_id: req.request_id.clone(),
209                route: req.route.clone(),
210            }));
211            pending.len()
212        };
213
214        let mut guard = lock_run(&self.run);
215        guard.metadata.finished_at_unix_ms = unix_time_ms();
216        if pending_count > 0 {
217            guard.metadata.lifecycle_warnings.push(format!(
218                "{pending_count} unfinished request(s) remained at shutdown; run includes no fabricated completions"
219            ));
220            guard.metadata.unfinished_requests.count = pending_count as u64;
221            guard.metadata.unfinished_requests.sample = pending_samples;
222            if self.strict_lifecycle {
223                return Err(SinkError::Lifecycle {
224                    unfinished_count: pending_count,
225                });
226            }
227        }
228
229        self.sink.write(&guard)
230    }
231
232    /// Creates an in-flight guard for `gauge`.
233    #[must_use]
234    pub(crate) fn inflight(&self, gauge: impl Into<String>) -> InflightGuard<'_> {
235        let gauge = gauge.into();
236        let count = {
237            let mut counts = lock_map(&self.inflight_counts);
238            let entry = counts.entry(gauge.clone()).or_insert(0);
239            *entry += 1;
240            *entry
241        };
242
243        self.record_inflight_snapshot(InFlightSnapshot {
244            gauge: gauge.clone(),
245            at_unix_ms: unix_time_ms(),
246            count,
247        });
248
249        InflightGuard {
250            tailtriage: self,
251            gauge,
252        }
253    }
254
255    /// Records one runtime metrics sample captured by an integration crate.
256    pub fn record_runtime_snapshot(&self, snapshot: RuntimeSnapshot) {
257        let mut run = lock_run(&self.run);
258        if run.runtime_snapshots.len() >= self.limits.max_runtime_snapshots {
259            run.truncation.dropped_runtime_snapshots =
260                run.truncation.dropped_runtime_snapshots.saturating_add(1);
261        } else {
262            run.runtime_snapshots.push(snapshot);
263        }
264    }
265
266    pub(crate) fn record_stage_event(&self, event: StageEvent) {
267        let mut run = lock_run(&self.run);
268        if run.stages.len() >= self.limits.max_stages {
269            run.truncation.dropped_stages = run.truncation.dropped_stages.saturating_add(1);
270        } else {
271            run.stages.push(event);
272        }
273    }
274
275    pub(crate) fn record_queue_event(&self, event: QueueEvent) {
276        let mut run = lock_run(&self.run);
277        if run.queues.len() >= self.limits.max_queues {
278            run.truncation.dropped_queues = run.truncation.dropped_queues.saturating_add(1);
279        } else {
280            run.queues.push(event);
281        }
282    }
283
284    pub(crate) fn record_inflight_snapshot(&self, snapshot: InFlightSnapshot) {
285        let mut run = lock_run(&self.run);
286        if run.inflight.len() >= self.limits.max_inflight_snapshots {
287            run.truncation.dropped_inflight_snapshots =
288                run.truncation.dropped_inflight_snapshots.saturating_add(1);
289        } else {
290            run.inflight.push(snapshot);
291        }
292    }
293
294    fn record_request_event(&self, event: RequestEvent) {
295        let mut run = lock_run(&self.run);
296        if run.requests.len() >= self.limits.max_requests {
297            run.truncation.dropped_requests = run.truncation.dropped_requests.saturating_add(1);
298        } else {
299            run.requests.push(event);
300        }
301    }
302
303    fn start_request(
304        &self,
305        route: String,
306        options: RequestOptions,
307    ) -> (String, String, Option<String>, u64) {
308        let request_id = options
309            .request_id
310            .unwrap_or_else(|| generate_request_id(&route));
311        let pending_key = PENDING_SEQUENCE.fetch_add(1, Ordering::Relaxed);
312        let kind = options.kind;
313        let pending = PendingRequest {
314            request_id: request_id.clone(),
315            route: route.clone(),
316            kind: kind.clone(),
317            started_at_unix_ms: unix_time_ms(),
318            started: Instant::now(),
319        };
320        lock_pending(&self.pending_requests).insert(pending_key, pending);
321
322        (request_id, route, kind, pending_key)
323    }
324}
325
326impl RequestHandle<'_> {
327    /// Returns the stable request ID for this request lifecycle.
328    #[must_use]
329    pub fn request_id(&self) -> &str {
330        &self.request_id
331    }
332
333    /// Returns the route or operation name associated with this request.
334    #[must_use]
335    pub fn route(&self) -> &str {
336        &self.route
337    }
338
339    /// Returns the optional semantic request kind.
340    #[must_use]
341    pub fn kind(&self) -> Option<&str> {
342        self.kind.as_deref()
343    }
344
345    /// Starts queue-wait timing instrumentation for `queue`.
346    #[must_use]
347    pub fn queue(&self, queue: impl Into<String>) -> QueueTimer<'_> {
348        QueueTimer {
349            tailtriage: self.tailtriage,
350            request_id: self.request_id.clone(),
351            queue: queue.into(),
352            depth_at_start: None,
353        }
354    }
355
356    /// Starts stage timing instrumentation for `stage`.
357    #[must_use]
358    pub fn stage(&self, stage: impl Into<String>) -> StageTimer<'_> {
359        StageTimer {
360            tailtriage: self.tailtriage,
361            request_id: self.request_id.clone(),
362            stage: stage.into(),
363        }
364    }
365
366    /// Increments in-flight gauge tracking for `gauge` until the returned guard drops.
367    #[must_use]
368    pub fn inflight(&self, gauge: impl Into<String>) -> InflightGuard<'_> {
369        self.tailtriage.inflight(gauge)
370    }
371}
372
373impl RequestCompletion<'_> {
374    /// Finishes this request with an explicit [`Outcome`].
375    pub fn finish(mut self, outcome: Outcome) {
376        self.finish_internal(outcome);
377    }
378
379    /// Convenience helper for successfully completed requests.
380    pub fn finish_ok(self) {
381        self.finish(Outcome::Ok);
382    }
383
384    /// Finishes this request from `result` and returns `result` unchanged.
385    ///
386    /// # Errors
387    ///
388    /// This method does not create new errors. It returns `result` unchanged,
389    /// including the original `Err(E)` value.
390    pub fn finish_result<T, E>(self, result: Result<T, E>) -> Result<T, E> {
391        let outcome = if result.is_ok() {
392            Outcome::Ok
393        } else {
394            Outcome::Error
395        };
396        self.finish(outcome);
397        result
398    }
399
400    fn finish_internal(&mut self, outcome: Outcome) {
401        if self.finished {
402            debug_assert!(
403                !self.finished,
404                "tailtriage request completion was finished more than once; each request must be finished exactly once"
405            );
406            return;
407        }
408
409        let pending = lock_pending(&self.tailtriage.pending_requests).remove(&self.pending_key);
410        let Some(pending) = pending else {
411            debug_assert!(
412                false,
413                "tailtriage request completion token had no pending request entry"
414            );
415            self.finished = true;
416            return;
417        };
418        self.finished = true;
419
420        self.tailtriage.record_request_event(RequestEvent {
421            request_id: pending.request_id,
422            route: pending.route,
423            kind: pending.kind,
424            started_at_unix_ms: pending.started_at_unix_ms,
425            finished_at_unix_ms: unix_time_ms(),
426            latency_us: duration_to_us(pending.started.elapsed()),
427            outcome: outcome.into_string(),
428        });
429    }
430}
431
432impl OwnedRequestHandle {
433    /// Correlation ID attached to this request.
434    #[must_use]
435    pub fn request_id(&self) -> &str {
436        &self.request_id
437    }
438
439    /// Route/operation name attached to this request.
440    #[must_use]
441    pub fn route(&self) -> &str {
442        &self.route
443    }
444
445    /// Optional kind metadata attached to this request.
446    #[must_use]
447    pub fn kind(&self) -> Option<&str> {
448        self.kind.as_deref()
449    }
450
451    /// Starts queue-wait timing instrumentation for `queue`.
452    #[must_use]
453    pub fn queue(&self, queue: impl Into<String>) -> QueueTimer<'_> {
454        QueueTimer {
455            tailtriage: self.tailtriage.as_ref(),
456            request_id: self.request_id.clone(),
457            queue: queue.into(),
458            depth_at_start: None,
459        }
460    }
461
462    /// Starts stage timing instrumentation for `stage`.
463    #[must_use]
464    pub fn stage(&self, stage: impl Into<String>) -> StageTimer<'_> {
465        StageTimer {
466            tailtriage: self.tailtriage.as_ref(),
467            request_id: self.request_id.clone(),
468            stage: stage.into(),
469        }
470    }
471
472    /// Creates an in-flight guard for `gauge`.
473    #[must_use]
474    pub fn inflight(&self, gauge: impl Into<String>) -> InflightGuard<'_> {
475        self.tailtriage.as_ref().inflight(gauge)
476    }
477}
478
479impl OwnedRequestCompletion {
480    /// Finishes the request with explicit outcome.
481    pub fn finish(mut self, outcome: Outcome) {
482        self.finish_internal(outcome);
483    }
484
485    /// Finishes the request as success.
486    pub fn finish_ok(self) {
487        self.finish(Outcome::Ok);
488    }
489
490    /// Maps `result` into request outcome and returns the original result.
491    ///
492    /// # Errors
493    ///
494    /// This method does not create new errors. It returns `result` unchanged,
495    /// including the original `Err(E)` value.
496    pub fn finish_result<T, E>(self, result: Result<T, E>) -> Result<T, E> {
497        let outcome = if result.is_ok() {
498            Outcome::Ok
499        } else {
500            Outcome::Error
501        };
502        self.finish(outcome);
503        result
504    }
505
506    fn finish_internal(&mut self, outcome: Outcome) {
507        if self.finished {
508            debug_assert!(
509                !self.finished,
510                "tailtriage request completion was finished more than once; each request must be finished exactly once"
511            );
512            return;
513        }
514
515        let pending = lock_pending(&self.tailtriage.pending_requests).remove(&self.pending_key);
516        let Some(pending) = pending else {
517            self.finished = true;
518            return;
519        };
520        self.finished = true;
521
522        self.tailtriage.record_request_event(RequestEvent {
523            request_id: pending.request_id,
524            route: pending.route,
525            kind: pending.kind,
526            started_at_unix_ms: pending.started_at_unix_ms,
527            finished_at_unix_ms: unix_time_ms(),
528            latency_us: duration_to_us(pending.started.elapsed()),
529            outcome: outcome.into_string(),
530        });
531    }
532}
533
534impl Drop for RequestCompletion<'_> {
535    fn drop(&mut self) {
536        debug_assert!(
537            self.finished || std::thread::panicking(),
538            "tailtriage request completion dropped without finish(...), finish_ok(), or finish_result(...)"
539        );
540    }
541}
542
543impl Drop for OwnedRequestCompletion {
544    fn drop(&mut self) {
545        debug_assert!(
546            self.finished || std::thread::panicking(),
547            "tailtriage request completion dropped without finish(...), finish_ok(), or finish_result(...)"
548        );
549    }
550}
551
552pub(crate) fn lock_run(run: &Mutex<Run>) -> std::sync::MutexGuard<'_, Run> {
553    match run.lock() {
554        Ok(guard) => guard,
555        Err(poisoned) => poisoned.into_inner(),
556    }
557}
558
559pub(crate) fn lock_map(
560    map: &Mutex<HashMap<String, u64>>,
561) -> std::sync::MutexGuard<'_, HashMap<String, u64>> {
562    match map.lock() {
563        Ok(guard) => guard,
564        Err(poisoned) => poisoned.into_inner(),
565    }
566}
567
568fn lock_pending(
569    map: &Mutex<HashMap<u64, PendingRequest>>,
570) -> std::sync::MutexGuard<'_, HashMap<u64, PendingRequest>> {
571    match map.lock() {
572        Ok(guard) => guard,
573        Err(poisoned) => poisoned.into_inner(),
574    }
575}
576
577pub(crate) fn duration_to_us(duration: Duration) -> u64 {
578    duration.as_micros().try_into().unwrap_or(u64::MAX)
579}
580
581fn generate_run_id() -> String {
582    format!("run-{}", unix_time_ms())
583}
584
585fn generate_request_id(route: &str) -> String {
586    let route_prefix = route
587        .chars()
588        .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '_' })
589        .collect::<String>();
590    let sequence = REQUEST_SEQUENCE.fetch_add(1, Ordering::Relaxed);
591    format!("{route_prefix}-{}-{sequence}", unix_time_ms())
592}
593
594static REQUEST_SEQUENCE: AtomicU64 = AtomicU64::new(0);
595static PENDING_SEQUENCE: AtomicU64 = AtomicU64::new(0);