1use std::path::Path;
32
33use chrono::NaiveDate;
34
35use crate::config::Config;
36use crate::destination::placeholder::PlaceholderContext;
37use crate::error::Result;
38use crate::pipeline::ManifestVerification;
39use crate::pipeline::validate_manifest::verify_at_destination;
40
41pub enum ValidateOutputFormat {
43 Pretty,
45 Json(Option<String>),
47}
48
49#[derive(Debug, Default, Clone)]
55pub struct ValidateTarget {
56 pub date: Option<NaiveDate>,
58 pub run_id: Option<String>,
60 pub prefix_override: Option<String>,
63}
64
65impl ValidateTarget {
66 fn placeholder_context(&self, export_name: &str) -> PlaceholderContext {
67 let mut ctx = match self.date {
68 Some(d) => PlaceholderContext::for_date(d, export_name),
69 None => PlaceholderContext::for_today(export_name),
70 };
71 if let Some(rid) = &self.run_id {
72 ctx = ctx.with_run_id(rid.clone());
73 }
74 ctx
75 }
76}
77
78pub fn run_validate_command(
87 config_path: &str,
88 export_name: Option<&str>,
89 format: ValidateOutputFormat,
90 target: ValidateTarget,
91) -> Result<()> {
92 let config = Config::load_with_params(config_path, None)?;
93
94 let exports: Vec<&crate::config::ExportConfig> = match export_name {
95 Some(name) => match config.exports.iter().find(|e| e.name == name) {
96 Some(e) => vec![e],
97 None => anyhow::bail!("export '{}' not found in config", name),
98 },
99 None => config.exports.iter().collect(),
100 };
101
102 if exports.is_empty() {
103 anyhow::bail!("no exports defined in config — nothing to validate");
104 }
105
106 if target.prefix_override.is_some() && exports.len() > 1 {
111 anyhow::bail!(
112 "--prefix requires --export <name>: cannot apply one override to {} exports",
113 exports.len()
114 );
115 }
116
117 let mut all_results: Vec<ExportVerdict> = Vec::with_capacity(exports.len());
118 let mut hard_failures: Vec<String> = Vec::new();
119
120 for export in &exports {
121 let ctx = target.placeholder_context(&export.name);
125 let mut expanded_dest =
126 crate::destination::placeholder::expand_destination(export.destination.clone(), &ctx);
127 if let Some(p) = &target.prefix_override {
128 expanded_dest.path = Some(p.clone());
132 expanded_dest.prefix = Some(p.clone());
133 }
134 let resolved_prefix = resolved_prefix_for_display(&expanded_dest);
135 let dest = match crate::destination::create_destination(&expanded_dest) {
136 Ok(d) => d,
137 Err(e) => {
138 let msg = format!(
139 "export '{}' (prefix: {}): could not open destination: {:#}",
140 export.name, resolved_prefix, e
141 );
142 hard_failures.push(msg);
143 continue;
144 }
145 };
146 if dest.capabilities().commit_protocol == crate::destination::WriteCommitProtocol::Streaming
148 {
149 log::info!(
150 "export '{}': streaming destination, skipping (nothing to verify)",
151 export.name
152 );
153 continue;
154 }
155 match verify_at_destination(&*dest, "") {
156 Ok(mut v) => {
157 v.enforce_content_policy(export.verify.requires_content());
160 all_results.push(ExportVerdict {
161 name: export.name.clone(),
162 resolved_prefix,
163 verification: v,
164 });
165 }
166 Err(e) => {
167 hard_failures.push(format!(
168 "export '{}' (prefix: {}): verify_at_destination failed: {:#}",
169 export.name, resolved_prefix, e
170 ));
171 }
172 }
173 }
174
175 match format {
176 ValidateOutputFormat::Pretty => render_pretty(&all_results, &hard_failures),
177 ValidateOutputFormat::Json(out_path) => {
178 render_json(&all_results, &hard_failures, out_path)?
179 }
180 }
181
182 let any_failed = all_results
196 .iter()
197 .any(|r| r.verification.manifest_found && !r.verification.passed);
198 if !hard_failures.is_empty() || any_failed {
199 anyhow::bail!(
200 "rivet validate: {} export(s) failed verification",
201 hard_failures.len()
202 + all_results
203 .iter()
204 .filter(|r| r.verification.manifest_found && !r.verification.passed)
205 .count()
206 );
207 }
208 Ok(())
209}
210
211struct ExportVerdict {
215 name: String,
216 resolved_prefix: String,
217 verification: ManifestVerification,
218}
219
220fn resolved_prefix_for_display(dest: &crate::config::DestinationConfig) -> String {
227 dest.prefix
228 .clone()
229 .or_else(|| dest.path.clone())
230 .unwrap_or_else(|| "<unresolved>".into())
231}
232
233fn render_pretty(results: &[ExportVerdict], hard_failures: &[String]) {
234 use std::io::Write;
235 let stdout = std::io::stdout();
236 let mut h = stdout.lock();
237
238 for r in results {
239 let _ = writeln!(h, "── {} ──", r.name);
240 let _ = writeln!(h, " prefix: {}", r.resolved_prefix);
241 let v = &r.verification;
242 if v.legacy_run {
243 let _ = writeln!(
244 h,
245 " status: legacy_run (no manifest at destination — pre-0.7.0 prefix)"
246 );
247 continue;
248 }
249 if !v.manifest_found {
250 let _ = writeln!(h, " status: NO MANIFEST");
251 continue;
252 }
253 let _ = writeln!(
254 h,
255 " status: {}",
256 if v.passed { "PASSED" } else { "FAILED" }
257 );
258 let _ = writeln!(
259 h,
260 " parts: {} verified ({} md5, {} size-only), {} failed",
261 v.parts_verified,
262 v.parts_md5_verified,
263 v.parts_verified.saturating_sub(v.parts_md5_verified),
264 v.parts_failed
265 );
266 let _ = writeln!(
267 h,
268 " _SUCCESS: {}",
269 if v.success_marker_consistent {
270 "consistent"
271 } else if v.failures.iter().any(|f| matches!(
272 f,
273 crate::pipeline::ManifestVerificationFailure::SuccessMarkerStale { .. }
274 | crate::pipeline::ManifestVerificationFailure::SuccessMarkerMalformed { .. }
275 | crate::pipeline::ManifestVerificationFailure::SuccessMarkerReadError { .. }
276 )) {
277 "INCONSISTENT (see failures)"
278 } else {
279 "absent (no signal)"
280 }
281 );
282 let _ = writeln!(
283 h,
284 " manifest: {}",
285 if v.manifest_self_consistent {
286 "self-consistent"
287 } else {
288 "INCONSISTENT (see failures)"
289 }
290 );
291 for failure in &v.failures {
292 let _ = writeln!(h, " failure: {}", failure);
295 }
296 }
297
298 if !hard_failures.is_empty() {
299 let _ = writeln!(h);
300 let _ = writeln!(h, "── errors ──");
301 for e in hard_failures {
302 let _ = writeln!(h, " {}", e);
303 }
304 }
305 let _ = h.flush();
306}
307
308fn render_json(
309 results: &[ExportVerdict],
310 hard_failures: &[String],
311 out_path: Option<String>,
312) -> Result<()> {
313 let payload = serde_json::json!({
314 "exports": results
315 .iter()
316 .map(|r| {
317 serde_json::json!({
318 "export_name": r.name,
319 "resolved_prefix": r.resolved_prefix,
320 "verification": r.verification,
321 })
322 })
323 .collect::<Vec<_>>(),
324 "errors": hard_failures,
325 });
326 let serialized = serde_json::to_string_pretty(&payload)?;
327 match out_path {
328 Some(p) => {
329 std::fs::write(Path::new(&p), &serialized)?;
330 log::info!("rivet validate: wrote JSON report to {}", p);
331 }
332 None => {
333 println!("{}", serialized);
334 }
335 }
336 Ok(())
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 #[test]
346 fn target_default_uses_today() {
347 let target = ValidateTarget::default();
348 let ctx = target.placeholder_context("orders");
349 assert_eq!(ctx.date, chrono::Utc::now().date_naive());
350 assert_eq!(ctx.export_name, "orders");
351 assert!(ctx.run_id.is_none());
352 }
353
354 #[test]
355 fn target_with_date_overrides_today() {
356 let target = ValidateTarget {
357 date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
358 ..Default::default()
359 };
360 let ctx = target.placeholder_context("orders");
361 assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
362 assert!(ctx.run_id.is_none());
363 }
364
365 #[test]
366 fn target_composes_date_and_run_id() {
367 let target = ValidateTarget {
371 date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
372 run_id: Some("r-abc123".into()),
373 prefix_override: None,
374 };
375 let ctx = target.placeholder_context("orders");
376 assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
377 assert_eq!(ctx.run_id.as_deref(), Some("r-abc123"));
378 }
379
380 #[test]
383 fn resolved_prefix_prefers_cloud_prefix_over_path() {
384 let dest = crate::config::DestinationConfig {
385 destination_type: crate::config::DestinationType::S3,
386 prefix: Some("exports/2026-05-21/orders/".into()),
387 path: Some("/scratch".into()),
388 ..Default::default()
389 };
390 assert_eq!(
391 resolved_prefix_for_display(&dest),
392 "exports/2026-05-21/orders/",
393 );
394 }
395
396 #[test]
397 fn resolved_prefix_falls_back_to_path_when_prefix_missing() {
398 let dest = crate::config::DestinationConfig {
399 destination_type: crate::config::DestinationType::Local,
400 prefix: None,
401 path: Some("/data/out".into()),
402 ..Default::default()
403 };
404 assert_eq!(resolved_prefix_for_display(&dest), "/data/out");
405 }
406}