#![doc = include_str!("../README.md")]
#![warn(missing_docs)]
use std::collections::{BTreeMap, HashMap};
use serde::{Serialize, Serializer};
mod confidence;
mod evidence;
mod route;
mod scoring;
mod temporal;
pub use evidence::{EvidenceQuality, EvidenceQualityLevel, SignalCoverageStatus};
use tailtriage_core::{InFlightSnapshot, Run, RuntimeSnapshot};
const LOW_COMPLETED_REQUEST_THRESHOLD: usize = 20;
const QUEUE_SHARE_TRIGGER_PERMILLE: u64 = 300;
const MEDIUM_CONFIDENCE_SCORE_THRESHOLD: u8 = 65;
const HIGH_CONFIDENCE_SCORE_THRESHOLD: u8 = 85;
const AMBIGUITY_MIN_SCORE_THRESHOLD: u8 = 60;
const AMBIGUITY_SCORE_GAP_THRESHOLD: u8 = 4;
const ROUTE_MIN_REQUEST_COUNT: usize = 3;
const ROUTE_BREAKDOWN_LIMIT: usize = 10;
const TEMPORAL_MIN_REQUEST_COUNT: usize = 20;
const TEMPORAL_MIN_SEGMENT_REQUEST_COUNT: usize = 8;
const TEMPORAL_SHARE_SHIFT_PERMILLE: u64 = 200;
const ROUTE_DIVERGENCE_WARNING: &str =
"Different routes show different primary suspects; inspect route_breakdowns before acting on the global suspect.";
const ROUTE_RUNTIME_ATTRIBUTION_WARNING: &str =
"Runtime and in-flight signals are global and are not attributed to this route.";
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiagnosisKind {
ApplicationQueueSaturation,
BlockingPoolPressure,
ExecutorPressureSuspected,
DownstreamStageDominates,
InsufficientEvidence,
}
impl DiagnosisKind {
#[must_use]
pub const fn as_str(&self) -> &'static str {
match self {
Self::ApplicationQueueSaturation => "application_queue_saturation",
Self::BlockingPoolPressure => "blocking_pool_pressure",
Self::ExecutorPressureSuspected => "executor_pressure_suspected",
Self::DownstreamStageDominates => "downstream_stage_dominates",
Self::InsufficientEvidence => "insufficient_evidence",
}
}
}
impl Serialize for DiagnosisKind {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.as_str())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum Confidence {
Low,
Medium,
High,
}
impl Confidence {
fn from_score(score: u8) -> Self {
if score >= HIGH_CONFIDENCE_SCORE_THRESHOLD {
Self::High
} else if score >= MEDIUM_CONFIDENCE_SCORE_THRESHOLD {
Self::Medium
} else {
Self::Low
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct Suspect {
pub kind: DiagnosisKind,
pub score: u8,
pub confidence: Confidence,
pub evidence: Vec<String>,
pub next_checks: Vec<String>,
pub confidence_notes: Vec<String>,
}
impl Suspect {
fn new(
kind: DiagnosisKind,
score: u8,
evidence: Vec<String>,
next_checks: Vec<String>,
) -> Self {
Self {
kind,
score,
confidence: Confidence::from_score(score),
evidence,
next_checks,
confidence_notes: Vec::new(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct InflightTrend {
pub gauge: String,
pub sample_count: usize,
pub peak_count: u64,
pub p95_count: u64,
pub growth_delta: i64,
pub growth_per_sec_milli: Option<i64>,
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct Report {
pub request_count: usize,
pub p50_latency_us: Option<u64>,
pub p95_latency_us: Option<u64>,
pub p99_latency_us: Option<u64>,
pub p95_queue_share_permille: Option<u64>,
pub p95_service_share_permille: Option<u64>,
pub inflight_trend: Option<InflightTrend>,
pub warnings: Vec<String>,
pub evidence_quality: EvidenceQuality,
pub primary_suspect: Suspect,
pub secondary_suspects: Vec<Suspect>,
pub route_breakdowns: Vec<RouteBreakdown>,
pub temporal_segments: Vec<TemporalSegment>,
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct TemporalSegment {
pub name: String,
pub request_count: usize,
pub started_at_unix_ms: Option<u64>,
pub finished_at_unix_ms: Option<u64>,
pub p50_latency_us: Option<u64>,
pub p95_latency_us: Option<u64>,
pub p99_latency_us: Option<u64>,
pub p95_queue_share_permille: Option<u64>,
pub p95_service_share_permille: Option<u64>,
pub evidence_quality: EvidenceQuality,
pub primary_suspect: Suspect,
pub secondary_suspects: Vec<Suspect>,
pub warnings: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct RouteBreakdown {
pub route: String,
pub request_count: usize,
pub p50_latency_us: Option<u64>,
pub p95_latency_us: Option<u64>,
pub p99_latency_us: Option<u64>,
pub p95_queue_share_permille: Option<u64>,
pub p95_service_share_permille: Option<u64>,
pub evidence_quality: EvidenceQuality,
pub primary_suspect: Suspect,
pub secondary_suspects: Vec<Suspect>,
pub warnings: Vec<String>,
}
#[must_use]
pub fn analyze_run(run: &Run, options: AnalyzeOptions) -> Report {
Analyzer::new(options).analyze_run(run)
}
#[must_use = "The rendered JSON string should be used for output or transport."]
pub fn render_json(report: &Report) -> Result<String, serde_json::Error> {
serde_json::to_string(report)
}
#[must_use = "The rendered JSON string should be used for output or transport."]
pub fn render_json_pretty(report: &Report) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(report)
}
#[must_use = "The rendered JSON string should be used for output or transport."]
pub fn analyze_run_json(
run: &tailtriage_core::Run,
options: AnalyzeOptions,
) -> Result<String, serde_json::Error> {
let report = analyze_run(run, options);
render_json(&report)
}
#[must_use = "The rendered JSON string should be used for output or transport."]
pub fn analyze_run_json_pretty(
run: &tailtriage_core::Run,
options: AnalyzeOptions,
) -> Result<String, serde_json::Error> {
let report = analyze_run(run, options);
render_json_pretty(&report)
}
#[non_exhaustive]
#[derive(Debug, Clone, Default)]
pub struct AnalyzeOptions {}
#[derive(Debug, Clone, Default)]
pub struct Analyzer {
options: AnalyzeOptions,
}
impl Analyzer {
#[must_use]
pub const fn new(options: AnalyzeOptions) -> Self {
Self { options }
}
#[must_use]
pub fn analyze_run(&self, run: &Run) -> Report {
analyze_run_with_options(run, &self.options)
}
}
fn analyze_run_with_options(run: &Run, _options: &AnalyzeOptions) -> Report {
let mut report = analyze_run_internal(run);
let route_context = route::route_breakdowns(run, &report);
if route_context.divergent {
report.warnings.push(ROUTE_DIVERGENCE_WARNING.to_string());
}
report.route_breakdowns = route_context.breakdowns;
report.temporal_segments = temporal::temporal_segments(run, &mut report.warnings);
report
}
fn analyze_run_internal(run: &Run) -> Report {
let request_latencies = run
.requests
.iter()
.map(|request| request.latency_us)
.collect::<Vec<_>>();
let p50_latency_us = percentile(&request_latencies, 50, 100);
let p95_latency_us = percentile(&request_latencies, 95, 100);
let p99_latency_us = percentile(&request_latencies, 99, 100);
let (queue_shares, service_shares) = request_time_shares(run);
let p95_queue_share_permille = percentile(&queue_shares, 95, 100);
let p95_service_share_permille = percentile(&service_shares, 95, 100);
let inflight_trend = dominant_inflight_trend(&run.inflight);
let mut suspects = Vec::new();
if let Some(queue_suspect) = scoring::queue_saturation_suspect(run, inflight_trend.as_ref()) {
suspects.push(queue_suspect);
}
if let Some(blocking_suspect) = scoring::blocking_pressure_suspect(run) {
suspects.push(blocking_suspect);
}
if let Some(executor_suspect) = scoring::executor_pressure_suspect(run, inflight_trend.as_ref())
{
suspects.push(executor_suspect);
}
if let Some(stage_suspect) = scoring::downstream_stage_suspect(run) {
suspects.push(stage_suspect);
}
if suspects.is_empty() {
suspects.push(Suspect::new(
DiagnosisKind::InsufficientEvidence,
50,
vec![
"Not enough queue, stage, or runtime signals to rank a stronger suspect."
.to_string(),
],
vec![
"Wrap critical awaits with queue(...).await_on(...), and use stage(...).await_on(...) for Result-returning work or stage(...).await_value(...) for infallible work.".to_string(),
"Enable RuntimeSampler during the run to capture runtime pressure signals."
.to_string(),
],
));
}
suspects.sort_by_key(|suspect| std::cmp::Reverse(suspect.score));
let warnings = analysis_warnings(run, &suspects);
let evidence_quality = evidence::evidence_quality(run);
confidence::apply_evidence_aware_confidence_caps(&mut suspects, run, &evidence_quality);
let mut ranked = suspects.into_iter();
let primary_suspect = ranked.next().unwrap_or_else(|| {
Suspect::new(
DiagnosisKind::InsufficientEvidence,
50,
vec!["No diagnosis signals were captured for this run.".to_string()],
vec!["Verify that request, queue, or stage instrumentation is enabled.".to_string()],
)
});
Report {
request_count: run.requests.len(),
p50_latency_us,
p95_latency_us,
p99_latency_us,
p95_queue_share_permille,
p95_service_share_permille,
inflight_trend,
warnings,
evidence_quality,
primary_suspect,
secondary_suspects: ranked.collect(),
route_breakdowns: Vec::new(),
temporal_segments: Vec::new(),
}
}
fn ambiguity_warning(suspects: &[Suspect]) -> Option<String> {
let mut ranked = suspects
.iter()
.filter(|s| s.kind != DiagnosisKind::InsufficientEvidence)
.collect::<Vec<_>>();
ranked.sort_by_key(|s| std::cmp::Reverse(s.score));
if ranked.len() >= 2
&& ranked[0].score >= AMBIGUITY_MIN_SCORE_THRESHOLD
&& ranked[1].score >= AMBIGUITY_MIN_SCORE_THRESHOLD
&& ranked[0].score.abs_diff(ranked[1].score) <= AMBIGUITY_SCORE_GAP_THRESHOLD
{
Some("Top suspects are close in score; treat ranking as ambiguous and validate both with next checks.".to_string())
} else {
None
}
}
fn analysis_warnings(run: &Run, suspects: &[Suspect]) -> Vec<String> {
let mut warnings = evidence::truncation_warnings(run);
if run.requests.len() < LOW_COMPLETED_REQUEST_THRESHOLD {
warnings.push(
"Low completed-request count; diagnosis ranking may be unstable for this run window."
.to_string(),
);
}
let primary_kind = suspects.first().map(|s| &s.kind);
if run.queues.is_empty()
&& primary_kind.is_some_and(|kind| *kind == DiagnosisKind::ApplicationQueueSaturation)
{
warnings.push(
"No queue events captured; queue saturation interpretation is limited.".to_string(),
);
}
if run.stages.is_empty()
&& primary_kind.is_some_and(|kind| *kind == DiagnosisKind::DownstreamStageDominates)
{
warnings.push(
"No stage events captured; downstream-stage interpretation is limited.".to_string(),
);
}
let runtime_distinction_relevant = suspects.iter().any(|s| {
s.kind == DiagnosisKind::BlockingPoolPressure
|| s.kind == DiagnosisKind::ExecutorPressureSuspected
});
let strong_non_runtime_primary = suspects.first().is_some_and(|s| {
(s.kind == DiagnosisKind::ApplicationQueueSaturation
|| s.kind == DiagnosisKind::DownstreamStageDominates)
&& s.score >= 85
});
if run.runtime_snapshots.is_empty() {
if !strong_non_runtime_primary {
warnings.push("No runtime snapshots captured; executor and blocking-pressure interpretation is limited.".to_string());
}
} else if runtime_distinction_relevant
&& (run
.runtime_snapshots
.iter()
.all(|s| s.blocking_queue_depth.is_none())
|| run
.runtime_snapshots
.iter()
.all(|s| s.local_queue_depth.is_none()))
{
warnings.push("Runtime snapshots are missing blocking_queue_depth or local_queue_depth; separating executor vs blocking pressure is limited.".to_string());
}
if let Some(w) = ambiguity_warning(suspects) {
warnings.push(w);
}
warnings
}
fn request_time_shares(run: &Run) -> (Vec<u64>, Vec<u64>) {
let mut total_queue_wait_by_request = HashMap::<&str, u64>::new();
for queue in &run.queues {
*total_queue_wait_by_request
.entry(queue.request_id.as_str())
.or_default() = total_queue_wait_by_request
.get(queue.request_id.as_str())
.copied()
.unwrap_or_default()
.saturating_add(queue.wait_us);
}
let mut queue_shares = Vec::new();
let mut service_shares = Vec::new();
for request in &run.requests {
if request.latency_us == 0 {
continue;
}
let queue_wait = total_queue_wait_by_request
.get(request.request_id.as_str())
.copied()
.unwrap_or_default()
.min(request.latency_us);
let service_time = request.latency_us.saturating_sub(queue_wait);
queue_shares.push(queue_wait.saturating_mul(1_000) / request.latency_us);
service_shares.push(service_time.saturating_mul(1_000) / request.latency_us);
}
(queue_shares, service_shares)
}
fn runtime_metric_series(
snapshots: &[RuntimeSnapshot],
selector: impl Fn(&RuntimeSnapshot) -> Option<u64>,
) -> Vec<u64> {
snapshots.iter().filter_map(selector).collect::<Vec<_>>()
}
fn dominant_inflight_trend(snapshots: &[InFlightSnapshot]) -> Option<InflightTrend> {
let mut by_gauge: BTreeMap<&str, Vec<&InFlightSnapshot>> = BTreeMap::new();
for snapshot in snapshots {
by_gauge
.entry(snapshot.gauge.as_str())
.or_default()
.push(snapshot);
}
by_gauge
.into_iter()
.filter_map(|(gauge, samples)| inflight_trend_for_gauge(gauge, samples))
.max_by(|left, right| {
left.peak_count
.cmp(&right.peak_count)
.then_with(|| left.p95_count.cmp(&right.p95_count))
.then_with(|| left.gauge.cmp(&right.gauge).reverse())
})
}
fn inflight_trend_for_gauge(
gauge: &str,
mut samples: Vec<&InFlightSnapshot>,
) -> Option<InflightTrend> {
if samples.is_empty() {
return None;
}
samples.sort_unstable_by(|left, right| {
left.at_unix_ms
.cmp(&right.at_unix_ms)
.then_with(|| left.count.cmp(&right.count))
});
let counts = samples
.iter()
.map(|sample| sample.count)
.collect::<Vec<_>>();
let first = samples.first()?;
let last = samples.last()?;
let growth_delta = signed_u64_delta(first.count, last.count);
let window_ms = last.at_unix_ms.saturating_sub(first.at_unix_ms);
let growth_per_sec_milli = if window_ms == 0 {
None
} else {
i64::try_from(window_ms)
.ok()
.map(|window_ms_i64| growth_delta.saturating_mul(1_000_000) / window_ms_i64)
};
Some(InflightTrend {
gauge: gauge.to_owned(),
sample_count: counts.len(),
peak_count: counts.iter().copied().max().unwrap_or(0),
p95_count: percentile(&counts, 95, 100).unwrap_or(0),
growth_delta,
growth_per_sec_milli,
})
}
fn signed_u64_delta(start: u64, end: u64) -> i64 {
if end >= start {
i64::try_from(end - start).unwrap_or(i64::MAX)
} else {
-i64::try_from(start - end).unwrap_or(i64::MAX)
}
}
fn percentile(values: &[u64], numerator: usize, denominator: usize) -> Option<u64> {
let sorted = sorted_u64(values);
percentile_sorted_u64(&sorted, numerator, denominator)
}
fn sorted_u64(values: &[u64]) -> Vec<u64> {
let mut sorted = values.to_vec();
sorted.sort_unstable();
sorted
}
fn percentile_sorted_u64(values: &[u64], numerator: usize, denominator: usize) -> Option<u64> {
if values.is_empty() {
return None;
}
if denominator == 0 {
return None;
}
let max_index = values.len().saturating_sub(1);
let index = max_index
.saturating_mul(numerator)
.div_ceil(denominator)
.min(max_index);
values.get(index).copied()
}
pub use render::render_text;
mod render;
#[cfg(test)]
mod tests;