1use std::path::PathBuf;
7
8use clap::Parser;
9use clap::ValueEnum;
10use futures::StreamExt;
11use indicatif::ProgressBar;
12use parquet::arrow::ParquetRecordBatchStreamBuilder;
13use tokio::fs::File;
14use tokio::io::AsyncWriteExt;
15use vortex::array::ArrayRef;
16use vortex::array::arrow::FromArrowArray;
17use vortex::array::stream::ArrayStreamAdapter;
18use vortex::dtype::DType;
19use vortex::dtype::arrow::FromArrowType;
20use vortex::error::VortexExpect;
21use vortex::error::vortex_err;
22use vortex::file::WriteOptionsSessionExt;
23use vortex::file::WriteStrategyBuilder;
24use vortex::session::VortexSession;
25
26#[derive(Clone, Copy, Debug, Default, ValueEnum)]
28pub enum Strategy {
29 #[default]
31 Btrblocks,
32 Compact,
34}
35
36#[derive(Debug, Clone, Parser)]
38pub struct ConvertArgs {
39 pub file: PathBuf,
41
42 #[arg(short, long, default_value = "btrblocks")]
44 pub strategy: Strategy,
45
46 #[arg(short, long)]
48 pub quiet: bool,
49}
50
51pub const BATCH_SIZE: usize = 8192;
53
54pub async fn exec_convert(session: &VortexSession, flags: ConvertArgs) -> anyhow::Result<()> {
60 let input_path = flags.file.clone();
61 if !flags.quiet {
62 eprintln!("Converting input Parquet file: {}", input_path.display());
63 }
64
65 let output_path = input_path.with_extension("vortex");
66 let file = File::open(input_path).await?;
67
68 let parquet = ParquetRecordBatchStreamBuilder::new(file)
69 .await?
70 .with_batch_size(BATCH_SIZE);
71 let num_rows = parquet.metadata().file_metadata().num_rows();
72
73 let dtype = DType::from_arrow(parquet.schema().as_ref());
74 let mut vortex_stream = parquet
75 .build()?
76 .map(|record_batch| {
77 record_batch
78 .map_err(|e| vortex_err!(External: e))
79 .and_then(|rb| ArrayRef::from_arrow(rb, false))
80 })
81 .boxed();
82
83 if !flags.quiet {
84 let nbatches = u64::try_from(num_rows)
87 .vortex_expect("negative row count?")
88 .div_ceil(BATCH_SIZE as u64);
89 vortex_stream = ProgressBar::new(nbatches)
90 .wrap_stream(vortex_stream)
91 .boxed();
92 }
93
94 let strategy = WriteStrategyBuilder::default();
95 let strategy = match flags.strategy {
96 Strategy::Btrblocks => strategy,
97 Strategy::Compact => strategy.with_compact_encodings(),
98 };
99
100 let mut file = File::create(output_path).await?;
101 session
102 .write_options()
103 .with_strategy(strategy.build())
104 .write(&mut file, ArrayStreamAdapter::new(dtype, vortex_stream))
105 .await?;
106 file.shutdown().await?;
107
108 Ok(())
109}