use std::collections::{HashMap, HashSet, VecDeque};
use std::time::Instant;
use sozu_command_lib::proto::command::{
AggregatedMetrics, ClusterMetrics, FilteredMetrics, filtered_metrics,
};
use sozu_lib::metrics::names;
use super::theme::GlyphMode;
use super::transport::{CertsSnapshot, ListenersSnapshot, Snapshot, TopEvent};
pub const SPARKLINE_DEPTH: usize = 60;
pub const EVENT_RING_DEPTH: usize = 200;
pub const PULSE_PERSIST_FRAMES: u32 = 5;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ActiveTab {
Overview,
Clusters,
Backends,
Listeners,
Certs,
H2,
Events,
}
impl ActiveTab {
pub const ALL: &'static [Self] = &[
Self::Overview,
Self::Clusters,
Self::Backends,
Self::Listeners,
Self::Certs,
Self::H2,
Self::Events,
];
pub fn label(self) -> &'static str {
match self {
Self::Overview => "OVERVIEW",
Self::Clusters => "CLUSTERS",
Self::Backends => "BACKENDS",
Self::Listeners => "LISTENERS",
Self::Certs => "CERTS",
Self::H2 => "H2",
Self::Events => "EVENTS",
}
}
pub fn from_digit(d: u8) -> Option<Self> {
Self::ALL.get(d.checked_sub(1)? as usize).copied()
}
pub fn from_alias(alias: &str) -> Option<Self> {
match alias {
"overview" | "o" => Some(Self::Overview),
"cluster" | "clusters" | "c" => Some(Self::Clusters),
"backend" | "backends" | "b" => Some(Self::Backends),
"listener" | "listeners" | "l" => Some(Self::Listeners),
"cert" | "certs" => Some(Self::Certs),
"h2" => Some(Self::H2),
"event" | "events" | "e" => Some(Self::Events),
_ => None,
}
}
pub fn cycle(self, forward: bool) -> Self {
let idx = Self::ALL.iter().position(|t| *t == self).unwrap_or(0);
let len = Self::ALL.len();
let next = if forward {
(idx + 1) % len
} else {
(idx + len - 1) % len
};
Self::ALL[next]
}
}
#[derive(Debug, Clone)]
pub struct SparkRing {
samples: VecDeque<u64>,
capacity: usize,
}
impl SparkRing {
pub fn new(capacity: usize) -> Self {
Self {
samples: VecDeque::with_capacity(capacity),
capacity,
}
}
pub fn push(&mut self, value: u64) {
if self.samples.len() == self.capacity {
self.samples.pop_front();
}
self.samples.push_back(value);
}
pub fn samples(&self) -> std::collections::vec_deque::Iter<'_, u64> {
self.samples.iter()
}
pub fn last(&self) -> Option<u64> {
self.samples.back().copied()
}
pub fn max(&self) -> u64 {
self.samples.iter().copied().max().unwrap_or(0)
}
pub fn is_empty(&self) -> bool {
self.samples.is_empty()
}
pub fn to_vec(&self) -> Vec<u64> {
self.samples.iter().copied().collect()
}
}
#[derive(Debug, Default)]
pub struct RateCalculator {
history: HashMap<String, (i64, Instant)>,
}
impl RateCalculator {
pub fn record(&mut self, key: &str, value: i64, sampled_at: Instant) -> Option<f64> {
let result = match self.history.get(key) {
Some((prev_value, prev_at)) if value >= *prev_value => {
let dt = sampled_at.saturating_duration_since(*prev_at).as_secs_f64();
if dt > 0.0 {
Some(((value - *prev_value) as f64) / dt)
} else {
Some(0.0)
}
}
Some(_) => Some(0.0), None => None, };
self.history.insert(key.to_owned(), (value, sampled_at));
result
}
pub fn retain<F: FnMut(&str) -> bool>(&mut self, mut keep: F) {
self.history.retain(|k, _| keep(k.as_str()));
}
}
#[derive(Debug, Default)]
pub struct PulseTracker {
cluster_disappeared: HashMap<String, u32>,
cluster_appeared: HashMap<String, u32>,
backend_down: HashMap<(String, String), u32>,
last_clusters: HashSet<String>,
last_backend_up: HashSet<(String, String)>,
}
impl PulseTracker {
fn tick(&mut self) {
Self::tick_map(&mut self.cluster_disappeared);
Self::tick_map(&mut self.cluster_appeared);
Self::tick_map(&mut self.backend_down);
}
fn tick_map<K>(map: &mut HashMap<K, u32>) {
map.retain(|_, v| {
if *v == 0 {
false
} else {
*v -= 1;
true
}
});
}
fn diff(&mut self, m: &AggregatedMetrics) {
let mut new_clusters: HashSet<String> = HashSet::new();
let mut new_backend_up: HashSet<(String, String)> = HashSet::new();
for (cluster_id, cm) in &m.clusters {
new_clusters.insert(cluster_id.clone());
for bm in &cm.backends {
let available = bm
.metrics
.get(names::backend::AVAILABLE)
.and_then(|m| match m.inner.as_ref()? {
filtered_metrics::Inner::Gauge(v) => Some(*v),
_ => None,
})
.unwrap_or(0);
if available >= 1 {
new_backend_up.insert((cluster_id.clone(), bm.backend_id.clone()));
}
}
}
if !self.last_clusters.is_empty() {
for missing in self.last_clusters.difference(&new_clusters) {
self.cluster_disappeared
.insert(missing.clone(), PULSE_PERSIST_FRAMES);
}
for fresh in new_clusters.difference(&self.last_clusters) {
self.cluster_appeared
.insert(fresh.clone(), PULSE_PERSIST_FRAMES);
}
}
for prev_up in &self.last_backend_up {
if !new_backend_up.contains(prev_up) {
self.backend_down
.insert(prev_up.clone(), PULSE_PERSIST_FRAMES);
}
}
self.last_clusters = new_clusters;
self.last_backend_up = new_backend_up;
}
pub fn cluster_pulse(&self, cluster_id: &str) -> Option<PulseKind> {
self.cluster_disappeared
.contains_key(cluster_id)
.then_some(PulseKind::Disappeared)
.or_else(|| {
self.cluster_appeared
.contains_key(cluster_id)
.then_some(PulseKind::Appeared)
})
}
pub fn backend_pulse(&self, cluster_id: &str, backend_id: &str) -> Option<PulseKind> {
self.backend_down
.contains_key(&(cluster_id.to_owned(), backend_id.to_owned()))
.then_some(PulseKind::WentDown)
}
pub fn has_active(&self) -> bool {
!self.cluster_disappeared.is_empty()
|| !self.cluster_appeared.is_empty()
|| !self.backend_down.is_empty()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PulseKind {
Disappeared,
Appeared,
WentDown,
}
#[derive(Debug, Clone)]
pub struct ThresholdTable {
pub error_ratio_critical_pct: f64,
pub slab_critical_pct: f64,
pub latency_p99_critical_ms: f64,
}
impl Default for ThresholdTable {
fn default() -> Self {
Self {
error_ratio_critical_pct: 1.0,
slab_critical_pct: 80.0,
latency_p99_critical_ms: 500.0,
}
}
}
impl ThresholdTable {
pub fn critical_message(&self, overview: &OverviewState) -> Option<&'static str> {
if overview
.latency_p99_ms
.last()
.is_some_and(|v| v as f64 >= self.latency_p99_critical_ms)
{
return Some("HIGH LATENCY");
}
if overview
.service_time_p99_ms
.last()
.is_some_and(|v| v as f64 >= self.latency_p99_critical_ms)
{
return Some("SOZU SLOW");
}
if overview
.saturation_pct
.last()
.is_some_and(|v| v as f64 >= self.slab_critical_pct)
{
return Some("SATURATION");
}
None
}
}
const ERRORS_5XX: [&str; 5] = [
names::http_status::S500,
names::http_status::S502,
names::http_status::S503,
names::http_status::S504,
names::http_status::S507,
];
const OVERVIEW_REQUESTS_KEY: &str = "__overview.requests";
fn cluster_rate_key(cluster_id: &str) -> String {
format!("__cluster.{cluster_id}.requests")
}
fn backend_rate_key(cluster_id: &str, backend_id: &str, suffix: &str) -> String {
format!("__backend.{cluster_id}.{backend_id}.{suffix}")
}
const H2_TRACKED_KEYS: &[&str] = &[
names::h2::CONNECTION_ACTIVE_STREAMS,
names::http::ALPN_H2,
names::http::ALPN_HTTP11,
names::client::CONNECTIONS,
names::h2::CONNECTION_WINDOW_BYTES,
names::h2::CONNECTION_PENDING_WINDOW_UPDATES,
names::h2::FLOW_CONTROL_STALL,
names::h2::FRAMES_TX_WINDOW_UPDATE,
names::h2::FRAMES_TX_RST_STREAM,
names::h2::FRAMES_TX_GOAWAY,
names::h2::HEADERS_REJECTED_BUDGET_OVERRUN,
names::h2::FLOOD_VIOLATION_GLITCH_WINDOW,
names::h2::FLOOD_VIOLATION_RAPID_RESET,
names::h2::FLOOD_VIOLATION_CONTINUATION,
names::h2::FLOOD_VIOLATION_MADE_YOU_RESET,
names::h2::FLOOD_VIOLATION_PING,
names::h2::FLOOD_VIOLATION_SETTINGS,
names::h2::FLOOD_VIOLATION_PRIORITY,
names::h2::WINDOW_UPDATE_DROPPED,
names::h2::CLOSE_WITH_ACTIVE_STREAMS,
];
fn render_spark_bars<I: IntoIterator<Item = u64>>(samples: I, alphabet: &[char]) -> String {
let samples: Vec<u64> = samples.into_iter().collect();
if samples.is_empty() || alphabet.is_empty() {
return "—".to_owned();
}
let max = samples.iter().copied().max().unwrap_or(0).max(1);
let last_idx = alphabet.len() - 1;
samples
.iter()
.map(|v| {
let idx = ((v * last_idx as u64) / max) as usize;
alphabet[idx.min(last_idx)]
})
.collect()
}
#[derive(Debug)]
pub struct App {
pub active_tab: ActiveTab,
pub overview: OverviewState,
pub events: VecDeque<TopEvent>,
pub thresholds: ThresholdTable,
pub last_snapshot_at: Option<Instant>,
pub last_metrics: Option<AggregatedMetrics>,
pub last_listeners: Option<ListenersSnapshot>,
pub last_certs: Option<CertsSnapshot>,
pub status: String,
pub should_quit: bool,
pub help_visible: bool,
pub cluster_sort: ClusterSortKey,
pub cluster_sort_reverse: bool,
pub backend_sort: BackendSortKey,
pub backend_sort_reverse: bool,
pub pulse: PulseTracker,
pub glyphs: GlyphMode,
pub palette_open: bool,
pub palette_input: tui_input::Input,
pub palette_error: Option<String>,
is_dirty: bool,
rates: RateCalculator,
cluster_rps: HashMap<String, u64>,
backend_rate_in_bps: HashMap<(String, String), f64>,
backend_rate_out_bps: HashMap<(String, String), f64>,
h2_trends: HashMap<&'static str, SparkRing>,
pub paused: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ClusterSortKey {
ClusterId,
Rps,
ErrorRate,
LatencyP99,
BackendsAvailable,
}
impl ClusterSortKey {
pub const ALL: &'static [Self] = &[
Self::ErrorRate,
Self::Rps,
Self::LatencyP99,
Self::BackendsAvailable,
Self::ClusterId,
];
pub fn label(self) -> &'static str {
match self {
Self::ClusterId => "id",
Self::Rps => "rps",
Self::ErrorRate => "err%",
Self::LatencyP99 => "p99",
Self::BackendsAvailable => "backends",
}
}
pub fn cycle(self) -> Self {
let idx = Self::ALL.iter().position(|k| *k == self).unwrap_or(0);
Self::ALL[(idx + 1) % Self::ALL.len()]
}
}
#[derive(Debug, Default)]
pub struct OverviewState {
pub rps: SparkRing,
pub latency_p99_ms: SparkRing,
pub service_time_p99_ms: SparkRing,
pub saturation_pct: SparkRing,
pub active_sessions: u64,
pub client_connections: u64,
}
impl Default for SparkRing {
fn default() -> Self {
Self::new(SPARKLINE_DEPTH)
}
}
impl App {
pub fn new() -> Self {
Self {
active_tab: ActiveTab::Overview,
overview: OverviewState::default(),
events: VecDeque::with_capacity(EVENT_RING_DEPTH),
thresholds: ThresholdTable::default(),
last_snapshot_at: None,
last_metrics: None,
last_listeners: None,
last_certs: None,
status: String::new(),
should_quit: false,
help_visible: false,
cluster_sort: ClusterSortKey::ErrorRate,
cluster_sort_reverse: false,
backend_sort: BackendSortKey::Bandwidth,
backend_sort_reverse: false,
pulse: PulseTracker::default(),
glyphs: GlyphMode::Block,
palette_open: false,
palette_input: tui_input::Input::default(),
palette_error: None,
is_dirty: true,
rates: RateCalculator::default(),
cluster_rps: HashMap::new(),
backend_rate_in_bps: HashMap::new(),
backend_rate_out_bps: HashMap::new(),
h2_trends: HashMap::new(),
paused: false,
}
}
pub fn take_dirty(&mut self) -> bool {
std::mem::replace(&mut self.is_dirty, false)
}
pub fn mark_dirty(&mut self) {
self.is_dirty = true;
}
pub fn ingest_snapshot(&mut self, snap: &Snapshot) {
if self.paused {
return;
}
self.last_snapshot_at = Some(snap.received_at);
self.fold_overview(&snap.metrics, snap.received_at);
self.fold_h2_trends(&snap.metrics);
self.pulse.diff(&snap.metrics);
self.last_metrics = Some(snap.metrics.clone());
self.is_dirty = true;
}
fn fold_h2_trends(&mut self, m: &AggregatedMetrics) {
for key in H2_TRACKED_KEYS {
let value = gauge_value(m.proxying.get(*key))
.or_else(|| count_value(m.proxying.get(*key)).map(|c| c.max(0) as u64))
.unwrap_or(0);
self.h2_trends
.entry(*key)
.or_insert_with(|| SparkRing::new(SPARKLINE_DEPTH))
.push(value);
}
}
pub fn h2_trend_bars(&self, key: &str) -> String {
match self.h2_trends.get(key) {
Some(ring) if !ring.is_empty() => {
render_spark_bars(ring.samples().copied(), self.glyphs.trend_alphabet())
}
_ => "—".to_owned(),
}
}
pub fn tick_pulses(&mut self) {
if self.pulse.has_active() {
self.is_dirty = true;
}
self.pulse.tick();
}
pub fn cluster_rows(&self) -> Vec<ClusterRow> {
let metrics = match self.last_metrics.as_ref() {
Some(m) => m,
None => return Vec::new(),
};
let mut rows: Vec<ClusterRow> = metrics
.clusters
.iter()
.map(|(id, cm)| {
let requests = cluster_count_total(cm, names::backend::REQUESTS);
let errors_5xx: i64 = ERRORS_5XX.iter().map(|k| cluster_count_total(cm, k)).sum();
let p99_ms = cluster_p99_max(cm);
let p50_ms = cluster_p50_max(cm);
let rollup_total =
gauge_value(cm.cluster.get(names::cluster::TOTAL_BACKENDS)).unwrap_or(0) as u32;
let rollup_available =
gauge_value(cm.cluster.get(names::cluster::AVAILABLE_BACKENDS)).unwrap_or(0)
as u32;
let backend_available_sum: u32 = cm
.backends
.iter()
.filter_map(|b| gauge_value(b.metrics.get(names::backend::AVAILABLE)))
.map(|v| v as u32)
.sum();
let backends_total = if rollup_total > 0 {
rollup_total
} else {
cm.backends.len() as u32
};
let backends_available = if rollup_total == 0 && backend_available_sum == 0 {
cm.backends.len() as u32
} else {
rollup_available.max(backend_available_sum)
};
let error_rate_pct = if requests > 0 {
(errors_5xx as f64 / requests as f64) * 100.0
} else {
0.0
};
ClusterRow {
cluster_id: id.clone(),
rps: self.cluster_rps.get(id).copied().unwrap_or(0),
error_rate_pct,
p50_ms,
p99_ms,
backends_total,
backends_available,
}
})
.collect();
rows.sort_by(|a, b| {
use std::cmp::Ordering;
let ord = match self.cluster_sort {
ClusterSortKey::ClusterId => a.cluster_id.cmp(&b.cluster_id),
ClusterSortKey::Rps => a.rps.cmp(&b.rps).reverse(),
ClusterSortKey::ErrorRate => a
.error_rate_pct
.partial_cmp(&b.error_rate_pct)
.unwrap_or(Ordering::Equal)
.reverse(),
ClusterSortKey::LatencyP99 => a.p99_ms.cmp(&b.p99_ms).reverse(),
ClusterSortKey::BackendsAvailable => {
a.backends_available.cmp(&b.backends_available)
}
};
if self.cluster_sort_reverse {
ord.reverse()
} else {
ord
}
});
rows
}
pub fn backend_rows(&self) -> Vec<BackendRow> {
let metrics = match self.last_metrics.as_ref() {
Some(m) => m,
None => return Vec::new(),
};
let mut rows: Vec<BackendRow> = Vec::new();
for (cluster_id, cm) in &metrics.clusters {
for bm in &cm.backends {
let key = (cluster_id.clone(), bm.backend_id.clone());
let bw_in_bps = self.backend_rate_in_bps.get(&key).copied().unwrap_or(0.0);
let bw_out_bps = self.backend_rate_out_bps.get(&key).copied().unwrap_or(0.0);
rows.push(BackendRow {
cluster_id: cluster_id.clone(),
backend_id: bm.backend_id.clone(),
bw_in_bps,
bw_out_bps,
connections: gauge_value(
bm.metrics.get(names::backend::CONNECTIONS_PER_BACKEND),
)
.unwrap_or(0),
p50_ms: percentile_p50_ms(bm.metrics.get(names::backend::RESPONSE_TIME))
.unwrap_or(0),
p99_ms: percentile_p99_ms(bm.metrics.get(names::backend::RESPONSE_TIME))
.unwrap_or(0),
requests_total: count_value(bm.metrics.get(names::backend::REQUESTS))
.unwrap_or(0) as u64,
});
}
}
rows.sort_by(|a, b| {
use std::cmp::Ordering;
let ord = match self.backend_sort {
BackendSortKey::ClusterId => a
.cluster_id
.cmp(&b.cluster_id)
.then(a.backend_id.cmp(&b.backend_id)),
BackendSortKey::BackendId => a.backend_id.cmp(&b.backend_id),
BackendSortKey::Bandwidth => {
let abw = a.bw_in_bps + a.bw_out_bps;
let bbw = b.bw_in_bps + b.bw_out_bps;
abw.partial_cmp(&bbw).unwrap_or(Ordering::Equal).reverse()
}
BackendSortKey::Connections => a.connections.cmp(&b.connections).reverse(),
BackendSortKey::LatencyP99 => a.p99_ms.cmp(&b.p99_ms).reverse(),
BackendSortKey::Requests => a.requests_total.cmp(&b.requests_total).reverse(),
};
ord.then_with(|| a.cluster_id.cmp(&b.cluster_id))
.then_with(|| a.backend_id.cmp(&b.backend_id))
});
if self.backend_sort_reverse {
rows.reverse();
}
rows
}
fn fold_overview(&mut self, m: &AggregatedMetrics, sampled_at: Instant) {
let mut total_requests: i64 = 0;
let mut total_requests_observed: i64 = 0;
self.cluster_rps.clear();
self.backend_rate_in_bps.clear();
self.backend_rate_out_bps.clear();
let mut live_rate_keys: HashSet<String> = HashSet::new();
live_rate_keys.insert(OVERVIEW_REQUESTS_KEY.to_owned());
for (id, cm) in &m.clusters {
let cluster_requests = cluster_count_total(cm, names::backend::REQUESTS);
if cluster_requests > 0 {
total_requests = total_requests.saturating_add(cluster_requests);
total_requests_observed = total_requests_observed.saturating_add(cluster_requests);
}
let key = cluster_rate_key(id);
let rate = self
.rates
.record(&key, cluster_requests, sampled_at)
.unwrap_or(0.0)
.max(0.0);
self.cluster_rps.insert(id.clone(), rate as u64);
live_rate_keys.insert(key);
for bm in &cm.backends {
let bid = &bm.backend_id;
let bytes_in = count_value(bm.metrics.get(names::backend::BYTES_IN)).unwrap_or(0);
let bytes_out = count_value(bm.metrics.get(names::backend::BYTES_OUT)).unwrap_or(0);
let in_key = backend_rate_key(id, bid, "bytes_in");
let out_key = backend_rate_key(id, bid, "bytes_out");
let rate_in = self
.rates
.record(&in_key, bytes_in, sampled_at)
.unwrap_or(0.0)
.max(0.0);
let rate_out = self
.rates
.record(&out_key, bytes_out, sampled_at)
.unwrap_or(0.0)
.max(0.0);
let map_key = (id.clone(), bid.clone());
self.backend_rate_in_bps.insert(map_key.clone(), rate_in);
self.backend_rate_out_bps.insert(map_key, rate_out);
live_rate_keys.insert(in_key);
live_rate_keys.insert(out_key);
}
}
self.rates.retain(|k| live_rate_keys.contains(k));
if total_requests_observed == 0 {
if let Some(v) = count_value(m.proxying.get(names::http::REQUESTS)) {
total_requests = total_requests.saturating_add(v);
}
}
let rps = self
.rates
.record(OVERVIEW_REQUESTS_KEY, total_requests, sampled_at)
.unwrap_or(0.0)
.max(0.0);
self.overview.rps.push(rps as u64);
let service_p99 =
percentile_p99_ms(m.proxying.get(names::event_loop::SERVICE_TIME)).unwrap_or(0);
self.overview.service_time_p99_ms.push(service_p99);
let max_p99_ms = m.clusters.values().map(cluster_p99_max).max().unwrap_or(0);
self.overview.latency_p99_ms.push(max_p99_ms);
let saturation = gauge_value(m.proxying.get(names::slab::USAGE_PERCENT))
.or_else(|| gauge_value(m.proxying.get(names::buffer::USAGE_PERCENT)))
.map(|v| v.min(100))
.unwrap_or(0);
self.overview.saturation_pct.push(saturation as u64);
self.overview.active_sessions =
gauge_value(m.proxying.get(names::http::ACTIVE_REQUESTS)).unwrap_or(0) as u64;
self.overview.client_connections =
gauge_value(m.proxying.get(names::client::CONNECTIONS)).unwrap_or(0) as u64;
}
pub fn ingest_event(&mut self, event: TopEvent) {
if self.events.len() == EVENT_RING_DEPTH {
self.events.pop_front();
}
self.events.push_back(event);
self.is_dirty = true;
}
pub fn ingest_listeners(&mut self, snap: ListenersSnapshot) {
self.last_listeners = Some(snap);
self.is_dirty = true;
}
pub fn ingest_certs(&mut self, snap: CertsSnapshot) {
self.last_certs = Some(snap);
self.is_dirty = true;
}
pub fn open_palette(&mut self) {
self.palette_open = true;
self.palette_input = tui_input::Input::default();
self.palette_error = None;
self.is_dirty = true;
}
pub fn cancel_palette(&mut self) {
self.palette_open = false;
self.palette_input = tui_input::Input::default();
self.is_dirty = true;
}
pub fn apply_palette(&mut self) {
let raw = self.palette_input.value().trim().to_owned();
let cmd = raw.trim_start_matches(':');
if let Some(tab) = ActiveTab::from_alias(cmd) {
self.active_tab = tab;
} else {
match cmd {
"help" | "h" | "?" => self.help_visible = !self.help_visible,
"quit" | "q" => self.should_quit = true,
"" => {} other => {
self.palette_error = Some(format!("unknown command: :{other}"));
self.palette_open = false;
self.palette_input = tui_input::Input::default();
self.is_dirty = true;
return;
}
}
}
self.palette_open = false;
self.palette_input = tui_input::Input::default();
self.palette_error = None;
self.is_dirty = true;
}
}
impl Default for App {
fn default() -> Self {
Self::new()
}
}
pub(super) fn count_value(metric: Option<&FilteredMetrics>) -> Option<i64> {
let inner = metric?.inner.as_ref()?;
match inner {
filtered_metrics::Inner::Count(v) => Some(*v),
_ => None,
}
}
pub(super) fn gauge_value(metric: Option<&FilteredMetrics>) -> Option<u64> {
let inner = metric?.inner.as_ref()?;
match inner {
filtered_metrics::Inner::Gauge(v) => Some(*v),
_ => None,
}
}
fn percentile_p99_ms(metric: Option<&FilteredMetrics>) -> Option<u64> {
let inner = metric?.inner.as_ref()?;
match inner {
filtered_metrics::Inner::Percentiles(p) => Some(p.p_99),
_ => None,
}
}
fn percentile_p50_ms(metric: Option<&FilteredMetrics>) -> Option<u64> {
let inner = metric?.inner.as_ref()?;
match inner {
filtered_metrics::Inner::Percentiles(p) => Some(p.p_50),
_ => None,
}
}
fn cluster_count_total(cm: &ClusterMetrics, key: &str) -> i64 {
let cluster_level = count_value(cm.cluster.get(key)).unwrap_or(0);
let backend_sum: i64 = cm
.backends
.iter()
.filter_map(|b| count_value(b.metrics.get(key)))
.sum();
cluster_level.saturating_add(backend_sum)
}
fn cluster_p99_max(cm: &ClusterMetrics) -> u64 {
let cluster_level = percentile_p99_ms(cm.cluster.get(names::backend::RESPONSE_TIME));
let backend_max = cm
.backends
.iter()
.filter_map(|b| percentile_p99_ms(b.metrics.get(names::backend::RESPONSE_TIME)))
.max();
cluster_level
.into_iter()
.chain(backend_max)
.max()
.unwrap_or(0)
}
fn cluster_p50_max(cm: &ClusterMetrics) -> u64 {
let cluster_level = percentile_p50_ms(cm.cluster.get(names::backend::RESPONSE_TIME));
let backend_max = cm
.backends
.iter()
.filter_map(|b| percentile_p50_ms(b.metrics.get(names::backend::RESPONSE_TIME)))
.max();
cluster_level
.into_iter()
.chain(backend_max)
.max()
.unwrap_or(0)
}
#[derive(Debug, Clone)]
pub struct ClusterRow {
pub cluster_id: String,
pub rps: u64,
pub error_rate_pct: f64,
pub p50_ms: u64,
pub p99_ms: u64,
pub backends_total: u32,
pub backends_available: u32,
}
#[derive(Debug, Clone)]
pub struct BackendRow {
pub cluster_id: String,
pub backend_id: String,
pub bw_in_bps: f64,
pub bw_out_bps: f64,
pub connections: u64,
pub p50_ms: u64,
pub p99_ms: u64,
pub requests_total: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackendSortKey {
ClusterId,
BackendId,
Bandwidth,
Connections,
LatencyP99,
Requests,
}
impl BackendSortKey {
pub const ALL: &'static [Self] = &[
Self::Bandwidth,
Self::LatencyP99,
Self::Connections,
Self::Requests,
Self::ClusterId,
Self::BackendId,
];
pub fn label(self) -> &'static str {
match self {
Self::ClusterId => "cluster",
Self::BackendId => "backend",
Self::Bandwidth => "bw",
Self::Connections => "conn",
Self::LatencyP99 => "p99",
Self::Requests => "req",
}
}
pub fn cycle(self) -> Self {
let idx = Self::ALL.iter().position(|k| *k == self).unwrap_or(0);
Self::ALL[(idx + 1) % Self::ALL.len()]
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn spark_ring_drops_oldest_at_capacity() {
let mut r = SparkRing::new(3);
r.push(1);
r.push(2);
r.push(3);
r.push(4);
assert_eq!(r.to_vec(), vec![2, 3, 4]);
assert_eq!(r.last(), Some(4));
assert_eq!(r.max(), 4);
}
#[test]
fn rate_calculator_first_observation_returns_none() {
let mut rc = RateCalculator::default();
let now = Instant::now();
assert!(rc.record("k", 100, now).is_none());
}
#[test]
fn rate_calculator_handles_monotonic_increase() {
let mut rc = RateCalculator::default();
let t0 = Instant::now();
let _ = rc.record("k", 100, t0);
let t1 = t0 + std::time::Duration::from_secs(1);
let r = rc.record("k", 150, t1).unwrap();
assert!((r - 50.0).abs() < 0.001);
}
#[test]
fn rate_calculator_emits_zero_on_hourly_reset() {
let mut rc = RateCalculator::default();
let t0 = Instant::now();
let _ = rc.record("k", 500, t0);
let t1 = t0 + std::time::Duration::from_secs(1);
let r = rc.record("k", 10, t1).unwrap();
assert_eq!(r, 0.0);
}
#[test]
fn active_tab_round_trips_digits_and_cycle() {
assert_eq!(ActiveTab::from_digit(1), Some(ActiveTab::Overview));
assert_eq!(ActiveTab::from_digit(7), Some(ActiveTab::Events));
assert_eq!(ActiveTab::from_digit(8), None);
assert_eq!(ActiveTab::Overview.cycle(true), ActiveTab::Clusters);
assert_eq!(ActiveTab::Overview.cycle(false), ActiveTab::Events);
}
}