1use std::collections::{BTreeMap, HashMap};
2
3use serde::{Serialize, Serializer};
4use tailtriage_core::{InFlightSnapshot, Run, RuntimeSnapshot};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum DiagnosisKind {
11 ApplicationQueueSaturation,
13 BlockingPoolPressure,
15 ExecutorPressureSuspected,
17 DownstreamStageDominates,
19 InsufficientEvidence,
21}
22
23impl DiagnosisKind {
24 #[must_use]
26 pub const fn as_str(&self) -> &'static str {
27 match self {
28 Self::ApplicationQueueSaturation => "application_queue_saturation",
29 Self::BlockingPoolPressure => "blocking_pool_pressure",
30 Self::ExecutorPressureSuspected => "executor_pressure_suspected",
31 Self::DownstreamStageDominates => "downstream_stage_dominates",
32 Self::InsufficientEvidence => "insufficient_evidence",
33 }
34 }
35}
36
37impl Serialize for DiagnosisKind {
38 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
39 where
40 S: Serializer,
41 {
42 serializer.serialize_str(self.as_str())
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
47#[serde(rename_all = "snake_case")]
48pub enum Confidence {
52 Low,
54 Medium,
56 High,
58}
59
60impl Confidence {
61 fn from_score(score: u8) -> Self {
62 if score >= 85 {
63 Self::High
64 } else if score >= 65 {
65 Self::Medium
66 } else {
67 Self::Low
68 }
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Serialize)]
76pub struct Suspect {
77 pub kind: DiagnosisKind,
79 pub score: u8,
81 pub confidence: Confidence,
83 pub evidence: Vec<String>,
85 pub next_checks: Vec<String>,
87}
88
89impl Suspect {
90 fn new(
91 kind: DiagnosisKind,
92 score: u8,
93 evidence: Vec<String>,
94 next_checks: Vec<String>,
95 ) -> Self {
96 Self {
97 kind,
98 score,
99 confidence: Confidence::from_score(score),
100 evidence,
101 next_checks,
102 }
103 }
104}
105
106#[derive(Debug, Clone, PartialEq, Serialize)]
108pub struct InflightTrend {
109 pub gauge: String,
111 pub sample_count: usize,
113 pub peak_count: u64,
115 pub p95_count: u64,
117 pub growth_delta: i64,
119 pub growth_per_sec_milli: Option<i64>,
121}
122
123#[derive(Debug, Clone, PartialEq, Serialize)]
128pub struct Report {
129 pub request_count: usize,
131 pub p50_latency_us: Option<u64>,
133 pub p95_latency_us: Option<u64>,
135 pub p99_latency_us: Option<u64>,
137 pub p95_queue_share_permille: Option<u64>,
139 pub p95_service_share_permille: Option<u64>,
141 pub inflight_trend: Option<InflightTrend>,
143 pub warnings: Vec<String>,
145 pub primary_suspect: Suspect,
147 pub secondary_suspects: Vec<Suspect>,
149}
150
151#[must_use]
190pub fn analyze_run(run: &Run) -> Report {
191 let request_latencies = run
192 .requests
193 .iter()
194 .map(|request| request.latency_us)
195 .collect::<Vec<_>>();
196
197 let p50_latency_us = percentile(&request_latencies, 50, 100);
198 let p95_latency_us = percentile(&request_latencies, 95, 100);
199 let p99_latency_us = percentile(&request_latencies, 99, 100);
200 let (queue_shares, service_shares) = request_time_shares(run);
201 let p95_queue_share_permille = percentile(&queue_shares, 95, 100);
202 let p95_service_share_permille = percentile(&service_shares, 95, 100);
203 let inflight_trend = dominant_inflight_trend(&run.inflight);
204
205 let mut suspects = Vec::new();
206
207 if let Some(queue_suspect) = queue_saturation_suspect(run, inflight_trend.as_ref()) {
208 suspects.push(queue_suspect);
209 }
210
211 if let Some(blocking_suspect) = blocking_pressure_suspect(run) {
212 suspects.push(blocking_suspect);
213 }
214
215 if let Some(executor_suspect) = executor_pressure_suspect(run, inflight_trend.as_ref()) {
216 suspects.push(executor_suspect);
217 }
218
219 if let Some(stage_suspect) = downstream_stage_suspect(run) {
220 suspects.push(stage_suspect);
221 }
222
223 if suspects.is_empty() {
224 suspects.push(Suspect::new(
225 DiagnosisKind::InsufficientEvidence,
226 50,
227 vec![
228 "Not enough queue, stage, or runtime signals to rank a stronger suspect."
229 .to_string(),
230 ],
231 vec![
232 "Wrap critical awaits with queue(...).await_on(...), and use stage(...).await_on(...) for Result-returning work or stage(...).await_value(...) for infallible work.".to_string(),
233 "Enable RuntimeSampler during the run to capture runtime pressure signals."
234 .to_string(),
235 ],
236 ));
237 }
238
239 suspects.sort_by(|left, right| right.score.cmp(&left.score));
240
241 let mut ranked = suspects.into_iter();
242 let primary_suspect = ranked.next().unwrap_or_else(|| {
243 Suspect::new(
244 DiagnosisKind::InsufficientEvidence,
245 50,
246 vec!["No diagnosis signals were captured for this run.".to_string()],
247 vec!["Verify that request, queue, or stage instrumentation is enabled.".to_string()],
248 )
249 });
250
251 Report {
252 request_count: run.requests.len(),
253 p50_latency_us,
254 p95_latency_us,
255 p99_latency_us,
256 p95_queue_share_permille,
257 p95_service_share_permille,
258 inflight_trend,
259 warnings: truncation_warnings(run),
260 primary_suspect,
261 secondary_suspects: ranked.collect(),
262 }
263}
264
265fn truncation_warnings(run: &Run) -> Vec<String> {
266 let mut warnings = Vec::new();
267 if run.truncation.dropped_requests > 0 {
268 warnings.push(format!(
269 "Capture truncated requests: dropped {} request events after reaching the configured max_requests limit.",
270 run.truncation.dropped_requests
271 ));
272 }
273 if run.truncation.dropped_stages > 0 {
274 warnings.push(format!(
275 "Capture truncated stages: dropped {} stage events after reaching the configured max_stages limit.",
276 run.truncation.dropped_stages
277 ));
278 }
279 if run.truncation.dropped_queues > 0 {
280 warnings.push(format!(
281 "Capture truncated queues: dropped {} queue events after reaching the configured max_queues limit.",
282 run.truncation.dropped_queues
283 ));
284 }
285 if run.truncation.dropped_inflight_snapshots > 0 {
286 warnings.push(format!(
287 "Capture truncated in-flight snapshots: dropped {} entries after reaching max_inflight_snapshots.",
288 run.truncation.dropped_inflight_snapshots
289 ));
290 }
291 if run.truncation.dropped_runtime_snapshots > 0 {
292 warnings.push(format!(
293 "Capture truncated runtime snapshots: dropped {} entries after reaching max_runtime_snapshots.",
294 run.truncation.dropped_runtime_snapshots
295 ));
296 }
297 warnings
298}
299
300fn queue_saturation_suspect(run: &Run, inflight_trend: Option<&InflightTrend>) -> Option<Suspect> {
301 let (queue_shares, _) = request_time_shares(run);
302 let p95_queue_share_permille = percentile(&queue_shares, 95, 100)?;
303 let max_depth = run
304 .queues
305 .iter()
306 .filter_map(|queue| queue.depth_at_start)
307 .max();
308
309 if p95_queue_share_permille < 300 {
310 return None;
311 }
312
313 let whole_percent = p95_queue_share_permille / 10;
314 let tenth_percent = p95_queue_share_permille % 10;
315 let mut evidence = vec![format!(
316 "Queue wait at p95 consumes {whole_percent}.{tenth_percent}% of request time."
317 )];
318
319 if let Some(depth) = max_depth {
320 evidence.push(format!("Observed queue depth sample up to {depth}."));
321 }
322 if let Some(trend) = inflight_trend.filter(|trend| trend.growth_delta > 0) {
323 evidence.push(format!(
324 "In-flight gauge '{}' grew by {} over the run window (p95={}, peak={}).",
325 trend.gauge, trend.growth_delta, trend.p95_count, trend.peak_count
326 ));
327 }
328
329 Some(Suspect::new(
330 DiagnosisKind::ApplicationQueueSaturation,
331 90,
332 evidence,
333 vec![
334 "Inspect queue admission limits and producer burst patterns.".to_string(),
335 "Compare queue wait distribution before and after increasing worker parallelism."
336 .to_string(),
337 ],
338 ))
339}
340
341fn blocking_pressure_suspect(run: &Run) -> Option<Suspect> {
342 let blocking_depths = runtime_metric_series(&run.runtime_snapshots, |snapshot| {
343 snapshot.blocking_queue_depth
344 });
345 let p95_blocking_depth = percentile(&blocking_depths, 95, 100)?;
346
347 if p95_blocking_depth == 0 {
348 return None;
349 }
350
351 Some(Suspect::new(
352 DiagnosisKind::BlockingPoolPressure,
353 80,
354 vec![format!(
355 "Blocking queue depth p95 is {p95_blocking_depth}, indicating sustained spawn_blocking backlog."
356 )],
357 vec![
358 "Audit blocking sections and move avoidable synchronous work out of hot paths."
359 .to_string(),
360 "Inspect spawn_blocking callsites for long-running CPU or I/O work.".to_string(),
361 ],
362 ))
363}
364
365fn executor_pressure_suspect(run: &Run, inflight_trend: Option<&InflightTrend>) -> Option<Suspect> {
366 let global_queue_depths = runtime_metric_series(&run.runtime_snapshots, |snapshot| {
367 snapshot.global_queue_depth
368 });
369 let p95_global_depth = percentile(&global_queue_depths, 95, 100)?;
370
371 if p95_global_depth == 0 {
372 return None;
373 }
374
375 let mut evidence = vec![format!(
376 "Runtime global queue depth p95 is {p95_global_depth}, suggesting scheduler contention."
377 )];
378 let positive_growth = inflight_trend.is_some_and(|trend| trend.growth_delta > 0);
379 if let Some(trend) = inflight_trend.filter(|trend| trend.growth_delta > 0) {
380 evidence.push(format!(
381 "In-flight gauge '{}' growth is positive (delta={}, peak={}), consistent with accumulating executor pressure.",
382 trend.gauge, trend.growth_delta, trend.peak_count
383 ));
384 }
385
386 let depth_bonus = if p95_global_depth >= 300 {
387 20
388 } else if p95_global_depth >= 200 {
389 12
390 } else if p95_global_depth >= 100 {
391 6
392 } else {
393 0
394 };
395 let trend_bonus = if positive_growth { 5 } else { 0 };
396 let score = (65 + depth_bonus + trend_bonus).min(90);
397
398 Some(Suspect::new(
399 DiagnosisKind::ExecutorPressureSuspected,
400 score,
401 evidence,
402 vec![
403 "Check for long polls without yielding and uneven task fan-out.".to_string(),
404 "Compare with per-stage timings to isolate overloaded async stages.".to_string(),
405 ],
406 ))
407}
408
409fn downstream_stage_suspect(run: &Run) -> Option<Suspect> {
410 let mut stage_totals: BTreeMap<&str, u64> = BTreeMap::new();
411 for stage in &run.stages {
412 *stage_totals.entry(stage.stage.as_str()).or_default() = stage_totals
413 .get(stage.stage.as_str())
414 .copied()
415 .unwrap_or_default()
416 .saturating_add(stage.latency_us);
417 }
418
419 let (dominant_stage, total_latency) = stage_totals
420 .iter()
421 .max_by(|left, right| left.1.cmp(right.1).then_with(|| right.0.cmp(left.0)))
422 .map(|(stage, latency)| (*stage, *latency))?;
423
424 let stage_count = run
425 .stages
426 .iter()
427 .filter(|stage| stage.stage == dominant_stage)
428 .count();
429 let stage_latencies = run
430 .stages
431 .iter()
432 .filter(|stage| stage.stage == dominant_stage)
433 .map(|stage| stage.latency_us)
434 .collect::<Vec<_>>();
435 let stage_p95 = percentile(&stage_latencies, 95, 100)?;
436 let total_request_latency = run
437 .requests
438 .iter()
439 .map(|request| request.latency_us)
440 .fold(0_u64, u64::saturating_add);
441 let stage_share_permille = if total_request_latency == 0 {
442 0
443 } else {
444 total_latency.saturating_mul(1_000) / total_request_latency
445 };
446 let share_bonus = (stage_share_permille / 40).min(25) as u8;
447 let score = (55 + share_bonus).min(79);
448
449 if stage_count < 3 {
450 return None;
451 }
452
453 Some(Suspect::new(
454 DiagnosisKind::DownstreamStageDominates,
455 score,
456 vec![
457 format!(
458 "Stage '{dominant_stage}' has p95 latency {stage_p95} us across {stage_count} samples."
459 ),
460 format!("Stage '{dominant_stage}' cumulative latency is {total_latency} us."),
461 format!(
462 "Stage '{dominant_stage}' contributes {stage_share_permille} permille of cumulative request latency."
463 ),
464 ],
465 vec![
466 format!("Inspect downstream dependency behind stage '{dominant_stage}'."),
467 "Collect downstream service timings and retry behavior during tail windows.".to_string(),
468 "Review downstream SLO/error budget and align retry budget/backoff with it.".to_string(),
469 ],
470 ))
471}
472
473fn request_time_shares(run: &Run) -> (Vec<u64>, Vec<u64>) {
474 let mut total_queue_wait_by_request = HashMap::<&str, u64>::new();
475 for queue in &run.queues {
476 *total_queue_wait_by_request
477 .entry(queue.request_id.as_str())
478 .or_default() = total_queue_wait_by_request
479 .get(queue.request_id.as_str())
480 .copied()
481 .unwrap_or_default()
482 .saturating_add(queue.wait_us);
483 }
484
485 let mut queue_shares = Vec::new();
486 let mut service_shares = Vec::new();
487
488 for request in &run.requests {
489 if request.latency_us == 0 {
490 continue;
491 }
492
493 let queue_wait = total_queue_wait_by_request
494 .get(request.request_id.as_str())
495 .copied()
496 .unwrap_or_default()
497 .min(request.latency_us);
498 let service_time = request.latency_us.saturating_sub(queue_wait);
499
500 queue_shares.push(queue_wait.saturating_mul(1_000) / request.latency_us);
501 service_shares.push(service_time.saturating_mul(1_000) / request.latency_us);
502 }
503
504 (queue_shares, service_shares)
505}
506
507fn runtime_metric_series(
508 snapshots: &[RuntimeSnapshot],
509 selector: impl Fn(&RuntimeSnapshot) -> Option<u64>,
510) -> Vec<u64> {
511 snapshots.iter().filter_map(selector).collect::<Vec<_>>()
512}
513
514fn dominant_inflight_trend(snapshots: &[InFlightSnapshot]) -> Option<InflightTrend> {
515 let mut by_gauge: BTreeMap<&str, Vec<&InFlightSnapshot>> = BTreeMap::new();
516 for snapshot in snapshots {
517 by_gauge
518 .entry(snapshot.gauge.as_str())
519 .or_default()
520 .push(snapshot);
521 }
522
523 by_gauge
524 .into_iter()
525 .filter_map(|(gauge, samples)| inflight_trend_for_gauge(gauge, samples))
526 .max_by(|left, right| {
527 left.peak_count
528 .cmp(&right.peak_count)
529 .then_with(|| left.p95_count.cmp(&right.p95_count))
530 .then_with(|| left.gauge.cmp(&right.gauge).reverse())
531 })
532}
533
534fn inflight_trend_for_gauge(
535 gauge: &str,
536 mut samples: Vec<&InFlightSnapshot>,
537) -> Option<InflightTrend> {
538 if samples.is_empty() {
539 return None;
540 }
541
542 samples.sort_unstable_by(|left, right| {
543 left.at_unix_ms
544 .cmp(&right.at_unix_ms)
545 .then_with(|| left.count.cmp(&right.count))
546 });
547
548 let counts = samples
549 .iter()
550 .map(|sample| sample.count)
551 .collect::<Vec<_>>();
552 let first = samples.first()?;
553 let last = samples.last()?;
554 let growth_delta = signed_u64_delta(first.count, last.count);
555 let window_ms = last.at_unix_ms.saturating_sub(first.at_unix_ms);
556 let growth_per_sec_milli = if window_ms == 0 {
557 None
558 } else {
559 i64::try_from(window_ms)
560 .ok()
561 .map(|window_ms_i64| growth_delta.saturating_mul(1_000_000) / window_ms_i64)
562 };
563
564 Some(InflightTrend {
565 gauge: gauge.to_owned(),
566 sample_count: counts.len(),
567 peak_count: counts.iter().copied().max().unwrap_or(0),
568 p95_count: percentile(&counts, 95, 100).unwrap_or(0),
569 growth_delta,
570 growth_per_sec_milli,
571 })
572}
573
574fn signed_u64_delta(start: u64, end: u64) -> i64 {
575 if end >= start {
576 i64::try_from(end - start).unwrap_or(i64::MAX)
577 } else {
578 -i64::try_from(start - end).unwrap_or(i64::MAX)
579 }
580}
581
582fn percentile(values: &[u64], numerator: usize, denominator: usize) -> Option<u64> {
583 let sorted = sorted_u64(values);
584 percentile_sorted_u64(&sorted, numerator, denominator)
585}
586
587fn sorted_u64(values: &[u64]) -> Vec<u64> {
588 let mut sorted = values.to_vec();
589 sorted.sort_unstable();
590 sorted
591}
592
593fn percentile_sorted_u64(values: &[u64], numerator: usize, denominator: usize) -> Option<u64> {
594 if values.is_empty() {
595 return None;
596 }
597 if denominator == 0 {
598 return None;
599 }
600
601 let max_index = values.len().saturating_sub(1);
602 let index = max_index
603 .saturating_mul(numerator)
604 .div_ceil(denominator)
605 .min(max_index);
606 values.get(index).copied()
607}
608
609#[must_use]
610pub fn render_text(report: &Report) -> String {
614 let inflight_line = match &report.inflight_trend {
615 Some(trend) => format!(
616 "inflight_trend gauge={} samples={} peak={} p95={} growth_delta={} growth_per_sec_milli={:?}",
617 trend.gauge,
618 trend.sample_count,
619 trend.peak_count,
620 trend.p95_count,
621 trend.growth_delta,
622 trend.growth_per_sec_milli
623 ),
624 None => "inflight_trend none".to_string(),
625 };
626
627 let mut lines = vec![
628 "tailtriage diagnosis".to_string(),
629 format!("requests: {}", report.request_count),
630 format!(
631 "latency_us p50={:?} p95={:?} p99={:?}",
632 report.p50_latency_us, report.p95_latency_us, report.p99_latency_us
633 ),
634 format!(
635 "request_time_share_permille p95 queue={:?} service={:?} (independent percentiles; not expected to sum to 1000)",
636 report.p95_queue_share_permille, report.p95_service_share_permille
637 ),
638 inflight_line,
639 format!(
640 "primary: {} (confidence={:?}, score={})",
641 report.primary_suspect.kind.as_str(),
642 report.primary_suspect.confidence,
643 report.primary_suspect.score
644 ),
645 ];
646 for warning in &report.warnings {
647 lines.push(format!("warning {warning}"));
648 }
649
650 for evidence in &report.primary_suspect.evidence {
651 lines.push(format!(" evidence: {evidence}"));
652 }
653
654 for next_check in &report.primary_suspect.next_checks {
655 lines.push(format!(" next: {next_check}"));
656 }
657
658 if !report.secondary_suspects.is_empty() {
659 lines.push("secondary suspects:".to_string());
660 for suspect in &report.secondary_suspects {
661 lines.push(format!(
662 " - {} (confidence={:?}, score={})",
663 suspect.kind.as_str(),
664 suspect.confidence,
665 suspect.score
666 ));
667 }
668 }
669
670 lines.join("\n")
671}
672
673#[cfg(test)]
674mod tests {
675 use tailtriage_core::{
676 CaptureMode, RequestEvent, Run, RunMetadata, StageEvent, SCHEMA_VERSION,
677 };
678
679 use crate::analyze::{
680 analyze_run, render_text, Confidence, DiagnosisKind, InflightTrend, Report, Suspect,
681 };
682
683 fn test_run() -> Run {
684 Run {
685 schema_version: SCHEMA_VERSION,
686 metadata: RunMetadata {
687 run_id: "run-1".to_owned(),
688 service_name: "svc".to_owned(),
689 service_version: None,
690 started_at_unix_ms: 1,
691 finished_at_unix_ms: 2,
692 mode: CaptureMode::Light,
693 host: None,
694 pid: Some(1),
695 lifecycle_warnings: Vec::new(),
696 unfinished_requests: tailtriage_core::UnfinishedRequests::default(),
697 },
698 requests: vec![
699 RequestEvent {
700 request_id: "req-1".to_owned(),
701 route: "/test".to_owned(),
702 kind: None,
703 started_at_unix_ms: 1,
704 finished_at_unix_ms: 2,
705 latency_us: 1_000,
706 outcome: "ok".to_owned(),
707 },
708 RequestEvent {
709 request_id: "req-2".to_owned(),
710 route: "/test".to_owned(),
711 kind: None,
712 started_at_unix_ms: 2,
713 finished_at_unix_ms: 3,
714 latency_us: 1_000,
715 outcome: "ok".to_owned(),
716 },
717 RequestEvent {
718 request_id: "req-3".to_owned(),
719 route: "/test".to_owned(),
720 kind: None,
721 started_at_unix_ms: 3,
722 finished_at_unix_ms: 4,
723 latency_us: 1_000,
724 outcome: "ok".to_owned(),
725 },
726 ],
727 stages: Vec::new(),
728 queues: Vec::new(),
729 inflight: Vec::new(),
730 runtime_snapshots: Vec::new(),
731 truncation: tailtriage_core::TruncationSummary::default(),
732 }
733 }
734
735 #[test]
736 fn downstream_stage_tie_break_is_deterministic() {
737 let mut run = test_run();
738 run.stages = vec![
739 StageEvent {
740 request_id: "req-1".to_owned(),
741 stage: "stage_a".to_owned(),
742 started_at_unix_ms: 1,
743 finished_at_unix_ms: 2,
744 latency_us: 300,
745 success: true,
746 },
747 StageEvent {
748 request_id: "req-2".to_owned(),
749 stage: "stage_a".to_owned(),
750 started_at_unix_ms: 2,
751 finished_at_unix_ms: 3,
752 latency_us: 300,
753 success: true,
754 },
755 StageEvent {
756 request_id: "req-3".to_owned(),
757 stage: "stage_a".to_owned(),
758 started_at_unix_ms: 3,
759 finished_at_unix_ms: 4,
760 latency_us: 300,
761 success: true,
762 },
763 StageEvent {
764 request_id: "req-1".to_owned(),
765 stage: "stage_b".to_owned(),
766 started_at_unix_ms: 1,
767 finished_at_unix_ms: 2,
768 latency_us: 300,
769 success: true,
770 },
771 StageEvent {
772 request_id: "req-2".to_owned(),
773 stage: "stage_b".to_owned(),
774 started_at_unix_ms: 2,
775 finished_at_unix_ms: 3,
776 latency_us: 300,
777 success: true,
778 },
779 StageEvent {
780 request_id: "req-3".to_owned(),
781 stage: "stage_b".to_owned(),
782 started_at_unix_ms: 3,
783 finished_at_unix_ms: 4,
784 latency_us: 300,
785 success: true,
786 },
787 ];
788
789 let report = analyze_run(&run);
790 assert_eq!(
791 report.primary_suspect.kind,
792 DiagnosisKind::DownstreamStageDominates
793 );
794 assert!(
795 report.primary_suspect.evidence[0].contains("stage_a"),
796 "expected deterministic stage tie-breaker to choose stage_a, got {:?}",
797 report.primary_suspect.evidence
798 );
799 }
800
801 #[test]
802 fn inflight_trend_is_none_for_empty_series() {
803 assert!(super::dominant_inflight_trend(&[]).is_none());
804 }
805
806 #[test]
807 fn inflight_trend_handles_constant_series() {
808 let trend = super::dominant_inflight_trend(&[
809 tailtriage_core::InFlightSnapshot {
810 gauge: "http".to_owned(),
811 at_unix_ms: 10,
812 count: 3,
813 },
814 tailtriage_core::InFlightSnapshot {
815 gauge: "http".to_owned(),
816 at_unix_ms: 20,
817 count: 3,
818 },
819 ])
820 .expect("trend should exist");
821
822 assert_eq!(trend.peak_count, 3);
823 assert_eq!(trend.p95_count, 3);
824 assert_eq!(trend.growth_delta, 0);
825 }
826
827 #[test]
828 fn inflight_trend_handles_monotonic_increase() {
829 let trend = super::dominant_inflight_trend(&[
830 tailtriage_core::InFlightSnapshot {
831 gauge: "http".to_owned(),
832 at_unix_ms: 10,
833 count: 1,
834 },
835 tailtriage_core::InFlightSnapshot {
836 gauge: "http".to_owned(),
837 at_unix_ms: 20,
838 count: 4,
839 },
840 tailtriage_core::InFlightSnapshot {
841 gauge: "http".to_owned(),
842 at_unix_ms: 30,
843 count: 6,
844 },
845 ])
846 .expect("trend should exist");
847
848 assert_eq!(trend.peak_count, 6);
849 assert_eq!(trend.p95_count, 6);
850 assert_eq!(trend.growth_delta, 5);
851 assert_eq!(trend.growth_per_sec_milli, Some(250_000));
852 }
853
854 #[test]
855 fn render_text_formats_inflight_trend_fields() {
856 let report = Report {
857 request_count: 2,
858 p50_latency_us: Some(10),
859 p95_latency_us: Some(20),
860 p99_latency_us: Some(20),
861 p95_queue_share_permille: Some(100),
862 p95_service_share_permille: Some(900),
863 inflight_trend: Some(InflightTrend {
864 gauge: "queue_inflight".to_owned(),
865 sample_count: 4,
866 peak_count: 8,
867 p95_count: 7,
868 growth_delta: 5,
869 growth_per_sec_milli: Some(2_500),
870 }),
871 warnings: Vec::new(),
872 primary_suspect: Suspect {
873 kind: DiagnosisKind::ApplicationQueueSaturation,
874 score: 90,
875 confidence: Confidence::High,
876 evidence: vec!["queue wait high".to_owned()],
877 next_checks: vec!["check queue policy".to_owned()],
878 },
879 secondary_suspects: Vec::new(),
880 };
881
882 let text = render_text(&report);
883 assert!(text.contains("inflight_trend gauge=queue_inflight"));
884 assert!(text.contains("samples=4"));
885 assert!(text.contains("growth_per_sec_milli=Some(2500)"));
886 assert!(text.contains("independent percentiles; not expected to sum to 1000"));
887 }
888
889 #[test]
890 fn render_text_marks_missing_inflight_trend() {
891 let report = Report {
892 request_count: 0,
893 p50_latency_us: None,
894 p95_latency_us: None,
895 p99_latency_us: None,
896 p95_queue_share_permille: None,
897 p95_service_share_permille: None,
898 inflight_trend: None,
899 warnings: vec!["Capture truncated requests.".to_owned()],
900 primary_suspect: Suspect {
901 kind: DiagnosisKind::InsufficientEvidence,
902 score: 50,
903 confidence: Confidence::Low,
904 evidence: vec!["missing signals".to_owned()],
905 next_checks: vec!["add instrumentation".to_owned()],
906 },
907 secondary_suspects: Vec::new(),
908 };
909
910 let text = render_text(&report);
911 assert!(text.contains("inflight_trend none"));
912 assert!(text.contains("warning Capture truncated requests."));
913 }
914
915 #[test]
916 fn analyze_run_emits_truncation_warnings() {
917 let mut run = test_run();
918 run.truncation.dropped_requests = 2;
919 run.truncation.dropped_runtime_snapshots = 1;
920
921 let report = analyze_run(&run);
922 assert_eq!(report.warnings.len(), 2);
923 assert!(report
924 .warnings
925 .iter()
926 .any(|warning| warning.contains("dropped 2 request events")));
927 assert!(report
928 .warnings
929 .iter()
930 .any(|warning| warning.contains("dropped 1 entries")));
931 }
932}