tailtriage-analyzer 0.2.0

Heuristic triage analyzer and report rendering for tailtriage runs
Documentation
use std::collections::BTreeMap;

use tailtriage_core::Run;

use crate::{
    percentile, request_time_shares, runtime_metric_series, DiagnosisKind, InflightTrend, Suspect,
    QUEUE_SHARE_TRIGGER_PERMILLE,
};

const DOWNSTREAM_MIN_STAGE_SAMPLES: usize = 3;
const SAMPLE_QUALITY_HIGH_SAMPLE_COUNT: usize = 100;
const SAMPLE_QUALITY_MEDIUM_SAMPLE_COUNT: usize = 40;
const SAMPLE_QUALITY_LOW_SAMPLE_COUNT: usize = 20;
const SAMPLE_QUALITY_MIN_NONZERO_SAMPLE_COUNT: usize = 8;

pub(super) fn queue_saturation_suspect(
    run: &Run,
    inflight_trend: Option<&InflightTrend>,
) -> Option<Suspect> {
    let (queue_shares, _) = request_time_shares(run);
    let p95_queue_share_permille = percentile(&queue_shares, 95, 100)?;
    if p95_queue_share_permille < QUEUE_SHARE_TRIGGER_PERMILLE {
        return None;
    }
    let queue_depths = run
        .queues
        .iter()
        .filter_map(|q| q.depth_at_start)
        .collect::<Vec<_>>();
    let max_depth = max_or_zero(&queue_depths);
    let growth_bonus = inflight_trend
        .filter(|t| t.growth_delta > 0)
        .map_or(0, |_| 5);
    let depth_bonus = (max_depth.min(40) * 2) / 3;
    let base = score_from_permille(22, p95_queue_share_permille, 14);
    let clean_extreme = p95_queue_share_permille >= 985
        && max_depth >= 12
        && queue_shares.len() >= 20
        && inflight_trend.is_some_and(|t| t.growth_delta > 0);
    let score = cap_unless_clean_evidence(
        base + depth_bonus + growth_bonus + u64::from(score_sample_quality(queue_shares.len())),
        clean_extreme,
        95,
    );
    let mut evidence = vec![format!(
        "Queue wait at p95 consumes {}.{}% of request time.",
        p95_queue_share_permille / 10,
        p95_queue_share_permille % 10
    )];
    if max_depth > 0 {
        evidence.push(format!("Observed queue depth sample up to {max_depth}."));
    }
    if let Some(trend) = inflight_trend.filter(|trend| trend.growth_delta > 0) {
        evidence.push(format!(
            "In-flight gauge '{}' grew by {} over the run window (p95={}, peak={}).",
            trend.gauge, trend.growth_delta, trend.p95_count, trend.peak_count
        ));
    }
    Some(Suspect::new(
        DiagnosisKind::ApplicationQueueSaturation,
        score,
        evidence,
        vec![
            "Inspect queue admission limits and producer burst patterns.".to_string(),
            "Compare queue wait distribution before and after increasing worker parallelism."
                .to_string(),
        ],
    ))
}

#[derive(Clone, Copy)]
struct BlockingSignal {
    p95: u64,
    peak: u64,
    nonzero: usize,
    samples: usize,
    nz_share_permille: u64,
}

fn blocking_signal(run: &Run) -> Option<BlockingSignal> {
    let depths = runtime_metric_series(&run.runtime_snapshots, |s| s.blocking_queue_depth);
    let p95 = percentile(&depths, 95, 100)?;
    let nonzero = nonzero_sample_count(&depths);
    if p95 == 0 && nonzero < 2 {
        return None;
    }
    let peak = max_or_zero(&depths);
    let nz_share_permille = if depths.is_empty() {
        0
    } else {
        nonzero as u64 * 1000 / depths.len() as u64
    };
    Some(BlockingSignal {
        p95,
        peak,
        nonzero,
        samples: depths.len(),
        nz_share_permille,
    })
}

fn strong_blocking_signal(signal: BlockingSignal) -> bool {
    signal.p95 >= 12 && signal.peak >= 20 && signal.nz_share_permille >= 700 && signal.samples >= 30
}

pub(super) fn stage_correlates_with_blocking_pool(stage: &str) -> bool {
    let lower = stage.to_ascii_lowercase();
    lower.contains("spawn_blocking")
        || lower.contains("blocking_path")
        || lower.contains("blocking")
}

pub(super) fn blocking_pressure_suspect(run: &Run) -> Option<Suspect> {
    let signal = blocking_signal(run)?;
    let clean_extreme = signal.p95 >= 16 && signal.peak >= 24 && signal.nz_share_permille >= 900;
    let score = cap_unless_clean_evidence(
        32 + signal.p95.min(24)
            + (signal.peak.min(24) / 2)
            + (signal.nz_share_permille / 80)
            + u64::from(score_sample_quality(signal.samples)),
        clean_extreme,
        94,
    );
    Some(Suspect::new(
        DiagnosisKind::BlockingPoolPressure,
        score,
        vec![format!(
            "Blocking queue depth p95 is {}, peak is {}, with {}/{} nonzero samples.",
            signal.p95, signal.peak, signal.nonzero, signal.samples
        )],
        vec![
            "Audit blocking sections and move avoidable synchronous work out of hot paths."
                .to_string(),
            "Inspect spawn_blocking callsites for long-running CPU or I/O work.".to_string(),
        ],
    ))
}

pub(super) fn executor_pressure_suspect(
    run: &Run,
    inflight_trend: Option<&InflightTrend>,
) -> Option<Suspect> {
    let global = runtime_metric_series(&run.runtime_snapshots, |s| s.global_queue_depth);
    let p95_global = percentile(&global, 95, 100)?;
    if p95_global == 0 {
        return None;
    }
    let local = runtime_metric_series(&run.runtime_snapshots, |s| s.local_queue_depth);
    let alive = runtime_metric_series(&run.runtime_snapshots, |s| s.alive_tasks);
    let growth_bonus = inflight_trend
        .filter(|t| t.growth_delta > 0)
        .map_or(0, |_| 4);
    let clean_extreme = p95_global >= 140 && global.len() >= 30;
    let score = cap_unless_clean_evidence(
        34 + (p95_global.min(150) / 4)
            + (percentile(&local, 95, 100).unwrap_or(0).min(60) / 6)
            + (percentile(&alive, 95, 100).unwrap_or(0).min(400) / 40)
            + growth_bonus
            + u64::from(score_sample_quality(global.len())),
        clean_extreme,
        94,
    );
    let mut evidence = vec![format!(
        "Runtime global queue depth p95 is {p95_global}, suggesting scheduler contention."
    )];
    if let Some(lp95) = percentile(&local, 95, 100) {
        evidence.push(format!("Runtime local queue depth p95 is {lp95}."));
    }
    if let Some(ap95) = percentile(&alive, 95, 100) {
        evidence.push(format!("Runtime alive_tasks p95 is {ap95}."));
    }
    Some(Suspect::new(
        DiagnosisKind::ExecutorPressureSuspected,
        score,
        evidence,
        vec![
            "Check for long polls without yielding and uneven task fan-out.".to_string(),
            "Compare with per-stage timings to isolate overloaded async stages.".to_string(),
        ],
    ))
}

#[derive(Clone)]
struct StageCandidate {
    stage: String,
    samples: usize,
    p95: u64,
    cumulative: u64,
    cum_share: u64,
    tail_share: u64,
    score: u8,
}

fn downstream_stage_candidates(run: &Run, p95_req: u64, total_req: u64) -> Vec<StageCandidate> {
    let tail_ids: std::collections::HashMap<&str, u64> = run
        .requests
        .iter()
        .filter(|r| r.latency_us >= p95_req)
        .map(|r| (r.request_id.as_str(), r.latency_us))
        .collect();
    let tail_total = tail_ids.values().copied().fold(0_u64, u64::saturating_add);
    let mut by: BTreeMap<&str, Vec<&tailtriage_core::StageEvent>> = BTreeMap::new();
    for st in &run.stages {
        by.entry(st.stage.as_str()).or_default().push(st);
    }
    let mut cands = Vec::new();
    for (name, ss) in by {
        if ss.len() < DOWNSTREAM_MIN_STAGE_SAMPLES {
            continue;
        }
        let lats = ss.iter().map(|s| s.latency_us).collect::<Vec<_>>();
        let cum = lats.iter().copied().fold(0_u64, u64::saturating_add);
        let p95 = percentile(&lats, 95, 100).unwrap_or(0);
        let cum_share = cum.saturating_mul(1000).checked_div(total_req).unwrap_or(0);
        let tail_stage = ss
            .iter()
            .filter_map(|s| tail_ids.get(s.request_id.as_str()).map(|_| s.latency_us))
            .fold(0_u64, u64::saturating_add);
        let tail_share = if tail_total == 0 {
            0
        } else {
            tail_stage
                .saturating_mul(1000)
                .checked_div(tail_total)
                .unwrap_or(0)
        };
        let clean_extreme = tail_share >= 960 && cum_share >= 920 && ss.len() >= 20;
        let score = cap_unless_clean_evidence(
            score_from_permille(24, tail_share, 11)
                + (cum_share / 35)
                + u64::from(score_sample_quality(ss.len())),
            clean_extreme,
            95,
        );
        cands.push(StageCandidate {
            stage: name.to_string(),
            samples: ss.len(),
            p95,
            cumulative: cum,
            cum_share,
            tail_share,
            score,
        });
    }
    cands
}

pub(super) fn downstream_stage_suspect(run: &Run) -> Option<Suspect> {
    let p95_req = percentile(
        &run.requests
            .iter()
            .map(|r| r.latency_us)
            .collect::<Vec<_>>(),
        95,
        100,
    )?;
    let total_req = run
        .requests
        .iter()
        .map(|r| r.latency_us)
        .fold(0_u64, u64::saturating_add);
    let blocking = blocking_signal(run);
    let blocking_score = blocking.map(|signal| {
        let clean_extreme =
            signal.p95 >= 16 && signal.peak >= 24 && signal.nz_share_permille >= 900;
        cap_unless_clean_evidence(
            32 + signal.p95.min(24)
                + (signal.peak.min(24) / 2)
                + (signal.nz_share_permille / 80)
                + u64::from(score_sample_quality(signal.samples)),
            clean_extreme,
            94,
        )
    });
    let best = downstream_stage_candidates(run, p95_req, total_req)
        .into_iter()
        .max_by(|a, b| {
            a.score
                .cmp(&b.score)
                .then_with(|| a.tail_share.cmp(&b.tail_share))
                .then_with(|| a.cum_share.cmp(&b.cum_share))
                .then_with(|| b.stage.cmp(&a.stage))
        })?;
    let mut downstream_score = best.score;
    let mut correlation_evidence: Option<String> = None;
    if stage_correlates_with_blocking_pool(&best.stage)
        && blocking.is_some_and(strong_blocking_signal)
        && blocking_score.is_some()
    {
        let cap = blocking_score.unwrap_or(downstream_score).saturating_sub(2);
        downstream_score = downstream_score.min(cap);
        correlation_evidence = Some(format!(
            "Stage '{}' looks blocking-correlated; strong runtime blocking-queue evidence keeps blocking_pool_pressure prioritized.",
            best.stage
        ));
    }
    let mut evidence = vec![
        format!(
            "Stage '{}' has p95 latency {} us across {} samples.",
            best.stage, best.p95, best.samples
        ),
        format!(
            "Stage '{}' cumulative latency is {} us ({} permille of request latency).",
            best.stage, best.cumulative, best.cum_share
        ),
        format!(
            "Stage '{}' contributes {} permille of tail request latency.",
            best.stage, best.tail_share
        ),
    ];
    if let Some(extra) = correlation_evidence {
        evidence.push(extra);
    }
    Some(Suspect::new(
        DiagnosisKind::DownstreamStageDominates,
        downstream_score,
        evidence,
        vec![
            format!(
                "Inspect downstream dependency behind stage '{}'.",
                best.stage
            ),
            "Collect downstream service timings and retry behavior during tail windows."
                .to_string(),
            "Review downstream SLO/error budget and align retry budget/backoff with it."
                .to_string(),
        ],
    ))
}

fn clamp_score(value: u64) -> u8 {
    u8::try_from(value.min(100)).unwrap_or(100)
}

fn nonzero_sample_count(values: &[u64]) -> usize {
    values.iter().filter(|&&v| v > 0).count()
}

fn max_or_zero(values: &[u64]) -> u64 {
    values.iter().copied().max().unwrap_or(0)
}

fn score_sample_quality(sample_count: usize) -> u8 {
    if sample_count >= SAMPLE_QUALITY_HIGH_SAMPLE_COUNT {
        8
    } else if sample_count >= SAMPLE_QUALITY_MEDIUM_SAMPLE_COUNT {
        5
    } else if sample_count >= SAMPLE_QUALITY_LOW_SAMPLE_COUNT {
        3
    } else {
        u8::from(sample_count >= SAMPLE_QUALITY_MIN_NONZERO_SAMPLE_COUNT)
    }
}

fn score_from_permille(base: u64, permille: u64, scale: u64) -> u64 {
    base + permille.min(1000) / scale
}

fn cap_unless_clean_evidence(score: u64, clean: bool, soft_cap: u8) -> u8 {
    if clean {
        clamp_score(score)
    } else {
        clamp_score(score.min(u64::from(soft_cap)))
    }
}