use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::time::{Duration, Instant};
use super::ipc::ChildEvent;
use super::{clamp_line, format_bytes, strip_chunked_recovery_hint};
pub(crate) fn sanitize_terminal(text: &str) -> String {
let needs_escape = |c: char| {
let cp = c as u32;
(cp <= 0x1f && c != '\t' && c != '\n') || cp == 0x7f || (0x80..=0x9f).contains(&cp)
};
if !text.chars().any(needs_escape) {
return text.to_string();
}
let mut out = String::with_capacity(text.len());
for c in text.chars() {
if needs_escape(c) {
out.push_str(&format!("\\u{{{:02x}}}", c as u32));
} else {
out.push(c);
}
}
out
}
#[derive(Debug)]
pub(crate) enum UiMessage {
Event(ChildEvent),
ChildClosed {
export_name: String,
wait_status: ChildWaitStatus,
},
}
#[derive(Debug, Clone)]
pub(crate) enum ChildWaitStatus {
#[allow(dead_code)]
Pending,
Success,
Failed(String),
}
const IDLE_REDRAW_INTERVAL: Duration = Duration::from_millis(200);
const MIN_NAME_COL: usize = 12;
const MIN_MODE_COL: usize = 7;
pub(crate) fn run_ui(rx: Receiver<UiMessage>) {
let width = pick_width();
if console::Term::stderr().features().is_attended() {
run_ui_interactive(rx, width);
} else {
run_ui_linear(rx, width);
}
}
fn run_ui_interactive(rx: Receiver<UiMessage>, width: usize) {
let mut renderer = Renderer::new(width);
loop {
match rx.recv_timeout(IDLE_REDRAW_INTERVAL) {
Ok(msg) => renderer.process_message(msg),
Err(RecvTimeoutError::Timeout) => {
}
Err(RecvTimeoutError::Disconnected) => break,
}
renderer.redraw();
}
for card in renderer.cards.values_mut() {
if !card.finished {
card.finalize_synthetic("failed", Some("child exited without a final summary"));
}
}
renderer.redraw();
renderer.commit();
}
fn run_ui_linear(rx: Receiver<UiMessage>, width: usize) {
let mut renderer = Renderer::new(width);
let mut printed: HashSet<String> = HashSet::new();
while let Ok(msg) = rx.recv() {
renderer.process_message(msg);
flush_finished_cards(&renderer, &mut printed, width);
}
let pending: Vec<String> = renderer
.order
.iter()
.filter(|n| !printed.contains(n.as_str()))
.cloned()
.collect();
for name in &pending {
if let Some(card) = renderer.cards.get_mut(name)
&& !card.finished
{
card.finalize_synthetic("failed", Some("child exited without a final summary"));
}
}
flush_finished_cards(&renderer, &mut printed, width);
}
fn flush_finished_cards(renderer: &Renderer, printed: &mut HashSet<String>, width: usize) {
let (name_col, mode_col) = column_widths(&renderer.cards);
let mut out = String::new();
for name in &renderer.order {
if printed.contains(name) {
continue;
}
let Some(card) = renderer.cards.get(name) else {
continue;
};
if !card.finished {
continue;
}
for line in card.live_lines(width, name_col, mode_col) {
out.push_str(&line);
out.push('\n');
}
printed.insert(name.clone());
}
if out.is_empty() {
return;
}
let mut handle = std::io::stderr().lock();
let _ = handle.write_all(out.as_bytes());
let _ = handle.flush();
}
fn column_widths(cards: &HashMap<String, CardState>) -> (usize, usize) {
let mut name = 0usize;
let mut mode = 0usize;
for c in cards.values() {
name = name.max(c.export_name.chars().count());
mode = mode.max(c.mode.chars().count());
}
(name.max(MIN_NAME_COL), mode.max(MIN_MODE_COL))
}
struct Renderer {
cards: HashMap<String, CardState>,
order: Vec<String>,
last_drawn_lines: usize,
width: usize,
}
impl Renderer {
fn new(width: usize) -> Self {
Self {
cards: HashMap::new(),
order: Vec::new(),
last_drawn_lines: 0,
width,
}
}
fn process_message(&mut self, msg: UiMessage) {
match msg {
UiMessage::Event(ev) => self.handle_event(ev),
UiMessage::ChildClosed {
export_name,
wait_status,
} => {
if let Some(card) = self.cards.get_mut(&export_name)
&& !card.finished
{
let (status, err) = match wait_status {
ChildWaitStatus::Success => ("success".to_string(), None),
ChildWaitStatus::Failed(msg) => ("failed".to_string(), Some(msg)),
ChildWaitStatus::Pending => (
"failed".to_string(),
Some("child exited without a final summary".into()),
),
};
card.finalize_synthetic(&status, err.as_deref());
}
}
}
}
fn handle_event(&mut self, ev: ChildEvent) {
match ev {
ChildEvent::Started {
export_name,
run_id: _,
mode,
tuning_profile: _,
batch_size: _,
} => {
if !self.cards.contains_key(&export_name) {
self.order.push(export_name.clone());
}
let card = CardState {
export_name: export_name.clone(),
mode,
status: "running".into(),
chunks_done: 0,
total_chunks: 0,
rows: 0,
started_at: Instant::now(),
finished: false,
final_line: String::new(),
};
self.cards.insert(export_name, card);
}
ChildEvent::ProgressInit {
export_name,
total_chunks,
} => {
if let Some(card) = self.cards.get_mut(&export_name) {
card.total_chunks = total_chunks;
}
}
ChildEvent::Progress {
export_name,
chunks_done,
rows,
} => {
if let Some(card) = self.cards.get_mut(&export_name) {
card.chunks_done = chunks_done;
card.rows = rows;
}
}
ChildEvent::Finished {
export_name,
run_id: _,
status,
total_rows,
files_produced,
bytes_written,
duration_ms,
peak_rss_mb,
error_message,
} => {
if let Some(card) = self.cards.get_mut(&export_name) {
card.finalize(
&status,
total_rows,
files_produced,
bytes_written,
duration_ms,
peak_rss_mb,
error_message.as_deref(),
);
}
}
}
}
fn redraw(&mut self) {
let mut out = String::new();
if self.last_drawn_lines > 0 {
out.push_str(&format!("\x1b[{}A", self.last_drawn_lines));
out.push('\r');
}
let (name_col, mode_col) = column_widths(&self.cards);
let mut new_lines = 0usize;
for name in &self.order {
if let Some(card) = self.cards.get(name) {
for line in card.live_lines(self.width, name_col, mode_col) {
out.push_str("\x1b[2K");
out.push_str(&line);
out.push('\n');
new_lines += 1;
}
}
}
if self.last_drawn_lines > new_lines {
let extra = self.last_drawn_lines - new_lines;
for _ in 0..extra {
out.push_str("\x1b[2K\n");
}
out.push_str(&format!("\x1b[{extra}A"));
}
let mut handle = std::io::stderr().lock();
let _ = handle.write_all(out.as_bytes());
let _ = handle.flush();
self.last_drawn_lines = new_lines;
}
fn commit(&mut self) {
self.last_drawn_lines = 0;
}
}
struct CardState {
export_name: String,
mode: String,
status: String,
chunks_done: u64,
total_chunks: u64,
rows: i64,
started_at: Instant,
finished: bool,
final_line: String,
}
impl CardState {
#[allow(clippy::too_many_arguments)]
fn finalize(
&mut self,
status: &str,
total_rows: i64,
files_produced: u64,
bytes_written: u64,
duration_ms: i64,
peak_rss_mb: i64,
error_message: Option<&str>,
) {
self.finished = true;
self.status = status.to_string();
self.rows = total_rows;
self.final_line = render_final_line(
total_rows,
files_produced,
bytes_written,
duration_ms,
peak_rss_mb,
error_message,
);
}
fn finalize_synthetic(&mut self, status: &str, error_message: Option<&str>) {
self.finished = true;
self.status = format!("synthetic-{}", status);
self.final_line = match error_message {
Some(msg) => sanitize_terminal(msg),
None => "child exited without a final summary".to_string(),
};
}
fn live_lines(&self, width: usize, name_col: usize, mode_col: usize) -> Vec<String> {
vec![clamp_line(&self.compact_line(name_col, mode_col), width)]
}
fn compact_line(&self, name_col: usize, mode_col: usize) -> String {
let icon = self.status_icon();
let body = if self.finished {
self.final_line.clone()
} else {
self.running_body()
};
format!(
"{} {:<name$} {:<mode$} {}",
icon,
self.export_name,
self.mode,
body,
name = name_col,
mode = mode_col,
)
}
fn status_icon(&self) -> &'static str {
if !self.finished {
return "▸";
}
match self.status.as_str() {
"success" => "✓",
"failed" => "✗",
s if s.starts_with("synthetic-") => "⚠",
_ => "•",
}
}
fn running_body(&self) -> String {
let elapsed_ms = self.started_at.elapsed().as_millis() as i64;
let chunks_label = if self.total_chunks > 0 {
format!("{}/{} chunks", self.chunks_done, self.total_chunks)
} else {
"preparing…".to_string()
};
let eta_label = if self.chunks_done > 0 && self.total_chunks > self.chunks_done {
let total_ms_est =
elapsed_ms as f64 * (self.total_chunks as f64 / self.chunks_done as f64);
let remaining = (total_ms_est - elapsed_ms as f64).max(0.0) as i64;
fmt_duration_ms(remaining)
} else if self.total_chunks > 0 && self.chunks_done >= self.total_chunks {
"0s".to_string()
} else {
"—".to_string()
};
let rate_label = if elapsed_ms >= 500 && self.rows > 0 {
let rps = self.rows as f64 * 1000.0 / elapsed_ms as f64;
format!(" {}", fmt_rate(rps))
} else {
String::new()
};
format!(
"{chunks} {rows}{rate} {elapsed} ETA {eta}",
chunks = chunks_label,
rows = fmt_rows(self.rows),
rate = rate_label,
elapsed = fmt_duration_ms(elapsed_ms),
eta = eta_label,
)
}
}
fn render_final_line(
total_rows: i64,
files_produced: u64,
bytes_written: u64,
duration_ms: i64,
peak_rss_mb: i64,
error_message: Option<&str>,
) -> String {
if let Some(err) = error_message {
let (cause, _) = strip_chunked_recovery_hint(err);
return sanitize_terminal(cause);
}
let rss = if peak_rss_mb > 0 {
format!(" RSS {} MB", fmt_thousands(peak_rss_mb))
} else {
String::new()
};
format!(
"{} rows {} files {} {}{}",
fmt_thousands(total_rows),
fmt_thousands(files_produced as i64),
format_bytes(bytes_written),
fmt_duration_ms(duration_ms),
rss
)
}
fn pick_width() -> usize {
let (_, cols) = console::Term::stderr().size();
(cols as usize).clamp(60, 100)
}
fn fmt_rows(rows: i64) -> String {
if rows >= 1_000_000 {
format!("{:.1}M rows", rows as f64 / 1_000_000.0)
} else if rows >= 1_000 {
format!("{:.0}K rows", rows as f64 / 1_000.0)
} else {
format!("{rows} rows")
}
}
fn fmt_rate(rps: f64) -> String {
if rps >= 1_000_000.0 {
format!("{:.1}M r/s", rps / 1_000_000.0)
} else if rps >= 1_000.0 {
format!("{:.1}K r/s", rps / 1_000.0)
} else {
format!("{:.0} r/s", rps)
}
}
fn fmt_thousands(n: i64) -> String {
let abs = n.unsigned_abs();
let raw = abs.to_string();
let mut buf = String::with_capacity(raw.len() + raw.len() / 3);
for (from_end, ch) in raw.chars().rev().enumerate() {
if from_end > 0 && from_end.is_multiple_of(3) {
buf.push(',');
}
buf.push(ch);
}
let s: String = buf.chars().rev().collect();
if n < 0 { format!("-{s}") } else { s }
}
fn fmt_duration_ms(ms: i64) -> String {
if ms < 0 {
return "0s".into();
}
let total_secs_f = ms as f64 / 1000.0;
if ms < 60_000 {
return format!("{:.1}s", total_secs_f);
}
let mut secs = total_secs_f as i64;
let hours = secs / 3600;
secs %= 3600;
let mins = secs / 60;
let rem_secs = total_secs_f - (hours as f64 * 3600.0) - (mins as f64 * 60.0);
if hours > 0 {
format!("{}h {:02}m {:04.1}s", hours, mins, rem_secs)
} else {
format!("{}m {:04.1}s", mins, rem_secs)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn fresh_card(name: &str, mode: &str) -> CardState {
CardState {
export_name: name.into(),
mode: mode.into(),
status: "running".into(),
chunks_done: 0,
total_chunks: 0,
rows: 0,
started_at: Instant::now(),
finished: false,
final_line: String::new(),
}
}
#[test]
fn final_line_includes_metrics() {
let s = render_final_line(123_456, 5, 1024 * 1024, 9_400, 30, None);
assert!(s.contains("123,456 rows"));
assert!(s.contains("5 files"));
assert!(s.contains("9.4s"));
assert!(s.contains("RSS 30 MB"));
}
#[test]
fn final_line_uses_error_message() {
let s = render_final_line(0, 0, 0, 0, 0, Some("connection reset"));
assert_eq!(s, "connection reset");
}
#[test]
fn final_line_strips_chunked_recovery_hint() {
let s = render_final_line(
0,
0,
0,
0,
0,
Some(
"export 'x': chunk checkpoint run 'rid' still in progress; \
use `rivet run --config foo.yaml --export x --resume` or \
`rivet state reset-chunks --config foo.yaml --export x`",
),
);
assert_eq!(
s,
"export 'x': chunk checkpoint run 'rid' still in progress"
);
}
#[test]
fn pick_width_in_bounds() {
let w = pick_width();
assert!((60..=100).contains(&w));
}
#[test]
fn fmt_rows_buckets() {
assert_eq!(fmt_rows(0), "0 rows");
assert_eq!(fmt_rows(999), "999 rows");
assert_eq!(fmt_rows(1_500), "2K rows");
assert_eq!(fmt_rows(1_500_000), "1.5M rows");
}
#[test]
fn fmt_thousands_basic() {
assert_eq!(fmt_thousands(0), "0");
assert_eq!(fmt_thousands(1234), "1,234");
assert_eq!(fmt_thousands(-1234), "-1,234");
assert_eq!(fmt_thousands(1_000_000), "1,000,000");
}
#[test]
fn fmt_duration_buckets() {
assert_eq!(fmt_duration_ms(0), "0.0s");
assert_eq!(fmt_duration_ms(9_400), "9.4s");
assert!(fmt_duration_ms(60_000).starts_with("1m"));
assert!(fmt_duration_ms(3_600_000).starts_with("1h"));
}
#[test]
fn fmt_rate_picks_unit() {
assert_eq!(fmt_rate(42.0), "42 r/s");
assert_eq!(fmt_rate(1_500.0), "1.5K r/s");
assert_eq!(fmt_rate(2_500_000.0), "2.5M r/s");
}
#[test]
fn running_body_shows_rate_after_warmup() {
let mut card = fresh_card("orders", "chunked");
card.total_chunks = 10;
card.chunks_done = 5;
card.rows = 50_000;
let line = card.running_body();
assert!(line.contains("50K rows"));
assert!(!line.contains("r/s"), "rate shown too early: {line}");
}
#[test]
fn compact_running_card_shows_progress_or_preparing() {
let mut card = fresh_card("orders", "chunked");
let line = card.compact_line(MIN_NAME_COL, MIN_MODE_COL);
assert!(line.starts_with("▸ "), "running uses ▸: {}", line);
assert!(line.contains("orders"));
assert!(line.contains("chunked"));
assert!(line.contains("preparing"));
card.total_chunks = 10;
card.chunks_done = 4;
card.rows = 40_000;
let line = card.compact_line(MIN_NAME_COL, MIN_MODE_COL);
assert!(line.contains("4/10 chunks"));
assert!(line.contains("40K rows"));
assert!(line.contains("ETA "));
}
#[test]
fn compact_finished_card_shows_metrics_with_check() {
let mut card = fresh_card("orders", "chunked");
card.finalize("success", 100, 1, 1024, 1234, 30, None);
let line = card.compact_line(MIN_NAME_COL, MIN_MODE_COL);
assert!(line.starts_with("✓ "), "success uses ✓: {}", line);
assert!(line.contains("100 rows"));
assert!(line.contains("1 files"));
assert!(line.contains("1.2s"));
assert!(line.contains("RSS 30 MB"));
}
#[test]
fn compact_failed_card_shows_cause_only() {
let mut card = fresh_card("metric_samples", "chunked");
card.finalize(
"failed",
0,
0,
0,
0,
0,
Some(
"export 'metric_samples': chunk checkpoint run 'rid' still in progress; \
use `rivet run --config foo.yaml --export metric_samples --resume` or \
`rivet state reset-chunks --config foo.yaml --export metric_samples`",
),
);
let line = card.compact_line(MIN_NAME_COL, MIN_MODE_COL);
assert!(line.starts_with("✗ "), "failed uses ✗: {}", line);
assert!(line.contains("still in progress"));
assert!(!line.contains("`rivet "), "hint must be stripped: {}", line);
}
#[test]
fn compact_synthetic_failure_uses_warning_glyph() {
let mut card = fresh_card("orders", "chunked");
card.finalize_synthetic("failed", Some("child crashed"));
let line = card.compact_line(MIN_NAME_COL, MIN_MODE_COL);
assert!(card.finished);
assert!(line.starts_with("⚠ "), "synthetic uses ⚠: {}", line);
assert!(line.contains("child crashed"));
}
#[test]
fn compact_line_pads_columns_for_alignment() {
let mut a = fresh_card("a", "full");
a.finalize("success", 1, 1, 0, 1000, 0, None);
let mut b = fresh_card("longer_name", "incremental");
b.finalize("success", 1, 1, 0, 1000, 0, None);
let name_col = "longer_name".chars().count();
let mode_col = "incremental".chars().count();
let la = a.compact_line(name_col, mode_col);
let lb = b.compact_line(name_col, mode_col);
let pa = la.find(" rows").expect("body present in a");
let pb = lb.find(" rows").expect("body present in b");
assert_eq!(pa, pb, "alignment broken: a={la:?} b={lb:?}");
}
fn dangerous_control_bytes(s: &str) -> Vec<char> {
s.chars()
.filter(|&c| {
let cp = c as u32;
(cp <= 0x1f && c != '\t' && c != '\n') || cp == 0x7f || (0x80..=0x9f).contains(&cp)
})
.collect()
}
#[test]
fn sanitize_terminal_escapes_ansi_and_osc_payload() {
let payload = "\u{1b}]0;pwned\u{07}\u{1b}[2Jboom";
let clean = sanitize_terminal(payload);
assert!(
dangerous_control_bytes(&clean).is_empty(),
"escape bytes survived: {clean:?}"
);
assert!(clean.contains("pwned"));
assert!(clean.contains("boom"));
}
#[test]
fn sanitize_terminal_preserves_printable_and_layout_whitespace() {
let s = "ok\tline1\nline2 ✓ … ──";
assert_eq!(sanitize_terminal(s), s);
}
#[test]
fn sanitize_terminal_strips_del_c1_and_cr() {
let s = "a\u{7f}b\u{85}c\rd";
let clean = sanitize_terminal(s);
assert!(dangerous_control_bytes(&clean).is_empty());
assert!(clean.contains('a') && clean.contains('d'));
}
#[test]
fn render_final_line_sanitises_db_error_escapes() {
let err = "db error: invalid input: \u{1b}]0;pwned\u{07}\u{1b}[2Jboom";
let line = render_final_line(0, 0, 0, 0, 0, Some(err));
assert!(
dangerous_control_bytes(&line).is_empty(),
"render_final_line leaked control bytes: {line:?}"
);
}
#[test]
fn finalize_synthetic_sanitises_escapes() {
let mut card = fresh_card("orders", "chunked");
card.finalize_synthetic("failed", Some("crash \u{1b}[2J\u{07}"));
let line = card.compact_line(MIN_NAME_COL, MIN_MODE_COL);
assert!(
dangerous_control_bytes(&line).is_empty(),
"synthetic final line leaked control bytes: {line:?}"
);
}
}