1use std::path::Path;
32
33use chrono::NaiveDate;
34
35use crate::config::Config;
36use crate::destination::placeholder::PlaceholderContext;
37use crate::error::Result;
38use crate::pipeline::ManifestVerification;
39use crate::pipeline::validate_manifest::verify_at_destination;
40
41pub enum ValidateOutputFormat {
43 Pretty,
45 Json(Option<String>),
47}
48
49#[derive(Debug, Default, Clone)]
55pub struct ValidateTarget {
56 pub date: Option<NaiveDate>,
58 pub run_id: Option<String>,
60 pub prefix_override: Option<String>,
63}
64
65impl ValidateTarget {
66 fn placeholder_context(&self, export_name: &str) -> PlaceholderContext {
67 let mut ctx = match self.date {
68 Some(d) => PlaceholderContext::for_date(d, export_name),
69 None => PlaceholderContext::for_today(export_name),
70 };
71 if let Some(rid) = &self.run_id {
72 ctx = ctx.with_run_id(rid.clone());
73 }
74 ctx
75 }
76}
77
78pub fn run_validate_command(
87 config_path: &str,
88 export_name: Option<&str>,
89 format: ValidateOutputFormat,
90 target: ValidateTarget,
91) -> Result<()> {
92 let config = Config::load_with_params(config_path, None)?;
93
94 let exports: Vec<&crate::config::ExportConfig> = match export_name {
95 Some(name) => match config.exports.iter().find(|e| e.name == name) {
96 Some(e) => vec![e],
97 None => anyhow::bail!("export '{}' not found in config", name),
98 },
99 None => config.exports.iter().collect(),
100 };
101
102 if exports.is_empty() {
103 anyhow::bail!("no exports defined in config — nothing to validate");
104 }
105
106 if target.prefix_override.is_some() && exports.len() > 1 {
111 anyhow::bail!(
112 "--prefix requires --export <name>: cannot apply one override to {} exports",
113 exports.len()
114 );
115 }
116
117 let mut all_results: Vec<ExportVerdict> = Vec::with_capacity(exports.len());
118 let mut hard_failures: Vec<String> = Vec::new();
119
120 for export in &exports {
121 let ctx = target.placeholder_context(&export.name);
125 let mut expanded_dest =
126 crate::destination::placeholder::expand_destination(export.destination.clone(), &ctx);
127 if let Some(p) = &target.prefix_override {
128 expanded_dest.path = Some(p.clone());
132 expanded_dest.prefix = Some(p.clone());
133 }
134 let resolved_prefix = resolved_prefix_for_display(&expanded_dest);
135 let dest = match crate::destination::create_destination(&expanded_dest) {
136 Ok(d) => d,
137 Err(e) => {
138 let msg = format!(
139 "export '{}' (prefix: {}): could not open destination: {:#}",
140 export.name, resolved_prefix, e
141 );
142 hard_failures.push(msg);
143 continue;
144 }
145 };
146 if dest.capabilities().commit_protocol == crate::destination::WriteCommitProtocol::Streaming
148 {
149 log::info!(
150 "export '{}': streaming destination, skipping (nothing to verify)",
151 export.name
152 );
153 continue;
154 }
155 match verify_at_destination(&*dest, "") {
156 Ok(v) => {
157 all_results.push(ExportVerdict {
158 name: export.name.clone(),
159 resolved_prefix,
160 verification: v,
161 });
162 }
163 Err(e) => {
164 hard_failures.push(format!(
165 "export '{}' (prefix: {}): verify_at_destination failed: {:#}",
166 export.name, resolved_prefix, e
167 ));
168 }
169 }
170 }
171
172 match format {
173 ValidateOutputFormat::Pretty => render_pretty(&all_results, &hard_failures),
174 ValidateOutputFormat::Json(out_path) => {
175 render_json(&all_results, &hard_failures, out_path)?
176 }
177 }
178
179 let any_failed = all_results
193 .iter()
194 .any(|r| r.verification.manifest_found && !r.verification.passed);
195 if !hard_failures.is_empty() || any_failed {
196 anyhow::bail!(
197 "rivet validate: {} export(s) failed verification",
198 hard_failures.len()
199 + all_results
200 .iter()
201 .filter(|r| r.verification.manifest_found && !r.verification.passed)
202 .count()
203 );
204 }
205 Ok(())
206}
207
208struct ExportVerdict {
212 name: String,
213 resolved_prefix: String,
214 verification: ManifestVerification,
215}
216
217fn resolved_prefix_for_display(dest: &crate::config::DestinationConfig) -> String {
224 dest.prefix
225 .clone()
226 .or_else(|| dest.path.clone())
227 .unwrap_or_else(|| "<unresolved>".into())
228}
229
230fn render_pretty(results: &[ExportVerdict], hard_failures: &[String]) {
231 use std::io::Write;
232 let stdout = std::io::stdout();
233 let mut h = stdout.lock();
234
235 for r in results {
236 let _ = writeln!(h, "── {} ──", r.name);
237 let _ = writeln!(h, " prefix: {}", r.resolved_prefix);
238 let v = &r.verification;
239 if v.legacy_run {
240 let _ = writeln!(
241 h,
242 " status: legacy_run (no manifest at destination — pre-0.7.0 prefix)"
243 );
244 continue;
245 }
246 if !v.manifest_found {
247 let _ = writeln!(h, " status: NO MANIFEST");
248 continue;
249 }
250 let _ = writeln!(
251 h,
252 " status: {}",
253 if v.passed { "PASSED" } else { "FAILED" }
254 );
255 let _ = writeln!(
256 h,
257 " parts: {} verified, {} failed",
258 v.parts_verified, v.parts_failed
259 );
260 let _ = writeln!(
261 h,
262 " _SUCCESS: {}",
263 if v.success_marker_consistent {
264 "consistent"
265 } else if v.failures.iter().any(|f| matches!(
266 f,
267 crate::pipeline::ManifestVerificationFailure::SuccessMarkerStale { .. }
268 | crate::pipeline::ManifestVerificationFailure::SuccessMarkerMalformed { .. }
269 | crate::pipeline::ManifestVerificationFailure::SuccessMarkerReadError { .. }
270 )) {
271 "INCONSISTENT (see failures)"
272 } else {
273 "absent (no signal)"
274 }
275 );
276 let _ = writeln!(
277 h,
278 " manifest: {}",
279 if v.manifest_self_consistent {
280 "self-consistent"
281 } else {
282 "INCONSISTENT (see failures)"
283 }
284 );
285 for failure in &v.failures {
286 let _ = writeln!(h, " failure: {}", failure);
289 }
290 }
291
292 if !hard_failures.is_empty() {
293 let _ = writeln!(h);
294 let _ = writeln!(h, "── errors ──");
295 for e in hard_failures {
296 let _ = writeln!(h, " {}", e);
297 }
298 }
299 let _ = h.flush();
300}
301
302fn render_json(
303 results: &[ExportVerdict],
304 hard_failures: &[String],
305 out_path: Option<String>,
306) -> Result<()> {
307 let payload = serde_json::json!({
308 "exports": results
309 .iter()
310 .map(|r| {
311 serde_json::json!({
312 "export_name": r.name,
313 "resolved_prefix": r.resolved_prefix,
314 "verification": r.verification,
315 })
316 })
317 .collect::<Vec<_>>(),
318 "errors": hard_failures,
319 });
320 let serialized = serde_json::to_string_pretty(&payload)?;
321 match out_path {
322 Some(p) => {
323 std::fs::write(Path::new(&p), &serialized)?;
324 log::info!("rivet validate: wrote JSON report to {}", p);
325 }
326 None => {
327 println!("{}", serialized);
328 }
329 }
330 Ok(())
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336
337 #[test]
340 fn target_default_uses_today() {
341 let target = ValidateTarget::default();
342 let ctx = target.placeholder_context("orders");
343 assert_eq!(ctx.date, chrono::Utc::now().date_naive());
344 assert_eq!(ctx.export_name, "orders");
345 assert!(ctx.run_id.is_none());
346 }
347
348 #[test]
349 fn target_with_date_overrides_today() {
350 let target = ValidateTarget {
351 date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
352 ..Default::default()
353 };
354 let ctx = target.placeholder_context("orders");
355 assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
356 assert!(ctx.run_id.is_none());
357 }
358
359 #[test]
360 fn target_composes_date_and_run_id() {
361 let target = ValidateTarget {
365 date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
366 run_id: Some("r-abc123".into()),
367 prefix_override: None,
368 };
369 let ctx = target.placeholder_context("orders");
370 assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
371 assert_eq!(ctx.run_id.as_deref(), Some("r-abc123"));
372 }
373
374 #[test]
377 fn resolved_prefix_prefers_cloud_prefix_over_path() {
378 let dest = crate::config::DestinationConfig {
379 destination_type: crate::config::DestinationType::S3,
380 prefix: Some("exports/2026-05-21/orders/".into()),
381 path: Some("/scratch".into()),
382 ..Default::default()
383 };
384 assert_eq!(
385 resolved_prefix_for_display(&dest),
386 "exports/2026-05-21/orders/",
387 );
388 }
389
390 #[test]
391 fn resolved_prefix_falls_back_to_path_when_prefix_missing() {
392 let dest = crate::config::DestinationConfig {
393 destination_type: crate::config::DestinationType::Local,
394 prefix: None,
395 path: Some("/data/out".into()),
396 ..Default::default()
397 };
398 assert_eq!(resolved_prefix_for_display(&dest), "/data/out");
399 }
400}