1use std::io;
2
3use std::path::Path;
4use std::path::PathBuf;
5
6use io::BufRead;
7use io::Write;
8
9use clap::Parser;
10use clap::ValueEnum;
11
12use parquet::basic::Encoding;
13
14use parquet::file::properties::EnabledStatistics;
15use parquet::file::properties::WriterProperties;
16use parquet::file::properties::WriterVersion;
17
18use rs_primitives2rbat::arrow;
19
20use arrow::record_batch::RecordBatch;
21
22use rs_primitives2rbat::conv::sync::fs::FsKvStore;
23use rs_primitives2rbat::datatype::BasicSchema;
24
25pub fn reader2basic_schema<R>(rdr: R) -> Result<BasicSchema, io::Error>
26where
27 R: BufRead,
28{
29 serde_json::from_reader(rdr).map_err(io::Error::other)
30}
31
32pub fn stdin2basic_schema() -> Result<BasicSchema, io::Error> {
33 reader2basic_schema(io::stdin().lock())
34}
35
36pub fn filename2basic_schema<P>(basic_schema_filename: P) -> Result<BasicSchema, io::Error>
37where
38 P: AsRef<Path>,
39{
40 let f = std::fs::File::open(basic_schema_filename)?;
41 let br = io::BufReader::new(f);
42 reader2basic_schema(br)
43}
44
45pub fn colname2fullpath(dirname: &Path, colname: &str, ext: &str) -> PathBuf {
46 dirname.join(colname).with_extension(ext)
47}
48
49pub fn key2filename_new(dirname: PathBuf, ext: String) -> impl Fn(&str) -> PathBuf {
50 move |colname: &str| colname2fullpath(&dirname, colname, &ext)
51}
52
53pub fn fs2batch(sch: BasicSchema, dirname: PathBuf, ext: String) -> Result<RecordBatch, io::Error> {
54 rs_primitives2rbat::conv::sync::fs::fs2batch(
55 sch,
56 FsKvStore {
57 key2filename: key2filename_new(dirname, ext),
58 },
59 )
60}
61
62pub fn schema2fs2batch<P>(
63 basic_schema_filename: Option<P>,
64 dirname: PathBuf,
65 ext: String,
66) -> Result<RecordBatch, io::Error>
67where
68 P: AsRef<Path>,
69{
70 let sch: BasicSchema = match basic_schema_filename {
71 Some(filename) => filename2basic_schema(filename),
72 None => stdin2basic_schema(),
73 }?;
74 fs2batch(sch, dirname, ext)
75}
76
77pub fn batch2parquet<W>(
78 b: &RecordBatch,
79 wtr: W,
80 props: Option<WriterProperties>,
81) -> Result<W, io::Error>
82where
83 W: Write + Send,
84{
85 let mut pw = parquet::arrow::arrow_writer::ArrowWriter::try_new(wtr, b.schema(), props)?;
86 pw.write(b)?;
87 pw.into_inner().map_err(io::Error::other)
88}
89
90pub fn fsync_nop(_: &mut std::fs::File) -> Result<(), io::Error> {
91 Ok(())
92}
93
94pub fn fsync_dat(f: &mut std::fs::File) -> Result<(), io::Error> {
95 f.sync_data()
96}
97
98pub fn fsync_all(f: &mut std::fs::File) -> Result<(), io::Error> {
99 f.sync_all()
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Parser)]
103pub enum FsyncType {
104 Nop,
106
107 Dat,
109
110 All,
112}
113
114impl core::str::FromStr for FsyncType {
115 type Err = io::Error;
116
117 fn from_str(s: &str) -> Result<Self, Self::Err> {
118 match s {
119 "nop" | "nope" | "none" => Ok(Self::Nop),
120 "dat" | "data" | "fsync_data" => Ok(Self::Dat),
121 "all" | "fsync" => Ok(Self::All),
122 _ => Err(io::Error::other("invalid fsync type")),
123 }
124 }
125}
126
127pub fn typ2fsync(styp: FsyncType) -> fn(&mut std::fs::File) -> Result<(), io::Error> {
128 match styp {
129 FsyncType::Nop => fsync_nop,
130 FsyncType::Dat => fsync_dat,
131 FsyncType::All => fsync_all,
132 }
133}
134
135pub fn batch2file<P, F>(
136 b: &RecordBatch,
137 parquet_filename: P,
138 props: Option<WriterProperties>,
139 fsync: F,
140) -> Result<(), io::Error>
141where
142 P: AsRef<Path>,
143 F: Fn(&mut std::fs::File) -> Result<(), io::Error>,
144{
145 let f = std::fs::File::create(parquet_filename)?;
146 let bw = io::BufWriter::new(f);
147 let mut wrote = batch2parquet(b, bw, props)?;
148 wrote.flush()?;
149 let mut wfile = wrote.into_inner()?;
150 wfile.flush()?;
151 fsync(&mut wfile)?;
152 Ok(())
153}
154
155#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
156pub enum Compression {
157 UNCOMPRESSED,
158 SNAPPY,
159 #[allow(non_camel_case_types)]
160 LZ4_RAW,
161}
162
163impl From<Compression> for parquet::basic::Compression {
164 fn from(c: Compression) -> Self {
165 match c {
166 Compression::UNCOMPRESSED => Self::UNCOMPRESSED,
167 Compression::SNAPPY => Self::SNAPPY,
168 Compression::LZ4_RAW => Self::LZ4_RAW,
169 }
170 }
171}
172
173#[derive(Debug, Parser)]
174pub struct Args {
175 #[clap(short, long)]
177 pub basic_schema_filename: Option<String>,
178
179 #[clap(short, long)]
181 pub data_dir: String,
182
183 #[clap(short, long)]
185 pub data_ext: String,
186
187 #[clap(short, long)]
189 pub output_parquet_filename: String,
190
191 #[clap(short, long, value_enum)]
193 pub fsync_type: Option<FsyncType>,
194
195 #[clap(short, long, value_enum)]
197 pub compression: Option<Compression>,
198
199 #[clap(short, long, value_enum)]
201 pub encoding: Option<Encoding>,
202
203 #[clap(short, long)]
205 pub dictionary_enabled: Option<bool>,
206
207 #[clap(short, long)]
209 pub dictionary_page_size_limit: Option<usize>,
210
211 #[clap(short, long)]
213 pub max_row_group_size: Option<usize>,
214
215 #[clap(short, long)]
217 pub data_page_row_count_limit: Option<usize>,
218
219 #[clap(short, long)]
221 pub data_page_size_limit: Option<usize>,
222
223 #[clap(short, long)]
225 pub column_index_truncate_length: Option<usize>,
226
227 #[clap(short, long)]
229 pub write_page_header_statistics: Option<bool>,
230
231 #[clap(short, long)]
233 pub bloom_filter_enabled: Option<bool>,
234
235 #[clap(short, long)]
237 pub bloom_filter_fpp: Option<f64>,
238
239 #[clap(short, long)]
241 pub bloom_filter_ndv: Option<u64>,
242
243 #[clap(short, long, value_enum)]
245 pub statistics_enabled: Option<EnabledStatistics>,
246
247 #[clap(short, long, value_enum)]
249 pub writer_version: Option<WriterVersion>,
250
251 #[clap(short, long)]
253 pub coerce_types: Option<bool>,
254}
255
256impl Args {
257 pub fn to_props(&self) -> WriterProperties {
258 let mut bldr = WriterProperties::builder();
259 if let Some(v) = self.compression {
260 bldr = bldr.set_compression(v.into());
261 }
262 if let Some(v) = self.encoding {
263 bldr = bldr.set_encoding(v);
264 }
265 if let Some(v) = self.dictionary_enabled {
266 bldr = bldr.set_dictionary_enabled(v);
267 }
268 if let Some(v) = self.dictionary_page_size_limit {
269 bldr = bldr.set_dictionary_page_size_limit(v);
270 }
271 if let Some(v) = self.max_row_group_size {
272 bldr = bldr.set_max_row_group_size(v);
273 }
274 if let Some(v) = self.data_page_row_count_limit {
275 bldr = bldr.set_data_page_row_count_limit(v);
276 }
277 if let Some(v) = self.data_page_size_limit {
278 bldr = bldr.set_data_page_size_limit(v);
279 }
280 if let Some(v) = self.column_index_truncate_length {
281 bldr = bldr.set_column_index_truncate_length(Some(v));
282 }
283 if let Some(v) = self.write_page_header_statistics {
284 bldr = bldr.set_write_page_header_statistics(v);
285 }
286 if let Some(v) = self.bloom_filter_enabled {
287 bldr = bldr.set_bloom_filter_enabled(v);
288 }
289 if let Some(v) = self.bloom_filter_fpp {
290 bldr = bldr.set_bloom_filter_fpp(v);
291 }
292 if let Some(v) = self.bloom_filter_ndv {
293 bldr = bldr.set_bloom_filter_ndv(v);
294 }
295 if let Some(v) = self.statistics_enabled {
296 bldr = bldr.set_statistics_enabled(v);
297 }
298 if let Some(v) = self.writer_version {
299 bldr = bldr.set_writer_version(v);
300 }
301 if let Some(v) = self.coerce_types {
302 bldr = bldr.set_coerce_types(v);
303 }
304
305 bldr.build()
306 }
307}
308
309impl Args {
310 pub fn schema2fs2batch2parquet2file(self) -> Result<(), io::Error> {
311 let props = self.to_props();
312 let fsync = typ2fsync(self.fsync_type.unwrap_or(FsyncType::Nop));
313 let rb = schema2fs2batch(
314 self.basic_schema_filename.as_deref(),
315 self.data_dir.into(),
316 self.data_ext,
317 )?;
318 batch2file(&rb, self.output_parquet_filename, Some(props), fsync)
319 }
320}