use std::collections::HashSet;
use std::io::{self, IsTerminal, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use owo_colors::OwoColorize;
use owo_colors::Stream::Stderr;
use sonda_core::schedule::stats::{ScenarioState, ScenarioStats};
const TTY_POLL_INTERVAL: Duration = Duration::from_millis(200);
const NON_TTY_INTERVAL: Duration = Duration::from_secs(5);
struct MonitoredScenario {
name: String,
stats: Arc<RwLock<ScenarioStats>>,
target_rate: f64,
alive: Arc<AtomicBool>,
}
pub struct ProgressDisplay {
thread: Option<JoinHandle<()>>,
stop_flag: Arc<AtomicBool>,
}
impl ProgressDisplay {
#[allow(clippy::type_complexity)]
pub fn start(
scenarios: Vec<(String, Arc<RwLock<ScenarioStats>>, f64, Arc<AtomicBool>)>,
) -> Self {
let stop_flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&stop_flag);
let monitored: Vec<MonitoredScenario> = scenarios
.into_iter()
.map(|(name, stats, target_rate, alive)| MonitoredScenario {
name,
stats,
target_rate,
alive,
})
.collect();
let is_tty = io::stderr().is_terminal();
let thread = thread::Builder::new()
.name("sonda-progress".to_string())
.spawn(move || {
if is_tty {
run_tty_loop(&monitored, &flag_clone);
} else {
run_non_tty_loop(&monitored, &flag_clone);
}
})
.expect("failed to spawn progress monitoring thread");
ProgressDisplay {
thread: Some(thread),
stop_flag,
}
}
pub fn stop(mut self) {
self.stop_flag.store(true, Ordering::SeqCst);
if let Some(thread) = self.thread.take() {
let _ = thread.join();
}
}
}
impl Drop for ProgressDisplay {
fn drop(&mut self) {
self.stop_flag.store(true, Ordering::SeqCst);
}
}
fn read_stats(stats: &Arc<RwLock<ScenarioStats>>) -> ScenarioStats {
match stats.read() {
Ok(guard) => guard.clone(),
Err(poisoned) => poisoned.into_inner().clone(),
}
}
fn run_tty_loop(scenarios: &[MonitoredScenario], stop_flag: &AtomicBool) {
let mut first_render = true;
let start = Instant::now();
let mut banner_emitted: HashSet<String> = HashSet::new();
let mut live_lines_last_render: usize = 0;
while !stop_flag.load(Ordering::SeqCst) {
thread::sleep(TTY_POLL_INTERVAL);
if stop_flag.load(Ordering::SeqCst) {
break;
}
let elapsed = start.elapsed();
let mut stderr = io::stderr().lock();
if !first_render && live_lines_last_render > 0 {
for _ in 0..live_lines_last_render {
let _ = write!(stderr, "\x1b[A");
}
}
let mut new_banners: Vec<String> = Vec::new();
let mut live_count: usize = 0;
for scenario in scenarios {
if banner_emitted.contains(&scenario.name) {
continue;
}
let stats = read_stats(&scenario.stats);
if stats.state == ScenarioState::Paused && scenario.alive.load(Ordering::SeqCst) {
let line = format_paused_line_tty(&scenario.name, &stats, elapsed);
let _ = write!(stderr, "\x1b[2K{line}\r\n");
live_count += 1;
continue;
}
if !scenario.alive.load(Ordering::SeqCst) {
let banner = format_stopped_line_tty(&scenario.name, &stats, elapsed);
new_banners.push(banner);
banner_emitted.insert(scenario.name.clone());
continue;
}
let line = format_tty_line(&scenario.name, &stats, scenario.target_rate, elapsed);
let _ = write!(stderr, "\x1b[2K{line}\r\n");
live_count += 1;
}
for banner in new_banners {
let _ = write!(stderr, "\x1b[2K{banner}\r\n");
}
let _ = stderr.flush();
first_render = false;
live_lines_last_render = live_count;
}
let mut stderr = io::stderr().lock();
if !first_render && live_lines_last_render > 0 {
for _ in 0..live_lines_last_render {
let _ = write!(stderr, "\x1b[A\x1b[2K");
}
}
let final_elapsed = start.elapsed();
drain_stopped_scenarios(scenarios, &banner_emitted, |name, stats| {
let banner = format_stopped_line_tty(name, stats, final_elapsed);
let _ = write!(stderr, "\x1b[2K{banner}\r\n");
});
let _ = stderr.flush();
}
fn run_non_tty_loop(scenarios: &[MonitoredScenario], stop_flag: &AtomicBool) {
let start = Instant::now();
let mut last_emit = Instant::now();
let check_interval = Duration::from_millis(200);
let mut banner_emitted: HashSet<String> = HashSet::new();
let mut paused_announced: HashSet<String> = HashSet::new();
while !stop_flag.load(Ordering::SeqCst) {
thread::sleep(check_interval);
if stop_flag.load(Ordering::SeqCst) {
break;
}
for scenario in scenarios {
if banner_emitted.contains(&scenario.name) {
continue;
}
if !scenario.alive.load(Ordering::SeqCst) {
let stats = read_stats(&scenario.stats);
let banner = format_stopped_line_plain(&scenario.name, &stats, start.elapsed());
eprintln!("{banner}");
banner_emitted.insert(scenario.name.clone());
}
}
for scenario in scenarios {
if banner_emitted.contains(&scenario.name) {
continue;
}
let stats = read_stats(&scenario.stats);
if stats.state == ScenarioState::Paused && scenario.alive.load(Ordering::SeqCst) {
if !paused_announced.contains(&scenario.name) {
let line = format_paused_line_plain(&scenario.name, &stats, start.elapsed());
eprintln!("{line}");
paused_announced.insert(scenario.name.clone());
}
} else {
paused_announced.remove(&scenario.name);
}
}
if last_emit.elapsed() < NON_TTY_INTERVAL {
continue;
}
last_emit = Instant::now();
let elapsed = start.elapsed();
for scenario in scenarios {
if banner_emitted.contains(&scenario.name) {
continue;
}
let stats = read_stats(&scenario.stats);
if stats.state == ScenarioState::Paused && scenario.alive.load(Ordering::SeqCst) {
continue;
}
let line = format_non_tty_line(&scenario.name, &stats, scenario.target_rate, elapsed);
eprintln!("{line}");
}
}
let final_elapsed = start.elapsed();
drain_stopped_scenarios(scenarios, &banner_emitted, |name, stats| {
let banner = format_stopped_line_plain(name, stats, final_elapsed);
eprintln!("{banner}");
});
}
fn drain_stopped_scenarios<F>(
scenarios: &[MonitoredScenario],
banner_emitted: &HashSet<String>,
mut emit: F,
) where
F: FnMut(&str, &ScenarioStats),
{
for scenario in scenarios {
if banner_emitted.contains(&scenario.name) {
continue;
}
if !scenario.alive.load(Ordering::SeqCst) {
let stats = read_stats(&scenario.stats);
emit(&scenario.name, &stats);
}
}
}
fn format_stopped_line_plain(name: &str, stats: &ScenarioStats, elapsed: Duration) -> String {
let error_clause = match stats.last_sink_error.as_ref() {
Some(e) => format!(" (sink: {e})"),
None => String::new(),
};
format!(
"[progress] {name} STOPPED{error_clause} | events: {events} | bytes: {bytes} | elapsed: {elapsed_str}",
events = stats.total_events,
bytes = format_bytes(stats.bytes_emitted),
elapsed_str = format_elapsed_plain(elapsed),
)
}
fn format_paused_line_plain(name: &str, stats: &ScenarioStats, elapsed: Duration) -> String {
format!(
"[progress] {name} PAUSED events: {events} | rate: 0.0/s | elapsed: {elapsed_str}",
events = stats.total_events,
elapsed_str = format_elapsed_plain(elapsed),
)
}
fn format_stopped_line_tty(name: &str, stats: &ScenarioStats, elapsed: Duration) -> String {
let bold_name = format!("{}", name.if_supports_color(Stderr, |t| t.bold()));
let label = format!("{}", "STOPPED".if_supports_color(Stderr, |t| t.red()));
let pipe = format!("{}", "|".if_supports_color(Stderr, |t| t.dimmed()));
let error_clause = match stats.last_sink_error.as_ref() {
Some(e) => format!(" (sink: {e})"),
None => String::new(),
};
let events_label = format!("{}", "events:".if_supports_color(Stderr, |t| t.dimmed()));
let events_value = format_count(stats.total_events);
let bytes_label = format!("{}", "bytes:".if_supports_color(Stderr, |t| t.dimmed()));
let bytes_value = format!(
"{}",
format_bytes(stats.bytes_emitted).if_supports_color(Stderr, |t| t.cyan())
);
let elapsed_label = format!("{}", "elapsed:".if_supports_color(Stderr, |t| t.dimmed()));
let elapsed_value = format_elapsed(elapsed);
format!(
" {label} {bold_name}{error_clause} {events_label} {events_value} {pipe} {bytes_label} {bytes_value} {pipe} {elapsed_label} {elapsed_value}"
)
}
fn format_paused_line_tty(name: &str, stats: &ScenarioStats, elapsed: Duration) -> String {
let bold_name = format!("{}", name.if_supports_color(Stderr, |t| t.bold()));
let label = format!("{}", "PAUSED".if_supports_color(Stderr, |t| t.yellow()));
let pipe = format!("{}", "|".if_supports_color(Stderr, |t| t.dimmed()));
let events_label = format!("{}", "events:".if_supports_color(Stderr, |t| t.dimmed()));
let events_value = format_count(stats.total_events);
let rate_label = format!("{}", "rate:".if_supports_color(Stderr, |t| t.dimmed()));
let rate_value = format!("{}", "0.0/s".if_supports_color(Stderr, |t| t.dimmed()));
let elapsed_label = format!("{}", "elapsed:".if_supports_color(Stderr, |t| t.dimmed()));
let elapsed_value = format_elapsed(elapsed);
format!(
" {label} {bold_name} {events_label} {events_value} {pipe} {rate_label} {rate_value} {pipe} {elapsed_label} {elapsed_value}"
)
}
fn format_tty_line(
name: &str,
stats: &ScenarioStats,
target_rate: f64,
elapsed: Duration,
) -> String {
let indicator = format_indicator(stats);
let bold_name = format!("{}", name.if_supports_color(Stderr, |t| t.bold()));
let pipe = format!("{}", "|".if_supports_color(Stderr, |t| t.dimmed()));
let events_label = format!("{}", "events:".if_supports_color(Stderr, |t| t.dimmed()));
let events_value = format_count(stats.total_events);
let rate_label = format!("{}", "rate:".if_supports_color(Stderr, |t| t.dimmed()));
let rate_value = format_rate_with_target(stats.current_rate, target_rate);
let bytes_label = format!("{}", "bytes:".if_supports_color(Stderr, |t| t.dimmed()));
let bytes_value = format!(
"{}",
format_bytes(stats.bytes_emitted).if_supports_color(Stderr, |t| t.cyan())
);
let elapsed_label = format!("{}", "elapsed:".if_supports_color(Stderr, |t| t.dimmed()));
let elapsed_value = format_elapsed(elapsed);
let window_tag = format_window_tag(stats);
format!(
" {indicator} {bold_name} {events_label} {events_value} {pipe} {rate_label} {rate_value} {pipe} {bytes_label} {bytes_value} {pipe} {elapsed_label} {elapsed_value}{window_tag}"
)
}
fn format_non_tty_line(
name: &str,
stats: &ScenarioStats,
target_rate: f64,
elapsed: Duration,
) -> String {
let events = stats.total_events;
let rate = format_rate_plain(stats.current_rate, target_rate);
let bytes = format_bytes(stats.bytes_emitted);
let elapsed_str = format_elapsed_plain(elapsed);
let window = format_window_tag_plain(stats);
format!(
"[progress] {name} events: {events} | rate: {rate} | bytes: {bytes} | elapsed: {elapsed_str}{window}"
)
}
fn format_indicator(stats: &ScenarioStats) -> String {
if stats.in_gap {
format!("{}", "~".if_supports_color(Stderr, |t| t.yellow()))
} else if stats.in_burst {
format!("{}", "~".if_supports_color(Stderr, |t| t.magenta()))
} else {
format!("{}", "~".if_supports_color(Stderr, |t| t.green()))
}
}
fn format_window_tag(stats: &ScenarioStats) -> String {
let mut tags = Vec::new();
if stats.in_gap {
tags.push(format!(
"{}",
"[gap]".if_supports_color(Stderr, |t| t.yellow())
));
}
if stats.in_burst {
tags.push(format!(
"{}",
"[burst]".if_supports_color(Stderr, |t| t.magenta())
));
}
if stats.in_cardinality_spike {
tags.push(format!(
"{}",
"[spike]".if_supports_color(Stderr, |t| t.red())
));
}
if tags.is_empty() {
String::new()
} else {
format!(" {}", tags.join(" "))
}
}
fn format_window_tag_plain(stats: &ScenarioStats) -> String {
let mut tags = Vec::new();
if stats.in_gap {
tags.push("[gap]");
}
if stats.in_burst {
tags.push("[burst]");
}
if stats.in_cardinality_spike {
tags.push("[spike]");
}
if tags.is_empty() {
String::new()
} else {
format!(" {}", tags.join(" "))
}
}
fn format_rate_with_target(current: f64, target: f64) -> String {
let rate_str = format!("{:.1}/s", current);
let ratio = if target > 0.0 { current / target } else { 1.0 };
if (0.8..=1.2).contains(&ratio) {
format!("{}", rate_str.if_supports_color(Stderr, |t| t.green()))
} else {
format!("{}", rate_str.if_supports_color(Stderr, |t| t.yellow()))
}
}
fn format_rate_plain(current: f64, _target: f64) -> String {
format!("{:.1}/s", current)
}
fn format_count(n: u64) -> String {
if n < 1_000 {
return format!("{}", n.if_supports_color(Stderr, |t| t.green()));
}
let s = n.to_string();
let mut result = String::with_capacity(s.len() + s.len() / 3);
for (i, ch) in s.chars().rev().enumerate() {
if i > 0 && i % 3 == 0 {
result.push(',');
}
result.push(ch);
}
let formatted: String = result.chars().rev().collect();
format!("{}", formatted.if_supports_color(Stderr, |t| t.green()))
}
fn format_elapsed(elapsed: Duration) -> String {
let secs = elapsed.as_secs_f64();
if secs < 60.0 {
format!("{secs:.1}s")
} else if secs < 3600.0 {
let mins = (secs / 60.0).floor() as u64;
let remaining = secs - (mins as f64 * 60.0);
format!("{mins}m{remaining:.0}s")
} else {
let hours = (secs / 3600.0).floor() as u64;
let remaining_mins = ((secs - hours as f64 * 3600.0) / 60.0).floor() as u64;
format!("{hours}h{remaining_mins}m")
}
}
fn format_elapsed_plain(elapsed: Duration) -> String {
format_elapsed(elapsed)
}
fn format_bytes(bytes: u64) -> String {
const KB: u64 = 1024;
const MB: u64 = 1024 * 1024;
const GB: u64 = 1024 * 1024 * 1024;
if bytes < KB {
format!("{bytes} B")
} else if bytes < MB {
format!("{:.1} KB", bytes as f64 / KB as f64)
} else if bytes < GB {
format!("{:.1} MB", bytes as f64 / MB as f64)
} else {
format!("{:.1} GB", bytes as f64 / GB as f64)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn format_bytes_zero() {
assert_eq!(format_bytes(0), "0 B");
}
#[test]
fn format_bytes_below_kb() {
assert_eq!(format_bytes(500), "500 B");
}
#[test]
fn format_bytes_one_kb() {
assert_eq!(format_bytes(1024), "1.0 KB");
}
#[test]
fn format_bytes_one_mb() {
assert_eq!(format_bytes(1_048_576), "1.0 MB");
}
#[test]
fn format_bytes_one_gb() {
assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
}
#[test]
fn format_elapsed_seconds_only() {
let d = Duration::from_secs_f64(5.3);
assert_eq!(format_elapsed(d), "5.3s");
}
#[test]
fn format_elapsed_minutes_and_seconds() {
let d = Duration::from_secs(90);
assert_eq!(format_elapsed(d), "1m30s");
}
#[test]
fn format_elapsed_hours_and_minutes() {
let d = Duration::from_secs(3661);
assert_eq!(format_elapsed(d), "1h1m");
}
#[test]
fn format_elapsed_zero() {
let d = Duration::ZERO;
assert_eq!(format_elapsed(d), "0.0s");
}
#[test]
fn format_elapsed_exactly_one_minute() {
let d = Duration::from_secs(60);
assert_eq!(format_elapsed(d), "1m0s");
}
#[test]
fn format_elapsed_exactly_one_hour() {
let d = Duration::from_secs(3600);
assert_eq!(format_elapsed(d), "1h0m");
}
fn strip_ansi(s: &str) -> String {
let mut result = String::new();
let mut in_escape = false;
for ch in s.chars() {
if ch == '\x1b' {
in_escape = true;
} else if in_escape {
if ch.is_ascii_alphabetic() {
in_escape = false;
}
} else {
result.push(ch);
}
}
result
}
#[test]
fn format_count_small_number() {
let s = strip_ansi(&format_count(42));
assert_eq!(s, "42");
}
#[test]
fn format_count_thousands() {
let s = strip_ansi(&format_count(1234));
assert_eq!(s, "1,234");
}
#[test]
fn format_count_millions() {
let s = strip_ansi(&format_count(1_234_567));
assert_eq!(s, "1,234,567");
}
#[test]
fn format_count_zero() {
let s = strip_ansi(&format_count(0));
assert_eq!(s, "0");
}
#[test]
fn format_count_exactly_one_thousand() {
let s = strip_ansi(&format_count(1000));
assert_eq!(s, "1,000");
}
#[test]
fn window_tag_plain_no_windows_active() {
let stats = ScenarioStats::default();
assert_eq!(format_window_tag_plain(&stats), "");
}
#[test]
fn window_tag_plain_gap_active() {
let mut stats = ScenarioStats::default();
stats.in_gap = true;
assert_eq!(format_window_tag_plain(&stats), " [gap]");
}
#[test]
fn window_tag_plain_burst_active() {
let mut stats = ScenarioStats::default();
stats.in_burst = true;
assert_eq!(format_window_tag_plain(&stats), " [burst]");
}
#[test]
fn window_tag_plain_spike_active() {
let mut stats = ScenarioStats::default();
stats.in_cardinality_spike = true;
assert_eq!(format_window_tag_plain(&stats), " [spike]");
}
#[test]
fn window_tag_plain_multiple_windows_active() {
let mut stats = ScenarioStats::default();
stats.in_burst = true;
stats.in_cardinality_spike = true;
assert_eq!(format_window_tag_plain(&stats), " [burst] [spike]");
}
#[test]
fn format_rate_plain_normal_rate() {
assert_eq!(format_rate_plain(99.5, 100.0), "99.5/s");
}
#[test]
fn format_rate_plain_zero_rate() {
assert_eq!(format_rate_plain(0.0, 100.0), "0.0/s");
}
#[test]
fn non_tty_line_contains_scenario_name() {
let mut stats = ScenarioStats::default();
stats.total_events = 42;
stats.bytes_emitted = 1024;
stats.current_rate = 10.0;
let line = format_non_tty_line("cpu_usage", &stats, 10.0, Duration::from_secs(5));
assert!(
line.contains("cpu_usage"),
"line must contain scenario name"
);
assert!(
line.contains("[progress]"),
"line must contain [progress] prefix"
);
assert!(line.contains("42"), "line must contain event count");
}
#[test]
fn non_tty_line_shows_window_state() {
let mut stats = ScenarioStats::default();
stats.in_burst = true;
let line = format_non_tty_line("test", &stats, 10.0, Duration::from_secs(1));
assert!(
line.contains("[burst]"),
"line must show burst window state"
);
}
fn alive_flag() -> Arc<AtomicBool> {
Arc::new(AtomicBool::new(true))
}
#[test]
fn progress_display_starts_and_stops_cleanly() {
let stats = Arc::new(RwLock::new(ScenarioStats::default()));
let display = ProgressDisplay::start(vec![("test".to_string(), stats, 10.0, alive_flag())]);
thread::sleep(Duration::from_millis(50));
display.stop();
}
#[test]
fn progress_display_handles_multiple_scenarios() {
let stats1 = Arc::new(RwLock::new(ScenarioStats::default()));
let stats2 = Arc::new(RwLock::new(ScenarioStats::default()));
let display = ProgressDisplay::start(vec![
("scenario-1".to_string(), stats1, 10.0, alive_flag()),
("scenario-2".to_string(), stats2, 20.0, alive_flag()),
]);
thread::sleep(Duration::from_millis(50));
display.stop();
}
#[test]
fn progress_display_drop_signals_stop_without_panic() {
let stats = Arc::new(RwLock::new(ScenarioStats::default()));
let display =
ProgressDisplay::start(vec![("drop-test".to_string(), stats, 10.0, alive_flag())]);
drop(display);
thread::sleep(Duration::from_millis(300));
}
#[test]
fn progress_display_reads_updated_stats() {
let stats = Arc::new(RwLock::new(ScenarioStats::default()));
let stats_writer = Arc::clone(&stats);
let display =
ProgressDisplay::start(vec![("live-test".to_string(), stats, 100.0, alive_flag())]);
{
let mut s = stats_writer.write().expect("lock must not be poisoned");
s.total_events = 500;
s.bytes_emitted = 4096;
s.current_rate = 95.0;
}
thread::sleep(Duration::from_millis(300));
display.stop();
}
#[test]
fn format_stopped_line_plain_includes_stopped_label() {
let mut stats = ScenarioStats::default();
stats.total_events = 100;
stats.bytes_emitted = 1024;
let line = format_stopped_line_plain("svc", &stats, Duration::from_secs(5));
assert!(line.contains("STOPPED"), "missing STOPPED in: {line}");
assert!(line.contains("svc"));
assert!(line.contains("events: 100"));
}
#[test]
fn format_stopped_line_plain_with_error_includes_sink_clause() {
let mut stats = ScenarioStats::default();
stats.last_sink_error = Some("connection refused".to_string());
let line = format_stopped_line_plain("svc", &stats, Duration::from_secs(1));
assert!(line.contains("(sink: connection refused)"), "got: {line}");
}
#[test]
fn format_stopped_line_plain_clean_shutdown_has_no_parenthetical() {
let stats = ScenarioStats::default();
let line = format_stopped_line_plain("svc", &stats, Duration::from_secs(1));
assert!(
!line.contains("(sink:"),
"must not include sink clause for clean shutdown: {line}"
);
}
#[test]
fn format_stopped_line_tty_includes_stopped_label() {
let stats = ScenarioStats::default();
let line = format_stopped_line_tty("svc", &stats, Duration::from_secs(1));
assert!(strip_ansi(&line).contains("STOPPED"));
assert!(line.contains("svc"));
}
#[test]
fn format_paused_line_plain_includes_paused_label_and_zero_rate() {
let mut stats = ScenarioStats::default();
stats.total_events = 42;
stats.state = ScenarioState::Paused;
let line = format_paused_line_plain("svc", &stats, Duration::from_secs(7));
assert!(line.contains("PAUSED"), "missing PAUSED in: {line}");
assert!(line.contains("svc"));
assert!(line.contains("events: 42"));
assert!(line.contains("rate: 0.0/s"));
assert!(line.contains("elapsed: 7.0s"));
}
#[test]
fn format_paused_line_plain_does_not_include_stopped_label() {
let stats = ScenarioStats::default();
let line = format_paused_line_plain("svc", &stats, Duration::from_secs(1));
assert!(!line.contains("STOPPED"));
}
#[test]
fn format_paused_line_tty_includes_paused_label_and_zero_rate() {
let mut stats = ScenarioStats::default();
stats.total_events = 100;
stats.state = ScenarioState::Paused;
let line = format_paused_line_tty("svc", &stats, Duration::from_secs(3));
let plain = strip_ansi(&line);
assert!(plain.contains("PAUSED"), "missing PAUSED in: {plain}");
assert!(plain.contains("svc"));
assert!(plain.contains("rate: 0.0/s"));
}
#[test]
fn format_paused_line_tty_distinct_from_stopped() {
let stats = ScenarioStats::default();
let paused = strip_ansi(&format_paused_line_tty(
"svc",
&stats,
Duration::from_secs(1),
));
let stopped = strip_ansi(&format_stopped_line_tty(
"svc",
&stats,
Duration::from_secs(1),
));
assert!(paused.contains("PAUSED") && !paused.contains("STOPPED"));
assert!(stopped.contains("STOPPED") && !stopped.contains("PAUSED"));
}
#[test]
fn progress_display_is_send() {
fn assert_send<T: Send>() {}
assert_send::<ProgressDisplay>();
}
fn dead_scenario(name: &str) -> MonitoredScenario {
MonitoredScenario {
name: name.to_string(),
stats: Arc::new(RwLock::new(ScenarioStats::default())),
target_rate: 10.0,
alive: Arc::new(AtomicBool::new(false)),
}
}
fn live_scenario(name: &str) -> MonitoredScenario {
MonitoredScenario {
name: name.to_string(),
stats: Arc::new(RwLock::new(ScenarioStats::default())),
target_rate: 10.0,
alive: Arc::new(AtomicBool::new(true)),
}
}
#[test]
fn drain_stopped_scenarios_emits_for_dead_unbannered() {
let scenarios = vec![dead_scenario("died_fast")];
let emitted = HashSet::new();
let mut names: Vec<String> = Vec::new();
drain_stopped_scenarios(&scenarios, &emitted, |name, _stats| {
names.push(name.to_string());
});
assert_eq!(names, vec!["died_fast".to_string()]);
}
#[test]
fn drain_stopped_scenarios_skips_already_bannered() {
let scenarios = vec![dead_scenario("died_first"), dead_scenario("died_second")];
let mut emitted = HashSet::new();
emitted.insert("died_first".to_string());
let mut names: Vec<String> = Vec::new();
drain_stopped_scenarios(&scenarios, &emitted, |name, _stats| {
names.push(name.to_string());
});
assert_eq!(names, vec!["died_second".to_string()]);
}
#[test]
fn drain_stopped_scenarios_skips_live_scenarios() {
let scenarios = vec![live_scenario("still_running")];
let emitted = HashSet::new();
let mut names: Vec<String> = Vec::new();
drain_stopped_scenarios(&scenarios, &emitted, |name, _stats| {
names.push(name.to_string());
});
assert!(
names.is_empty(),
"live scenarios must not receive a final banner"
);
}
#[test]
fn drain_stopped_scenarios_passes_stats_snapshot() {
let scenario = dead_scenario("with_error");
{
let mut s = scenario.stats.write().expect("lock");
s.last_sink_error = Some("boom".to_string());
s.total_events = 7;
}
let scenarios = vec![scenario];
let emitted = HashSet::new();
let mut captured: Option<ScenarioStats> = None;
drain_stopped_scenarios(&scenarios, &emitted, |_name, stats| {
captured = Some(stats.clone());
});
let snap = captured.expect("must emit once");
assert_eq!(snap.total_events, 7);
assert_eq!(snap.last_sink_error.as_deref(), Some("boom"));
}
}