use crate::config::DestinationConfig;
use crate::error::Result;
use crate::plan::ResolvedRunPlan;
use crate::state::StateStore;
use super::summary::RunSummary;
pub(super) fn finalize_run_report(config_path: &str, summary: &RunSummary, kind: &str) {
use std::io::Write;
let dir = crate::pipeline::report::report_dir(config_path, &summary.run_id);
let written = match crate::pipeline::report::write_run_report(config_path, summary) {
Ok(_) => true,
Err(e) => {
log::warn!(
"{} '{}': run report write failed (not fatal): {:#}",
kind,
summary.export_name,
e
);
false
}
};
if crate::pipeline::ipc::capturing_events() {
return;
}
let stderr = std::io::stderr();
let mut h = stderr.lock();
if written && !crate::pipeline::multi_export_mode() {
let _ = writeln!(h, "report: {}", dir.join("summary.md").display());
}
if summary.status == "failed" && summary.files_committed > 0 {
let _ = writeln!(
h,
"resume: rivet run --config {} --resume",
crate::pipeline::report::shell_quote(config_path)
);
}
let _ = h.flush();
}
pub(super) fn finalize_manifest(
plan: &ResolvedRunPlan,
state: &StateStore,
summary: &RunSummary,
kind: &str,
) {
use crate::manifest::ManifestStatus;
use crate::pipeline::manifest_writer::{ManifestBuilder, WriteOutcome, write_manifest};
if cfg!(debug_assertions)
&& let Err(e) = summary.check_post_run_invariants()
{
panic!(
"summary↔manifest coherence violated at finalize_manifest \
for {} '{}': {}",
kind, summary.export_name, e
);
}
let snapshot = match summary.journal.plan_snapshot() {
Some(s) => s,
None => {
log::debug!(
"{} '{}': no plan snapshot, manifest skipped",
kind,
summary.export_name
);
return;
}
};
let status = match summary.status.as_str() {
"success" => ManifestStatus::Success,
"failed" => ManifestStatus::Failed,
_ => ManifestStatus::Interrupted,
};
let schema_fingerprint = summary
.schema_fingerprint
.clone()
.or_else(|| {
state
.get_stored_schema(&summary.export_name)
.ok()
.flatten()
.map(|cols| crate::state::schema_fingerprint(&cols))
})
.unwrap_or_else(|| "xxh3:0000000000000000".to_string());
let source_engine = match plan.source.source_type {
crate::config::SourceType::Postgres => "postgres",
crate::config::SourceType::Mysql => "mysql",
crate::config::SourceType::Mssql => "mssql",
};
let (source_schema, source_table) = match summary.export_name.split_once('.') {
Some((s, t)) if !s.is_empty() && !t.is_empty() => {
(Some(s.to_string()), Some(t.to_string()))
}
_ => (None, None),
};
let started_at = summary
.journal
.entries
.first()
.map(|e| e.recorded_at)
.unwrap_or_else(chrono::Utc::now);
let mut builder = ManifestBuilder::new(
snapshot,
&summary.run_id,
started_at,
schema_fingerprint,
source_engine,
source_schema,
source_table,
destination_uri_for_manifest(&plan.destination),
);
for part in &summary.manifest_parts {
builder.record_part(
part.part_id,
part.path.clone(),
part.rows,
part.size_bytes,
part.content_fingerprint.clone(),
part.content_md5.clone(),
);
}
if !summary.column_checksums.is_empty() {
builder.set_column_checksums(
summary.column_checksums.clone(),
summary.checksum_key_column.clone(),
);
}
let manifest = builder.finalize(status);
let dest = match crate::destination::create_destination(&plan.destination) {
Ok(d) => d,
Err(e) => {
log::warn!(
"{} '{}': could not create destination for manifest write (not fatal): {:#}",
kind,
summary.export_name,
e
);
return;
}
};
match write_manifest(&*dest, &manifest) {
Ok(WriteOutcome::Written { success_marker }) => {
log::info!(
"{} '{}': manifest.json written ({} parts, {} rows){}",
kind,
summary.export_name,
manifest.part_count,
manifest.row_count,
if success_marker { " + _SUCCESS" } else { "" },
);
}
Ok(WriteOutcome::SkippedStreaming) => {
log::info!(
"{} '{}': manifest skipped (streaming destination)",
kind,
summary.export_name,
);
}
Err(e) => {
log::warn!(
"{} '{}': manifest write failed (not fatal): {:#}",
kind,
summary.export_name,
e
);
}
}
}
pub(super) fn finalize_validate_manifest(
plan: &ResolvedRunPlan,
summary: &mut RunSummary,
kind: &str,
) {
use crate::destination::WriteCommitProtocol;
use crate::pipeline::validate_manifest::{ValidateDepth, verify_at_destination};
let dest = match crate::destination::create_destination(&plan.destination) {
Ok(d) => d,
Err(e) => {
log::warn!(
"{} '{}': could not create destination for --validate manifest pass (not fatal): {:#}",
kind,
summary.export_name,
e
);
return;
}
};
if dest.capabilities().commit_protocol == WriteCommitProtocol::Streaming {
log::debug!(
"{} '{}': streaming destination — skipping manifest-aware --validate",
kind,
summary.export_name
);
return;
}
match verify_at_destination(&*dest, "", ValidateDepth::Full) {
Ok(mut v) => {
v.enforce_content_policy(plan.verify.requires_content());
if !v.passed && v.manifest_found && summary.validated == Some(true) {
summary.validated = Some(false);
}
log::info!(
"{} '{}': --validate manifest pass: {} parts verified, {} failed{}{}",
kind,
summary.export_name,
v.parts_verified,
v.parts_failed,
if v.success_marker_consistent {
" (_SUCCESS consistent)"
} else if v.manifest_found {
""
} else {
" (legacy_run: no manifest)"
},
if v.has_failures() {
format!(" — {} issue(s)", v.failures.len())
} else {
String::new()
},
);
summary.manifest_verification = Some(v);
}
Err(e) => {
log::warn!(
"{} '{}': --validate manifest pass failed (not fatal): {:#}",
kind,
summary.export_name,
e
);
}
}
}
pub(super) fn check_success_gate_for_resume(plan: &ResolvedRunPlan) -> Result<()> {
use crate::destination::WriteCommitProtocol;
use crate::manifest::SUCCESS_FILENAME;
let dest = crate::destination::create_destination(&plan.destination)?;
if dest.capabilities().commit_protocol == WriteCommitProtocol::Streaming {
log::debug!(
"resume: streaming destination for export '{}' has no prefix; gate skipped",
plan.export_name
);
return Ok(());
}
match dest.head(SUCCESS_FILENAME)? {
Some(_) => anyhow::bail!(
"export '{}': --resume refused — destination prefix already has _SUCCESS \
from a prior completed run. Re-running would overwrite a verified dataset. \
Pass --force to override, or use a different destination prefix.",
plan.export_name
),
None => Ok(()),
}
}
pub(super) fn warn_if_prefix_has_completed_run(plan: &ResolvedRunPlan) {
use crate::destination::WriteCommitProtocol;
use crate::manifest::{MANIFEST_FILENAME, SUCCESS_FILENAME};
let dest = match crate::destination::create_destination(&plan.destination) {
Ok(d) => d,
Err(e) => {
log::debug!(
"rerun-guard: could not create destination for export '{}' (skipping pre-run check): {:#}",
plan.export_name,
e
);
return;
}
};
if dest.capabilities().commit_protocol == WriteCommitProtocol::Streaming {
return;
}
let marker = match dest.head(SUCCESS_FILENAME) {
Ok(Some(_)) => Some(SUCCESS_FILENAME),
Ok(None) => match dest.head(MANIFEST_FILENAME) {
Ok(Some(_)) => Some(MANIFEST_FILENAME),
Ok(None) => None,
Err(e) => {
log::debug!(
"rerun-guard: stat {} failed for export '{}' (skipping pre-run check): {:#}",
MANIFEST_FILENAME,
plan.export_name,
e
);
return;
}
},
Err(e) => {
log::debug!(
"rerun-guard: stat {} failed for export '{}' (skipping pre-run check): {:#}",
SUCCESS_FILENAME,
plan.export_name,
e
);
return;
}
};
if let Some(marker) = marker {
log::warn!(
"export '{}': {}",
plan.export_name,
rerun_warning_message(&destination_uri_for_manifest(&plan.destination), marker),
);
}
}
pub(crate) fn destination_has_success(dest: &crate::config::DestinationConfig) -> bool {
use crate::destination::WriteCommitProtocol;
use crate::manifest::SUCCESS_FILENAME;
let Ok(d) = crate::destination::create_destination(dest) else {
return false;
};
if d.capabilities().commit_protocol == WriteCommitProtocol::Streaming {
return false;
}
matches!(d.head(SUCCESS_FILENAME), Ok(Some(_)))
}
fn rerun_warning_message(uri: &str, marker: &str) -> String {
format!(
"destination prefix '{uri}' already has a prior completed run ({marker} present) — \
re-running WITHOUT --resume appends fresh timestamp-named parts alongside the old ones \
(nothing is overwritten) and rewrites manifest.json to describe only this run, so a glob \
reader over the prefix will double-count / orphan the old parts. \
Use --resume to continue the prior run, or clear the prefix first."
)
}
pub(crate) fn destination_uri_for_manifest(cfg: &DestinationConfig) -> String {
use crate::config::DestinationType;
match cfg.destination_type {
DestinationType::Local => cfg
.path
.clone()
.or_else(|| cfg.prefix.clone())
.map(|p| format!("file://{p}"))
.unwrap_or_else(|| "file://.".to_string()),
DestinationType::S3 => {
let bucket = cfg.bucket.as_deref().unwrap_or("");
let prefix = cfg.prefix.as_deref().unwrap_or("");
if prefix.is_empty() {
format!("s3://{bucket}/")
} else {
format!("s3://{bucket}/{prefix}")
}
}
DestinationType::Gcs => {
let bucket = cfg.bucket.as_deref().unwrap_or("");
let prefix = cfg.prefix.as_deref().unwrap_or("");
if prefix.is_empty() {
format!("gs://{bucket}/")
} else {
format!("gs://{bucket}/{prefix}")
}
}
DestinationType::Azure => {
let container = cfg.bucket.as_deref().unwrap_or("");
let prefix = cfg.prefix.as_deref().unwrap_or("");
if prefix.is_empty() {
format!("az://{container}/")
} else {
format!("az://{container}/{prefix}")
}
}
DestinationType::Stdout => "stdout".to_string(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::DestinationType;
fn cfg_local(path: Option<&str>, prefix: Option<&str>) -> DestinationConfig {
DestinationConfig {
destination_type: DestinationType::Local,
prefix: prefix.map(str::to_string),
path: path.map(str::to_string),
..Default::default()
}
}
fn cfg_s3(bucket: &str, prefix: Option<&str>) -> DestinationConfig {
DestinationConfig {
destination_type: DestinationType::S3,
bucket: Some(bucket.into()),
prefix: prefix.map(str::to_string),
..Default::default()
}
}
fn cfg_gcs(bucket: &str, prefix: Option<&str>) -> DestinationConfig {
let mut c = cfg_s3(bucket, prefix);
c.destination_type = DestinationType::Gcs;
c
}
fn cfg_azure(container: &str, prefix: Option<&str>) -> DestinationConfig {
let mut c = cfg_s3(container, prefix);
c.destination_type = DestinationType::Azure;
c
}
#[test]
fn destination_uri_local_uses_path() {
assert_eq!(
destination_uri_for_manifest(&cfg_local(Some("/tmp/out"), None)),
"file:///tmp/out"
);
}
#[test]
fn destination_uri_local_falls_back_to_prefix_then_dot() {
assert_eq!(
destination_uri_for_manifest(&cfg_local(None, Some("/var/data"))),
"file:///var/data"
);
assert_eq!(
destination_uri_for_manifest(&cfg_local(None, None)),
"file://."
);
}
#[test]
fn destination_uri_s3_with_and_without_prefix() {
assert_eq!(destination_uri_for_manifest(&cfg_s3("b", None)), "s3://b/");
assert_eq!(
destination_uri_for_manifest(&cfg_s3("b", Some("k/"))),
"s3://b/k/"
);
}
#[test]
fn destination_uri_gcs_with_and_without_prefix() {
assert_eq!(destination_uri_for_manifest(&cfg_gcs("b", None)), "gs://b/");
assert_eq!(
destination_uri_for_manifest(&cfg_gcs("b", Some("k/"))),
"gs://b/k/"
);
}
#[test]
fn destination_uri_azure_with_and_without_prefix() {
assert_eq!(
destination_uri_for_manifest(&cfg_azure("c", None)),
"az://c/"
);
assert_eq!(
destination_uri_for_manifest(&cfg_azure("c", Some("runs/2026/"))),
"az://c/runs/2026/"
);
}
#[test]
fn destination_uri_stdout_is_stable() {
let mut c = cfg_local(None, None);
c.destination_type = DestinationType::Stdout;
assert_eq!(destination_uri_for_manifest(&c), "stdout");
}
fn audit_matcher_accepts(s: &str) -> bool {
let s = s.to_lowercase();
s.contains("_success")
|| s.contains("already has")
|| s.contains("prior completed run")
|| s.contains("would overwrite")
|| s.contains("orphan")
}
#[test]
fn rerun_warning_message_matches_live_audit_matcher_for_success_marker() {
let msg = rerun_warning_message("file:///tmp/out", "_SUCCESS");
assert!(
audit_matcher_accepts(&msg),
"rerun warning must trip the live audit matcher; message was: {msg}"
);
assert!(
msg.contains("file:///tmp/out"),
"must name the prefix: {msg}"
);
assert!(
msg.contains("--resume"),
"must point at the safe recovery: {msg}"
);
}
#[test]
fn rerun_warning_message_matches_live_audit_matcher_for_manifest_marker() {
let msg = rerun_warning_message("file:///tmp/out", "manifest.json");
assert!(
audit_matcher_accepts(&msg),
"manifest-only rerun warning must still trip the live audit matcher; message was: {msg}"
);
}
}