1use std::io;
2use std::path::Path;
3
4use io::Read;
5use io::Seek;
6use io::Write;
7
8use arrow::datatypes::Schema;
9use arrow::datatypes::SchemaRef;
10
11use arrow::csv::reader::Format;
12use arrow::record_batch::RecordBatch;
13
14use arrow::csv::reader::ReaderBuilder;
15
16use parquet::arrow::ArrowWriter;
17
18pub fn batch2parquet<I, W>(batch: I, mut wtr: ArrowWriter<W>) -> Result<W, io::Error>
19where
20 W: Write + Send,
21 I: Iterator<Item = Result<RecordBatch, io::Error>>,
22{
23 for rb in batch {
24 let b: RecordBatch = rb?;
25 wtr.write(&b)?;
26 }
27 wtr.into_inner().map_err(io::Error::other)
28}
29
30pub fn batch2parquet_file<I, P, F>(
31 batch: I,
32 sch: SchemaRef,
33 out_pq_filename: P,
34 fsync: F,
35) -> Result<(), io::Error>
36where
37 P: AsRef<Path>,
38 I: Iterator<Item = Result<RecordBatch, io::Error>>,
39 F: Fn(&mut std::fs::File) -> Result<(), io::Error>,
40{
41 let f = std::fs::File::create(out_pq_filename)?;
42 let wtr = ArrowWriter::try_new(f, sch, None)?;
43 let mut f = batch2parquet(batch, wtr)?;
44 f.flush()?;
45 fsync(&mut f)
46}
47
48pub fn fmt2schema<R>(f: &Format, rdr: R, max_records: Option<usize>) -> Result<Schema, io::Error>
49where
50 R: Read,
51{
52 f.infer_schema(rdr, max_records)
53 .map(|t| t.0)
54 .map_err(io::Error::other)
55}
56
57pub fn schema2batch<R>(
58 sch: SchemaRef,
59 f: Format,
60 rdr: R,
61) -> Result<impl Iterator<Item = Result<RecordBatch, io::Error>>, io::Error>
62where
63 R: Read,
64{
65 ReaderBuilder::new(sch)
66 .with_format(f)
67 .build(rdr)
68 .map_err(io::Error::other)
69 .map(|i| i.map(|r| r.map_err(io::Error::other)))
70}
71
72pub fn filename2batch2parquet<P, F>(
73 fm: Format,
74 max_records: Option<usize>,
75 input_csv_filename: P,
76 output_parquet_filename: P,
77 fsync: F,
78) -> Result<(), io::Error>
79where
80 P: AsRef<Path>,
81 F: Fn(&mut std::fs::File) -> Result<(), io::Error>,
82{
83 let mut input_csv = std::fs::File::open(input_csv_filename)?;
84 let s: Schema = fmt2schema(&fm, &input_csv, max_records)?;
85 input_csv.rewind()?;
86 let sr: SchemaRef = s.into();
87 let batch = schema2batch(sr.clone(), fm, input_csv)?;
88
89 batch2parquet_file(batch, sr, output_parquet_filename, fsync)
90}
91
92pub fn fsync_nop(_: &mut std::fs::File) -> Result<(), io::Error> {
93 Ok(())
94}
95pub fn fsync_all(f: &mut std::fs::File) -> Result<(), io::Error> {
96 f.sync_all()
97}