rs_jsons2arrow_ipc_stream/
lib.rs

1use arrow::error::Result;
2use arrow::ipc::writer::StreamWriter;
3use arrow::json::ReaderBuilder;
4use arrow::record_batch::RecordBatchReader;
5use std::io::{BufRead, Cursor, Read, Write};
6use std::sync::Arc;
7
8pub fn jsons_to_record_batch_reader<R: BufRead + 'static>(
9    mut reader: R,
10    num_lines: usize,
11) -> Result<impl RecordBatchReader> {
12    let br2 = &mut reader;
13    let lines = br2.lines();
14    let taken = lines.take(num_lines);
15
16    // 1. Read a few lines for schema inference
17    let mut jsonl: String = String::new();
18    for rline in taken {
19        let line: String = rline?;
20        jsonl.push_str(&line);
21        jsonl.push('\n');
22    }
23
24    // 2. Infer the schema
25    let (schema, _) = arrow::json::reader::infer_json_schema(jsonl.as_bytes(), None)?;
26    let schema = Arc::new(schema);
27
28    // 3. Create a chained reader
29    let chained_reader = Cursor::new(jsonl.into_bytes()).chain(reader);
30
31    // 4. Create a JSON reader
32    let builder = ReaderBuilder::new(schema);
33    let reader = builder.build(chained_reader)?;
34
35    Ok(Box::new(reader))
36}
37
38pub fn write_ipc_stream<W: Write>(writer: W, reader: impl RecordBatchReader) -> Result<()> {
39    let schema = reader.schema();
40    let mut writer = StreamWriter::try_new(writer, &schema)?;
41
42    for batch in reader {
43        writer.write(&batch?)?;
44    }
45
46    Ok(())
47}
48
49pub fn jsons2ipc<R: BufRead + 'static, W: Write>(
50    reader: R,
51    writer: W,
52    num_lines: usize,
53) -> Result<()> {
54    let reader = jsons_to_record_batch_reader(reader, num_lines)?;
55    write_ipc_stream(writer, reader)
56}