Skip to main content

vortex_tui/
convert.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Convert Parquet files to Vortex format.
5
6use std::path::PathBuf;
7
8use clap::Parser;
9use clap::ValueEnum;
10use futures::StreamExt;
11use indicatif::ProgressBar;
12use parquet::arrow::ParquetRecordBatchStreamBuilder;
13use tokio::fs::File;
14use tokio::io::AsyncWriteExt;
15use vortex::array::ArrayRef;
16use vortex::array::arrow::FromArrowArray;
17use vortex::array::stream::ArrayStreamAdapter;
18use vortex::dtype::DType;
19use vortex::dtype::arrow::FromArrowType;
20use vortex::error::VortexExpect;
21use vortex::error::vortex_err;
22use vortex::file::WriteOptionsSessionExt;
23use vortex::file::WriteStrategyBuilder;
24use vortex::session::VortexSession;
25
26/// Compression strategy to use when converting Parquet files to Vortex format.
27#[derive(Clone, Copy, Debug, Default, ValueEnum)]
28pub enum Strategy {
29    /// Use the BtrBlocks compressor strategy (default)
30    #[default]
31    Btrblocks,
32    /// Use the Compact compression strategy for more aggressive compression.
33    Compact,
34}
35
36/// Command-line flags for the convert command.
37#[derive(Debug, Clone, Parser)]
38pub struct ConvertArgs {
39    /// Path to the Parquet file on disk to convert to Vortex.
40    pub file: PathBuf,
41
42    /// Compression strategy.
43    #[arg(short, long, default_value = "btrblocks")]
44    pub strategy: Strategy,
45
46    /// Execute quietly. No output will be printed.
47    #[arg(short, long)]
48    pub quiet: bool,
49}
50
51/// The batch size of the record batches.
52pub const BATCH_SIZE: usize = 8192;
53
54/// Convert Parquet files to Vortex.
55///
56/// # Errors
57///
58/// Returns an error if the input file cannot be read or the output file cannot be written.
59pub async fn exec_convert(session: &VortexSession, flags: ConvertArgs) -> anyhow::Result<()> {
60    let input_path = flags.file.clone();
61    if !flags.quiet {
62        eprintln!("Converting input Parquet file: {}", input_path.display());
63    }
64
65    let output_path = input_path.with_extension("vortex");
66    let file = File::open(input_path).await?;
67
68    let parquet = ParquetRecordBatchStreamBuilder::new(file)
69        .await?
70        .with_batch_size(BATCH_SIZE);
71    let num_rows = parquet.metadata().file_metadata().num_rows();
72
73    let dtype = DType::from_arrow(parquet.schema().as_ref());
74    let mut vortex_stream = parquet
75        .build()?
76        .map(|record_batch| {
77            record_batch
78                .map_err(|e| vortex_err!(External: e))
79                .and_then(|rb| ArrayRef::from_arrow(rb, false))
80        })
81        .boxed();
82
83    if !flags.quiet {
84        // Parquet reader returns batches, rather than row groups. So make sure we correctly
85        // configure the progress bar.
86        let nbatches = u64::try_from(num_rows)
87            .vortex_expect("negative row count?")
88            .div_ceil(BATCH_SIZE as u64);
89        vortex_stream = ProgressBar::new(nbatches)
90            .wrap_stream(vortex_stream)
91            .boxed();
92    }
93
94    let strategy = WriteStrategyBuilder::default();
95    let strategy = match flags.strategy {
96        Strategy::Btrblocks => strategy,
97        Strategy::Compact => strategy.with_compact_encodings(),
98    };
99
100    let mut file = File::create(output_path).await?;
101    session
102        .write_options()
103        .with_strategy(strategy.build())
104        .write(&mut file, ArrayStreamAdapter::new(dtype, vortex_stream))
105        .await?;
106    file.shutdown().await?;
107
108    Ok(())
109}