rs_arrow_ipc_stream2jsonl/
lib.rs1#![forbid(clippy::unwrap_used)]
2#![forbid(clippy::expect_used)]
3
4use std::io;
5
6use io::BufReader;
7use io::Read;
8
9use io::BufWriter;
10use io::Write;
11
12use arrow::datatypes::SchemaRef;
13use arrow::record_batch::RecordBatch;
14
15use arrow::ipc::reader::StreamReader;
16
17use arrow::json::writer::LineDelimitedWriter;
18
19pub struct IpcStreamReader<R>(pub StreamReader<BufReader<R>>);
20
21impl<R: Read> IpcStreamReader<R> {
22 pub fn schema_ref(&self) -> SchemaRef {
23 self.0.schema()
24 }
25
26 pub fn into_batch_iter(self) -> impl Iterator<Item = Result<RecordBatch, io::Error>> {
27 self.0.map(|r| r.map_err(io::Error::other))
28 }
29}
30
31impl<R: Read> IpcStreamReader<R> {
32 pub fn to_json_writer<W>(self, mut wtr: JsonLinesWriter<W>) -> Result<(), io::Error>
33 where
34 W: Write,
35 {
36 let values = self.into_batch_iter();
37 wtr.write_all(values)?;
38 wtr.finish()
39 }
40
41 pub fn to_writer<W>(self, mut wtr: W) -> Result<(), io::Error>
42 where
43 W: Write,
44 {
45 let jwtr = JsonLinesWriter(LineDelimitedWriter::new(&mut wtr));
46 self.to_json_writer(jwtr)?;
47 wtr.flush()
48 }
49}
50
51impl<R: Read> IpcStreamReader<R> {
52 pub fn from_reader(rdr: R, projection: Option<Vec<usize>>) -> Result<Self, io::Error> {
53 let srdr: StreamReader<_> =
54 StreamReader::try_new_buffered(rdr, projection).map_err(io::Error::other)?;
55 Ok(Self(srdr))
56 }
57}
58
59pub struct JsonLinesWriter<W: Write>(pub LineDelimitedWriter<W>);
60
61impl<W: Write> JsonLinesWriter<W> {
62 pub fn write_batch(&mut self, b: &RecordBatch) -> Result<(), io::Error> {
63 self.0.write(b).map_err(io::Error::other)
64 }
65
66 pub fn finish(&mut self) -> Result<(), io::Error> {
67 self.0.finish().map_err(io::Error::other)
68 }
69}
70
71impl<W: Write> JsonLinesWriter<W> {
72 pub fn write_all<I>(&mut self, b: I) -> Result<(), io::Error>
73 where
74 I: Iterator<Item = Result<RecordBatch, io::Error>>,
75 {
76 for rbat in b {
77 let bat: RecordBatch = rbat?;
78 self.write_batch(&bat)?;
79 }
80 Ok(())
81 }
82}
83
84pub fn reader2ipc2jsons2writer<R, W>(
85 rdr: R,
86 wtr: W,
87 projection: Option<Vec<usize>>,
88) -> Result<(), io::Error>
89where
90 R: Read,
91 W: Write,
92{
93 let irdr = IpcStreamReader::from_reader(rdr, projection)?;
94 irdr.to_writer(wtr)
95}
96
97pub fn stdin2ipc2jsons2stdout(projection: Option<Vec<usize>>) -> Result<(), io::Error> {
98 let i = io::stdin().lock();
99 let mut o = io::stdout().lock();
100 let bw = BufWriter::new(&mut o);
101 reader2ipc2jsons2writer(i, bw, projection)?;
102 o.flush()
103}