1use std::collections::{BTreeMap, HashMap};
2
3use serde::{Serialize, Serializer};
4use tailtriage_core::{InFlightSnapshot, Run, RuntimeSnapshot};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum DiagnosisKind {
11 ApplicationQueueSaturation,
13 BlockingPoolPressure,
15 ExecutorPressureSuspected,
17 DownstreamStageDominates,
19 InsufficientEvidence,
21}
22
23impl DiagnosisKind {
24 #[must_use]
26 pub const fn as_str(&self) -> &'static str {
27 match self {
28 Self::ApplicationQueueSaturation => "application_queue_saturation",
29 Self::BlockingPoolPressure => "blocking_pool_pressure",
30 Self::ExecutorPressureSuspected => "executor_pressure_suspected",
31 Self::DownstreamStageDominates => "downstream_stage_dominates",
32 Self::InsufficientEvidence => "insufficient_evidence",
33 }
34 }
35}
36
37impl Serialize for DiagnosisKind {
38 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
39 where
40 S: Serializer,
41 {
42 serializer.serialize_str(self.as_str())
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
47#[serde(rename_all = "snake_case")]
48pub enum Confidence {
52 Low,
54 Medium,
56 High,
58}
59
60impl Confidence {
61 fn from_score(score: u8) -> Self {
62 if score >= 85 {
63 Self::High
64 } else if score >= 65 {
65 Self::Medium
66 } else {
67 Self::Low
68 }
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Serialize)]
76pub struct Suspect {
77 pub kind: DiagnosisKind,
79 pub score: u8,
81 pub confidence: Confidence,
83 pub evidence: Vec<String>,
85 pub next_checks: Vec<String>,
87}
88
89impl Suspect {
90 fn new(
91 kind: DiagnosisKind,
92 score: u8,
93 evidence: Vec<String>,
94 next_checks: Vec<String>,
95 ) -> Self {
96 Self {
97 kind,
98 score,
99 confidence: Confidence::from_score(score),
100 evidence,
101 next_checks,
102 }
103 }
104}
105
106#[derive(Debug, Clone, PartialEq, Serialize)]
108pub struct InflightTrend {
109 pub gauge: String,
111 pub sample_count: usize,
113 pub peak_count: u64,
115 pub p95_count: u64,
117 pub growth_delta: i64,
119 pub growth_per_sec_milli: Option<i64>,
121}
122
123#[derive(Debug, Clone, PartialEq, Serialize)]
128pub struct Report {
129 pub request_count: usize,
131 pub p50_latency_us: Option<u64>,
133 pub p95_latency_us: Option<u64>,
135 pub p99_latency_us: Option<u64>,
137 pub p95_queue_share_permille: Option<u64>,
139 pub p95_service_share_permille: Option<u64>,
141 pub inflight_trend: Option<InflightTrend>,
143 pub warnings: Vec<String>,
145 pub primary_suspect: Suspect,
147 pub secondary_suspects: Vec<Suspect>,
149}
150
151#[must_use]
201pub fn analyze_run(run: &Run) -> Report {
202 let request_latencies = run
203 .requests
204 .iter()
205 .map(|request| request.latency_us)
206 .collect::<Vec<_>>();
207
208 let p50_latency_us = percentile(&request_latencies, 50, 100);
209 let p95_latency_us = percentile(&request_latencies, 95, 100);
210 let p99_latency_us = percentile(&request_latencies, 99, 100);
211 let (queue_shares, service_shares) = request_time_shares(run);
212 let p95_queue_share_permille = percentile(&queue_shares, 95, 100);
213 let p95_service_share_permille = percentile(&service_shares, 95, 100);
214 let inflight_trend = dominant_inflight_trend(&run.inflight);
215
216 let mut suspects = Vec::new();
217
218 if let Some(queue_suspect) = queue_saturation_suspect(run, inflight_trend.as_ref()) {
219 suspects.push(queue_suspect);
220 }
221
222 if let Some(blocking_suspect) = blocking_pressure_suspect(run) {
223 suspects.push(blocking_suspect);
224 }
225
226 if let Some(executor_suspect) = executor_pressure_suspect(run, inflight_trend.as_ref()) {
227 suspects.push(executor_suspect);
228 }
229
230 if let Some(stage_suspect) = downstream_stage_suspect(run) {
231 suspects.push(stage_suspect);
232 }
233
234 if suspects.is_empty() {
235 suspects.push(Suspect::new(
236 DiagnosisKind::InsufficientEvidence,
237 50,
238 vec![
239 "Not enough queue, stage, or runtime signals to rank a stronger suspect."
240 .to_string(),
241 ],
242 vec![
243 "Wrap critical awaits with queue(...).await_on(...), and use stage(...).await_on(...) for Result-returning work or stage(...).await_value(...) for infallible work.".to_string(),
244 "Enable RuntimeSampler during the run to capture runtime pressure signals."
245 .to_string(),
246 ],
247 ));
248 }
249
250 suspects.sort_by_key(|suspect| std::cmp::Reverse(suspect.score));
251
252 let mut ranked = suspects.into_iter();
253 let primary_suspect = ranked.next().unwrap_or_else(|| {
254 Suspect::new(
255 DiagnosisKind::InsufficientEvidence,
256 50,
257 vec!["No diagnosis signals were captured for this run.".to_string()],
258 vec!["Verify that request, queue, or stage instrumentation is enabled.".to_string()],
259 )
260 });
261
262 Report {
263 request_count: run.requests.len(),
264 p50_latency_us,
265 p95_latency_us,
266 p99_latency_us,
267 p95_queue_share_permille,
268 p95_service_share_permille,
269 inflight_trend,
270 warnings: truncation_warnings(run),
271 primary_suspect,
272 secondary_suspects: ranked.collect(),
273 }
274}
275
276fn truncation_warnings(run: &Run) -> Vec<String> {
277 let mut warnings = Vec::new();
278 if run.truncation.limits_hit || run.truncation.is_truncated() {
279 warnings.push(
280 "Capture limits were hit during this run; dropped evidence can reduce diagnosis completeness and confidence."
281 .to_string(),
282 );
283 }
284 if run.truncation.dropped_requests > 0 {
285 warnings.push(format!(
286 "Capture truncated requests: dropped {} request events after reaching the configured max_requests limit. This dropped evidence can reduce diagnosis completeness and confidence.",
287 run.truncation.dropped_requests
288 ));
289 }
290 if run.truncation.dropped_stages > 0 {
291 warnings.push(format!(
292 "Capture truncated stages: dropped {} stage events after reaching the configured max_stages limit. This dropped evidence can reduce diagnosis completeness and confidence.",
293 run.truncation.dropped_stages
294 ));
295 }
296 if run.truncation.dropped_queues > 0 {
297 warnings.push(format!(
298 "Capture truncated queues: dropped {} queue events after reaching the configured max_queues limit. This dropped evidence can reduce diagnosis completeness and confidence.",
299 run.truncation.dropped_queues
300 ));
301 }
302 if run.truncation.dropped_inflight_snapshots > 0 {
303 warnings.push(format!(
304 "Capture truncated in-flight snapshots: dropped {} entries after reaching max_inflight_snapshots. This dropped evidence can reduce diagnosis completeness and confidence.",
305 run.truncation.dropped_inflight_snapshots
306 ));
307 }
308 if run.truncation.dropped_runtime_snapshots > 0 {
309 warnings.push(format!(
310 "Capture truncated runtime snapshots: dropped {} entries after reaching max_runtime_snapshots. This dropped evidence can reduce diagnosis completeness and confidence.",
311 run.truncation.dropped_runtime_snapshots
312 ));
313 }
314 warnings
315}
316
317fn queue_saturation_suspect(run: &Run, inflight_trend: Option<&InflightTrend>) -> Option<Suspect> {
318 let (queue_shares, _) = request_time_shares(run);
319 let p95_queue_share_permille = percentile(&queue_shares, 95, 100)?;
320 let max_depth = run
321 .queues
322 .iter()
323 .filter_map(|queue| queue.depth_at_start)
324 .max();
325
326 if p95_queue_share_permille < 300 {
327 return None;
328 }
329
330 let whole_percent = p95_queue_share_permille / 10;
331 let tenth_percent = p95_queue_share_permille % 10;
332 let mut evidence = vec![format!(
333 "Queue wait at p95 consumes {whole_percent}.{tenth_percent}% of request time."
334 )];
335
336 if let Some(depth) = max_depth {
337 evidence.push(format!("Observed queue depth sample up to {depth}."));
338 }
339 if let Some(trend) = inflight_trend.filter(|trend| trend.growth_delta > 0) {
340 evidence.push(format!(
341 "In-flight gauge '{}' grew by {} over the run window (p95={}, peak={}).",
342 trend.gauge, trend.growth_delta, trend.p95_count, trend.peak_count
343 ));
344 }
345
346 Some(Suspect::new(
347 DiagnosisKind::ApplicationQueueSaturation,
348 90,
349 evidence,
350 vec![
351 "Inspect queue admission limits and producer burst patterns.".to_string(),
352 "Compare queue wait distribution before and after increasing worker parallelism."
353 .to_string(),
354 ],
355 ))
356}
357
358fn blocking_pressure_suspect(run: &Run) -> Option<Suspect> {
359 let blocking_depths = runtime_metric_series(&run.runtime_snapshots, |snapshot| {
360 snapshot.blocking_queue_depth
361 });
362 let p95_blocking_depth = percentile(&blocking_depths, 95, 100)?;
363
364 if p95_blocking_depth == 0 {
365 return None;
366 }
367
368 Some(Suspect::new(
369 DiagnosisKind::BlockingPoolPressure,
370 80,
371 vec![format!(
372 "Blocking queue depth p95 is {p95_blocking_depth}, indicating sustained spawn_blocking backlog."
373 )],
374 vec![
375 "Audit blocking sections and move avoidable synchronous work out of hot paths."
376 .to_string(),
377 "Inspect spawn_blocking callsites for long-running CPU or I/O work.".to_string(),
378 ],
379 ))
380}
381
382fn executor_pressure_suspect(run: &Run, inflight_trend: Option<&InflightTrend>) -> Option<Suspect> {
383 let global_queue_depths = runtime_metric_series(&run.runtime_snapshots, |snapshot| {
384 snapshot.global_queue_depth
385 });
386 let p95_global_depth = percentile(&global_queue_depths, 95, 100)?;
387
388 if p95_global_depth == 0 {
389 return None;
390 }
391
392 let mut evidence = vec![format!(
393 "Runtime global queue depth p95 is {p95_global_depth}, suggesting scheduler contention."
394 )];
395 let positive_growth = inflight_trend.is_some_and(|trend| trend.growth_delta > 0);
396 if let Some(trend) = inflight_trend.filter(|trend| trend.growth_delta > 0) {
397 evidence.push(format!(
398 "In-flight gauge '{}' growth is positive (delta={}, peak={}), consistent with accumulating executor pressure.",
399 trend.gauge, trend.growth_delta, trend.peak_count
400 ));
401 }
402
403 let depth_bonus = if p95_global_depth >= 300 {
404 20
405 } else if p95_global_depth >= 200 {
406 12
407 } else if p95_global_depth >= 100 {
408 6
409 } else {
410 0
411 };
412 let trend_bonus = if positive_growth { 5 } else { 0 };
413 let score = (65 + depth_bonus + trend_bonus).min(90);
414
415 Some(Suspect::new(
416 DiagnosisKind::ExecutorPressureSuspected,
417 score,
418 evidence,
419 vec![
420 "Check for long polls without yielding and uneven task fan-out.".to_string(),
421 "Compare with per-stage timings to isolate overloaded async stages.".to_string(),
422 ],
423 ))
424}
425
426fn downstream_stage_suspect(run: &Run) -> Option<Suspect> {
427 let mut stage_totals: BTreeMap<&str, u64> = BTreeMap::new();
428 for stage in &run.stages {
429 *stage_totals.entry(stage.stage.as_str()).or_default() = stage_totals
430 .get(stage.stage.as_str())
431 .copied()
432 .unwrap_or_default()
433 .saturating_add(stage.latency_us);
434 }
435
436 let (dominant_stage, total_latency) = stage_totals
437 .iter()
438 .max_by(|left, right| left.1.cmp(right.1).then_with(|| right.0.cmp(left.0)))
439 .map(|(stage, latency)| (*stage, *latency))?;
440
441 let stage_count = run
442 .stages
443 .iter()
444 .filter(|stage| stage.stage == dominant_stage)
445 .count();
446 let stage_latencies = run
447 .stages
448 .iter()
449 .filter(|stage| stage.stage == dominant_stage)
450 .map(|stage| stage.latency_us)
451 .collect::<Vec<_>>();
452 let stage_p95 = percentile(&stage_latencies, 95, 100)?;
453 let total_request_latency = run
454 .requests
455 .iter()
456 .map(|request| request.latency_us)
457 .fold(0_u64, u64::saturating_add);
458 let stage_share_permille = total_latency
459 .saturating_mul(1_000)
460 .checked_div(total_request_latency)
461 .unwrap_or(0);
462 let share_bonus = (stage_share_permille / 40).min(25) as u8;
463 let score = (55 + share_bonus).min(79);
464
465 if stage_count < 3 {
466 return None;
467 }
468
469 Some(Suspect::new(
470 DiagnosisKind::DownstreamStageDominates,
471 score,
472 vec![
473 format!(
474 "Stage '{dominant_stage}' has p95 latency {stage_p95} us across {stage_count} samples."
475 ),
476 format!("Stage '{dominant_stage}' cumulative latency is {total_latency} us."),
477 format!(
478 "Stage '{dominant_stage}' contributes {stage_share_permille} permille of cumulative request latency."
479 ),
480 ],
481 vec![
482 format!("Inspect downstream dependency behind stage '{dominant_stage}'."),
483 "Collect downstream service timings and retry behavior during tail windows.".to_string(),
484 "Review downstream SLO/error budget and align retry budget/backoff with it.".to_string(),
485 ],
486 ))
487}
488
489fn request_time_shares(run: &Run) -> (Vec<u64>, Vec<u64>) {
490 let mut total_queue_wait_by_request = HashMap::<&str, u64>::new();
491 for queue in &run.queues {
492 *total_queue_wait_by_request
493 .entry(queue.request_id.as_str())
494 .or_default() = total_queue_wait_by_request
495 .get(queue.request_id.as_str())
496 .copied()
497 .unwrap_or_default()
498 .saturating_add(queue.wait_us);
499 }
500
501 let mut queue_shares = Vec::new();
502 let mut service_shares = Vec::new();
503
504 for request in &run.requests {
505 if request.latency_us == 0 {
506 continue;
507 }
508
509 let queue_wait = total_queue_wait_by_request
510 .get(request.request_id.as_str())
511 .copied()
512 .unwrap_or_default()
513 .min(request.latency_us);
514 let service_time = request.latency_us.saturating_sub(queue_wait);
515
516 queue_shares.push(queue_wait.saturating_mul(1_000) / request.latency_us);
517 service_shares.push(service_time.saturating_mul(1_000) / request.latency_us);
518 }
519
520 (queue_shares, service_shares)
521}
522
523fn runtime_metric_series(
524 snapshots: &[RuntimeSnapshot],
525 selector: impl Fn(&RuntimeSnapshot) -> Option<u64>,
526) -> Vec<u64> {
527 snapshots.iter().filter_map(selector).collect::<Vec<_>>()
528}
529
530fn dominant_inflight_trend(snapshots: &[InFlightSnapshot]) -> Option<InflightTrend> {
531 let mut by_gauge: BTreeMap<&str, Vec<&InFlightSnapshot>> = BTreeMap::new();
532 for snapshot in snapshots {
533 by_gauge
534 .entry(snapshot.gauge.as_str())
535 .or_default()
536 .push(snapshot);
537 }
538
539 by_gauge
540 .into_iter()
541 .filter_map(|(gauge, samples)| inflight_trend_for_gauge(gauge, samples))
542 .max_by(|left, right| {
543 left.peak_count
544 .cmp(&right.peak_count)
545 .then_with(|| left.p95_count.cmp(&right.p95_count))
546 .then_with(|| left.gauge.cmp(&right.gauge).reverse())
547 })
548}
549
550fn inflight_trend_for_gauge(
551 gauge: &str,
552 mut samples: Vec<&InFlightSnapshot>,
553) -> Option<InflightTrend> {
554 if samples.is_empty() {
555 return None;
556 }
557
558 samples.sort_unstable_by(|left, right| {
559 left.at_unix_ms
560 .cmp(&right.at_unix_ms)
561 .then_with(|| left.count.cmp(&right.count))
562 });
563
564 let counts = samples
565 .iter()
566 .map(|sample| sample.count)
567 .collect::<Vec<_>>();
568 let first = samples.first()?;
569 let last = samples.last()?;
570 let growth_delta = signed_u64_delta(first.count, last.count);
571 let window_ms = last.at_unix_ms.saturating_sub(first.at_unix_ms);
572 let growth_per_sec_milli = if window_ms == 0 {
573 None
574 } else {
575 i64::try_from(window_ms)
576 .ok()
577 .map(|window_ms_i64| growth_delta.saturating_mul(1_000_000) / window_ms_i64)
578 };
579
580 Some(InflightTrend {
581 gauge: gauge.to_owned(),
582 sample_count: counts.len(),
583 peak_count: counts.iter().copied().max().unwrap_or(0),
584 p95_count: percentile(&counts, 95, 100).unwrap_or(0),
585 growth_delta,
586 growth_per_sec_milli,
587 })
588}
589
590fn signed_u64_delta(start: u64, end: u64) -> i64 {
591 if end >= start {
592 i64::try_from(end - start).unwrap_or(i64::MAX)
593 } else {
594 -i64::try_from(start - end).unwrap_or(i64::MAX)
595 }
596}
597
598fn percentile(values: &[u64], numerator: usize, denominator: usize) -> Option<u64> {
599 let sorted = sorted_u64(values);
600 percentile_sorted_u64(&sorted, numerator, denominator)
601}
602
603fn sorted_u64(values: &[u64]) -> Vec<u64> {
604 let mut sorted = values.to_vec();
605 sorted.sort_unstable();
606 sorted
607}
608
609fn percentile_sorted_u64(values: &[u64], numerator: usize, denominator: usize) -> Option<u64> {
610 if values.is_empty() {
611 return None;
612 }
613 if denominator == 0 {
614 return None;
615 }
616
617 let max_index = values.len().saturating_sub(1);
618 let index = max_index
619 .saturating_mul(numerator)
620 .div_ceil(denominator)
621 .min(max_index);
622 values.get(index).copied()
623}
624
625fn fmt_opt_u64(value: Option<u64>) -> String {
626 match value {
627 Some(value) => value.to_string(),
628 None => "n/a".to_string(),
629 }
630}
631
632fn fmt_percent_permille(value: Option<u64>) -> String {
633 match value {
634 Some(value) => format!("{}.{:01}%", value / 10, value % 10),
635 None => "n/a".to_string(),
636 }
637}
638
639fn fmt_confidence(confidence: Confidence) -> &'static str {
640 match confidence {
641 Confidence::Low => "low",
642 Confidence::Medium => "medium",
643 Confidence::High => "high",
644 }
645}
646
647#[must_use]
648pub fn render_text(report: &Report) -> String {
652 let mut lines = vec![
653 "tailtriage diagnosis".to_string(),
654 format!("Requests analyzed: {}", report.request_count),
655 format!(
656 "Latency (us): p50 {}, p95 {}, p99 {}",
657 fmt_opt_u64(report.p50_latency_us),
658 fmt_opt_u64(report.p95_latency_us),
659 fmt_opt_u64(report.p99_latency_us),
660 ),
661 format!(
662 "Request time at p95: queue {}, non-queue service {}",
663 fmt_percent_permille(report.p95_queue_share_permille),
664 fmt_percent_permille(report.p95_service_share_permille),
665 ),
666 ];
667
668 match &report.inflight_trend {
669 Some(trend) => {
670 lines.push(format!(
671 "Inflight trend: gauge '{}', samples {}, peak {}, p95 {}, net growth {:+}",
672 trend.gauge,
673 trend.sample_count,
674 trend.peak_count,
675 trend.p95_count,
676 trend.growth_delta,
677 ));
678 }
679 None => {
680 lines.push("Inflight trend: none".to_string());
681 }
682 }
683
684 lines.push(format!(
685 "Primary suspect: {} ({} confidence, score {})",
686 report.primary_suspect.kind.as_str(),
687 fmt_confidence(report.primary_suspect.confidence),
688 report.primary_suspect.score,
689 ));
690
691 if !report.warnings.is_empty() {
692 lines.push("Warnings:".to_string());
693 for warning in &report.warnings {
694 lines.push(format!("- {warning}"));
695 }
696 }
697
698 if !report.primary_suspect.evidence.is_empty() {
699 lines.push("Evidence:".to_string());
700 for evidence in &report.primary_suspect.evidence {
701 lines.push(format!("- {evidence}"));
702 }
703 }
704
705 if !report.primary_suspect.next_checks.is_empty() {
706 lines.push("Next checks:".to_string());
707 for next_check in &report.primary_suspect.next_checks {
708 lines.push(format!("- {next_check}"));
709 }
710 }
711
712 if !report.secondary_suspects.is_empty() {
713 lines.push("Secondary suspects:".to_string());
714 for suspect in &report.secondary_suspects {
715 lines.push(format!(
716 "- {} ({} confidence, score {})",
717 suspect.kind.as_str(),
718 fmt_confidence(suspect.confidence),
719 suspect.score,
720 ));
721 }
722 }
723
724 lines.join("\n")
725}
726
727#[cfg(test)]
728mod tests {
729 use tailtriage_core::{
730 CaptureMode, EffectiveCoreConfig, RequestEvent, Run, RunMetadata, StageEvent,
731 SCHEMA_VERSION,
732 };
733
734 use crate::analyze::{
735 analyze_run, render_text, Confidence, DiagnosisKind, InflightTrend, Report, Suspect,
736 };
737
738 fn test_run() -> Run {
739 Run {
740 schema_version: SCHEMA_VERSION,
741 metadata: RunMetadata {
742 run_id: "run-1".to_owned(),
743 service_name: "svc".to_owned(),
744 service_version: None,
745 started_at_unix_ms: 1,
746 finished_at_unix_ms: 2,
747 finalized_at_unix_ms: Some(2),
748 mode: CaptureMode::Light,
749 effective_core_config: Some(EffectiveCoreConfig {
750 mode: CaptureMode::Light,
751 capture_limits: CaptureMode::Light.core_defaults(),
752 strict_lifecycle: false,
753 }),
754 effective_tokio_sampler_config: None,
755 host: None,
756 pid: Some(1),
757 lifecycle_warnings: Vec::new(),
758 unfinished_requests: tailtriage_core::UnfinishedRequests::default(),
759 run_end_reason: None,
760 },
761 requests: vec![
762 RequestEvent {
763 request_id: "req-1".to_owned(),
764 route: "/test".to_owned(),
765 kind: None,
766 started_at_unix_ms: 1,
767 finished_at_unix_ms: 2,
768 latency_us: 1_000,
769 outcome: "ok".to_owned(),
770 },
771 RequestEvent {
772 request_id: "req-2".to_owned(),
773 route: "/test".to_owned(),
774 kind: None,
775 started_at_unix_ms: 2,
776 finished_at_unix_ms: 3,
777 latency_us: 1_000,
778 outcome: "ok".to_owned(),
779 },
780 RequestEvent {
781 request_id: "req-3".to_owned(),
782 route: "/test".to_owned(),
783 kind: None,
784 started_at_unix_ms: 3,
785 finished_at_unix_ms: 4,
786 latency_us: 1_000,
787 outcome: "ok".to_owned(),
788 },
789 ],
790 stages: Vec::new(),
791 queues: Vec::new(),
792 inflight: Vec::new(),
793 runtime_snapshots: Vec::new(),
794 truncation: tailtriage_core::TruncationSummary::default(),
795 }
796 }
797
798 #[test]
799 fn downstream_stage_tie_break_is_deterministic() {
800 let mut run = test_run();
801 run.stages = vec![
802 StageEvent {
803 request_id: "req-1".to_owned(),
804 stage: "stage_a".to_owned(),
805 started_at_unix_ms: 1,
806 finished_at_unix_ms: 2,
807 latency_us: 300,
808 success: true,
809 },
810 StageEvent {
811 request_id: "req-2".to_owned(),
812 stage: "stage_a".to_owned(),
813 started_at_unix_ms: 2,
814 finished_at_unix_ms: 3,
815 latency_us: 300,
816 success: true,
817 },
818 StageEvent {
819 request_id: "req-3".to_owned(),
820 stage: "stage_a".to_owned(),
821 started_at_unix_ms: 3,
822 finished_at_unix_ms: 4,
823 latency_us: 300,
824 success: true,
825 },
826 StageEvent {
827 request_id: "req-1".to_owned(),
828 stage: "stage_b".to_owned(),
829 started_at_unix_ms: 1,
830 finished_at_unix_ms: 2,
831 latency_us: 300,
832 success: true,
833 },
834 StageEvent {
835 request_id: "req-2".to_owned(),
836 stage: "stage_b".to_owned(),
837 started_at_unix_ms: 2,
838 finished_at_unix_ms: 3,
839 latency_us: 300,
840 success: true,
841 },
842 StageEvent {
843 request_id: "req-3".to_owned(),
844 stage: "stage_b".to_owned(),
845 started_at_unix_ms: 3,
846 finished_at_unix_ms: 4,
847 latency_us: 300,
848 success: true,
849 },
850 ];
851
852 let report = analyze_run(&run);
853 assert_eq!(
854 report.primary_suspect.kind,
855 DiagnosisKind::DownstreamStageDominates
856 );
857 assert!(
858 report.primary_suspect.evidence[0].contains("stage_a"),
859 "expected deterministic stage tie-breaker to choose stage_a, got {:?}",
860 report.primary_suspect.evidence
861 );
862 }
863
864 #[test]
865 fn inflight_trend_is_none_for_empty_series() {
866 assert!(super::dominant_inflight_trend(&[]).is_none());
867 }
868
869 #[test]
870 fn inflight_trend_handles_constant_series() {
871 let trend = super::dominant_inflight_trend(&[
872 tailtriage_core::InFlightSnapshot {
873 gauge: "http".to_owned(),
874 at_unix_ms: 10,
875 count: 3,
876 },
877 tailtriage_core::InFlightSnapshot {
878 gauge: "http".to_owned(),
879 at_unix_ms: 20,
880 count: 3,
881 },
882 ])
883 .expect("trend should exist");
884
885 assert_eq!(trend.peak_count, 3);
886 assert_eq!(trend.p95_count, 3);
887 assert_eq!(trend.growth_delta, 0);
888 }
889
890 #[test]
891 fn inflight_trend_handles_monotonic_increase() {
892 let trend = super::dominant_inflight_trend(&[
893 tailtriage_core::InFlightSnapshot {
894 gauge: "http".to_owned(),
895 at_unix_ms: 10,
896 count: 1,
897 },
898 tailtriage_core::InFlightSnapshot {
899 gauge: "http".to_owned(),
900 at_unix_ms: 20,
901 count: 4,
902 },
903 tailtriage_core::InFlightSnapshot {
904 gauge: "http".to_owned(),
905 at_unix_ms: 30,
906 count: 6,
907 },
908 ])
909 .expect("trend should exist");
910
911 assert_eq!(trend.peak_count, 6);
912 assert_eq!(trend.p95_count, 6);
913 assert_eq!(trend.growth_delta, 5);
914 assert_eq!(trend.growth_per_sec_milli, Some(250_000));
915 }
916
917 #[test]
918 fn render_text_formats_inflight_trend_fields() {
919 let report = Report {
920 request_count: 2,
921 p50_latency_us: Some(10),
922 p95_latency_us: Some(20),
923 p99_latency_us: Some(20),
924 p95_queue_share_permille: Some(100),
925 p95_service_share_permille: Some(900),
926 inflight_trend: Some(InflightTrend {
927 gauge: "queue_inflight".to_owned(),
928 sample_count: 4,
929 peak_count: 8,
930 p95_count: 7,
931 growth_delta: 5,
932 growth_per_sec_milli: Some(2_500),
933 }),
934 warnings: Vec::new(),
935 primary_suspect: Suspect {
936 kind: DiagnosisKind::ApplicationQueueSaturation,
937 score: 90,
938 confidence: Confidence::High,
939 evidence: vec!["queue wait high".to_owned()],
940 next_checks: vec!["check queue policy".to_owned()],
941 },
942 secondary_suspects: Vec::new(),
943 };
944
945 let text = render_text(&report);
946 assert!(text.contains("Inflight trend: gauge 'queue_inflight'"));
947 assert!(text.contains("samples 4"));
948 assert!(text.contains("peak 8"));
949 assert!(text.contains("p95 7"));
950 assert!(text.contains("net growth +5"));
951 assert!(text.contains("Request time at p95: queue 10.0%, non-queue service 90.0%"));
952 }
953
954 #[test]
955 fn render_text_marks_missing_inflight_trend() {
956 let report = Report {
957 request_count: 0,
958 p50_latency_us: None,
959 p95_latency_us: None,
960 p99_latency_us: None,
961 p95_queue_share_permille: None,
962 p95_service_share_permille: None,
963 inflight_trend: None,
964 warnings: vec!["Capture truncated requests.".to_owned()],
965 primary_suspect: Suspect {
966 kind: DiagnosisKind::InsufficientEvidence,
967 score: 50,
968 confidence: Confidence::Low,
969 evidence: vec!["missing signals".to_owned()],
970 next_checks: vec!["add instrumentation".to_owned()],
971 },
972 secondary_suspects: Vec::new(),
973 };
974
975 let text = render_text(&report);
976 assert!(text.contains("Inflight trend: none"));
977 assert!(text.contains("Warnings:"));
978 assert!(text.contains("- Capture truncated requests."));
979 }
980
981 #[test]
982 fn analyze_run_emits_truncation_warnings() {
983 let mut run = test_run();
984 run.truncation.dropped_requests = 2;
985 run.truncation.dropped_runtime_snapshots = 1;
986 run.truncation.limits_hit = true;
987
988 let report = analyze_run(&run);
989 assert_eq!(report.warnings.len(), 3);
990 assert!(report.warnings.iter().any(|warning| {
991 warning.contains("dropped evidence can reduce diagnosis completeness and confidence")
992 }));
993 assert!(report
994 .warnings
995 .iter()
996 .any(|warning| warning.contains("dropped 2 request events")));
997 assert!(report
998 .warnings
999 .iter()
1000 .any(|warning| warning.contains("dropped 1 entries")));
1001 }
1002}