Skip to main content

tailtriage_cli/
analyze.rs

1use std::collections::{BTreeMap, HashMap};
2
3use serde::{Serialize, Serializer};
4use tailtriage_core::{InFlightSnapshot, Run, RuntimeSnapshot};
5
6/// Evidence-ranked diagnosis categories produced by heuristic triage.
7///
8/// These categories are leads for investigation and are not proof of root cause.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum DiagnosisKind {
11    /// Queue wait dominates request latency, suggesting application-level queue pressure.
12    ApplicationQueueSaturation,
13    /// Blocking pool backlog suggests pressure in `spawn_blocking`-backed work.
14    BlockingPoolPressure,
15    /// Runtime scheduler queueing suggests potential executor pressure.
16    ExecutorPressureSuspected,
17    /// One stage dominates aggregate latency, suggesting downstream slowdown.
18    DownstreamStageDominates,
19    /// Captured signals are too sparse to rank stronger suspects.
20    InsufficientEvidence,
21}
22
23impl DiagnosisKind {
24    /// Returns the stable machine-readable diagnosis kind label.
25    #[must_use]
26    pub const fn as_str(&self) -> &'static str {
27        match self {
28            Self::ApplicationQueueSaturation => "application_queue_saturation",
29            Self::BlockingPoolPressure => "blocking_pool_pressure",
30            Self::ExecutorPressureSuspected => "executor_pressure_suspected",
31            Self::DownstreamStageDominates => "downstream_stage_dominates",
32            Self::InsufficientEvidence => "insufficient_evidence",
33        }
34    }
35}
36
37impl Serialize for DiagnosisKind {
38    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
39    where
40        S: Serializer,
41    {
42        serializer.serialize_str(self.as_str())
43    }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
47#[serde(rename_all = "snake_case")]
48/// Confidence bucket derived from suspect score thresholds.
49///
50/// This is score-derived ranking confidence, not causal certainty.
51pub enum Confidence {
52    /// Weak signal quality relative to stronger suspects in the same report.
53    Low,
54    /// Moderate signal quality for triage follow-up.
55    Medium,
56    /// Strong signal quality for triage follow-up.
57    High,
58}
59
60impl Confidence {
61    fn from_score(score: u8) -> Self {
62        if score >= 85 {
63            Self::High
64        } else if score >= 65 {
65            Self::Medium
66        } else {
67            Self::Low
68        }
69    }
70}
71
72/// Evidence-ranked suspect produced by heuristic analysis.
73///
74/// Suspects are triage leads and should be validated with follow-up checks.
75#[derive(Debug, Clone, PartialEq, Serialize)]
76pub struct Suspect {
77    /// Ranked suspect category.
78    pub kind: DiagnosisKind,
79    /// Relative ranking score in range `0..=100` (higher means stronger evidence).
80    pub score: u8,
81    /// Score-derived confidence bucket for triage prioritization.
82    pub confidence: Confidence,
83    /// Supporting evidence strings used to justify this suspect ranking.
84    pub evidence: Vec<String>,
85    /// Recommended next checks to validate or falsify this suspect.
86    pub next_checks: Vec<String>,
87}
88
89impl Suspect {
90    fn new(
91        kind: DiagnosisKind,
92        score: u8,
93        evidence: Vec<String>,
94        next_checks: Vec<String>,
95    ) -> Self {
96        Self {
97            kind,
98            score,
99            confidence: Confidence::from_score(score),
100            evidence,
101            next_checks,
102        }
103    }
104}
105
106/// Summary of one dominant in-flight gauge trend over the run window.
107#[derive(Debug, Clone, PartialEq, Serialize)]
108pub struct InflightTrend {
109    /// Gauge name chosen as the dominant trend candidate.
110    pub gauge: String,
111    /// Number of snapshots seen for this gauge.
112    pub sample_count: usize,
113    /// Peak in-flight count observed for this gauge.
114    pub peak_count: u64,
115    /// p95 in-flight count for this gauge.
116    pub p95_count: u64,
117    /// Net growth (`last - first`) across the sampled run window.
118    pub growth_delta: i64,
119    /// Growth rate in milli-counts/sec, if timestamps permit calculation.
120    pub growth_per_sec_milli: Option<i64>,
121}
122
123/// Rule-based triage report for one run artifact.
124///
125/// The report ranks evidence-backed suspects and suggests next checks.
126/// It does not prove root cause and should be used as triage guidance.
127#[derive(Debug, Clone, PartialEq, Serialize)]
128pub struct Report {
129    /// Number of request events considered in analysis.
130    pub request_count: usize,
131    /// p50 request latency in microseconds.
132    pub p50_latency_us: Option<u64>,
133    /// p95 request latency in microseconds.
134    pub p95_latency_us: Option<u64>,
135    /// p99 request latency in microseconds.
136    pub p99_latency_us: Option<u64>,
137    /// p95 queue-time share per request in permille (`0..=1000`).
138    pub p95_queue_share_permille: Option<u64>,
139    /// p95 non-queue service-time share per request in permille (`0..=1000`).
140    pub p95_service_share_permille: Option<u64>,
141    /// Dominant in-flight trend signal, if enough samples exist.
142    pub inflight_trend: Option<InflightTrend>,
143    /// Non-fatal analysis warnings (for example, capture truncation notices).
144    pub warnings: Vec<String>,
145    /// Highest-ranked suspect from this run.
146    pub primary_suspect: Suspect,
147    /// Lower-ranked suspects retained for follow-up triage.
148    pub secondary_suspects: Vec<Suspect>,
149}
150
151/// Analyzes one run artifact with rule-based heuristics and returns a triage report.
152///
153/// The analysis ranks evidence-backed suspects and next checks; it does not
154/// claim causal certainty or proven root cause.
155///
156/// # Examples
157///
158/// ```
159/// use tailtriage_cli::analyze::analyze_run;
160/// use tailtriage_core::{
161///     CaptureMode, Run, RunMetadata, UnfinishedRequests, SCHEMA_VERSION,
162/// };
163///
164/// let run = Run {
165///     schema_version: SCHEMA_VERSION,
166///     metadata: RunMetadata {
167///         run_id: "run-1".to_string(),
168///         service_name: "svc".to_string(),
169///         service_version: None,
170///         started_at_unix_ms: 1,
171///         finished_at_unix_ms: 2,
172///         mode: CaptureMode::Light,
173///         host: None,
174///         pid: None,
175///         lifecycle_warnings: Vec::new(),
176///         unfinished_requests: UnfinishedRequests::default(),
177///     },
178///     requests: vec![],
179///     stages: vec![],
180///     queues: vec![],
181///     inflight: vec![],
182///     runtime_snapshots: vec![],
183///     truncation: Default::default(),
184/// };
185///
186/// let report = analyze_run(&run);
187/// assert_eq!(report.request_count, 0);
188/// ```
189#[must_use]
190pub fn analyze_run(run: &Run) -> Report {
191    let request_latencies = run
192        .requests
193        .iter()
194        .map(|request| request.latency_us)
195        .collect::<Vec<_>>();
196
197    let p50_latency_us = percentile(&request_latencies, 50, 100);
198    let p95_latency_us = percentile(&request_latencies, 95, 100);
199    let p99_latency_us = percentile(&request_latencies, 99, 100);
200    let (queue_shares, service_shares) = request_time_shares(run);
201    let p95_queue_share_permille = percentile(&queue_shares, 95, 100);
202    let p95_service_share_permille = percentile(&service_shares, 95, 100);
203    let inflight_trend = dominant_inflight_trend(&run.inflight);
204
205    let mut suspects = Vec::new();
206
207    if let Some(queue_suspect) = queue_saturation_suspect(run, inflight_trend.as_ref()) {
208        suspects.push(queue_suspect);
209    }
210
211    if let Some(blocking_suspect) = blocking_pressure_suspect(run) {
212        suspects.push(blocking_suspect);
213    }
214
215    if let Some(executor_suspect) = executor_pressure_suspect(run, inflight_trend.as_ref()) {
216        suspects.push(executor_suspect);
217    }
218
219    if let Some(stage_suspect) = downstream_stage_suspect(run) {
220        suspects.push(stage_suspect);
221    }
222
223    if suspects.is_empty() {
224        suspects.push(Suspect::new(
225            DiagnosisKind::InsufficientEvidence,
226            50,
227            vec![
228                "Not enough queue, stage, or runtime signals to rank a stronger suspect."
229                    .to_string(),
230            ],
231            vec![
232                "Wrap critical awaits with queue(...).await_on(...), and use stage(...).await_on(...) for Result-returning work or stage(...).await_value(...) for infallible work.".to_string(),
233                "Enable RuntimeSampler during the run to capture runtime pressure signals."
234                    .to_string(),
235            ],
236        ));
237    }
238
239    suspects.sort_by(|left, right| right.score.cmp(&left.score));
240
241    let mut ranked = suspects.into_iter();
242    let primary_suspect = ranked.next().unwrap_or_else(|| {
243        Suspect::new(
244            DiagnosisKind::InsufficientEvidence,
245            50,
246            vec!["No diagnosis signals were captured for this run.".to_string()],
247            vec!["Verify that request, queue, or stage instrumentation is enabled.".to_string()],
248        )
249    });
250
251    Report {
252        request_count: run.requests.len(),
253        p50_latency_us,
254        p95_latency_us,
255        p99_latency_us,
256        p95_queue_share_permille,
257        p95_service_share_permille,
258        inflight_trend,
259        warnings: truncation_warnings(run),
260        primary_suspect,
261        secondary_suspects: ranked.collect(),
262    }
263}
264
265fn truncation_warnings(run: &Run) -> Vec<String> {
266    let mut warnings = Vec::new();
267    if run.truncation.dropped_requests > 0 {
268        warnings.push(format!(
269            "Capture truncated requests: dropped {} request events after reaching the configured max_requests limit.",
270            run.truncation.dropped_requests
271        ));
272    }
273    if run.truncation.dropped_stages > 0 {
274        warnings.push(format!(
275            "Capture truncated stages: dropped {} stage events after reaching the configured max_stages limit.",
276            run.truncation.dropped_stages
277        ));
278    }
279    if run.truncation.dropped_queues > 0 {
280        warnings.push(format!(
281            "Capture truncated queues: dropped {} queue events after reaching the configured max_queues limit.",
282            run.truncation.dropped_queues
283        ));
284    }
285    if run.truncation.dropped_inflight_snapshots > 0 {
286        warnings.push(format!(
287            "Capture truncated in-flight snapshots: dropped {} entries after reaching max_inflight_snapshots.",
288            run.truncation.dropped_inflight_snapshots
289        ));
290    }
291    if run.truncation.dropped_runtime_snapshots > 0 {
292        warnings.push(format!(
293            "Capture truncated runtime snapshots: dropped {} entries after reaching max_runtime_snapshots.",
294            run.truncation.dropped_runtime_snapshots
295        ));
296    }
297    warnings
298}
299
300fn queue_saturation_suspect(run: &Run, inflight_trend: Option<&InflightTrend>) -> Option<Suspect> {
301    let (queue_shares, _) = request_time_shares(run);
302    let p95_queue_share_permille = percentile(&queue_shares, 95, 100)?;
303    let max_depth = run
304        .queues
305        .iter()
306        .filter_map(|queue| queue.depth_at_start)
307        .max();
308
309    if p95_queue_share_permille < 300 {
310        return None;
311    }
312
313    let whole_percent = p95_queue_share_permille / 10;
314    let tenth_percent = p95_queue_share_permille % 10;
315    let mut evidence = vec![format!(
316        "Queue wait at p95 consumes {whole_percent}.{tenth_percent}% of request time."
317    )];
318
319    if let Some(depth) = max_depth {
320        evidence.push(format!("Observed queue depth sample up to {depth}."));
321    }
322    if let Some(trend) = inflight_trend.filter(|trend| trend.growth_delta > 0) {
323        evidence.push(format!(
324            "In-flight gauge '{}' grew by {} over the run window (p95={}, peak={}).",
325            trend.gauge, trend.growth_delta, trend.p95_count, trend.peak_count
326        ));
327    }
328
329    Some(Suspect::new(
330        DiagnosisKind::ApplicationQueueSaturation,
331        90,
332        evidence,
333        vec![
334            "Inspect queue admission limits and producer burst patterns.".to_string(),
335            "Compare queue wait distribution before and after increasing worker parallelism."
336                .to_string(),
337        ],
338    ))
339}
340
341fn blocking_pressure_suspect(run: &Run) -> Option<Suspect> {
342    let blocking_depths = runtime_metric_series(&run.runtime_snapshots, |snapshot| {
343        snapshot.blocking_queue_depth
344    });
345    let p95_blocking_depth = percentile(&blocking_depths, 95, 100)?;
346
347    if p95_blocking_depth == 0 {
348        return None;
349    }
350
351    Some(Suspect::new(
352        DiagnosisKind::BlockingPoolPressure,
353        80,
354        vec![format!(
355            "Blocking queue depth p95 is {p95_blocking_depth}, indicating sustained spawn_blocking backlog."
356        )],
357        vec![
358            "Audit blocking sections and move avoidable synchronous work out of hot paths."
359                .to_string(),
360            "Inspect spawn_blocking callsites for long-running CPU or I/O work.".to_string(),
361        ],
362    ))
363}
364
365fn executor_pressure_suspect(run: &Run, inflight_trend: Option<&InflightTrend>) -> Option<Suspect> {
366    let global_queue_depths = runtime_metric_series(&run.runtime_snapshots, |snapshot| {
367        snapshot.global_queue_depth
368    });
369    let p95_global_depth = percentile(&global_queue_depths, 95, 100)?;
370
371    if p95_global_depth == 0 {
372        return None;
373    }
374
375    let mut evidence = vec![format!(
376        "Runtime global queue depth p95 is {p95_global_depth}, suggesting scheduler contention."
377    )];
378    let positive_growth = inflight_trend.is_some_and(|trend| trend.growth_delta > 0);
379    if let Some(trend) = inflight_trend.filter(|trend| trend.growth_delta > 0) {
380        evidence.push(format!(
381            "In-flight gauge '{}' growth is positive (delta={}, peak={}), consistent with accumulating executor pressure.",
382            trend.gauge, trend.growth_delta, trend.peak_count
383        ));
384    }
385
386    let depth_bonus = if p95_global_depth >= 300 {
387        20
388    } else if p95_global_depth >= 200 {
389        12
390    } else if p95_global_depth >= 100 {
391        6
392    } else {
393        0
394    };
395    let trend_bonus = if positive_growth { 5 } else { 0 };
396    let score = (65 + depth_bonus + trend_bonus).min(90);
397
398    Some(Suspect::new(
399        DiagnosisKind::ExecutorPressureSuspected,
400        score,
401        evidence,
402        vec![
403            "Check for long polls without yielding and uneven task fan-out.".to_string(),
404            "Compare with per-stage timings to isolate overloaded async stages.".to_string(),
405        ],
406    ))
407}
408
409fn downstream_stage_suspect(run: &Run) -> Option<Suspect> {
410    let mut stage_totals: BTreeMap<&str, u64> = BTreeMap::new();
411    for stage in &run.stages {
412        *stage_totals.entry(stage.stage.as_str()).or_default() = stage_totals
413            .get(stage.stage.as_str())
414            .copied()
415            .unwrap_or_default()
416            .saturating_add(stage.latency_us);
417    }
418
419    let (dominant_stage, total_latency) = stage_totals
420        .iter()
421        .max_by(|left, right| left.1.cmp(right.1).then_with(|| right.0.cmp(left.0)))
422        .map(|(stage, latency)| (*stage, *latency))?;
423
424    let stage_count = run
425        .stages
426        .iter()
427        .filter(|stage| stage.stage == dominant_stage)
428        .count();
429    let stage_latencies = run
430        .stages
431        .iter()
432        .filter(|stage| stage.stage == dominant_stage)
433        .map(|stage| stage.latency_us)
434        .collect::<Vec<_>>();
435    let stage_p95 = percentile(&stage_latencies, 95, 100)?;
436    let total_request_latency = run
437        .requests
438        .iter()
439        .map(|request| request.latency_us)
440        .fold(0_u64, u64::saturating_add);
441    let stage_share_permille = if total_request_latency == 0 {
442        0
443    } else {
444        total_latency.saturating_mul(1_000) / total_request_latency
445    };
446    let share_bonus = (stage_share_permille / 40).min(25) as u8;
447    let score = (55 + share_bonus).min(79);
448
449    if stage_count < 3 {
450        return None;
451    }
452
453    Some(Suspect::new(
454        DiagnosisKind::DownstreamStageDominates,
455        score,
456        vec![
457            format!(
458                "Stage '{dominant_stage}' has p95 latency {stage_p95} us across {stage_count} samples."
459            ),
460            format!("Stage '{dominant_stage}' cumulative latency is {total_latency} us."),
461            format!(
462                "Stage '{dominant_stage}' contributes {stage_share_permille} permille of cumulative request latency."
463            ),
464        ],
465        vec![
466            format!("Inspect downstream dependency behind stage '{dominant_stage}'."),
467            "Collect downstream service timings and retry behavior during tail windows.".to_string(),
468            "Review downstream SLO/error budget and align retry budget/backoff with it.".to_string(),
469        ],
470    ))
471}
472
473fn request_time_shares(run: &Run) -> (Vec<u64>, Vec<u64>) {
474    let mut total_queue_wait_by_request = HashMap::<&str, u64>::new();
475    for queue in &run.queues {
476        *total_queue_wait_by_request
477            .entry(queue.request_id.as_str())
478            .or_default() = total_queue_wait_by_request
479            .get(queue.request_id.as_str())
480            .copied()
481            .unwrap_or_default()
482            .saturating_add(queue.wait_us);
483    }
484
485    let mut queue_shares = Vec::new();
486    let mut service_shares = Vec::new();
487
488    for request in &run.requests {
489        if request.latency_us == 0 {
490            continue;
491        }
492
493        let queue_wait = total_queue_wait_by_request
494            .get(request.request_id.as_str())
495            .copied()
496            .unwrap_or_default()
497            .min(request.latency_us);
498        let service_time = request.latency_us.saturating_sub(queue_wait);
499
500        queue_shares.push(queue_wait.saturating_mul(1_000) / request.latency_us);
501        service_shares.push(service_time.saturating_mul(1_000) / request.latency_us);
502    }
503
504    (queue_shares, service_shares)
505}
506
507fn runtime_metric_series(
508    snapshots: &[RuntimeSnapshot],
509    selector: impl Fn(&RuntimeSnapshot) -> Option<u64>,
510) -> Vec<u64> {
511    snapshots.iter().filter_map(selector).collect::<Vec<_>>()
512}
513
514fn dominant_inflight_trend(snapshots: &[InFlightSnapshot]) -> Option<InflightTrend> {
515    let mut by_gauge: BTreeMap<&str, Vec<&InFlightSnapshot>> = BTreeMap::new();
516    for snapshot in snapshots {
517        by_gauge
518            .entry(snapshot.gauge.as_str())
519            .or_default()
520            .push(snapshot);
521    }
522
523    by_gauge
524        .into_iter()
525        .filter_map(|(gauge, samples)| inflight_trend_for_gauge(gauge, samples))
526        .max_by(|left, right| {
527            left.peak_count
528                .cmp(&right.peak_count)
529                .then_with(|| left.p95_count.cmp(&right.p95_count))
530                .then_with(|| left.gauge.cmp(&right.gauge).reverse())
531        })
532}
533
534fn inflight_trend_for_gauge(
535    gauge: &str,
536    mut samples: Vec<&InFlightSnapshot>,
537) -> Option<InflightTrend> {
538    if samples.is_empty() {
539        return None;
540    }
541
542    samples.sort_unstable_by(|left, right| {
543        left.at_unix_ms
544            .cmp(&right.at_unix_ms)
545            .then_with(|| left.count.cmp(&right.count))
546    });
547
548    let counts = samples
549        .iter()
550        .map(|sample| sample.count)
551        .collect::<Vec<_>>();
552    let first = samples.first()?;
553    let last = samples.last()?;
554    let growth_delta = signed_u64_delta(first.count, last.count);
555    let window_ms = last.at_unix_ms.saturating_sub(first.at_unix_ms);
556    let growth_per_sec_milli = if window_ms == 0 {
557        None
558    } else {
559        i64::try_from(window_ms)
560            .ok()
561            .map(|window_ms_i64| growth_delta.saturating_mul(1_000_000) / window_ms_i64)
562    };
563
564    Some(InflightTrend {
565        gauge: gauge.to_owned(),
566        sample_count: counts.len(),
567        peak_count: counts.iter().copied().max().unwrap_or(0),
568        p95_count: percentile(&counts, 95, 100).unwrap_or(0),
569        growth_delta,
570        growth_per_sec_milli,
571    })
572}
573
574fn signed_u64_delta(start: u64, end: u64) -> i64 {
575    if end >= start {
576        i64::try_from(end - start).unwrap_or(i64::MAX)
577    } else {
578        -i64::try_from(start - end).unwrap_or(i64::MAX)
579    }
580}
581
582fn percentile(values: &[u64], numerator: usize, denominator: usize) -> Option<u64> {
583    let sorted = sorted_u64(values);
584    percentile_sorted_u64(&sorted, numerator, denominator)
585}
586
587fn sorted_u64(values: &[u64]) -> Vec<u64> {
588    let mut sorted = values.to_vec();
589    sorted.sort_unstable();
590    sorted
591}
592
593fn percentile_sorted_u64(values: &[u64], numerator: usize, denominator: usize) -> Option<u64> {
594    if values.is_empty() {
595        return None;
596    }
597    if denominator == 0 {
598        return None;
599    }
600
601    let max_index = values.len().saturating_sub(1);
602    let index = max_index
603        .saturating_mul(numerator)
604        .div_ceil(denominator)
605        .min(max_index);
606    values.get(index).copied()
607}
608
609#[must_use]
610/// Renders a compact text triage summary from a [`Report`].
611///
612/// The rendered output is guidance for follow-up checks, not proof of root cause.
613pub fn render_text(report: &Report) -> String {
614    let inflight_line = match &report.inflight_trend {
615        Some(trend) => format!(
616            "inflight_trend gauge={} samples={} peak={} p95={} growth_delta={} growth_per_sec_milli={:?}",
617            trend.gauge,
618            trend.sample_count,
619            trend.peak_count,
620            trend.p95_count,
621            trend.growth_delta,
622            trend.growth_per_sec_milli
623        ),
624        None => "inflight_trend none".to_string(),
625    };
626
627    let mut lines = vec![
628        "tailtriage diagnosis".to_string(),
629        format!("requests: {}", report.request_count),
630        format!(
631            "latency_us p50={:?} p95={:?} p99={:?}",
632            report.p50_latency_us, report.p95_latency_us, report.p99_latency_us
633        ),
634        format!(
635            "request_time_share_permille p95 queue={:?} service={:?} (independent percentiles; not expected to sum to 1000)",
636            report.p95_queue_share_permille, report.p95_service_share_permille
637        ),
638        inflight_line,
639        format!(
640            "primary: {} (confidence={:?}, score={})",
641            report.primary_suspect.kind.as_str(),
642            report.primary_suspect.confidence,
643            report.primary_suspect.score
644        ),
645    ];
646    for warning in &report.warnings {
647        lines.push(format!("warning {warning}"));
648    }
649
650    for evidence in &report.primary_suspect.evidence {
651        lines.push(format!("  evidence: {evidence}"));
652    }
653
654    for next_check in &report.primary_suspect.next_checks {
655        lines.push(format!("  next: {next_check}"));
656    }
657
658    if !report.secondary_suspects.is_empty() {
659        lines.push("secondary suspects:".to_string());
660        for suspect in &report.secondary_suspects {
661            lines.push(format!(
662                "  - {} (confidence={:?}, score={})",
663                suspect.kind.as_str(),
664                suspect.confidence,
665                suspect.score
666            ));
667        }
668    }
669
670    lines.join("\n")
671}
672
673#[cfg(test)]
674mod tests {
675    use tailtriage_core::{
676        CaptureMode, RequestEvent, Run, RunMetadata, StageEvent, SCHEMA_VERSION,
677    };
678
679    use crate::analyze::{
680        analyze_run, render_text, Confidence, DiagnosisKind, InflightTrend, Report, Suspect,
681    };
682
683    fn test_run() -> Run {
684        Run {
685            schema_version: SCHEMA_VERSION,
686            metadata: RunMetadata {
687                run_id: "run-1".to_owned(),
688                service_name: "svc".to_owned(),
689                service_version: None,
690                started_at_unix_ms: 1,
691                finished_at_unix_ms: 2,
692                mode: CaptureMode::Light,
693                host: None,
694                pid: Some(1),
695                lifecycle_warnings: Vec::new(),
696                unfinished_requests: tailtriage_core::UnfinishedRequests::default(),
697            },
698            requests: vec![
699                RequestEvent {
700                    request_id: "req-1".to_owned(),
701                    route: "/test".to_owned(),
702                    kind: None,
703                    started_at_unix_ms: 1,
704                    finished_at_unix_ms: 2,
705                    latency_us: 1_000,
706                    outcome: "ok".to_owned(),
707                },
708                RequestEvent {
709                    request_id: "req-2".to_owned(),
710                    route: "/test".to_owned(),
711                    kind: None,
712                    started_at_unix_ms: 2,
713                    finished_at_unix_ms: 3,
714                    latency_us: 1_000,
715                    outcome: "ok".to_owned(),
716                },
717                RequestEvent {
718                    request_id: "req-3".to_owned(),
719                    route: "/test".to_owned(),
720                    kind: None,
721                    started_at_unix_ms: 3,
722                    finished_at_unix_ms: 4,
723                    latency_us: 1_000,
724                    outcome: "ok".to_owned(),
725                },
726            ],
727            stages: Vec::new(),
728            queues: Vec::new(),
729            inflight: Vec::new(),
730            runtime_snapshots: Vec::new(),
731            truncation: tailtriage_core::TruncationSummary::default(),
732        }
733    }
734
735    #[test]
736    fn downstream_stage_tie_break_is_deterministic() {
737        let mut run = test_run();
738        run.stages = vec![
739            StageEvent {
740                request_id: "req-1".to_owned(),
741                stage: "stage_a".to_owned(),
742                started_at_unix_ms: 1,
743                finished_at_unix_ms: 2,
744                latency_us: 300,
745                success: true,
746            },
747            StageEvent {
748                request_id: "req-2".to_owned(),
749                stage: "stage_a".to_owned(),
750                started_at_unix_ms: 2,
751                finished_at_unix_ms: 3,
752                latency_us: 300,
753                success: true,
754            },
755            StageEvent {
756                request_id: "req-3".to_owned(),
757                stage: "stage_a".to_owned(),
758                started_at_unix_ms: 3,
759                finished_at_unix_ms: 4,
760                latency_us: 300,
761                success: true,
762            },
763            StageEvent {
764                request_id: "req-1".to_owned(),
765                stage: "stage_b".to_owned(),
766                started_at_unix_ms: 1,
767                finished_at_unix_ms: 2,
768                latency_us: 300,
769                success: true,
770            },
771            StageEvent {
772                request_id: "req-2".to_owned(),
773                stage: "stage_b".to_owned(),
774                started_at_unix_ms: 2,
775                finished_at_unix_ms: 3,
776                latency_us: 300,
777                success: true,
778            },
779            StageEvent {
780                request_id: "req-3".to_owned(),
781                stage: "stage_b".to_owned(),
782                started_at_unix_ms: 3,
783                finished_at_unix_ms: 4,
784                latency_us: 300,
785                success: true,
786            },
787        ];
788
789        let report = analyze_run(&run);
790        assert_eq!(
791            report.primary_suspect.kind,
792            DiagnosisKind::DownstreamStageDominates
793        );
794        assert!(
795            report.primary_suspect.evidence[0].contains("stage_a"),
796            "expected deterministic stage tie-breaker to choose stage_a, got {:?}",
797            report.primary_suspect.evidence
798        );
799    }
800
801    #[test]
802    fn inflight_trend_is_none_for_empty_series() {
803        assert!(super::dominant_inflight_trend(&[]).is_none());
804    }
805
806    #[test]
807    fn inflight_trend_handles_constant_series() {
808        let trend = super::dominant_inflight_trend(&[
809            tailtriage_core::InFlightSnapshot {
810                gauge: "http".to_owned(),
811                at_unix_ms: 10,
812                count: 3,
813            },
814            tailtriage_core::InFlightSnapshot {
815                gauge: "http".to_owned(),
816                at_unix_ms: 20,
817                count: 3,
818            },
819        ])
820        .expect("trend should exist");
821
822        assert_eq!(trend.peak_count, 3);
823        assert_eq!(trend.p95_count, 3);
824        assert_eq!(trend.growth_delta, 0);
825    }
826
827    #[test]
828    fn inflight_trend_handles_monotonic_increase() {
829        let trend = super::dominant_inflight_trend(&[
830            tailtriage_core::InFlightSnapshot {
831                gauge: "http".to_owned(),
832                at_unix_ms: 10,
833                count: 1,
834            },
835            tailtriage_core::InFlightSnapshot {
836                gauge: "http".to_owned(),
837                at_unix_ms: 20,
838                count: 4,
839            },
840            tailtriage_core::InFlightSnapshot {
841                gauge: "http".to_owned(),
842                at_unix_ms: 30,
843                count: 6,
844            },
845        ])
846        .expect("trend should exist");
847
848        assert_eq!(trend.peak_count, 6);
849        assert_eq!(trend.p95_count, 6);
850        assert_eq!(trend.growth_delta, 5);
851        assert_eq!(trend.growth_per_sec_milli, Some(250_000));
852    }
853
854    #[test]
855    fn render_text_formats_inflight_trend_fields() {
856        let report = Report {
857            request_count: 2,
858            p50_latency_us: Some(10),
859            p95_latency_us: Some(20),
860            p99_latency_us: Some(20),
861            p95_queue_share_permille: Some(100),
862            p95_service_share_permille: Some(900),
863            inflight_trend: Some(InflightTrend {
864                gauge: "queue_inflight".to_owned(),
865                sample_count: 4,
866                peak_count: 8,
867                p95_count: 7,
868                growth_delta: 5,
869                growth_per_sec_milli: Some(2_500),
870            }),
871            warnings: Vec::new(),
872            primary_suspect: Suspect {
873                kind: DiagnosisKind::ApplicationQueueSaturation,
874                score: 90,
875                confidence: Confidence::High,
876                evidence: vec!["queue wait high".to_owned()],
877                next_checks: vec!["check queue policy".to_owned()],
878            },
879            secondary_suspects: Vec::new(),
880        };
881
882        let text = render_text(&report);
883        assert!(text.contains("inflight_trend gauge=queue_inflight"));
884        assert!(text.contains("samples=4"));
885        assert!(text.contains("growth_per_sec_milli=Some(2500)"));
886        assert!(text.contains("independent percentiles; not expected to sum to 1000"));
887    }
888
889    #[test]
890    fn render_text_marks_missing_inflight_trend() {
891        let report = Report {
892            request_count: 0,
893            p50_latency_us: None,
894            p95_latency_us: None,
895            p99_latency_us: None,
896            p95_queue_share_permille: None,
897            p95_service_share_permille: None,
898            inflight_trend: None,
899            warnings: vec!["Capture truncated requests.".to_owned()],
900            primary_suspect: Suspect {
901                kind: DiagnosisKind::InsufficientEvidence,
902                score: 50,
903                confidence: Confidence::Low,
904                evidence: vec!["missing signals".to_owned()],
905                next_checks: vec!["add instrumentation".to_owned()],
906            },
907            secondary_suspects: Vec::new(),
908        };
909
910        let text = render_text(&report);
911        assert!(text.contains("inflight_trend none"));
912        assert!(text.contains("warning Capture truncated requests."));
913    }
914
915    #[test]
916    fn analyze_run_emits_truncation_warnings() {
917        let mut run = test_run();
918        run.truncation.dropped_requests = 2;
919        run.truncation.dropped_runtime_snapshots = 1;
920
921        let report = analyze_run(&run);
922        assert_eq!(report.warnings.len(), 2);
923        assert!(report
924            .warnings
925            .iter()
926            .any(|warning| warning.contains("dropped 2 request events")));
927        assert!(report
928            .warnings
929            .iter()
930            .any(|warning| warning.contains("dropped 1 entries")));
931    }
932}