1#![doc = include_str!("../README.md")]
2#![warn(missing_docs)]
3
4use std::collections::{BTreeMap, HashMap};
5
6use serde::{Serialize, Serializer};
7
8mod confidence;
9mod evidence;
10mod route;
11mod scoring;
12mod temporal;
13
14pub use evidence::{EvidenceQuality, EvidenceQualityLevel, SignalCoverageStatus};
15use tailtriage_core::{InFlightSnapshot, Run, RuntimeSnapshot};
16
17const LOW_COMPLETED_REQUEST_THRESHOLD: usize = 20;
18const QUEUE_SHARE_TRIGGER_PERMILLE: u64 = 300;
19const MEDIUM_CONFIDENCE_SCORE_THRESHOLD: u8 = 65;
20const HIGH_CONFIDENCE_SCORE_THRESHOLD: u8 = 85;
21const AMBIGUITY_MIN_SCORE_THRESHOLD: u8 = 60;
22const AMBIGUITY_SCORE_GAP_THRESHOLD: u8 = 4;
23const ROUTE_MIN_REQUEST_COUNT: usize = 3;
24const ROUTE_BREAKDOWN_LIMIT: usize = 10;
25const TEMPORAL_MIN_REQUEST_COUNT: usize = 20;
26const TEMPORAL_MIN_SEGMENT_REQUEST_COUNT: usize = 8;
27const TEMPORAL_SHARE_SHIFT_PERMILLE: u64 = 200;
28const ROUTE_DIVERGENCE_WARNING: &str =
29 "Different routes show different primary suspects; inspect route_breakdowns before acting on the global suspect.";
30const ROUTE_RUNTIME_ATTRIBUTION_WARNING: &str =
31 "Runtime and in-flight signals are global and are not attributed to this route.";
32
33#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum DiagnosisKind {
38 ApplicationQueueSaturation,
40 BlockingPoolPressure,
42 ExecutorPressureSuspected,
44 DownstreamStageDominates,
46 InsufficientEvidence,
48}
49
50impl DiagnosisKind {
51 #[must_use]
53 pub const fn as_str(&self) -> &'static str {
54 match self {
55 Self::ApplicationQueueSaturation => "application_queue_saturation",
56 Self::BlockingPoolPressure => "blocking_pool_pressure",
57 Self::ExecutorPressureSuspected => "executor_pressure_suspected",
58 Self::DownstreamStageDominates => "downstream_stage_dominates",
59 Self::InsufficientEvidence => "insufficient_evidence",
60 }
61 }
62}
63
64impl Serialize for DiagnosisKind {
65 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
66 where
67 S: Serializer,
68 {
69 serializer.serialize_str(self.as_str())
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
74#[serde(rename_all = "snake_case")]
75pub enum Confidence {
79 Low,
81 Medium,
83 High,
85}
86
87impl Confidence {
88 fn from_score(score: u8) -> Self {
89 if score >= HIGH_CONFIDENCE_SCORE_THRESHOLD {
90 Self::High
91 } else if score >= MEDIUM_CONFIDENCE_SCORE_THRESHOLD {
92 Self::Medium
93 } else {
94 Self::Low
95 }
96 }
97}
98
99#[derive(Debug, Clone, PartialEq, Serialize)]
103pub struct Suspect {
104 pub kind: DiagnosisKind,
106 pub score: u8,
108 pub confidence: Confidence,
110 pub evidence: Vec<String>,
112 pub next_checks: Vec<String>,
114 pub confidence_notes: Vec<String>,
116}
117
118impl Suspect {
119 fn new(
120 kind: DiagnosisKind,
121 score: u8,
122 evidence: Vec<String>,
123 next_checks: Vec<String>,
124 ) -> Self {
125 Self {
126 kind,
127 score,
128 confidence: Confidence::from_score(score),
129 evidence,
130 next_checks,
131 confidence_notes: Vec::new(),
132 }
133 }
134}
135
136#[derive(Debug, Clone, PartialEq, Serialize)]
138pub struct InflightTrend {
139 pub gauge: String,
141 pub sample_count: usize,
143 pub peak_count: u64,
145 pub p95_count: u64,
147 pub growth_delta: i64,
149 pub growth_per_sec_milli: Option<i64>,
151}
152
153#[derive(Debug, Clone, PartialEq, Serialize)]
158pub struct Report {
159 pub request_count: usize,
161 pub p50_latency_us: Option<u64>,
163 pub p95_latency_us: Option<u64>,
165 pub p99_latency_us: Option<u64>,
167 pub p95_queue_share_permille: Option<u64>,
169 pub p95_service_share_permille: Option<u64>,
171 pub inflight_trend: Option<InflightTrend>,
173 pub warnings: Vec<String>,
175 pub evidence_quality: EvidenceQuality,
177 pub primary_suspect: Suspect,
179 pub secondary_suspects: Vec<Suspect>,
181 pub route_breakdowns: Vec<RouteBreakdown>,
183 pub temporal_segments: Vec<TemporalSegment>,
185}
186
187#[derive(Debug, Clone, PartialEq, Serialize)]
188pub struct TemporalSegment {
190 pub name: String,
192 pub request_count: usize,
194 pub started_at_unix_ms: Option<u64>,
196 pub finished_at_unix_ms: Option<u64>,
198 pub p50_latency_us: Option<u64>,
200 pub p95_latency_us: Option<u64>,
202 pub p99_latency_us: Option<u64>,
204 pub p95_queue_share_permille: Option<u64>,
206 pub p95_service_share_permille: Option<u64>,
208 pub evidence_quality: EvidenceQuality,
210 pub primary_suspect: Suspect,
212 pub secondary_suspects: Vec<Suspect>,
214 pub warnings: Vec<String>,
216}
217
218#[derive(Debug, Clone, PartialEq, Serialize)]
219pub struct RouteBreakdown {
221 pub route: String,
223 pub request_count: usize,
225 pub p50_latency_us: Option<u64>,
227 pub p95_latency_us: Option<u64>,
229 pub p99_latency_us: Option<u64>,
231 pub p95_queue_share_permille: Option<u64>,
233 pub p95_service_share_permille: Option<u64>,
235 pub evidence_quality: EvidenceQuality,
237 pub primary_suspect: Suspect,
239 pub secondary_suspects: Vec<Suspect>,
241 pub warnings: Vec<String>,
243}
244
245#[must_use]
295pub fn analyze_run(run: &Run, options: AnalyzeOptions) -> Report {
296 Analyzer::new(options).analyze_run(run)
297}
298
299#[must_use = "The rendered JSON string should be used for output or transport."]
307pub fn render_json(report: &Report) -> Result<String, serde_json::Error> {
308 serde_json::to_string(report)
309}
310
311#[must_use = "The rendered JSON string should be used for output or transport."]
320pub fn render_json_pretty(report: &Report) -> Result<String, serde_json::Error> {
321 serde_json::to_string_pretty(report)
322}
323
324#[must_use = "The rendered JSON string should be used for output or transport."]
333pub fn analyze_run_json(
334 run: &tailtriage_core::Run,
335 options: AnalyzeOptions,
336) -> Result<String, serde_json::Error> {
337 let report = analyze_run(run, options);
338 render_json(&report)
339}
340
341#[must_use = "The rendered JSON string should be used for output or transport."]
350pub fn analyze_run_json_pretty(
351 run: &tailtriage_core::Run,
352 options: AnalyzeOptions,
353) -> Result<String, serde_json::Error> {
354 let report = analyze_run(run, options);
355 render_json_pretty(&report)
356}
357
358#[non_exhaustive]
360#[derive(Debug, Clone, Default)]
361pub struct AnalyzeOptions {}
362
363#[derive(Debug, Clone, Default)]
365pub struct Analyzer {
366 options: AnalyzeOptions,
367}
368
369impl Analyzer {
370 #[must_use]
372 pub const fn new(options: AnalyzeOptions) -> Self {
373 Self { options }
374 }
375
376 #[must_use]
378 pub fn analyze_run(&self, run: &Run) -> Report {
379 analyze_run_with_options(run, &self.options)
380 }
381}
382
383fn analyze_run_with_options(run: &Run, _options: &AnalyzeOptions) -> Report {
384 let mut report = analyze_run_internal(run);
385 let route_context = route::route_breakdowns(run, &report);
386 if route_context.divergent {
387 report.warnings.push(ROUTE_DIVERGENCE_WARNING.to_string());
388 }
389 report.route_breakdowns = route_context.breakdowns;
390 report.temporal_segments = temporal::temporal_segments(run, &mut report.warnings);
391 report
392}
393
394fn analyze_run_internal(run: &Run) -> Report {
395 let request_latencies = run
396 .requests
397 .iter()
398 .map(|request| request.latency_us)
399 .collect::<Vec<_>>();
400
401 let p50_latency_us = percentile(&request_latencies, 50, 100);
402 let p95_latency_us = percentile(&request_latencies, 95, 100);
403 let p99_latency_us = percentile(&request_latencies, 99, 100);
404 let (queue_shares, service_shares) = request_time_shares(run);
405 let p95_queue_share_permille = percentile(&queue_shares, 95, 100);
406 let p95_service_share_permille = percentile(&service_shares, 95, 100);
407 let inflight_trend = dominant_inflight_trend(&run.inflight);
408
409 let mut suspects = Vec::new();
410
411 if let Some(queue_suspect) = scoring::queue_saturation_suspect(run, inflight_trend.as_ref()) {
412 suspects.push(queue_suspect);
413 }
414
415 if let Some(blocking_suspect) = scoring::blocking_pressure_suspect(run) {
416 suspects.push(blocking_suspect);
417 }
418
419 if let Some(executor_suspect) = scoring::executor_pressure_suspect(run, inflight_trend.as_ref())
420 {
421 suspects.push(executor_suspect);
422 }
423
424 if let Some(stage_suspect) = scoring::downstream_stage_suspect(run) {
425 suspects.push(stage_suspect);
426 }
427
428 if suspects.is_empty() {
429 suspects.push(Suspect::new(
430 DiagnosisKind::InsufficientEvidence,
431 50,
432 vec![
433 "Not enough queue, stage, or runtime signals to rank a stronger suspect."
434 .to_string(),
435 ],
436 vec![
437 "Wrap critical awaits with queue(...).await_on(...), and use stage(...).await_on(...) for Result-returning work or stage(...).await_value(...) for infallible work.".to_string(),
438 "Enable RuntimeSampler during the run to capture runtime pressure signals."
439 .to_string(),
440 ],
441 ));
442 }
443
444 suspects.sort_by_key(|suspect| std::cmp::Reverse(suspect.score));
445
446 let warnings = analysis_warnings(run, &suspects);
447 let evidence_quality = evidence::evidence_quality(run);
448
449 confidence::apply_evidence_aware_confidence_caps(&mut suspects, run, &evidence_quality);
450
451 let mut ranked = suspects.into_iter();
452 let primary_suspect = ranked.next().unwrap_or_else(|| {
453 Suspect::new(
454 DiagnosisKind::InsufficientEvidence,
455 50,
456 vec!["No diagnosis signals were captured for this run.".to_string()],
457 vec!["Verify that request, queue, or stage instrumentation is enabled.".to_string()],
458 )
459 });
460
461 Report {
462 request_count: run.requests.len(),
463 p50_latency_us,
464 p95_latency_us,
465 p99_latency_us,
466 p95_queue_share_permille,
467 p95_service_share_permille,
468 inflight_trend,
469 warnings,
470 evidence_quality,
471 primary_suspect,
472 secondary_suspects: ranked.collect(),
473 route_breakdowns: Vec::new(),
474 temporal_segments: Vec::new(),
475 }
476}
477
478fn ambiguity_warning(suspects: &[Suspect]) -> Option<String> {
479 let mut ranked = suspects
480 .iter()
481 .filter(|s| s.kind != DiagnosisKind::InsufficientEvidence)
482 .collect::<Vec<_>>();
483 ranked.sort_by_key(|s| std::cmp::Reverse(s.score));
484 if ranked.len() >= 2
485 && ranked[0].score >= AMBIGUITY_MIN_SCORE_THRESHOLD
486 && ranked[1].score >= AMBIGUITY_MIN_SCORE_THRESHOLD
487 && ranked[0].score.abs_diff(ranked[1].score) <= AMBIGUITY_SCORE_GAP_THRESHOLD
488 {
489 Some("Top suspects are close in score; treat ranking as ambiguous and validate both with next checks.".to_string())
490 } else {
491 None
492 }
493}
494
495fn analysis_warnings(run: &Run, suspects: &[Suspect]) -> Vec<String> {
496 let mut warnings = evidence::truncation_warnings(run);
497 if run.requests.len() < LOW_COMPLETED_REQUEST_THRESHOLD {
498 warnings.push(
499 "Low completed-request count; diagnosis ranking may be unstable for this run window."
500 .to_string(),
501 );
502 }
503 let primary_kind = suspects.first().map(|s| &s.kind);
504 if run.queues.is_empty()
505 && primary_kind.is_some_and(|kind| *kind == DiagnosisKind::ApplicationQueueSaturation)
506 {
507 warnings.push(
508 "No queue events captured; queue saturation interpretation is limited.".to_string(),
509 );
510 }
511 if run.stages.is_empty()
512 && primary_kind.is_some_and(|kind| *kind == DiagnosisKind::DownstreamStageDominates)
513 {
514 warnings.push(
515 "No stage events captured; downstream-stage interpretation is limited.".to_string(),
516 );
517 }
518 let runtime_distinction_relevant = suspects.iter().any(|s| {
519 s.kind == DiagnosisKind::BlockingPoolPressure
520 || s.kind == DiagnosisKind::ExecutorPressureSuspected
521 });
522 let strong_non_runtime_primary = suspects.first().is_some_and(|s| {
523 (s.kind == DiagnosisKind::ApplicationQueueSaturation
524 || s.kind == DiagnosisKind::DownstreamStageDominates)
525 && s.score >= 85
526 });
527
528 if run.runtime_snapshots.is_empty() {
529 if !strong_non_runtime_primary {
530 warnings.push("No runtime snapshots captured; executor and blocking-pressure interpretation is limited.".to_string());
531 }
532 } else if runtime_distinction_relevant
533 && (run
534 .runtime_snapshots
535 .iter()
536 .all(|s| s.blocking_queue_depth.is_none())
537 || run
538 .runtime_snapshots
539 .iter()
540 .all(|s| s.local_queue_depth.is_none()))
541 {
542 warnings.push("Runtime snapshots are missing blocking_queue_depth or local_queue_depth; separating executor vs blocking pressure is limited.".to_string());
543 }
544 if let Some(w) = ambiguity_warning(suspects) {
545 warnings.push(w);
546 }
547 warnings
548}
549
550fn request_time_shares(run: &Run) -> (Vec<u64>, Vec<u64>) {
551 let mut total_queue_wait_by_request = HashMap::<&str, u64>::new();
552 for queue in &run.queues {
553 *total_queue_wait_by_request
554 .entry(queue.request_id.as_str())
555 .or_default() = total_queue_wait_by_request
556 .get(queue.request_id.as_str())
557 .copied()
558 .unwrap_or_default()
559 .saturating_add(queue.wait_us);
560 }
561
562 let mut queue_shares = Vec::new();
563 let mut service_shares = Vec::new();
564
565 for request in &run.requests {
566 if request.latency_us == 0 {
567 continue;
568 }
569
570 let queue_wait = total_queue_wait_by_request
571 .get(request.request_id.as_str())
572 .copied()
573 .unwrap_or_default()
574 .min(request.latency_us);
575 let service_time = request.latency_us.saturating_sub(queue_wait);
576
577 queue_shares.push(queue_wait.saturating_mul(1_000) / request.latency_us);
578 service_shares.push(service_time.saturating_mul(1_000) / request.latency_us);
579 }
580
581 (queue_shares, service_shares)
582}
583
584fn runtime_metric_series(
585 snapshots: &[RuntimeSnapshot],
586 selector: impl Fn(&RuntimeSnapshot) -> Option<u64>,
587) -> Vec<u64> {
588 snapshots.iter().filter_map(selector).collect::<Vec<_>>()
589}
590
591fn dominant_inflight_trend(snapshots: &[InFlightSnapshot]) -> Option<InflightTrend> {
592 let mut by_gauge: BTreeMap<&str, Vec<&InFlightSnapshot>> = BTreeMap::new();
593 for snapshot in snapshots {
594 by_gauge
595 .entry(snapshot.gauge.as_str())
596 .or_default()
597 .push(snapshot);
598 }
599
600 by_gauge
601 .into_iter()
602 .filter_map(|(gauge, samples)| inflight_trend_for_gauge(gauge, samples))
603 .max_by(|left, right| {
604 left.peak_count
605 .cmp(&right.peak_count)
606 .then_with(|| left.p95_count.cmp(&right.p95_count))
607 .then_with(|| left.gauge.cmp(&right.gauge).reverse())
608 })
609}
610
611fn inflight_trend_for_gauge(
612 gauge: &str,
613 mut samples: Vec<&InFlightSnapshot>,
614) -> Option<InflightTrend> {
615 if samples.is_empty() {
616 return None;
617 }
618
619 samples.sort_unstable_by(|left, right| {
620 left.at_unix_ms
621 .cmp(&right.at_unix_ms)
622 .then_with(|| left.count.cmp(&right.count))
623 });
624
625 let counts = samples
626 .iter()
627 .map(|sample| sample.count)
628 .collect::<Vec<_>>();
629 let first = samples.first()?;
630 let last = samples.last()?;
631 let growth_delta = signed_u64_delta(first.count, last.count);
632 let window_ms = last.at_unix_ms.saturating_sub(first.at_unix_ms);
633 let growth_per_sec_milli = if window_ms == 0 {
634 None
635 } else {
636 i64::try_from(window_ms)
637 .ok()
638 .map(|window_ms_i64| growth_delta.saturating_mul(1_000_000) / window_ms_i64)
639 };
640
641 Some(InflightTrend {
642 gauge: gauge.to_owned(),
643 sample_count: counts.len(),
644 peak_count: counts.iter().copied().max().unwrap_or(0),
645 p95_count: percentile(&counts, 95, 100).unwrap_or(0),
646 growth_delta,
647 growth_per_sec_milli,
648 })
649}
650
651fn signed_u64_delta(start: u64, end: u64) -> i64 {
652 if end >= start {
653 i64::try_from(end - start).unwrap_or(i64::MAX)
654 } else {
655 -i64::try_from(start - end).unwrap_or(i64::MAX)
656 }
657}
658
659fn percentile(values: &[u64], numerator: usize, denominator: usize) -> Option<u64> {
660 let sorted = sorted_u64(values);
661 percentile_sorted_u64(&sorted, numerator, denominator)
662}
663
664fn sorted_u64(values: &[u64]) -> Vec<u64> {
665 let mut sorted = values.to_vec();
666 sorted.sort_unstable();
667 sorted
668}
669
670fn percentile_sorted_u64(values: &[u64], numerator: usize, denominator: usize) -> Option<u64> {
671 if values.is_empty() {
672 return None;
673 }
674 if denominator == 0 {
675 return None;
676 }
677
678 let max_index = values.len().saturating_sub(1);
679 let index = max_index
680 .saturating_mul(numerator)
681 .div_ceil(denominator)
682 .min(max_index);
683 values.get(index).copied()
684}
685
686pub use render::render_text;
687
688mod render;
689
690#[cfg(test)]
691mod tests;