rs_ints2parquet/
lib.rs

1use std::io;
2
3use std::path::Path;
4use std::path::PathBuf;
5
6use io::BufRead;
7use io::Write;
8
9use clap::Parser;
10use clap::ValueEnum;
11
12use parquet::basic::Encoding;
13
14use parquet::file::properties::EnabledStatistics;
15use parquet::file::properties::WriterProperties;
16use parquet::file::properties::WriterVersion;
17
18use rs_primitives2rbat::arrow;
19
20use arrow::record_batch::RecordBatch;
21
22use rs_primitives2rbat::conv::sync::fs::FsKvStore;
23use rs_primitives2rbat::datatype::BasicSchema;
24
25pub fn reader2basic_schema<R>(rdr: R) -> Result<BasicSchema, io::Error>
26where
27    R: BufRead,
28{
29    serde_json::from_reader(rdr).map_err(io::Error::other)
30}
31
32pub fn stdin2basic_schema() -> Result<BasicSchema, io::Error> {
33    reader2basic_schema(io::stdin().lock())
34}
35
36pub fn filename2basic_schema<P>(basic_schema_filename: P) -> Result<BasicSchema, io::Error>
37where
38    P: AsRef<Path>,
39{
40    let f = std::fs::File::open(basic_schema_filename)?;
41    let br = io::BufReader::new(f);
42    reader2basic_schema(br)
43}
44
45pub fn colname2fullpath(dirname: &Path, colname: &str, ext: &str) -> PathBuf {
46    dirname.join(colname).with_extension(ext)
47}
48
49pub fn key2filename_new(dirname: PathBuf, ext: String) -> impl Fn(&str) -> PathBuf {
50    move |colname: &str| colname2fullpath(&dirname, colname, &ext)
51}
52
53pub fn fs2batch(sch: BasicSchema, dirname: PathBuf, ext: String) -> Result<RecordBatch, io::Error> {
54    rs_primitives2rbat::conv::sync::fs::fs2batch(
55        sch,
56        FsKvStore {
57            key2filename: key2filename_new(dirname, ext),
58        },
59    )
60}
61
62pub fn schema2fs2batch<P>(
63    basic_schema_filename: Option<P>,
64    dirname: PathBuf,
65    ext: String,
66) -> Result<RecordBatch, io::Error>
67where
68    P: AsRef<Path>,
69{
70    let sch: BasicSchema = match basic_schema_filename {
71        Some(filename) => filename2basic_schema(filename),
72        None => stdin2basic_schema(),
73    }?;
74    fs2batch(sch, dirname, ext)
75}
76
77pub fn batch2parquet<W>(
78    b: &RecordBatch,
79    wtr: W,
80    props: Option<WriterProperties>,
81) -> Result<W, io::Error>
82where
83    W: Write + Send,
84{
85    let mut pw = parquet::arrow::arrow_writer::ArrowWriter::try_new(wtr, b.schema(), props)?;
86    pw.write(b)?;
87    pw.into_inner().map_err(io::Error::other)
88}
89
90pub fn fsync_nop(_: &mut std::fs::File) -> Result<(), io::Error> {
91    Ok(())
92}
93
94pub fn fsync_dat(f: &mut std::fs::File) -> Result<(), io::Error> {
95    f.sync_data()
96}
97
98pub fn fsync_all(f: &mut std::fs::File) -> Result<(), io::Error> {
99    f.sync_all()
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Parser)]
103pub enum FsyncType {
104    /// No fsync.
105    Nop,
106
107    /// fsync_data(for linux).
108    Dat,
109
110    /// fsync.
111    All,
112}
113
114impl core::str::FromStr for FsyncType {
115    type Err = io::Error;
116
117    fn from_str(s: &str) -> Result<Self, Self::Err> {
118        match s {
119            "nop" | "nope" | "none" => Ok(Self::Nop),
120            "dat" | "data" | "fsync_data" => Ok(Self::Dat),
121            "all" | "fsync" => Ok(Self::All),
122            _ => Err(io::Error::other("invalid fsync type")),
123        }
124    }
125}
126
127pub fn typ2fsync(styp: FsyncType) -> fn(&mut std::fs::File) -> Result<(), io::Error> {
128    match styp {
129        FsyncType::Nop => fsync_nop,
130        FsyncType::Dat => fsync_dat,
131        FsyncType::All => fsync_all,
132    }
133}
134
135pub fn batch2file<P, F>(
136    b: &RecordBatch,
137    parquet_filename: P,
138    props: Option<WriterProperties>,
139    fsync: F,
140) -> Result<(), io::Error>
141where
142    P: AsRef<Path>,
143    F: Fn(&mut std::fs::File) -> Result<(), io::Error>,
144{
145    let f = std::fs::File::create(parquet_filename)?;
146    let bw = io::BufWriter::new(f);
147    let mut wrote = batch2parquet(b, bw, props)?;
148    wrote.flush()?;
149    let mut wfile = wrote.into_inner()?;
150    wfile.flush()?;
151    fsync(&mut wfile)?;
152    Ok(())
153}
154
155#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
156pub enum Compression {
157    UNCOMPRESSED,
158    SNAPPY,
159    #[allow(non_camel_case_types)]
160    LZ4_RAW,
161}
162
163impl From<Compression> for parquet::basic::Compression {
164    fn from(c: Compression) -> Self {
165        match c {
166            Compression::UNCOMPRESSED => Self::UNCOMPRESSED,
167            Compression::SNAPPY => Self::SNAPPY,
168            Compression::LZ4_RAW => Self::LZ4_RAW,
169        }
170    }
171}
172
173#[derive(Debug, Parser)]
174pub struct Args {
175    /// The path to the basic schema. Keep this empty to use stdin instead.
176    #[clap(short, long)]
177    pub basic_schema_filename: Option<String>,
178
179    /// The path to the directory containing the raw data.
180    #[clap(short, long)]
181    pub data_dir: String,
182
183    /// The extension of the data files.
184    #[clap(short, long)]
185    pub data_ext: String,
186
187    /// The save path for the parquet file.
188    #[clap(short, long)]
189    pub output_parquet_filename: String,
190
191    /// The type of the fsync.
192    #[clap(short, long, value_enum)]
193    pub fsync_type: Option<FsyncType>,
194
195    /// The type of the compression.
196    #[clap(short, long, value_enum)]
197    pub compression: Option<Compression>,
198
199    /// The type of the encoding when dictionary is not enabled.
200    #[clap(short, long, value_enum)]
201    pub encoding: Option<Encoding>,
202
203    /// Enables dictionary encoding.
204    #[clap(short, long)]
205    pub dictionary_enabled: Option<bool>,
206
207    /// The size of the dictionary page in bytes.
208    #[clap(short, long)]
209    pub dictionary_page_size_limit: Option<usize>,
210
211    /// Max number of rows in a row group.
212    #[clap(short, long)]
213    pub max_row_group_size: Option<usize>,
214
215    /// Max number of rows in a data page.
216    #[clap(short, long)]
217    pub data_page_row_count_limit: Option<usize>,
218
219    /// Max size of a data page in bytes.
220    #[clap(short, long)]
221    pub data_page_size_limit: Option<usize>,
222
223    /// Max length of statistics in the column index.
224    #[clap(short, long)]
225    pub column_index_truncate_length: Option<usize>,
226
227    /// Writes statistics to the data page headers.
228    #[clap(short, long)]
229    pub write_page_header_statistics: Option<bool>,
230
231    /// Enables bloom filter.
232    #[clap(short, long)]
233    pub bloom_filter_enabled: Option<bool>,
234
235    /// The probability parameter for the bloom filter.
236    #[clap(short, long)]
237    pub bloom_filter_fpp: Option<f64>,
238
239    /// The estimated number of distinct values for the bloom filter.
240    #[clap(short, long)]
241    pub bloom_filter_ndv: Option<u64>,
242
243    /// Enables statistics.
244    #[clap(short, long, value_enum)]
245    pub statistics_enabled: Option<EnabledStatistics>,
246
247    /// The version of the writer.
248    #[clap(short, long, value_enum)]
249    pub writer_version: Option<WriterVersion>,
250
251    /// Enables coerce arrow types.
252    #[clap(short, long)]
253    pub coerce_types: Option<bool>,
254}
255
256impl Args {
257    pub fn to_props(&self) -> WriterProperties {
258        let mut bldr = WriterProperties::builder();
259        if let Some(v) = self.compression {
260            bldr = bldr.set_compression(v.into());
261        }
262        if let Some(v) = self.encoding {
263            bldr = bldr.set_encoding(v);
264        }
265        if let Some(v) = self.dictionary_enabled {
266            bldr = bldr.set_dictionary_enabled(v);
267        }
268        if let Some(v) = self.dictionary_page_size_limit {
269            bldr = bldr.set_dictionary_page_size_limit(v);
270        }
271        if let Some(v) = self.max_row_group_size {
272            bldr = bldr.set_max_row_group_size(v);
273        }
274        if let Some(v) = self.data_page_row_count_limit {
275            bldr = bldr.set_data_page_row_count_limit(v);
276        }
277        if let Some(v) = self.data_page_size_limit {
278            bldr = bldr.set_data_page_size_limit(v);
279        }
280        if let Some(v) = self.column_index_truncate_length {
281            bldr = bldr.set_column_index_truncate_length(Some(v));
282        }
283        if let Some(v) = self.write_page_header_statistics {
284            bldr = bldr.set_write_page_header_statistics(v);
285        }
286        if let Some(v) = self.bloom_filter_enabled {
287            bldr = bldr.set_bloom_filter_enabled(v);
288        }
289        if let Some(v) = self.bloom_filter_fpp {
290            bldr = bldr.set_bloom_filter_fpp(v);
291        }
292        if let Some(v) = self.bloom_filter_ndv {
293            bldr = bldr.set_bloom_filter_ndv(v);
294        }
295        if let Some(v) = self.statistics_enabled {
296            bldr = bldr.set_statistics_enabled(v);
297        }
298        if let Some(v) = self.writer_version {
299            bldr = bldr.set_writer_version(v);
300        }
301        if let Some(v) = self.coerce_types {
302            bldr = bldr.set_coerce_types(v);
303        }
304
305        bldr.build()
306    }
307}
308
309impl Args {
310    pub fn schema2fs2batch2parquet2file(self) -> Result<(), io::Error> {
311        let props = self.to_props();
312        let fsync = typ2fsync(self.fsync_type.unwrap_or(FsyncType::Nop));
313        let rb = schema2fs2batch(
314            self.basic_schema_filename.as_deref(),
315            self.data_dir.into(),
316            self.data_ext,
317        )?;
318        batch2file(&rb, self.output_parquet_filename, Some(props), fsync)
319    }
320}