use crate::logger::Colors;
fn classify_streaming_pattern(
sizes_vec: &[usize],
total_deltas: usize,
avg_delta_size: usize,
) -> StreamingPattern {
if total_deltas < 2 {
return StreamingPattern::Normal;
}
let mean_u32 = u32::try_from(avg_delta_size).unwrap_or(u32::MAX);
let mean = f64::from(mean_u32);
if mean < 0.001 {
return StreamingPattern::Empty;
}
classify_by_coefficient_of_variation(sizes_vec, total_deltas, avg_delta_size, mean)
}
fn compute_cv(sizes_vec: &[usize], total_deltas: usize, avg_delta_size: usize, mean: f64) -> f64 {
let variance_sum: usize = sizes_vec
.iter()
.map(|&size| {
let diff = size.abs_diff(avg_delta_size);
diff.saturating_mul(diff)
})
.sum();
let variance = variance_sum / total_deltas;
let variance_u32 = u32::try_from(variance).unwrap_or(u32::MAX);
f64::from(variance_u32).sqrt() / mean
}
fn cv_to_pattern(cv: f64) -> StreamingPattern {
if cv < 0.3 {
StreamingPattern::Smooth
} else if cv < 1.0 {
StreamingPattern::Normal
} else {
StreamingPattern::Bursty
}
}
fn classify_by_coefficient_of_variation(
sizes_vec: &[usize],
total_deltas: usize,
avg_delta_size: usize,
mean: f64,
) -> StreamingPattern {
cv_to_pattern(compute_cv(sizes_vec, total_deltas, avg_delta_size, mean))
}
fn format_streaming_base(m: &StreamingQualityMetrics, colors: Colors) -> String {
let pattern_str = match m.pattern {
StreamingPattern::Empty => "empty",
StreamingPattern::Smooth => "smooth",
StreamingPattern::Normal => "normal",
StreamingPattern::Bursty => "bursty",
};
format!(
"{}[Streaming]{} {} deltas, avg {} bytes (min {}, max {}), pattern: {}",
colors.dim(),
colors.reset(),
m.total_deltas,
m.avg_delta_size,
m.min_delta_size,
m.max_delta_size,
pattern_str
)
}
fn push_if_nonzero(parts: &mut Vec<String>, count: usize, label: &str, color: &str, reset: &str) {
if count > 0 {
parts.push(format!("{}{}: {}{}", color, label, count, reset));
}
}
fn collect_streaming_extras(m: &StreamingQualityMetrics, colors: Colors) -> Vec<String> {
let mut parts = Vec::new();
push_if_nonzero(
&mut parts,
m.snapshot_repairs_count,
"snapshot repairs",
colors.yellow(),
colors.reset(),
);
push_if_nonzero(
&mut parts,
m.large_delta_count,
"large deltas",
colors.yellow(),
colors.reset(),
);
push_if_nonzero(
&mut parts,
m.protocol_violations,
"protocol violations",
colors.red(),
colors.reset(),
);
if let Some(queue_str) = format_queue_metrics(m, colors) {
parts.push(queue_str);
}
parts
}
fn collect_queue_parts(m: &StreamingQualityMetrics, colors: Colors) -> Vec<String> {
let mut queue_parts = Vec::new();
if m.queue_depth > 0 {
queue_parts.push(format!("depth: {}", m.queue_depth));
}
push_if_nonzero(
&mut queue_parts,
m.queue_dropped_events,
"dropped",
colors.yellow(),
colors.reset(),
);
push_if_nonzero(
&mut queue_parts,
m.queue_backpressure_count,
"backpressure",
colors.yellow(),
colors.reset(),
);
queue_parts
}
fn format_queue_metrics(m: &StreamingQualityMetrics, colors: Colors) -> Option<String> {
if m.queue_depth == 0 && m.queue_dropped_events == 0 && m.queue_backpressure_count == 0 {
return None;
}
let queue_parts = collect_queue_parts(m, colors);
if queue_parts.is_empty() {
None
} else {
Some(format!("queue: {}", queue_parts.join(", ")))
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamingQualityMetrics {
pub total_deltas: usize,
pub avg_delta_size: usize,
pub min_delta_size: usize,
pub max_delta_size: usize,
pub pattern: StreamingPattern,
pub snapshot_repairs_count: usize,
pub large_delta_count: usize,
pub protocol_violations: usize,
pub queue_depth: usize,
pub queue_dropped_events: usize,
pub queue_backpressure_count: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum StreamingPattern {
#[default]
Empty,
Smooth,
Normal,
Bursty,
}
impl StreamingQualityMetrics {
pub fn from_sizes<I: Iterator<Item = usize>>(sizes: I) -> Self {
let sizes_vec: Vec<_> = sizes.collect();
if sizes_vec.is_empty() {
return Self::default();
}
let total_deltas = sizes_vec.len();
let min_delta_size = sizes_vec.iter().copied().min().unwrap_or(0);
let max_delta_size = sizes_vec.iter().copied().max().unwrap_or(0);
let sum: usize = sizes_vec.iter().sum();
let avg_delta_size = sum / total_deltas;
let pattern = classify_streaming_pattern(&sizes_vec, total_deltas, avg_delta_size);
Self {
total_deltas,
avg_delta_size,
min_delta_size,
max_delta_size,
pattern,
snapshot_repairs_count: 0,
large_delta_count: 0,
protocol_violations: 0,
queue_depth: 0,
queue_dropped_events: 0,
queue_backpressure_count: 0,
}
}
#[must_use]
pub fn format(&self, colors: Colors) -> String {
if self.total_deltas == 0 {
return format!(
"{}[Streaming]{} No deltas recorded",
colors.dim(),
colors.reset()
);
}
let base = format_streaming_base(self, colors);
let extras = collect_streaming_extras(self, colors);
if extras.is_empty() {
base
} else {
format!("{base}, {}", extras.join(", "))
}
}
}