vortex_tui/
convert.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Convert Parquet files to Vortex format.
5
6use std::path::PathBuf;
7
8use clap::Parser;
9use clap::ValueEnum;
10use futures::StreamExt;
11use indicatif::ProgressBar;
12use parquet::arrow::ParquetRecordBatchStreamBuilder;
13use tokio::fs::File;
14use tokio::io::AsyncWriteExt;
15use vortex::array::ArrayRef;
16use vortex::array::arrow::FromArrowArray;
17use vortex::array::stream::ArrayStreamAdapter;
18use vortex::compressor::CompactCompressor;
19use vortex::dtype::DType;
20use vortex::dtype::arrow::FromArrowType;
21use vortex::error::VortexError;
22use vortex::error::VortexExpect;
23use vortex::file::WriteOptionsSessionExt;
24use vortex::file::WriteStrategyBuilder;
25use vortex::session::VortexSession;
26
27/// Compression strategy to use when converting Parquet files to Vortex format.
28#[derive(Clone, Copy, Debug, Default, ValueEnum)]
29pub enum Strategy {
30    /// Use the BtrBlocks compressor strategy (default)
31    #[default]
32    Btrblocks,
33    /// Use the Compact compression strategy for more aggressive compression.
34    Compact,
35}
36
37/// Command-line flags for the convert command.
38#[derive(Debug, Clone, Parser)]
39pub struct ConvertArgs {
40    /// Path to the Parquet file on disk to convert to Vortex.
41    pub file: PathBuf,
42
43    /// Compression strategy.
44    #[arg(short, long, default_value = "btrblocks")]
45    pub strategy: Strategy,
46
47    /// Execute quietly. No output will be printed.
48    #[arg(short, long)]
49    pub quiet: bool,
50}
51
52/// The batch size of the record batches.
53pub const BATCH_SIZE: usize = 8192;
54
55/// Convert Parquet files to Vortex.
56///
57/// # Errors
58///
59/// Returns an error if the input file cannot be read or the output file cannot be written.
60pub async fn exec_convert(session: &VortexSession, flags: ConvertArgs) -> anyhow::Result<()> {
61    let input_path = flags.file.clone();
62    if !flags.quiet {
63        eprintln!("Converting input Parquet file: {}", input_path.display());
64    }
65
66    let output_path = input_path.with_extension("vortex");
67    let file = File::open(input_path).await?;
68
69    let parquet = ParquetRecordBatchStreamBuilder::new(file)
70        .await?
71        .with_batch_size(BATCH_SIZE);
72    let num_rows = parquet.metadata().file_metadata().num_rows();
73
74    let dtype = DType::from_arrow(parquet.schema().as_ref());
75    let mut vortex_stream = parquet
76        .build()?
77        .map(|record_batch| {
78            record_batch
79                .map_err(|e| VortexError::generic(e.into()))
80                .map(|rb| ArrayRef::from_arrow(rb, false))
81        })
82        .boxed();
83
84    if !flags.quiet {
85        // Parquet reader returns batches, rather than row groups. So make sure we correctly
86        // configure the progress bar.
87        let nbatches = u64::try_from(num_rows)
88            .vortex_expect("negative row count?")
89            .div_ceil(BATCH_SIZE as u64);
90        vortex_stream = ProgressBar::new(nbatches)
91            .wrap_stream(vortex_stream)
92            .boxed();
93    }
94
95    let strategy = WriteStrategyBuilder::default();
96    let strategy = match flags.strategy {
97        Strategy::Btrblocks => strategy,
98        Strategy::Compact => strategy.with_compressor(CompactCompressor::default()),
99    };
100
101    let mut file = File::create(output_path).await?;
102    session
103        .write_options()
104        .with_strategy(strategy.build())
105        .write(&mut file, ArrayStreamAdapter::new(dtype, vortex_stream))
106        .await?;
107    file.shutdown().await?;
108
109    Ok(())
110}