use std::io::{self, IsTerminal, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use owo_colors::OwoColorize;
use owo_colors::Stream::Stderr;
use sonda_core::schedule::stats::ScenarioStats;
const TTY_POLL_INTERVAL: Duration = Duration::from_millis(200);
const NON_TTY_INTERVAL: Duration = Duration::from_secs(5);
struct MonitoredScenario {
name: String,
stats: Arc<RwLock<ScenarioStats>>,
target_rate: f64,
}
pub struct ProgressDisplay {
thread: Option<JoinHandle<()>>,
stop_flag: Arc<AtomicBool>,
}
impl ProgressDisplay {
pub fn start(scenarios: Vec<(String, Arc<RwLock<ScenarioStats>>, f64)>) -> Self {
let stop_flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&stop_flag);
let monitored: Vec<MonitoredScenario> = scenarios
.into_iter()
.map(|(name, stats, target_rate)| MonitoredScenario {
name,
stats,
target_rate,
})
.collect();
let is_tty = io::stderr().is_terminal();
let thread = thread::Builder::new()
.name("sonda-progress".to_string())
.spawn(move || {
if is_tty {
run_tty_loop(&monitored, &flag_clone);
} else {
run_non_tty_loop(&monitored, &flag_clone);
}
})
.expect("failed to spawn progress monitoring thread");
ProgressDisplay {
thread: Some(thread),
stop_flag,
}
}
pub fn stop(mut self) {
self.stop_flag.store(true, Ordering::SeqCst);
if let Some(thread) = self.thread.take() {
let _ = thread.join();
}
}
}
impl Drop for ProgressDisplay {
fn drop(&mut self) {
self.stop_flag.store(true, Ordering::SeqCst);
}
}
fn read_stats(stats: &Arc<RwLock<ScenarioStats>>) -> ScenarioStats {
match stats.read() {
Ok(guard) => guard.clone(),
Err(poisoned) => poisoned.into_inner().clone(),
}
}
fn run_tty_loop(scenarios: &[MonitoredScenario], stop_flag: &AtomicBool) {
let mut first_render = true;
let start = Instant::now();
while !stop_flag.load(Ordering::SeqCst) {
thread::sleep(TTY_POLL_INTERVAL);
if stop_flag.load(Ordering::SeqCst) {
break;
}
let elapsed = start.elapsed();
let mut stderr = io::stderr().lock();
if !first_render {
for _ in 0..scenarios.len() {
let _ = write!(stderr, "\x1b[A");
}
}
for scenario in scenarios {
let stats = read_stats(&scenario.stats);
let line = format_tty_line(&scenario.name, &stats, scenario.target_rate, elapsed);
let _ = write!(stderr, "\x1b[2K{line}\r\n");
}
let _ = stderr.flush();
first_render = false;
}
let mut stderr = io::stderr().lock();
if !first_render {
for _ in 0..scenarios.len() {
let _ = write!(stderr, "\x1b[A\x1b[2K");
}
}
let _ = stderr.flush();
}
fn run_non_tty_loop(scenarios: &[MonitoredScenario], stop_flag: &AtomicBool) {
let start = Instant::now();
let mut last_emit = Instant::now();
let check_interval = Duration::from_millis(200);
while !stop_flag.load(Ordering::SeqCst) {
thread::sleep(check_interval);
if stop_flag.load(Ordering::SeqCst) {
break;
}
if last_emit.elapsed() < NON_TTY_INTERVAL {
continue;
}
last_emit = Instant::now();
let elapsed = start.elapsed();
for scenario in scenarios {
let stats = read_stats(&scenario.stats);
let line = format_non_tty_line(&scenario.name, &stats, scenario.target_rate, elapsed);
eprintln!("{line}");
}
}
}
fn format_tty_line(
name: &str,
stats: &ScenarioStats,
target_rate: f64,
elapsed: Duration,
) -> String {
let indicator = format_indicator(stats);
let bold_name = format!("{}", name.if_supports_color(Stderr, |t| t.bold()));
let pipe = format!("{}", "|".if_supports_color(Stderr, |t| t.dimmed()));
let events_label = format!("{}", "events:".if_supports_color(Stderr, |t| t.dimmed()));
let events_value = format_count(stats.total_events);
let rate_label = format!("{}", "rate:".if_supports_color(Stderr, |t| t.dimmed()));
let rate_value = format_rate_with_target(stats.current_rate, target_rate);
let bytes_label = format!("{}", "bytes:".if_supports_color(Stderr, |t| t.dimmed()));
let bytes_value = format!(
"{}",
format_bytes(stats.bytes_emitted).if_supports_color(Stderr, |t| t.cyan())
);
let elapsed_label = format!("{}", "elapsed:".if_supports_color(Stderr, |t| t.dimmed()));
let elapsed_value = format_elapsed(elapsed);
let window_tag = format_window_tag(stats);
format!(
" {indicator} {bold_name} {events_label} {events_value} {pipe} {rate_label} {rate_value} {pipe} {bytes_label} {bytes_value} {pipe} {elapsed_label} {elapsed_value}{window_tag}"
)
}
fn format_non_tty_line(
name: &str,
stats: &ScenarioStats,
target_rate: f64,
elapsed: Duration,
) -> String {
let events = stats.total_events;
let rate = format_rate_plain(stats.current_rate, target_rate);
let bytes = format_bytes(stats.bytes_emitted);
let elapsed_str = format_elapsed_plain(elapsed);
let window = format_window_tag_plain(stats);
format!(
"[progress] {name} events: {events} | rate: {rate} | bytes: {bytes} | elapsed: {elapsed_str}{window}"
)
}
fn format_indicator(stats: &ScenarioStats) -> String {
if stats.in_gap {
format!("{}", "~".if_supports_color(Stderr, |t| t.yellow()))
} else if stats.in_burst {
format!("{}", "~".if_supports_color(Stderr, |t| t.magenta()))
} else {
format!("{}", "~".if_supports_color(Stderr, |t| t.green()))
}
}
fn format_window_tag(stats: &ScenarioStats) -> String {
let mut tags = Vec::new();
if stats.in_gap {
tags.push(format!(
"{}",
"[gap]".if_supports_color(Stderr, |t| t.yellow())
));
}
if stats.in_burst {
tags.push(format!(
"{}",
"[burst]".if_supports_color(Stderr, |t| t.magenta())
));
}
if stats.in_cardinality_spike {
tags.push(format!(
"{}",
"[spike]".if_supports_color(Stderr, |t| t.red())
));
}
if tags.is_empty() {
String::new()
} else {
format!(" {}", tags.join(" "))
}
}
fn format_window_tag_plain(stats: &ScenarioStats) -> String {
let mut tags = Vec::new();
if stats.in_gap {
tags.push("[gap]");
}
if stats.in_burst {
tags.push("[burst]");
}
if stats.in_cardinality_spike {
tags.push("[spike]");
}
if tags.is_empty() {
String::new()
} else {
format!(" {}", tags.join(" "))
}
}
fn format_rate_with_target(current: f64, target: f64) -> String {
let rate_str = format!("{:.1}/s", current);
let ratio = if target > 0.0 { current / target } else { 1.0 };
if (0.8..=1.2).contains(&ratio) {
format!("{}", rate_str.if_supports_color(Stderr, |t| t.green()))
} else {
format!("{}", rate_str.if_supports_color(Stderr, |t| t.yellow()))
}
}
fn format_rate_plain(current: f64, _target: f64) -> String {
format!("{:.1}/s", current)
}
fn format_count(n: u64) -> String {
if n < 1_000 {
return format!("{}", n.if_supports_color(Stderr, |t| t.green()));
}
let s = n.to_string();
let mut result = String::with_capacity(s.len() + s.len() / 3);
for (i, ch) in s.chars().rev().enumerate() {
if i > 0 && i % 3 == 0 {
result.push(',');
}
result.push(ch);
}
let formatted: String = result.chars().rev().collect();
format!("{}", formatted.if_supports_color(Stderr, |t| t.green()))
}
fn format_elapsed(elapsed: Duration) -> String {
let secs = elapsed.as_secs_f64();
if secs < 60.0 {
format!("{secs:.1}s")
} else if secs < 3600.0 {
let mins = (secs / 60.0).floor() as u64;
let remaining = secs - (mins as f64 * 60.0);
format!("{mins}m{remaining:.0}s")
} else {
let hours = (secs / 3600.0).floor() as u64;
let remaining_mins = ((secs - hours as f64 * 3600.0) / 60.0).floor() as u64;
format!("{hours}h{remaining_mins}m")
}
}
fn format_elapsed_plain(elapsed: Duration) -> String {
format_elapsed(elapsed)
}
fn format_bytes(bytes: u64) -> String {
const KB: u64 = 1024;
const MB: u64 = 1024 * 1024;
const GB: u64 = 1024 * 1024 * 1024;
if bytes < KB {
format!("{bytes} B")
} else if bytes < MB {
format!("{:.1} KB", bytes as f64 / KB as f64)
} else if bytes < GB {
format!("{:.1} MB", bytes as f64 / MB as f64)
} else {
format!("{:.1} GB", bytes as f64 / GB as f64)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn format_bytes_zero() {
assert_eq!(format_bytes(0), "0 B");
}
#[test]
fn format_bytes_below_kb() {
assert_eq!(format_bytes(500), "500 B");
}
#[test]
fn format_bytes_one_kb() {
assert_eq!(format_bytes(1024), "1.0 KB");
}
#[test]
fn format_bytes_one_mb() {
assert_eq!(format_bytes(1_048_576), "1.0 MB");
}
#[test]
fn format_bytes_one_gb() {
assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
}
#[test]
fn format_elapsed_seconds_only() {
let d = Duration::from_secs_f64(5.3);
assert_eq!(format_elapsed(d), "5.3s");
}
#[test]
fn format_elapsed_minutes_and_seconds() {
let d = Duration::from_secs(90);
assert_eq!(format_elapsed(d), "1m30s");
}
#[test]
fn format_elapsed_hours_and_minutes() {
let d = Duration::from_secs(3661);
assert_eq!(format_elapsed(d), "1h1m");
}
#[test]
fn format_elapsed_zero() {
let d = Duration::ZERO;
assert_eq!(format_elapsed(d), "0.0s");
}
#[test]
fn format_elapsed_exactly_one_minute() {
let d = Duration::from_secs(60);
assert_eq!(format_elapsed(d), "1m0s");
}
#[test]
fn format_elapsed_exactly_one_hour() {
let d = Duration::from_secs(3600);
assert_eq!(format_elapsed(d), "1h0m");
}
fn strip_ansi(s: &str) -> String {
let mut result = String::new();
let mut in_escape = false;
for ch in s.chars() {
if ch == '\x1b' {
in_escape = true;
} else if in_escape {
if ch.is_ascii_alphabetic() {
in_escape = false;
}
} else {
result.push(ch);
}
}
result
}
#[test]
fn format_count_small_number() {
let s = strip_ansi(&format_count(42));
assert_eq!(s, "42");
}
#[test]
fn format_count_thousands() {
let s = strip_ansi(&format_count(1234));
assert_eq!(s, "1,234");
}
#[test]
fn format_count_millions() {
let s = strip_ansi(&format_count(1_234_567));
assert_eq!(s, "1,234,567");
}
#[test]
fn format_count_zero() {
let s = strip_ansi(&format_count(0));
assert_eq!(s, "0");
}
#[test]
fn format_count_exactly_one_thousand() {
let s = strip_ansi(&format_count(1000));
assert_eq!(s, "1,000");
}
#[test]
fn window_tag_plain_no_windows_active() {
let stats = ScenarioStats::default();
assert_eq!(format_window_tag_plain(&stats), "");
}
#[test]
fn window_tag_plain_gap_active() {
let mut stats = ScenarioStats::default();
stats.in_gap = true;
assert_eq!(format_window_tag_plain(&stats), " [gap]");
}
#[test]
fn window_tag_plain_burst_active() {
let mut stats = ScenarioStats::default();
stats.in_burst = true;
assert_eq!(format_window_tag_plain(&stats), " [burst]");
}
#[test]
fn window_tag_plain_spike_active() {
let mut stats = ScenarioStats::default();
stats.in_cardinality_spike = true;
assert_eq!(format_window_tag_plain(&stats), " [spike]");
}
#[test]
fn window_tag_plain_multiple_windows_active() {
let mut stats = ScenarioStats::default();
stats.in_burst = true;
stats.in_cardinality_spike = true;
assert_eq!(format_window_tag_plain(&stats), " [burst] [spike]");
}
#[test]
fn format_rate_plain_normal_rate() {
assert_eq!(format_rate_plain(99.5, 100.0), "99.5/s");
}
#[test]
fn format_rate_plain_zero_rate() {
assert_eq!(format_rate_plain(0.0, 100.0), "0.0/s");
}
#[test]
fn non_tty_line_contains_scenario_name() {
let mut stats = ScenarioStats::default();
stats.total_events = 42;
stats.bytes_emitted = 1024;
stats.current_rate = 10.0;
let line = format_non_tty_line("cpu_usage", &stats, 10.0, Duration::from_secs(5));
assert!(
line.contains("cpu_usage"),
"line must contain scenario name"
);
assert!(
line.contains("[progress]"),
"line must contain [progress] prefix"
);
assert!(line.contains("42"), "line must contain event count");
}
#[test]
fn non_tty_line_shows_window_state() {
let mut stats = ScenarioStats::default();
stats.in_burst = true;
let line = format_non_tty_line("test", &stats, 10.0, Duration::from_secs(1));
assert!(
line.contains("[burst]"),
"line must show burst window state"
);
}
#[test]
fn progress_display_starts_and_stops_cleanly() {
let stats = Arc::new(RwLock::new(ScenarioStats::default()));
let display = ProgressDisplay::start(vec![("test".to_string(), stats, 10.0)]);
thread::sleep(Duration::from_millis(50));
display.stop();
}
#[test]
fn progress_display_handles_multiple_scenarios() {
let stats1 = Arc::new(RwLock::new(ScenarioStats::default()));
let stats2 = Arc::new(RwLock::new(ScenarioStats::default()));
let display = ProgressDisplay::start(vec![
("scenario-1".to_string(), stats1, 10.0),
("scenario-2".to_string(), stats2, 20.0),
]);
thread::sleep(Duration::from_millis(50));
display.stop();
}
#[test]
fn progress_display_drop_signals_stop_without_panic() {
let stats = Arc::new(RwLock::new(ScenarioStats::default()));
let display = ProgressDisplay::start(vec![("drop-test".to_string(), stats, 10.0)]);
drop(display);
thread::sleep(Duration::from_millis(300));
}
#[test]
fn progress_display_reads_updated_stats() {
let stats = Arc::new(RwLock::new(ScenarioStats::default()));
let stats_writer = Arc::clone(&stats);
let display = ProgressDisplay::start(vec![("live-test".to_string(), stats, 100.0)]);
{
let mut s = stats_writer.write().expect("lock must not be poisoned");
s.total_events = 500;
s.bytes_emitted = 4096;
s.current_rate = 95.0;
}
thread::sleep(Duration::from_millis(300));
display.stop();
}
#[test]
fn progress_display_is_send() {
fn assert_send<T: Send>() {}
assert_send::<ProgressDisplay>();
}
}