Skip to main content

rivet_cli/pipeline/
validate.rs

1use std::path::Path;
2
3use crate::config::FormatType;
4use crate::error::Result;
5
6pub fn validate_output(path: &Path, format: FormatType, expected_rows: usize) -> Result<()> {
7    let actual = match format {
8        FormatType::Parquet => {
9            let file = std::fs::File::open(path)?;
10            let builder =
11                parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)?;
12            let reader = builder.build()?;
13            let mut count = 0usize;
14            for batch in reader {
15                count += batch?.num_rows();
16            }
17            count
18        }
19        FormatType::Csv => {
20            let content = std::fs::read_to_string(path)?;
21            let lines = content.lines().count();
22            lines.saturating_sub(1)
23        }
24    };
25
26    if actual != expected_rows {
27        anyhow::bail!(
28            "validation failed: expected {} rows, got {} in {}",
29            expected_rows,
30            actual,
31            path.display()
32        );
33    }
34
35    log::info!("validation passed: {} rows verified", actual);
36    Ok(())
37}