rs_arrow_ipc_stream2jsonl/
lib.rs

1#![forbid(clippy::unwrap_used)]
2#![forbid(clippy::expect_used)]
3
4use std::io;
5
6use io::BufReader;
7use io::Read;
8
9use io::BufWriter;
10use io::Write;
11
12use arrow::datatypes::SchemaRef;
13use arrow::record_batch::RecordBatch;
14
15use arrow::ipc::reader::StreamReader;
16
17use arrow::json::writer::LineDelimitedWriter;
18
19pub struct IpcStreamReader<R>(pub StreamReader<BufReader<R>>);
20
21impl<R: Read> IpcStreamReader<R> {
22    pub fn schema_ref(&self) -> SchemaRef {
23        self.0.schema()
24    }
25
26    pub fn into_batch_iter(self) -> impl Iterator<Item = Result<RecordBatch, io::Error>> {
27        self.0.map(|r| r.map_err(io::Error::other))
28    }
29}
30
31impl<R: Read> IpcStreamReader<R> {
32    pub fn to_json_writer<W>(self, mut wtr: JsonLinesWriter<W>) -> Result<(), io::Error>
33    where
34        W: Write,
35    {
36        let values = self.into_batch_iter();
37        wtr.write_all(values)?;
38        wtr.finish()
39    }
40
41    pub fn to_writer<W>(self, mut wtr: W) -> Result<(), io::Error>
42    where
43        W: Write,
44    {
45        let jwtr = JsonLinesWriter(LineDelimitedWriter::new(&mut wtr));
46        self.to_json_writer(jwtr)?;
47        wtr.flush()
48    }
49}
50
51impl<R: Read> IpcStreamReader<R> {
52    pub fn from_reader(rdr: R, projection: Option<Vec<usize>>) -> Result<Self, io::Error> {
53        let srdr: StreamReader<_> =
54            StreamReader::try_new_buffered(rdr, projection).map_err(io::Error::other)?;
55        Ok(Self(srdr))
56    }
57}
58
59pub struct JsonLinesWriter<W: Write>(pub LineDelimitedWriter<W>);
60
61impl<W: Write> JsonLinesWriter<W> {
62    pub fn write_batch(&mut self, b: &RecordBatch) -> Result<(), io::Error> {
63        self.0.write(b).map_err(io::Error::other)
64    }
65
66    pub fn finish(&mut self) -> Result<(), io::Error> {
67        self.0.finish().map_err(io::Error::other)
68    }
69}
70
71impl<W: Write> JsonLinesWriter<W> {
72    pub fn write_all<I>(&mut self, b: I) -> Result<(), io::Error>
73    where
74        I: Iterator<Item = Result<RecordBatch, io::Error>>,
75    {
76        for rbat in b {
77            let bat: RecordBatch = rbat?;
78            self.write_batch(&bat)?;
79        }
80        Ok(())
81    }
82}
83
84pub fn reader2ipc2jsons2writer<R, W>(
85    rdr: R,
86    wtr: W,
87    projection: Option<Vec<usize>>,
88) -> Result<(), io::Error>
89where
90    R: Read,
91    W: Write,
92{
93    let irdr = IpcStreamReader::from_reader(rdr, projection)?;
94    irdr.to_writer(wtr)
95}
96
97pub fn stdin2ipc2jsons2stdout(projection: Option<Vec<usize>>) -> Result<(), io::Error> {
98    let i = io::stdin().lock();
99    let mut o = io::stdout().lock();
100    let bw = BufWriter::new(&mut o);
101    reader2ipc2jsons2writer(i, bw, projection)?;
102    o.flush()
103}