1use std::path::PathBuf;
7
8use clap::Parser;
9use clap::ValueEnum;
10use futures::StreamExt;
11use indicatif::ProgressBar;
12use parquet::arrow::ParquetRecordBatchStreamBuilder;
13use tokio::fs::File;
14use tokio::io::AsyncWriteExt;
15use vortex::array::ArrayRef;
16use vortex::array::arrow::FromArrowArray;
17use vortex::array::stream::ArrayStreamAdapter;
18use vortex::compressor::CompactCompressor;
19use vortex::dtype::DType;
20use vortex::dtype::arrow::FromArrowType;
21use vortex::error::VortexError;
22use vortex::error::VortexExpect;
23use vortex::file::WriteOptionsSessionExt;
24use vortex::file::WriteStrategyBuilder;
25use vortex::session::VortexSession;
26
27#[derive(Clone, Copy, Debug, Default, ValueEnum)]
29pub enum Strategy {
30 #[default]
32 Btrblocks,
33 Compact,
35}
36
37#[derive(Debug, Clone, Parser)]
39pub struct ConvertArgs {
40 pub file: PathBuf,
42
43 #[arg(short, long, default_value = "btrblocks")]
45 pub strategy: Strategy,
46
47 #[arg(short, long)]
49 pub quiet: bool,
50}
51
52pub const BATCH_SIZE: usize = 8192;
54
55pub async fn exec_convert(session: &VortexSession, flags: ConvertArgs) -> anyhow::Result<()> {
61 let input_path = flags.file.clone();
62 if !flags.quiet {
63 eprintln!("Converting input Parquet file: {}", input_path.display());
64 }
65
66 let output_path = input_path.with_extension("vortex");
67 let file = File::open(input_path).await?;
68
69 let parquet = ParquetRecordBatchStreamBuilder::new(file)
70 .await?
71 .with_batch_size(BATCH_SIZE);
72 let num_rows = parquet.metadata().file_metadata().num_rows();
73
74 let dtype = DType::from_arrow(parquet.schema().as_ref());
75 let mut vortex_stream = parquet
76 .build()?
77 .map(|record_batch| {
78 record_batch
79 .map_err(|e| VortexError::generic(e.into()))
80 .map(|rb| ArrayRef::from_arrow(rb, false))
81 })
82 .boxed();
83
84 if !flags.quiet {
85 let nbatches = u64::try_from(num_rows)
88 .vortex_expect("negative row count?")
89 .div_ceil(BATCH_SIZE as u64);
90 vortex_stream = ProgressBar::new(nbatches)
91 .wrap_stream(vortex_stream)
92 .boxed();
93 }
94
95 let strategy = WriteStrategyBuilder::default();
96 let strategy = match flags.strategy {
97 Strategy::Btrblocks => strategy,
98 Strategy::Compact => strategy.with_compressor(CompactCompressor::default()),
99 };
100
101 let mut file = File::create(output_path).await?;
102 session
103 .write_options()
104 .with_strategy(strategy.build())
105 .write(&mut file, ArrayStreamAdapter::new(dtype, vortex_stream))
106 .await?;
107 file.shutdown().await?;
108
109 Ok(())
110}