parqcat 1.0.1

A lightweight Unix-style CLI for inspecting Parquet files.
Documentation
use std::fs::File;
use std::io::Write;

use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;

use crate::cli::{Mode, OutputFormat};
use crate::error::Result;

const BATCH_SIZE: usize = 8192;

pub fn run(
    file: File,
    mode: Mode,
    output_format: OutputFormat,
    output: &mut impl Write,
) -> Result<()> {
    match mode {
        Mode::Schema => {
            let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
            crate::render::write_schema(builder.schema(), output)
        }
        Mode::Cat => {
            let Some((builder, _)) = row_reader(ParquetRecordBatchReaderBuilder::try_new(file)?)
            else {
                return Ok(());
            };
            stream(builder, output_format, output)
        }
        Mode::Head { lines } => {
            if lines == 0 {
                return Ok(());
            }
            let Some((builder, _)) = row_reader(ParquetRecordBatchReaderBuilder::try_new(file)?)
            else {
                return Ok(());
            };
            stream(builder.with_limit(lines), output_format, output)
        }
        Mode::Tail { lines } => {
            if lines == 0 {
                return Ok(());
            }
            let Some((builder, total_rows)) =
                row_reader(ParquetRecordBatchReaderBuilder::try_new(file)?)
            else {
                return Ok(());
            };
            let plan = tail_plan(&builder, lines.min(total_rows));
            stream(
                builder
                    .with_row_groups(plan.row_groups)
                    .with_offset(plan.offset)
                    .with_limit(lines),
                output_format,
                output,
            )
        }
    }
}

fn row_reader(
    builder: ParquetRecordBatchReaderBuilder<File>,
) -> Option<(ParquetRecordBatchReaderBuilder<File>, usize)> {
    let total_rows = usize::try_from(builder.metadata().file_metadata().num_rows()).unwrap_or(0);

    if total_rows == 0 {
        return None;
    }

    Some((builder.with_batch_size(BATCH_SIZE), total_rows))
}

fn stream(
    builder: ParquetRecordBatchReaderBuilder<File>,
    output_format: OutputFormat,
    output: &mut impl Write,
) -> Result<()> {
    let reader = builder.build()?;
    let mut table = None;

    for batch in reader {
        let batch = batch?;
        match output_format {
            OutputFormat::Jsonl => crate::render::write_json_batch(&batch, output)?,
            OutputFormat::Table => {
                let table = table.get_or_insert_with(|| crate::render::TableWriter::new(&batch));
                table.write_batch(&batch, output)?;
            }
        }
    }
    Ok(())
}

struct TailPlan {
    row_groups: Vec<usize>,
    offset: usize,
}

fn tail_plan(builder: &ParquetRecordBatchReaderBuilder<File>, wanted_rows: usize) -> TailPlan {
    let metadata = builder.metadata();
    let mut selected_rows = 0usize;
    let mut first_group = metadata.num_row_groups();

    while first_group > 0 && selected_rows < wanted_rows {
        first_group -= 1;
        let rows = usize::try_from(metadata.row_group(first_group).num_rows()).unwrap_or(0);
        selected_rows += rows;
    }

    let row_groups = (first_group..metadata.num_row_groups()).collect::<Vec<_>>();
    let offset = selected_rows.saturating_sub(wanted_rows);

    TailPlan { row_groups, offset }
}