rs_csv2parquet/
lib.rs

1use std::io;
2use std::path::Path;
3
4use io::Read;
5use io::Seek;
6use io::Write;
7
8use arrow::datatypes::Schema;
9use arrow::datatypes::SchemaRef;
10
11use arrow::csv::reader::Format;
12use arrow::record_batch::RecordBatch;
13
14use arrow::csv::reader::ReaderBuilder;
15
16use parquet::arrow::ArrowWriter;
17
18pub fn batch2parquet<I, W>(batch: I, mut wtr: ArrowWriter<W>) -> Result<W, io::Error>
19where
20    W: Write + Send,
21    I: Iterator<Item = Result<RecordBatch, io::Error>>,
22{
23    for rb in batch {
24        let b: RecordBatch = rb?;
25        wtr.write(&b)?;
26    }
27    wtr.into_inner().map_err(io::Error::other)
28}
29
30pub fn batch2parquet_file<I, P, F>(
31    batch: I,
32    sch: SchemaRef,
33    out_pq_filename: P,
34    fsync: F,
35) -> Result<(), io::Error>
36where
37    P: AsRef<Path>,
38    I: Iterator<Item = Result<RecordBatch, io::Error>>,
39    F: Fn(&mut std::fs::File) -> Result<(), io::Error>,
40{
41    let f = std::fs::File::create(out_pq_filename)?;
42    let wtr = ArrowWriter::try_new(f, sch, None)?;
43    let mut f = batch2parquet(batch, wtr)?;
44    f.flush()?;
45    fsync(&mut f)
46}
47
48pub fn fmt2schema<R>(f: &Format, rdr: R, max_records: Option<usize>) -> Result<Schema, io::Error>
49where
50    R: Read,
51{
52    f.infer_schema(rdr, max_records)
53        .map(|t| t.0)
54        .map_err(io::Error::other)
55}
56
57pub fn schema2batch<R>(
58    sch: SchemaRef,
59    f: Format,
60    rdr: R,
61) -> Result<impl Iterator<Item = Result<RecordBatch, io::Error>>, io::Error>
62where
63    R: Read,
64{
65    ReaderBuilder::new(sch)
66        .with_format(f)
67        .build(rdr)
68        .map_err(io::Error::other)
69        .map(|i| i.map(|r| r.map_err(io::Error::other)))
70}
71
72pub fn filename2batch2parquet<P, F>(
73    fm: Format,
74    max_records: Option<usize>,
75    input_csv_filename: P,
76    output_parquet_filename: P,
77    fsync: F,
78) -> Result<(), io::Error>
79where
80    P: AsRef<Path>,
81    F: Fn(&mut std::fs::File) -> Result<(), io::Error>,
82{
83    let mut input_csv = std::fs::File::open(input_csv_filename)?;
84    let s: Schema = fmt2schema(&fm, &input_csv, max_records)?;
85    input_csv.rewind()?;
86    let sr: SchemaRef = s.into();
87    let batch = schema2batch(sr.clone(), fm, input_csv)?;
88
89    batch2parquet_file(batch, sr, output_parquet_filename, fsync)
90}
91
92pub fn fsync_nop(_: &mut std::fs::File) -> Result<(), io::Error> {
93    Ok(())
94}
95pub fn fsync_all(f: &mut std::fs::File) -> Result<(), io::Error> {
96    f.sync_all()
97}