use std::collections::HashMap;
use std::path::Path;
use chrono::{DateTime, Utc};
use serde::Serialize;
use crate::error::Result;
use crate::state::{ExportMetric, RunAggregate, RunAggregateEntry, StateStore};
use super::summary::RunSummary;
use super::{format_bytes, strip_chunked_recovery_hint};
#[allow(dead_code)]
#[derive(Debug, Clone, Serialize)]
pub(super) struct MetricRowJson {
pub export_name: String,
pub run_id: Option<String>,
pub run_at: String,
pub duration_ms: i64,
pub total_rows: i64,
pub peak_rss_mb: Option<i64>,
pub status: String,
pub error_message: Option<String>,
pub tuning_profile: Option<String>,
pub format: Option<String>,
pub mode: Option<String>,
pub files_produced: i64,
pub bytes_written: i64,
pub retries: i64,
pub validated: Option<bool>,
pub schema_changed: Option<bool>,
}
impl From<&ExportMetric> for MetricRowJson {
fn from(m: &ExportMetric) -> Self {
Self {
export_name: m.export_name.clone(),
run_id: m.run_id.clone(),
run_at: m.run_at.clone(),
duration_ms: m.duration_ms,
total_rows: m.total_rows,
peak_rss_mb: m.peak_rss_mb,
status: m.status.clone(),
error_message: m.error_message.clone(),
tuning_profile: m.tuning_profile.clone(),
format: m.format.clone(),
mode: m.mode.clone(),
files_produced: m.files_produced,
bytes_written: m.bytes_written,
retries: m.retries,
validated: m.validated,
schema_changed: m.schema_changed,
}
}
}
#[allow(dead_code)]
pub(super) fn metrics_to_json(metrics: &[ExportMetric]) -> Result<String> {
let rows: Vec<MetricRowJson> = metrics.iter().map(MetricRowJson::from).collect();
serde_json::to_string_pretty(&rows).map_err(|e| anyhow::anyhow!("serde_json: {:#}", e))
}
pub(super) fn entry_from_summary(s: &RunSummary) -> RunAggregateEntry {
RunAggregateEntry {
export_name: s.export_name.clone(),
status: s.status.clone(),
run_id: s.run_id.clone(),
rows: s.total_rows,
files: s.files_produced as i64,
bytes: s.bytes_written,
duration_ms: s.duration_ms,
mode: s.mode.clone(),
error_message: s.error_message.clone(),
}
}
pub(super) fn build(
entries: Vec<RunAggregateEntry>,
started_at: DateTime<Utc>,
finished_at: DateTime<Utc>,
config_path: Option<&str>,
parallel_mode: &str,
) -> RunAggregate {
let total_exports = entries.len();
let success_count = entries.iter().filter(|e| e.status == "success").count();
let failed_count = entries.iter().filter(|e| e.status == "failed").count();
let skipped_count = total_exports
.saturating_sub(success_count)
.saturating_sub(failed_count);
let total_rows = entries.iter().map(|e| e.rows).sum();
let total_files = entries.iter().map(|e| e.files).sum();
let total_bytes = entries.iter().map(|e| e.bytes).sum();
let id = format!("agg_{}", started_at.format("%Y%m%dT%H%M%S%3f"));
RunAggregate {
run_aggregate_id: id,
started_at: started_at.to_rfc3339(),
finished_at: finished_at.to_rfc3339(),
duration_ms: (finished_at - started_at).num_milliseconds(),
config_path: config_path.map(|s| s.to_string()),
parallel_mode: parallel_mode.to_string(),
total_exports,
success_count,
failed_count,
skipped_count,
total_rows,
total_files,
total_bytes,
per_export: entries,
}
}
pub(super) fn print(agg: &RunAggregate) {
eprintln!();
eprintln!("════════════════════════════════════════════════════════");
eprintln!(" Run summary ({} exports)", agg.total_exports);
eprintln!("════════════════════════════════════════════════════════");
eprintln!(" id: {}", agg.run_aggregate_id);
let mut status_line = format!(
"{} success · {} failed",
agg.success_count, agg.failed_count
);
if agg.skipped_count > 0 {
status_line.push_str(&format!(" · {} skipped", agg.skipped_count));
}
eprintln!(" status: {}", status_line);
eprintln!(" rows: {}", agg.total_rows);
eprintln!(" files: {}", agg.total_files);
if agg.total_bytes > 0 {
eprintln!(" bytes: {}", format_bytes(agg.total_bytes));
}
eprintln!(
" duration: {} (wall clock)",
format_duration(agg.duration_ms)
);
if agg.duration_ms > 0 && agg.total_rows > 0 {
let rps = agg.total_rows as f64 * 1000.0 / agg.duration_ms as f64;
eprintln!(" throughput: {} rows/s", format_rate(rps));
}
eprintln!(" mode: {}", agg.parallel_mode);
if let Some(cp) = &agg.config_path {
eprintln!(" config: {}", cp);
}
if agg.failed_count > 0 {
eprintln!();
eprintln!(" failed exports:");
let mut chunked_recovery: Vec<&str> = Vec::new();
for e in agg.per_export.iter().filter(|e| e.status == "failed") {
let msg = e
.error_message
.as_deref()
.unwrap_or("(no error message recorded)");
let (cause, has_chunked_hint) = strip_chunked_recovery_hint(msg);
if has_chunked_hint {
chunked_recovery.push(e.export_name.as_str());
}
eprintln!(" - {}: {}", e.export_name, truncate(cause, 200));
}
if !chunked_recovery.is_empty() {
print_chunked_recovery(&chunked_recovery, agg.config_path.as_deref());
}
}
}
fn print_chunked_recovery(exports: &[&str], config_path: Option<&str>) {
let cfg = match config_path {
Some(p) if !p.is_empty() => format!("--config {}", p),
_ => "--config <CONFIG>".to_string(),
};
let names_spaced = exports.join(" ");
eprintln!();
eprintln!(" recovery ({} chunked export(s)):", exports.len());
eprintln!(" resume in-progress checkpoint runs:");
eprintln!(" rivet run {} --resume", cfg);
eprintln!(
" or reset stuck checkpoints for every export in this config (chunk_run.status = in_progress), then resume:"
);
eprintln!(
" rivet state reset-chunks {} --stuck-checkpoints && rivet run {} --resume",
cfg, cfg
);
eprintln!(" or reset only the exports listed above, then resume:");
eprintln!(
" for e in {}; do rivet state reset-chunks {} --export \"$e\"; done && rivet run {} --resume",
names_spaced, cfg, cfg
);
}
fn format_duration(ms: i64) -> String {
if ms < 1000 {
return format!("{}ms", ms);
}
let total_secs = ms / 1000;
let h = total_secs / 3600;
let m = (total_secs % 3600) / 60;
let s = total_secs % 60;
if h > 0 {
format!("{}h {}m {}s", h, m, s)
} else if m > 0 {
format!("{}m {}s", m, s)
} else {
format!("{:.1}s", ms as f64 / 1000.0)
}
}
fn format_rate(r: f64) -> String {
if r >= 1_000_000.0 {
format!("{:.1}M", r / 1_000_000.0)
} else if r >= 1_000.0 {
format!("{:.1}K", r / 1_000.0)
} else {
format!("{:.0}", r)
}
}
fn truncate(s: &str, max_chars: usize) -> String {
match s.char_indices().nth(max_chars) {
None => s.to_owned(),
Some((byte_pos, _)) => {
let mut out = s[..byte_pos].to_owned();
out.push('…');
out
}
}
}
pub(super) fn persist(state: &StateStore, agg: &RunAggregate, summary_output: Option<&Path>) {
if let Err(e) = state.record_run_aggregate(agg) {
log::warn!(
"aggregate: failed to record run_aggregate (observational, ignored): {:#}",
e
);
} else {
log::info!(
"aggregate: recorded {} ({} exports, {} success, {} failed)",
agg.run_aggregate_id,
agg.total_exports,
agg.success_count,
agg.failed_count,
);
}
if let Some(path) = summary_output {
match write_json(path, agg) {
Ok(()) => eprintln!(" written: {}", path.display()),
Err(e) => log::warn!(
"aggregate: failed to write summary JSON to {}: {:#}",
path.display(),
e
),
}
}
}
fn write_json(path: &Path, agg: &RunAggregate) -> Result<()> {
if let Some(parent) = path.parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent)
.map_err(|e| anyhow::anyhow!("create_dir_all({}): {:#}", parent.display(), e))?;
}
let json =
serde_json::to_string_pretty(agg).map_err(|e| anyhow::anyhow!("serde_json: {:#}", e))?;
std::fs::write(path, json)
.map_err(|e| anyhow::anyhow!("write({}): {:#}", path.display(), e))?;
Ok(())
}
pub(super) fn collect_child_entries(
state: &StateStore,
exports: &[&crate::config::ExportConfig],
started_at: DateTime<Utc>,
child_failures: &HashMap<String, String>,
) -> Vec<RunAggregateEntry> {
let mut out = Vec::with_capacity(exports.len());
for export in exports {
let mut entry: Option<RunAggregateEntry> = None;
match state.get_metrics(Some(&export.name), 1) {
Ok(rows) => {
if let Some(m) = rows.into_iter().next()
&& let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(&m.run_at)
&& parsed.with_timezone(&Utc) >= started_at
{
entry = Some(RunAggregateEntry {
export_name: m.export_name,
status: m.status,
run_id: m.run_id.unwrap_or_default(),
rows: m.total_rows,
files: m.files_produced,
bytes: m.bytes_written.max(0) as u64,
duration_ms: m.duration_ms,
mode: m.mode.unwrap_or_default(),
error_message: m.error_message,
});
}
}
Err(e) => {
log::warn!(
"aggregate: metric query failed for '{}': {:#} (treating as failed)",
export.name,
e
);
}
}
out.push(entry.unwrap_or_else(|| {
RunAggregateEntry {
export_name: export.name.clone(),
status: "failed".into(),
run_id: String::new(),
rows: 0,
files: 0,
bytes: 0,
duration_ms: 0,
mode: String::new(),
error_message: Some(
child_failures
.get(export.name.as_str())
.cloned()
.unwrap_or_else(|| "no metric recorded for this run".into()),
),
}
}));
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration;
fn entry(name: &str, status: &str, rows: i64, files: i64, bytes: u64) -> RunAggregateEntry {
RunAggregateEntry {
export_name: name.into(),
status: status.into(),
run_id: format!("{name}_run"),
rows,
files,
bytes,
duration_ms: 1000,
mode: "full".into(),
error_message: if status == "failed" {
Some("boom".into())
} else {
None
},
}
}
#[test]
fn build_aggregates_counts_and_totals() {
let started = Utc::now();
let finished = started + Duration::seconds(120);
let agg = build(
vec![
entry("a", "success", 100, 1, 1024),
entry("b", "failed", 0, 0, 0),
entry("c", "success", 50, 2, 2048),
],
started,
finished,
Some("conf.yaml"),
"sequential",
);
assert_eq!(agg.total_exports, 3);
assert_eq!(agg.success_count, 2);
assert_eq!(agg.failed_count, 1);
assert_eq!(agg.skipped_count, 0);
assert_eq!(agg.total_rows, 150);
assert_eq!(agg.total_files, 3);
assert_eq!(agg.total_bytes, 3072);
assert_eq!(agg.duration_ms, 120_000);
assert_eq!(agg.parallel_mode, "sequential");
assert_eq!(agg.config_path.as_deref(), Some("conf.yaml"));
assert!(
agg.run_aggregate_id.starts_with("agg_"),
"id should start with `agg_`, got {}",
agg.run_aggregate_id
);
}
#[test]
fn build_handles_unknown_status_as_skipped() {
let started = Utc::now();
let finished = started + Duration::seconds(1);
let agg = build(
vec![
entry("a", "success", 1, 0, 0),
entry("b", "running", 0, 0, 0), ],
started,
finished,
None,
"sequential",
);
assert_eq!(agg.success_count, 1);
assert_eq!(agg.failed_count, 0);
assert_eq!(agg.skipped_count, 1);
}
#[test]
fn build_with_zero_exports_is_well_formed() {
let now = Utc::now();
let agg = build(vec![], now, now, None, "sequential");
assert_eq!(agg.total_exports, 0);
assert_eq!(agg.total_rows, 0);
assert_eq!(agg.success_count, 0);
assert_eq!(agg.failed_count, 0);
assert_eq!(agg.skipped_count, 0);
}
#[test]
fn format_duration_picks_unit() {
assert_eq!(format_duration(500), "500ms");
assert_eq!(format_duration(1500), "1.5s");
assert_eq!(format_duration(65_000), "1m 5s");
assert_eq!(format_duration(3_725_000), "1h 2m 5s");
}
#[test]
fn format_rate_scales() {
assert_eq!(format_rate(42.0), "42");
assert_eq!(format_rate(1500.0), "1.5K");
assert_eq!(format_rate(2_500_000.0), "2.5M");
}
#[test]
fn truncate_respects_char_boundary_with_unicode() {
let s = "αβγδ".repeat(100); let t = truncate(&s, 10);
assert_eq!(t.chars().count(), 11); }
fn metric(name: &str, status: &str) -> ExportMetric {
ExportMetric {
export_name: name.into(),
run_id: Some(format!("{name}_run")),
run_at: "2026-06-09T12:00:00+00:00".into(),
duration_ms: 1500,
total_rows: 42,
peak_rss_mb: Some(64),
status: status.into(),
error_message: if status == "failed" {
Some("boom".into())
} else {
None
},
tuning_profile: Some("balanced".into()),
format: Some("parquet".into()),
mode: Some("full".into()),
files_produced: 2,
bytes_written: 4096,
retries: 1,
validated: Some(true),
schema_changed: Some(false),
}
}
#[test]
fn metrics_to_json_empty_is_valid_array() {
let json = metrics_to_json(&[]).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert!(parsed.is_array(), "empty metrics must serialize as []");
assert_eq!(parsed.as_array().unwrap().len(), 0);
assert!(!json.contains("No metrics recorded yet"));
}
#[test]
fn metrics_to_json_carries_all_fields() {
let json =
metrics_to_json(&[metric("orders", "success"), metric("users", "failed")]).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
let rows = parsed.as_array().unwrap();
assert_eq!(rows.len(), 2);
let first = &rows[0];
for key in [
"export_name",
"run_id",
"run_at",
"duration_ms",
"total_rows",
"peak_rss_mb",
"status",
"error_message",
"tuning_profile",
"format",
"mode",
"files_produced",
"bytes_written",
"retries",
"validated",
"schema_changed",
] {
assert!(
first.get(key).is_some(),
"metrics JSON row must carry `{key}`; got {first}"
);
}
assert_eq!(first["export_name"], "orders");
assert_eq!(first["status"], "success");
assert_eq!(first["total_rows"], 42);
assert_eq!(first["files_produced"], 2);
assert_eq!(first["validated"], true);
assert!(first["error_message"].is_null());
assert_eq!(rows[1]["status"], "failed");
assert_eq!(rows[1]["error_message"], "boom");
}
#[test]
fn persist_records_to_state_and_writes_file() {
use crate::state::StateStore;
let s = StateStore::open_in_memory().unwrap();
let now = Utc::now();
let agg = build(
vec![entry("a", "success", 10, 1, 100)],
now - Duration::seconds(5),
now,
Some("test.yaml"),
"sequential",
);
let tmp = tempfile::tempdir().unwrap();
let out = tmp.path().join("nested").join("summary.json");
persist(&s, &agg, Some(&out));
let rows = s.get_recent_run_aggregates(1).unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].run_aggregate_id, agg.run_aggregate_id);
assert_eq!(rows[0].total_rows, 10);
let body = std::fs::read_to_string(&out).unwrap();
let round: RunAggregate = serde_json::from_str(&body).unwrap();
assert_eq!(round.run_aggregate_id, agg.run_aggregate_id);
assert_eq!(round.per_export.len(), 1);
assert_eq!(round.per_export[0].export_name, "a");
}
}