rs_jsons2arrow_ipc_stream/
lib.rs1use arrow::error::Result;
2use arrow::ipc::writer::StreamWriter;
3use arrow::json::ReaderBuilder;
4use arrow::record_batch::RecordBatchReader;
5use std::io::{BufRead, Cursor, Read, Write};
6use std::sync::Arc;
7
8pub fn jsons_to_record_batch_reader<R: BufRead + 'static>(
9 mut reader: R,
10 num_lines: usize,
11) -> Result<impl RecordBatchReader> {
12 let br2 = &mut reader;
13 let lines = br2.lines();
14 let taken = lines.take(num_lines);
15
16 let mut jsonl: String = String::new();
18 for rline in taken {
19 let line: String = rline?;
20 jsonl.push_str(&line);
21 jsonl.push('\n');
22 }
23
24 let (schema, _) = arrow::json::reader::infer_json_schema(jsonl.as_bytes(), None)?;
26 let schema = Arc::new(schema);
27
28 let chained_reader = Cursor::new(jsonl.into_bytes()).chain(reader);
30
31 let builder = ReaderBuilder::new(schema);
33 let reader = builder.build(chained_reader)?;
34
35 Ok(Box::new(reader))
36}
37
38pub fn write_ipc_stream<W: Write>(writer: W, reader: impl RecordBatchReader) -> Result<()> {
39 let schema = reader.schema();
40 let mut writer = StreamWriter::try_new(writer, &schema)?;
41
42 for batch in reader {
43 writer.write(&batch?)?;
44 }
45
46 Ok(())
47}
48
49pub fn jsons2ipc<R: BufRead + 'static, W: Write>(
50 reader: R,
51 writer: W,
52 num_lines: usize,
53) -> Result<()> {
54 let reader = jsons_to_record_batch_reader(reader, num_lines)?;
55 write_ipc_stream(writer, reader)
56}