Module datafusion::common::arrow::json::reader

source ·
Expand description

JSON reader

This JSON reader allows JSON records to be read into the Arrow memory model. Records are loaded in batches and are then converted from the record-oriented representation to the columnar arrow data model.

The reader ignores whitespace between JSON values, including \n and \r, allowing parsing of sequences of one or more arbitrarily formatted JSON values, including but not limited to newline-delimited JSON.

§Basic Usage

Reader can be used directly with synchronous data sources, such as std::fs::File


let schema = Arc::new(Schema::new(vec![
    Field::new("a", DataType::Float64, false),
    Field::new("b", DataType::Float64, false),
    Field::new("c", DataType::Boolean, true),
]));

let file = File::open("test/data/basic.json").unwrap();

let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
let batch = json.next().unwrap().unwrap();

§Async Usage

The lower-level Decoder can be integrated with various forms of async data streams, and is designed to be agnostic to the various different kinds of async IO primitives found within the Rust ecosystem.

For example, see below for how it can be used with an arbitrary Stream of Bytes

fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
    mut decoder: Decoder,
    mut input: S,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
    let mut buffered = Bytes::new();
    futures::stream::poll_fn(move |cx| {
        loop {
            if buffered.is_empty() {
                buffered = match ready!(input.poll_next_unpin(cx)) {
                    Some(b) => b,
                    None => break,
                };
            }
            let decoded = match decoder.decode(buffered.as_ref()) {
                Ok(decoded) => decoded,
                Err(e) => return Poll::Ready(Some(Err(e))),
            };
            let read = buffered.len();
            buffered.advance(decoded);
            if decoded != read {
                break
            }
        }

        Poll::Ready(decoder.flush().transpose())
    })
}

In a similar vein, it can also be used with tokio-based IO primitives

fn decode_stream<R: AsyncBufRead + Unpin>(
    mut decoder: Decoder,
    mut reader: R,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
    futures::stream::poll_fn(move |cx| {
        loop {
            let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
                Ok(b) if b.is_empty() => break,
                Ok(b) => b,
                Err(e) => return Poll::Ready(Some(Err(e.into()))),
            };
            let read = b.len();
            let decoded = match decoder.decode(b) {
                Ok(decoded) => decoded,
                Err(e) => return Poll::Ready(Some(Err(e))),
            };
            Pin::new(&mut reader).consume(decoded);
            if decoded != read {
                break;
            }
        }

        Poll::Ready(decoder.flush().transpose())
    })
}

Structs§

Functions§

  • Infer the fields of a JSON file by reading the first n records of the buffer, with max_read_records controlling the maximum number of records to read.
  • Infer the fields of a JSON file by reading all items from the JSON Value Iterator.
  • Infer the fields of a JSON file by reading the first n records of the file, with max_read_records controlling the maximum number of records to read.