rivet_cli/pipeline/
validate.rs1use std::path::Path;
2
3use crate::config::FormatType;
4use crate::error::Result;
5
6pub fn validate_output(path: &Path, format: FormatType, expected_rows: usize) -> Result<()> {
7 let actual = match format {
8 FormatType::Parquet => {
9 let file = std::fs::File::open(path)?;
10 let builder =
11 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
12 let reader = builder.build()?;
13 let mut count = 0usize;
14 for batch in reader {
15 count += batch?.num_rows();
16 }
17 count
18 }
19 FormatType::Csv => {
20 let content = std::fs::read_to_string(path)?;
21 let lines = content.lines().count();
22 lines.saturating_sub(1)
23 }
24 };
25
26 if actual != expected_rows {
27 anyhow::bail!(
28 "validation failed: expected {} rows, got {} in {}",
29 expected_rows,
30 actual,
31 path.display()
32 );
33 }
34
35 log::info!("validation passed: {} rows verified", actual);
36 Ok(())
37}