mod encoder;
use std::{fmt::Debug, io::Write, sync::Arc};
use crate::StructMode;
use arrow_array::*;
use arrow_schema::*;
pub use encoder::{Encoder, EncoderFactory, EncoderOptions, NullableEncoder, make_encoder};
pub trait JsonFormat: Debug + Default {
#[inline]
fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
Ok(())
}
#[inline]
fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<(), ArrowError> {
Ok(())
}
#[inline]
fn end_row<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
Ok(())
}
fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
Ok(())
}
}
#[derive(Debug, Default)]
pub struct LineDelimited {}
impl JsonFormat for LineDelimited {
fn end_row<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
writer.write_all(b"\n")?;
Ok(())
}
}
#[derive(Debug, Default)]
pub struct JsonArray {}
impl JsonFormat for JsonArray {
fn start_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
writer.write_all(b"[")?;
Ok(())
}
fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<(), ArrowError> {
if !is_first_row {
writer.write_all(b",")?;
}
Ok(())
}
fn end_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
writer.write_all(b"]")?;
Ok(())
}
}
pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
pub type ArrayWriter<W> = Writer<W, JsonArray>;
#[derive(Debug, Clone, Default)]
pub struct WriterBuilder(EncoderOptions);
impl WriterBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn explicit_nulls(&self) -> bool {
self.0.explicit_nulls()
}
pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
self.0 = self.0.with_explicit_nulls(explicit_nulls);
self
}
pub fn struct_mode(&self) -> StructMode {
self.0.struct_mode()
}
pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
self.0 = self.0.with_struct_mode(struct_mode);
self
}
pub fn with_encoder_factory(mut self, factory: Arc<dyn EncoderFactory>) -> Self {
self.0 = self.0.with_encoder_factory(factory);
self
}
pub fn with_date_format(mut self, format: String) -> Self {
self.0 = self.0.with_date_format(format);
self
}
pub fn with_datetime_format(mut self, format: String) -> Self {
self.0 = self.0.with_datetime_format(format);
self
}
pub fn with_time_format(mut self, format: String) -> Self {
self.0 = self.0.with_time_format(format);
self
}
pub fn with_timestamp_format(mut self, format: String) -> Self {
self.0 = self.0.with_timestamp_format(format);
self
}
pub fn with_timestamp_tz_format(mut self, tz_format: String) -> Self {
self.0 = self.0.with_timestamp_tz_format(tz_format);
self
}
pub fn build<W, F>(self, writer: W) -> Writer<W, F>
where
W: Write,
F: JsonFormat,
{
Writer {
writer,
started: false,
finished: false,
format: F::default(),
options: self.0,
}
}
}
#[derive(Debug)]
pub struct Writer<W, F>
where
W: Write,
F: JsonFormat,
{
writer: W,
started: bool,
finished: bool,
format: F,
options: EncoderOptions,
}
impl<W, F> Writer<W, F>
where
W: Write,
F: JsonFormat,
{
pub fn new(writer: W) -> Self {
Self {
writer,
started: false,
finished: false,
format: F::default(),
options: EncoderOptions::default(),
}
}
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
if batch.num_rows() == 0 {
return Ok(());
}
let mut buffer = Vec::with_capacity(16 * 1024);
let mut is_first_row = !self.started;
if !self.started {
self.format.start_stream(&mut buffer)?;
self.started = true;
}
let array = StructArray::from(batch.clone());
let field = Arc::new(Field::new_struct(
"",
batch.schema().fields().clone(),
false,
));
let mut encoder = make_encoder(&field, &array, &self.options)?;
assert!(!encoder.has_nulls(), "root cannot be nullable");
for idx in 0..batch.num_rows() {
self.format.start_row(&mut buffer, is_first_row)?;
is_first_row = false;
encoder.encode(idx, &mut buffer);
if buffer.len() > 8 * 1024 {
self.writer.write_all(&buffer)?;
buffer.clear();
}
self.format.end_row(&mut buffer)?;
}
if !buffer.is_empty() {
self.writer.write_all(&buffer)?;
}
Ok(())
}
pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
for b in batches {
self.write(b)?;
}
Ok(())
}
pub fn finish(&mut self) -> Result<(), ArrowError> {
if !self.started {
self.format.start_stream(&mut self.writer)?;
self.started = true;
}
if !self.finished {
self.format.end_stream(&mut self.writer)?;
self.finished = true;
}
Ok(())
}
pub fn get_ref(&self) -> &W {
&self.writer
}
pub fn get_mut(&mut self) -> &mut W {
&mut self.writer
}
pub fn into_inner(self) -> W {
self.writer
}
}
impl<W, F> RecordBatchWriter for Writer<W, F>
where
W: Write,
F: JsonFormat,
{
fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
self.write(batch)
}
fn close(mut self) -> Result<(), ArrowError> {
self.finish()
}
}
#[cfg(test)]
mod tests {
use core::str;
use std::collections::HashMap;
use std::fs::{File, read_to_string};
use std::io::{BufReader, Seek};
use std::sync::Arc;
use arrow_array::cast::AsArray;
use serde_json::{Value, json};
use super::LineDelimited;
use super::{Encoder, WriterBuilder};
use arrow_array::builder::*;
use arrow_array::types::*;
use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer, ToByteSlice, i256};
use arrow_data::ArrayData;
use crate::reader::*;
use super::*;
fn assert_json_eq(input: &[u8], expected: &str) {
let expected: Vec<Option<Value>> = expected
.split('\n')
.map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
.collect();
let actual: Vec<Option<Value>> = input
.split(|b| *b == b'\n')
.map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap()))
.collect();
assert_eq!(actual, expected);
}
#[test]
fn write_simple_rows() {
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Utf8, true),
]);
let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"c1":1,"c2":"a"}
{"c1":2,"c2":"b"}
{"c1":3,"c2":"c"}
{"c2":"d"}
{"c1":5}
"#,
);
}
#[test]
fn write_large_utf8_and_utf8_view() {
let schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::LargeUtf8, true),
Field::new("c3", DataType::Utf8View, true),
]);
let a = StringArray::from(vec![Some("a"), None, Some("c"), Some("d"), None]);
let b = LargeStringArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
let c = StringViewArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b), Arc::new(c)],
)
.unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"c1":"a","c2":"a","c3":"a"}
{"c2":"b","c3":"b"}
{"c1":"c"}
{"c1":"d","c2":"d","c3":"d"}
{}
"#,
);
}
#[test]
fn write_dictionary() {
let schema = Schema::new(vec![
Field::new_dictionary("c1", DataType::Int32, DataType::Utf8, true),
Field::new_dictionary("c2", DataType::Int8, DataType::Utf8, true),
]);
let a: DictionaryArray<Int32Type> = vec![
Some("cupcakes"),
Some("foo"),
Some("foo"),
None,
Some("cupcakes"),
]
.into_iter()
.collect();
let b: DictionaryArray<Int8Type> =
vec![Some("sdsd"), Some("sdsd"), None, Some("sd"), Some("sdsd")]
.into_iter()
.collect();
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"c1":"cupcakes","c2":"sdsd"}
{"c1":"foo","c2":"sdsd"}
{"c1":"foo"}
{"c2":"sd"}
{"c1":"cupcakes","c2":"sdsd"}
"#,
);
}
#[test]
fn write_list_of_dictionary() {
let dict_field = Arc::new(Field::new_dictionary(
"item",
DataType::Int32,
DataType::Utf8,
true,
));
let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
let dict_array: DictionaryArray<Int32Type> =
vec![Some("a"), Some("b"), Some("c"), Some("a"), None, Some("c")]
.into_iter()
.collect();
let list_array = LargeListArray::try_new(
dict_field,
OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
Arc::new(dict_array),
Some(NullBuffer::from_iter([true, true, false, true])),
)
.unwrap();
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"l":["a","b","c"]}
{"l":["a",null]}
{}
{"l":["c"]}
"#,
);
}
#[test]
fn write_list_of_dictionary_large_values() {
let dict_field = Arc::new(Field::new_dictionary(
"item",
DataType::Int32,
DataType::LargeUtf8,
true,
));
let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
let keys = PrimitiveArray::<Int32Type>::from(vec![
Some(0),
Some(1),
Some(2),
Some(0),
None,
Some(2),
]);
let values = LargeStringArray::from(vec!["a", "b", "c"]);
let dict_array = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
let list_array = LargeListArray::try_new(
dict_field,
OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
Arc::new(dict_array),
Some(NullBuffer::from_iter([true, true, false, true])),
)
.unwrap();
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"l":["a","b","c"]}
{"l":["a",null]}
{}
{"l":["c"]}
"#,
);
}
#[test]
fn write_timestamps() {
let ts_string = "2018-11-13T17:11:10.011375885995";
let ts_nanos = ts_string
.parse::<chrono::NaiveDateTime>()
.unwrap()
.and_utc()
.timestamp_nanos_opt()
.unwrap();
let ts_micros = ts_nanos / 1000;
let ts_millis = ts_micros / 1000;
let ts_secs = ts_millis / 1000;
let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
let schema = Schema::new(vec![
Field::new("nanos", arr_nanos.data_type().clone(), true),
Field::new("micros", arr_micros.data_type().clone(), true),
Field::new("millis", arr_millis.data_type().clone(), true),
Field::new("secs", arr_secs.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(arr_nanos),
Arc::new(arr_micros),
Arc::new(arr_millis),
Arc::new(arr_secs),
Arc::new(arr_names),
],
)
.unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"micros":"2018-11-13T17:11:10.011375","millis":"2018-11-13T17:11:10.011","name":"a","nanos":"2018-11-13T17:11:10.011375885","secs":"2018-11-13T17:11:10"}
{"name":"b"}
"#,
);
let mut buf = Vec::new();
{
let mut writer = WriterBuilder::new()
.with_timestamp_format("%m-%d-%Y".to_string())
.build::<_, LineDelimited>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"nanos":"11-13-2018","micros":"11-13-2018","millis":"11-13-2018","secs":"11-13-2018","name":"a"}
{"name":"b"}
"#,
);
}
#[test]
fn write_timestamps_with_tz() {
let ts_string = "2018-11-13T17:11:10.011375885995";
let ts_nanos = ts_string
.parse::<chrono::NaiveDateTime>()
.unwrap()
.and_utc()
.timestamp_nanos_opt()
.unwrap();
let ts_micros = ts_nanos / 1000;
let ts_millis = ts_micros / 1000;
let ts_secs = ts_millis / 1000;
let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
let tz = "+00:00";
let arr_nanos = arr_nanos.with_timezone(tz);
let arr_micros = arr_micros.with_timezone(tz);
let arr_millis = arr_millis.with_timezone(tz);
let arr_secs = arr_secs.with_timezone(tz);
let schema = Schema::new(vec![
Field::new("nanos", arr_nanos.data_type().clone(), true),
Field::new("micros", arr_micros.data_type().clone(), true),
Field::new("millis", arr_millis.data_type().clone(), true),
Field::new("secs", arr_secs.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(arr_nanos),
Arc::new(arr_micros),
Arc::new(arr_millis),
Arc::new(arr_secs),
Arc::new(arr_names),
],
)
.unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"micros":"2018-11-13T17:11:10.011375Z","millis":"2018-11-13T17:11:10.011Z","name":"a","nanos":"2018-11-13T17:11:10.011375885Z","secs":"2018-11-13T17:11:10Z"}
{"name":"b"}
"#,
);
let mut buf = Vec::new();
{
let mut writer = WriterBuilder::new()
.with_timestamp_tz_format("%m-%d-%Y %Z".to_string())
.build::<_, LineDelimited>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"nanos":"11-13-2018 +00:00","micros":"11-13-2018 +00:00","millis":"11-13-2018 +00:00","secs":"11-13-2018 +00:00","name":"a"}
{"name":"b"}
"#,
);
}
#[test]
fn write_dates() {
let ts_string = "2018-11-13T17:11:10.011375885995";
let ts_millis = ts_string
.parse::<chrono::NaiveDateTime>()
.unwrap()
.and_utc()
.timestamp_millis();
let arr_date32 = Date32Array::from(vec![
Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()),
None,
]);
let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]);
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
let schema = Schema::new(vec![
Field::new("date32", arr_date32.data_type().clone(), true),
Field::new("date64", arr_date64.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), false),
]);
let schema = Arc::new(schema);
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(arr_date32),
Arc::new(arr_date64),
Arc::new(arr_names),
],
)
.unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"date32":"2018-11-13","date64":"2018-11-13T17:11:10.011","name":"a"}
{"name":"b"}
"#,
);
let mut buf = Vec::new();
{
let mut writer = WriterBuilder::new()
.with_date_format("%m-%d-%Y".to_string())
.with_datetime_format("%m-%d-%Y %Mmin %Ssec %Hhour".to_string())
.build::<_, LineDelimited>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"date32":"11-13-2018","date64":"11-13-2018 11min 10sec 17hour","name":"a"}
{"name":"b"}
"#,
);
}
#[test]
fn write_times() {
let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]);
let arr_time32msec = Time32MillisecondArray::from(vec![Some(120), None]);
let arr_time64usec = Time64MicrosecondArray::from(vec![Some(120), None]);
let arr_time64nsec = Time64NanosecondArray::from(vec![Some(120), None]);
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
let schema = Schema::new(vec![
Field::new("time32sec", arr_time32sec.data_type().clone(), true),
Field::new("time32msec", arr_time32msec.data_type().clone(), true),
Field::new("time64usec", arr_time64usec.data_type().clone(), true),
Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(arr_time32sec),
Arc::new(arr_time32msec),
Arc::new(arr_time64usec),
Arc::new(arr_time64nsec),
Arc::new(arr_names),
],
)
.unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"time32sec":"00:02:00","time32msec":"00:00:00.120","time64usec":"00:00:00.000120","time64nsec":"00:00:00.000000120","name":"a"}
{"name":"b"}
"#,
);
let mut buf = Vec::new();
{
let mut writer = WriterBuilder::new()
.with_time_format("%H-%M-%S %f".to_string())
.build::<_, LineDelimited>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"time32sec":"00-02-00 000000000","time32msec":"00-00-00 120000000","time64usec":"00-00-00 000120000","time64nsec":"00-00-00 000000120","name":"a"}
{"name":"b"}
"#,
);
}
#[test]
fn write_durations() {
let arr_durationsec = DurationSecondArray::from(vec![Some(120), None]);
let arr_durationmsec = DurationMillisecondArray::from(vec![Some(120), None]);
let arr_durationusec = DurationMicrosecondArray::from(vec![Some(120), None]);
let arr_durationnsec = DurationNanosecondArray::from(vec![Some(120), None]);
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
let schema = Schema::new(vec![
Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(arr_durationsec),
Arc::new(arr_durationmsec),
Arc::new(arr_durationusec),
Arc::new(arr_durationnsec),
Arc::new(arr_names),
],
)
.unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"duration_sec":"PT120S","duration_msec":"PT0.12S","duration_usec":"PT0.00012S","duration_nsec":"PT0.00000012S","name":"a"}
{"name":"b"}
"#,
);
}
#[test]
fn write_nested_structs() {
let schema = Schema::new(vec![
Field::new(
"c1",
DataType::Struct(Fields::from(vec![
Field::new("c11", DataType::Int32, true),
Field::new(
"c12",
DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
false,
),
])),
false,
),
Field::new("c2", DataType::Utf8, false),
]);
let c1 = StructArray::from(vec![
(
Arc::new(Field::new("c11", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
),
(
Arc::new(Field::new(
"c12",
DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
false,
)),
Arc::new(StructArray::from(vec![(
Arc::new(Field::new("c121", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
)])) as ArrayRef,
),
]);
let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"}
{"c1":{"c12":{"c121":"f"}},"c2":"b"}
{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}
"#,
);
}
#[test]
fn write_struct_with_list_field() {
let field_c1 = Field::new(
"c1",
DataType::List(Arc::new(Field::new("c_list", DataType::Utf8, false))),
false,
);
let field_c2 = Field::new("c2", DataType::Int32, false);
let schema = Schema::new(vec![field_c1.clone(), field_c2]);
let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]);
let a_value_offsets = Buffer::from([0, 2, 3, 4, 5, 6].to_byte_slice());
let a_list_data = ArrayData::builder(field_c1.data_type().clone())
.len(5)
.add_buffer(a_value_offsets)
.add_child_data(a_values.into_data())
.null_bit_buffer(Some(Buffer::from([0b00011111])))
.build()
.unwrap();
let a = ListArray::from(a_list_data);
let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"c1":["a","a1"],"c2":1}
{"c1":["b"],"c2":2}
{"c1":["c"],"c2":3}
{"c1":["d"],"c2":4}
{"c1":["e"],"c2":5}
"#,
);
}
#[test]
fn write_nested_list() {
let list_inner_type = Field::new(
"a",
DataType::List(Arc::new(Field::new("b", DataType::Int32, false))),
false,
);
let field_c1 = Field::new(
"c1",
DataType::List(Arc::new(list_inner_type.clone())),
false,
);
let field_c2 = Field::new("c2", DataType::Utf8, true);
let schema = Schema::new(vec![field_c1.clone(), field_c2]);
let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
let a_value_offsets = Buffer::from([0, 2, 3, 6].to_byte_slice());
let a_list_data = ArrayData::builder(list_inner_type.data_type().clone())
.len(3)
.add_buffer(a_value_offsets)
.null_bit_buffer(Some(Buffer::from([0b00000111])))
.add_child_data(a_values.into_data())
.build()
.unwrap();
let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
.len(3)
.add_buffer(c1_value_offsets)
.add_child_data(a_list_data)
.build()
.unwrap();
let c1 = ListArray::from(c1_list_data);
let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]);
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"c1":[[1,2],[3]],"c2":"foo"}
{"c1":[],"c2":"bar"}
{"c1":[[4,5,6]]}
"#,
);
}
#[test]
fn write_list_of_struct() {
let field_c1 = Field::new(
"c1",
DataType::List(Arc::new(Field::new(
"s",
DataType::Struct(Fields::from(vec![
Field::new("c11", DataType::Int32, true),
Field::new(
"c12",
DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
false,
),
])),
false,
))),
true,
);
let field_c2 = Field::new("c2", DataType::Int32, false);
let schema = Schema::new(vec![field_c1.clone(), field_c2]);
let struct_values = StructArray::from(vec![
(
Arc::new(Field::new("c11", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
),
(
Arc::new(Field::new(
"c12",
DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
false,
)),
Arc::new(StructArray::from(vec![(
Arc::new(Field::new("c121", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
)])) as ArrayRef,
),
]);
let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
.len(3)
.add_buffer(c1_value_offsets)
.add_child_data(struct_values.into_data())
.null_bit_buffer(Some(Buffer::from([0b00000101])))
.build()
.unwrap();
let c1 = ListArray::from(c1_list_data);
let c2 = Int32Array::from(vec![1, 2, 3]);
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1}
{"c2":2}
{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3}
"#,
);
}
fn test_write_for_file(test_file: &str, remove_nulls: bool) {
let file = File::open(test_file).unwrap();
let mut reader = BufReader::new(file);
let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
reader.rewind().unwrap();
let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
let mut reader = builder.build(reader).unwrap();
let batch = reader.next().unwrap().unwrap();
let mut buf = Vec::new();
{
if remove_nulls {
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
} else {
let mut writer = WriterBuilder::new()
.with_explicit_nulls(true)
.build::<_, LineDelimited>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
}
let result = str::from_utf8(&buf).unwrap();
let expected = read_to_string(test_file).unwrap();
for (r, e) in result.lines().zip(expected.lines()) {
let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
if remove_nulls {
if let Value::Object(obj) = expected_json {
expected_json =
Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
}
}
assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
}
}
#[test]
fn write_basic_rows() {
test_write_for_file("test/data/basic.json", true);
}
#[test]
fn write_arrays() {
test_write_for_file("test/data/arrays.json", true);
}
#[test]
fn write_basic_nulls() {
test_write_for_file("test/data/basic_nulls.json", true);
}
#[test]
fn write_nested_with_nulls() {
test_write_for_file("test/data/nested_with_nulls.json", false);
}
#[test]
fn json_line_writer_empty() {
let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
writer.finish().unwrap();
assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
}
#[test]
fn json_array_writer_empty() {
let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
writer.finish().unwrap();
assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
}
#[test]
fn json_line_writer_empty_batch() {
let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
let array = Int32Array::from(Vec::<i32>::new());
let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
}
#[test]
fn json_array_writer_empty_batch() {
let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
let array = Int32Array::from(Vec::<i32>::new());
let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
}
#[test]
fn json_struct_array_nulls() {
let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2)]),
Some(vec![None]),
Some(vec![]),
Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
None, None,
]);
let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
let array = Arc::new(inner) as ArrayRef;
let struct_array_a = StructArray::from((
vec![(field.clone(), array.clone())],
Buffer::from([0b01010111]),
));
let struct_array_b = StructArray::from(vec![(field, array)]);
let schema = Schema::new(vec![
Field::new_struct("a", struct_array_a.fields().clone(), true),
Field::new_struct("b", struct_array_b.fields().clone(), true),
]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(struct_array_a), Arc::new(struct_array_b)],
)
.unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
{"a":{"list":[null]},"b":{"list":[null]}}
{"a":{"list":[]},"b":{"list":[]}}
{"b":{"list":[3,null]}}
{"a":{"list":[4,5]},"b":{"list":[4,5]}}
{"b":{}}
{"a":{},"b":{}}
"#,
);
}
fn run_json_writer_map_with_keys(keys_array: ArrayRef) {
let values_array = super::Int64Array::from(vec![10, 20, 30, 40, 50]);
let keys_field = Arc::new(Field::new("keys", keys_array.data_type().clone(), false));
let values_field = Arc::new(Field::new("values", DataType::Int64, false));
let entry_struct = StructArray::from(vec![
(keys_field, keys_array.clone()),
(values_field, Arc::new(values_array) as ArrayRef),
]);
let map_data_type = DataType::Map(
Arc::new(Field::new(
"entries",
entry_struct.data_type().clone(),
false,
)),
false,
);
let entry_offsets = Buffer::from([0, 1, 1, 1, 4, 5, 5].to_byte_slice());
let valid_buffer = Buffer::from([0b00111101]);
let map_data = ArrayData::builder(map_data_type.clone())
.len(6)
.null_bit_buffer(Some(valid_buffer))
.add_buffer(entry_offsets)
.add_child_data(entry_struct.into_data())
.build()
.unwrap();
let map = MapArray::from(map_data);
let map_field = Field::new("map", map_data_type, true);
let schema = Arc::new(Schema::new(vec![map_field]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"map":{"foo":10}}
{}
{"map":{}}
{"map":{"bar":20,"baz":30,"qux":40}}
{"map":{"quux":50}}
{"map":{}}
"#,
);
}
#[test]
fn json_writer_map() {
let keys_utf8 = super::StringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
run_json_writer_map_with_keys(Arc::new(keys_utf8) as ArrayRef);
let keys_large = super::LargeStringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
run_json_writer_map_with_keys(Arc::new(keys_large) as ArrayRef);
let keys_view = super::StringViewArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
run_json_writer_map_with_keys(Arc::new(keys_view) as ArrayRef);
}
#[test]
fn test_write_single_batch() {
let test_file = "test/data/basic.json";
let file = File::open(test_file).unwrap();
let mut reader = BufReader::new(file);
let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
reader.rewind().unwrap();
let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
let mut reader = builder.build(reader).unwrap();
let batch = reader.next().unwrap().unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write(&batch).unwrap();
}
let result = str::from_utf8(&buf).unwrap();
let expected = read_to_string(test_file).unwrap();
for (r, e) in result.lines().zip(expected.lines()) {
let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
if let Value::Object(obj) = expected_json {
expected_json =
Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
}
assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
}
}
#[test]
fn test_write_multi_batches() {
let test_file = "test/data/basic.json";
let schema = SchemaRef::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Float64, true),
Field::new("c", DataType::Boolean, true),
Field::new("d", DataType::Utf8, true),
Field::new("e", DataType::Utf8, true),
Field::new("f", DataType::Utf8, true),
Field::new("g", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("h", DataType::Float16, true),
]));
let mut reader = ReaderBuilder::new(schema.clone())
.build(BufReader::new(File::open(test_file).unwrap()))
.unwrap();
let batch = reader.next().unwrap().unwrap();
let batches = [&RecordBatch::new_empty(schema), &batch, &batch];
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&batches).unwrap();
}
let result = str::from_utf8(&buf).unwrap();
let expected = read_to_string(test_file).unwrap();
let expected = format!("{expected}\n{expected}");
for (r, e) in result.lines().zip(expected.lines()) {
let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
if let Value::Object(obj) = expected_json {
expected_json =
Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
}
assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
}
}
#[test]
fn test_writer_explicit_nulls() -> Result<(), ArrowError> {
fn nested_list() -> (Arc<ListArray>, Arc<Field>) {
let array = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![None, None, None]),
Some(vec![Some(1), Some(2), Some(3)]),
None,
Some(vec![None, None, None]),
]));
let field = Arc::new(Field::new("list", array.data_type().clone(), true));
(array, field)
}
fn nested_dict() -> (Arc<DictionaryArray<Int32Type>>, Arc<Field>) {
let array = Arc::new(DictionaryArray::from_iter(vec![
Some("cupcakes"),
None,
Some("bear"),
Some("kuma"),
]));
let field = Arc::new(Field::new("dict", array.data_type().clone(), true));
(array, field)
}
fn nested_map() -> (Arc<MapArray>, Arc<Field>) {
let string_builder = StringBuilder::new();
let int_builder = Int64Builder::new();
let mut builder = MapBuilder::new(None, string_builder, int_builder);
builder.keys().append_value("foo");
builder.values().append_value(10);
builder.append(true).unwrap();
builder.append(false).unwrap();
builder.append(true).unwrap();
builder.keys().append_value("bar");
builder.values().append_value(20);
builder.keys().append_value("baz");
builder.values().append_value(30);
builder.keys().append_value("qux");
builder.values().append_value(40);
builder.append(true).unwrap();
let array = Arc::new(builder.finish());
let field = Arc::new(Field::new("map", array.data_type().clone(), true));
(array, field)
}
fn root_list() -> (Arc<ListArray>, Field) {
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("utf8", DataType::Utf8, true)),
Arc::new(StringArray::from(vec![Some("a"), Some("b"), None, None])) as ArrayRef,
),
(
Arc::new(Field::new("int32", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![Some(1), None, Some(5), None])) as ArrayRef,
),
]);
let field = Field::new_list(
"list",
Field::new("struct", struct_array.data_type().clone(), true),
true,
);
let entry_offsets = Buffer::from([0, 2, 2, 3, 3].to_byte_slice());
let data = ArrayData::builder(field.data_type().clone())
.len(4)
.add_buffer(entry_offsets)
.add_child_data(struct_array.into_data())
.null_bit_buffer(Some([0b00000101].into()))
.build()
.unwrap();
let array = Arc::new(ListArray::from(data));
(array, field)
}
let (nested_list_array, nested_list_field) = nested_list();
let (nested_dict_array, nested_dict_field) = nested_dict();
let (nested_map_array, nested_map_field) = nested_map();
let (root_list_array, root_list_field) = root_list();
let schema = Schema::new(vec![
Field::new("date", DataType::Date32, true),
Field::new("null", DataType::Null, true),
Field::new_struct(
"struct",
vec![
Arc::new(Field::new("utf8", DataType::Utf8, true)),
nested_list_field.clone(),
nested_dict_field.clone(),
nested_map_field.clone(),
],
true,
),
root_list_field,
]);
let arr_date32 = Date32Array::from(vec![Some(0), None, Some(1), None]);
let arr_null = NullArray::new(4);
let arr_struct = StructArray::from(vec![
(
Arc::new(Field::new("utf8", DataType::Utf8, true)),
Arc::new(StringArray::from(vec![Some("a"), None, None, Some("b")])) as ArrayRef,
),
(nested_list_field, nested_list_array as ArrayRef),
(nested_dict_field, nested_dict_array as ArrayRef),
(nested_map_field, nested_map_array as ArrayRef),
]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(arr_date32),
Arc::new(arr_null),
Arc::new(arr_struct),
root_list_array,
],
)?;
let mut buf = Vec::new();
{
let mut writer = WriterBuilder::new()
.with_explicit_nulls(true)
.build::<_, JsonArray>(&mut buf);
writer.write_batches(&[&batch])?;
writer.finish()?;
}
let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
let expected = serde_json::from_value::<Vec<Value>>(json!([
{
"date": "1970-01-01",
"list": [
{
"int32": 1,
"utf8": "a"
},
{
"int32": null,
"utf8": "b"
}
],
"null": null,
"struct": {
"dict": "cupcakes",
"list": [
null,
null,
null
],
"map": {
"foo": 10
},
"utf8": "a"
}
},
{
"date": null,
"list": null,
"null": null,
"struct": {
"dict": null,
"list": [
1,
2,
3
],
"map": null,
"utf8": null
}
},
{
"date": "1970-01-02",
"list": [
{
"int32": 5,
"utf8": null
}
],
"null": null,
"struct": {
"dict": "bear",
"list": null,
"map": {},
"utf8": null
}
},
{
"date": null,
"list": null,
"null": null,
"struct": {
"dict": "kuma",
"list": [
null,
null,
null
],
"map": {
"bar": 20,
"baz": 30,
"qux": 40
},
"utf8": "b"
}
}
]))
.unwrap();
assert_eq!(actual, expected);
Ok(())
}
fn build_array_binary<O: OffsetSizeTrait>(values: &[Option<&[u8]>]) -> RecordBatch {
let schema = SchemaRef::new(Schema::new(vec![Field::new(
"bytes",
GenericBinaryType::<O>::DATA_TYPE,
true,
)]));
let mut builder = GenericByteBuilder::<GenericBinaryType<O>>::new();
for value in values {
match value {
Some(v) => builder.append_value(v),
None => builder.append_null(),
}
}
let array = Arc::new(builder.finish()) as ArrayRef;
RecordBatch::try_new(schema, vec![array]).unwrap()
}
fn build_array_binary_view(values: &[Option<&[u8]>]) -> RecordBatch {
let schema = SchemaRef::new(Schema::new(vec![Field::new(
"bytes",
DataType::BinaryView,
true,
)]));
let mut builder = BinaryViewBuilder::new();
for value in values {
match value {
Some(v) => builder.append_value(v),
None => builder.append_null(),
}
}
let array = Arc::new(builder.finish()) as ArrayRef;
RecordBatch::try_new(schema, vec![array]).unwrap()
}
fn assert_binary_json(batch: &RecordBatch) {
{
let mut buf = Vec::new();
let json_value: Value = {
let mut writer = WriterBuilder::new()
.with_explicit_nulls(true)
.build::<_, JsonArray>(&mut buf);
writer.write(batch).unwrap();
writer.close().unwrap();
serde_json::from_slice(&buf).unwrap()
};
assert_eq!(
json!([
{
"bytes": "4e656420466c616e64657273"
},
{
"bytes": null },
{
"bytes": "54726f79204d63436c757265"
}
]),
json_value,
);
}
{
let mut buf = Vec::new();
let json_value: Value = {
let mut writer = ArrayWriter::new(&mut buf);
writer.write(batch).unwrap();
writer.close().unwrap();
serde_json::from_slice(&buf).unwrap()
};
assert_eq!(
json!([
{ "bytes": "4e656420466c616e64657273" },
{},
{ "bytes": "54726f79204d63436c757265" }
]),
json_value
);
}
}
#[test]
fn test_writer_binary() {
let values: [Option<&[u8]>; 3] = [
Some(b"Ned Flanders" as &[u8]),
None,
Some(b"Troy McClure" as &[u8]),
];
{
let batch = build_array_binary::<i32>(&values);
assert_binary_json(&batch);
}
{
let batch = build_array_binary::<i64>(&values);
assert_binary_json(&batch);
}
{
let batch = build_array_binary_view(&values);
assert_binary_json(&batch);
}
}
#[test]
fn test_writer_fixed_size_binary() {
let size = 11;
let schema = SchemaRef::new(Schema::new(vec![Field::new(
"bytes",
DataType::FixedSizeBinary(size),
true,
)]));
let mut builder = FixedSizeBinaryBuilder::new(size);
let values = [Some(b"hello world"), None, Some(b"summer rain")];
for value in values {
match value {
Some(v) => builder.append_value(v).unwrap(),
None => builder.append_null(),
}
}
let array = Arc::new(builder.finish()) as ArrayRef;
let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
{
let mut buf = Vec::new();
let json_value: Value = {
let mut writer = WriterBuilder::new()
.with_explicit_nulls(true)
.build::<_, JsonArray>(&mut buf);
writer.write(&batch).unwrap();
writer.close().unwrap();
serde_json::from_slice(&buf).unwrap()
};
assert_eq!(
json!([
{
"bytes": "68656c6c6f20776f726c64"
},
{
"bytes": null },
{
"bytes": "73756d6d6572207261696e"
}
]),
json_value,
);
}
{
let mut buf = Vec::new();
let json_value: Value = {
let mut writer = ArrayWriter::new(&mut buf);
writer.write(&batch).unwrap();
writer.close().unwrap();
serde_json::from_slice(&buf).unwrap()
};
assert_eq!(
json!([
{
"bytes": "68656c6c6f20776f726c64"
},
{}, {
"bytes": "73756d6d6572207261696e"
}
]),
json_value,
);
}
}
#[test]
fn test_writer_fixed_size_list() {
let size = 3;
let field = FieldRef::new(Field::new_list_field(DataType::Int32, true));
let schema = SchemaRef::new(Schema::new(vec![Field::new(
"list",
DataType::FixedSizeList(field, size),
true,
)]));
let values_builder = Int32Builder::new();
let mut list_builder = FixedSizeListBuilder::new(values_builder, size);
let lists = [
Some([Some(1), Some(2), None]),
Some([Some(3), None, Some(4)]),
Some([None, Some(5), Some(6)]),
None,
];
for list in lists {
match list {
Some(l) => {
for value in l {
match value {
Some(v) => list_builder.values().append_value(v),
None => list_builder.values().append_null(),
}
}
list_builder.append(true);
}
None => {
for _ in 0..size {
list_builder.values().append_null();
}
list_builder.append(false);
}
}
}
let array = Arc::new(list_builder.finish()) as ArrayRef;
let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
{
let json_value: Value = {
let mut buf = Vec::new();
let mut writer = WriterBuilder::new()
.with_explicit_nulls(true)
.build::<_, JsonArray>(&mut buf);
writer.write(&batch).unwrap();
writer.close().unwrap();
serde_json::from_slice(&buf).unwrap()
};
assert_eq!(
json!([
{"list": [1, 2, null]},
{"list": [3, null, 4]},
{"list": [null, 5, 6]},
{"list": null},
]),
json_value
);
}
{
let json_value: Value = {
let mut buf = Vec::new();
let mut writer = ArrayWriter::new(&mut buf);
writer.write(&batch).unwrap();
writer.close().unwrap();
serde_json::from_slice(&buf).unwrap()
};
assert_eq!(
json!([
{"list": [1, 2, null]},
{"list": [3, null, 4]},
{"list": [null, 5, 6]},
{}, ]),
json_value
);
}
}
#[test]
fn test_writer_null_dict() {
let keys = Int32Array::from_iter(vec![Some(0), None, Some(1)]);
let values = Arc::new(StringArray::from_iter(vec![Some("a"), None]));
let dict = DictionaryArray::new(keys, values);
let schema = SchemaRef::new(Schema::new(vec![Field::new(
"my_dict",
DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into()),
true,
)]));
let array = Arc::new(dict) as ArrayRef;
let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
let mut json = Vec::new();
let write_builder = WriterBuilder::new().with_explicit_nulls(true);
let mut writer = write_builder.build::<_, JsonArray>(&mut json);
writer.write(&batch).unwrap();
writer.close().unwrap();
let json_str = str::from_utf8(&json).unwrap();
assert_eq!(
json_str,
r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":""}]"#
)
}
#[test]
fn test_decimal32_encoder() {
let array = Decimal32Array::from_iter_values([1234, 5678, 9012])
.with_precision_and_scale(8, 2)
.unwrap();
let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"decimal":12.34}
{"decimal":56.78}
{"decimal":90.12}
"#,
);
}
#[test]
fn test_decimal64_encoder() {
let array = Decimal64Array::from_iter_values([1234, 5678, 9012])
.with_precision_and_scale(10, 2)
.unwrap();
let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"decimal":12.34}
{"decimal":56.78}
{"decimal":90.12}
"#,
);
}
#[test]
fn test_decimal128_encoder() {
let array = Decimal128Array::from_iter_values([1234, 5678, 9012])
.with_precision_and_scale(10, 2)
.unwrap();
let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"decimal":12.34}
{"decimal":56.78}
{"decimal":90.12}
"#,
);
}
#[test]
fn test_decimal256_encoder() {
let array = Decimal256Array::from_iter_values([
i256::from(123400),
i256::from(567800),
i256::from(901200),
])
.with_precision_and_scale(10, 4)
.unwrap();
let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"decimal":12.3400}
{"decimal":56.7800}
{"decimal":90.1200}
"#,
);
}
#[test]
fn test_decimal_encoder_with_nulls() {
let array = Decimal128Array::from_iter([Some(1234), None, Some(5678)])
.with_precision_and_scale(10, 2)
.unwrap();
let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"decimal":12.34}
{}
{"decimal":56.78}
"#,
);
}
#[test]
fn write_structs_as_list() {
let schema = Schema::new(vec![
Field::new(
"c1",
DataType::Struct(Fields::from(vec![
Field::new("c11", DataType::Int32, true),
Field::new(
"c12",
DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
false,
),
])),
false,
),
Field::new("c2", DataType::Utf8, false),
]);
let c1 = StructArray::from(vec![
(
Arc::new(Field::new("c11", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
),
(
Arc::new(Field::new(
"c12",
DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
false,
)),
Arc::new(StructArray::from(vec![(
Arc::new(Field::new("c121", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
)])) as ArrayRef,
),
]);
let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
let expected = r#"[[1,["e"]],"a"]
[[null,["f"]],"b"]
[[5,["g"]],"c"]
"#;
let mut buf = Vec::new();
{
let builder = WriterBuilder::new()
.with_explicit_nulls(true)
.with_struct_mode(StructMode::ListOnly);
let mut writer = builder.build::<_, LineDelimited>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(&buf, expected);
let mut buf = Vec::new();
{
let builder = WriterBuilder::new()
.with_explicit_nulls(false)
.with_struct_mode(StructMode::ListOnly);
let mut writer = builder.build::<_, LineDelimited>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(&buf, expected);
}
fn make_fallback_encoder_test_data() -> (RecordBatch, Arc<dyn EncoderFactory>) {
#[derive(Debug)]
enum UnionValue {
Int32(i32),
String(String),
}
#[derive(Debug)]
struct UnionEncoder {
array: Vec<Option<UnionValue>>,
}
impl Encoder for UnionEncoder {
fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
match &self.array[idx] {
None => out.extend_from_slice(b"null"),
Some(UnionValue::Int32(v)) => out.extend_from_slice(v.to_string().as_bytes()),
Some(UnionValue::String(v)) => {
out.extend_from_slice(format!("\"{v}\"").as_bytes())
}
}
}
}
#[derive(Debug)]
struct UnionEncoderFactory;
impl EncoderFactory for UnionEncoderFactory {
fn make_default_encoder<'a>(
&self,
_field: &'a FieldRef,
array: &'a dyn Array,
_options: &'a EncoderOptions,
) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
let data_type = array.data_type();
let fields = match data_type {
DataType::Union(fields, UnionMode::Sparse) => fields,
_ => return Ok(None),
};
let fields = fields.iter().map(|(_, f)| f).collect::<Vec<_>>();
for f in fields.iter() {
match f.data_type() {
DataType::Null => {}
DataType::Int32 => {}
DataType::Utf8 => {}
_ => return Ok(None),
}
}
let (_, type_ids, _, buffers) = array.as_union().clone().into_parts();
let mut values = Vec::with_capacity(type_ids.len());
for idx in 0..type_ids.len() {
let type_id = type_ids[idx];
let field = &fields[type_id as usize];
let value = match field.data_type() {
DataType::Null => None,
DataType::Int32 => Some(UnionValue::Int32(
buffers[type_id as usize]
.as_primitive::<Int32Type>()
.value(idx),
)),
DataType::Utf8 => Some(UnionValue::String(
buffers[type_id as usize]
.as_string::<i32>()
.value(idx)
.to_string(),
)),
_ => unreachable!(),
};
values.push(value);
}
let array_encoder =
Box::new(UnionEncoder { array: values }) as Box<dyn Encoder + 'a>;
let nulls = array.nulls().cloned();
Ok(Some(NullableEncoder::new(array_encoder, nulls)))
}
}
let int_array = Int32Array::from(vec![Some(1), None, None]);
let string_array = StringArray::from(vec![None, Some("a"), None]);
let null_array = NullArray::new(3);
let type_ids = [0_i8, 1, 2].into_iter().collect::<ScalarBuffer<i8>>();
let union_fields = [
(0, Arc::new(Field::new("A", DataType::Int32, false))),
(1, Arc::new(Field::new("B", DataType::Utf8, false))),
(2, Arc::new(Field::new("C", DataType::Null, false))),
]
.into_iter()
.collect::<UnionFields>();
let children = vec![
Arc::new(int_array) as Arc<dyn Array>,
Arc::new(string_array),
Arc::new(null_array),
];
let array = UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap();
let float_array = Float64Array::from(vec![Some(1.0), None, Some(3.4)]);
let fields = vec![
Field::new(
"union",
DataType::Union(union_fields, UnionMode::Sparse),
true,
),
Field::new("float", DataType::Float64, true),
];
let batch = RecordBatch::try_new(
Arc::new(Schema::new(fields)),
vec![
Arc::new(array) as Arc<dyn Array>,
Arc::new(float_array) as Arc<dyn Array>,
],
)
.unwrap();
(batch, Arc::new(UnionEncoderFactory))
}
#[test]
fn test_fallback_encoder_factory_line_delimited_implicit_nulls() {
let (batch, encoder_factory) = make_fallback_encoder_test_data();
let mut buf = Vec::new();
{
let mut writer = WriterBuilder::new()
.with_encoder_factory(encoder_factory)
.with_explicit_nulls(false)
.build::<_, LineDelimited>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
writer.finish().unwrap();
}
println!("{}", str::from_utf8(&buf).unwrap());
assert_json_eq(
&buf,
r#"{"union":1,"float":1.0}
{"union":"a"}
{"union":null,"float":3.4}
"#,
);
}
#[test]
fn test_fallback_encoder_factory_line_delimited_explicit_nulls() {
let (batch, encoder_factory) = make_fallback_encoder_test_data();
let mut buf = Vec::new();
{
let mut writer = WriterBuilder::new()
.with_encoder_factory(encoder_factory)
.with_explicit_nulls(true)
.build::<_, LineDelimited>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
writer.finish().unwrap();
}
assert_json_eq(
&buf,
r#"{"union":1,"float":1.0}
{"union":"a","float":null}
{"union":null,"float":3.4}
"#,
);
}
#[test]
fn test_fallback_encoder_factory_array_implicit_nulls() {
let (batch, encoder_factory) = make_fallback_encoder_test_data();
let json_value: Value = {
let mut buf = Vec::new();
let mut writer = WriterBuilder::new()
.with_encoder_factory(encoder_factory)
.build::<_, JsonArray>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
writer.finish().unwrap();
serde_json::from_slice(&buf).unwrap()
};
let expected = json!([
{"union":1,"float":1.0},
{"union":"a"},
{"float":3.4,"union":null},
]);
assert_eq!(json_value, expected);
}
#[test]
fn test_fallback_encoder_factory_array_explicit_nulls() {
let (batch, encoder_factory) = make_fallback_encoder_test_data();
let json_value: Value = {
let mut buf = Vec::new();
let mut writer = WriterBuilder::new()
.with_encoder_factory(encoder_factory)
.with_explicit_nulls(true)
.build::<_, JsonArray>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
writer.finish().unwrap();
serde_json::from_slice(&buf).unwrap()
};
let expected = json!([
{"union":1,"float":1.0},
{"union":"a", "float": null},
{"union":null,"float":3.4},
]);
assert_eq!(json_value, expected);
}
#[test]
fn test_default_encoder_byte_array() {
struct IntArrayBinaryEncoder<B> {
array: B,
}
impl<'a, B> Encoder for IntArrayBinaryEncoder<B>
where
B: ArrayAccessor<Item = &'a [u8]>,
{
fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
out.push(b'[');
let child = self.array.value(idx);
for (idx, byte) in child.iter().enumerate() {
write!(out, "{byte}").unwrap();
if idx < child.len() - 1 {
out.push(b',');
}
}
out.push(b']');
}
}
#[derive(Debug)]
struct IntArayBinaryEncoderFactory;
impl EncoderFactory for IntArayBinaryEncoderFactory {
fn make_default_encoder<'a>(
&self,
_field: &'a FieldRef,
array: &'a dyn Array,
_options: &'a EncoderOptions,
) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
match array.data_type() {
DataType::Binary => {
let array = array.as_binary::<i32>();
let encoder = IntArrayBinaryEncoder { array };
let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
let nulls = array.nulls().cloned();
Ok(Some(NullableEncoder::new(array_encoder, nulls)))
}
_ => Ok(None),
}
}
}
let binary_array = BinaryArray::from_opt_vec(vec![Some(b"a"), None, Some(b"b")]);
let float_array = Float64Array::from(vec![Some(1.0), Some(2.3), None]);
let fields = vec![
Field::new("bytes", DataType::Binary, true),
Field::new("float", DataType::Float64, true),
];
let batch = RecordBatch::try_new(
Arc::new(Schema::new(fields)),
vec![
Arc::new(binary_array) as Arc<dyn Array>,
Arc::new(float_array) as Arc<dyn Array>,
],
)
.unwrap();
let json_value: Value = {
let mut buf = Vec::new();
let mut writer = WriterBuilder::new()
.with_encoder_factory(Arc::new(IntArayBinaryEncoderFactory))
.build::<_, JsonArray>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
writer.finish().unwrap();
serde_json::from_slice(&buf).unwrap()
};
let expected = json!([
{"bytes": [97], "float": 1.0},
{"float": 2.3},
{"bytes": [98]},
]);
assert_eq!(json_value, expected);
}
#[test]
fn test_encoder_factory_customize_dictionary() {
struct PaddedInt32Encoder {
array: Int32Array,
}
impl Encoder for PaddedInt32Encoder {
fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
let value = self.array.value(idx);
write!(out, "\"{value:0>8}\"").unwrap();
}
}
#[derive(Debug)]
struct CustomEncoderFactory;
impl EncoderFactory for CustomEncoderFactory {
fn make_default_encoder<'a>(
&self,
field: &'a FieldRef,
array: &'a dyn Array,
_options: &'a EncoderOptions,
) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
let padded = field
.metadata()
.get("padded")
.map(|v| v == "true")
.unwrap_or_default();
match (array.data_type(), padded) {
(DataType::Int32, true) => {
let array = array.as_primitive::<Int32Type>();
let nulls = array.nulls().cloned();
let encoder = PaddedInt32Encoder {
array: array.clone(),
};
let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
Ok(Some(NullableEncoder::new(array_encoder, nulls)))
}
_ => Ok(None),
}
}
}
let to_json = |batch| {
let mut buf = Vec::new();
let mut writer = WriterBuilder::new()
.with_encoder_factory(Arc::new(CustomEncoderFactory))
.build::<_, JsonArray>(&mut buf);
writer.write_batches(&[batch]).unwrap();
writer.finish().unwrap();
serde_json::from_slice::<Value>(&buf).unwrap()
};
let array = Int32Array::from(vec![Some(1), None, Some(2)]);
let field = Arc::new(Field::new("int", DataType::Int32, true).with_metadata(
HashMap::from_iter(vec![("padded".to_string(), "true".to_string())]),
));
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![field.clone()])),
vec![Arc::new(array)],
)
.unwrap();
let json_value = to_json(&batch);
let expected = json!([
{"int": "00000001"},
{},
{"int": "00000002"},
]);
assert_eq!(json_value, expected);
let mut array_builder = PrimitiveDictionaryBuilder::<UInt16Type, Int32Type>::new();
array_builder.append_value(1);
array_builder.append_null();
array_builder.append_value(1);
let array = array_builder.finish();
let field = Field::new(
"int",
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Int32)),
true,
)
.with_metadata(HashMap::from_iter(vec![(
"padded".to_string(),
"true".to_string(),
)]));
let batch = RecordBatch::try_new(Arc::new(Schema::new(vec![field])), vec![Arc::new(array)])
.unwrap();
let json_value = to_json(&batch);
let expected = json!([
{"int": "00000001"},
{},
{"int": "00000001"},
]);
assert_eq!(json_value, expected);
}
#[test]
fn test_write_run_end_encoded() {
let run_ends = Int32Array::from(vec![2, 5, 6]);
let values = StringArray::from(vec![Some("a"), Some("b"), None]);
let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
"c1",
ree.data_type().clone(),
true,
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(ree)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
assert_json_eq(
&buf,
r#"{"c1":"a"}
{"c1":"a"}
{"c1":"b"}
{"c1":"b"}
{"c1":"b"}
{}
"#,
);
}
#[test]
fn test_write_run_end_encoded_int_values() {
let run_ends = Int32Array::from(vec![3, 5]);
let values = Int32Array::from(vec![10, 20]);
let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
"n",
ree.data_type().clone(),
true,
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(ree)]).unwrap();
let json_value: Value = {
let mut buf = Vec::new();
let mut writer = WriterBuilder::new().build::<_, JsonArray>(&mut buf);
writer.write_batches(&[&batch]).unwrap();
writer.finish().unwrap();
serde_json::from_slice(&buf).unwrap()
};
let expected = json!([
{"n": 10},
{"n": 10},
{"n": 10},
{"n": 20},
{"n": 20},
]);
assert_eq!(json_value, expected);
}
#[test]
fn test_run_end_encoded_roundtrip() {
let run_ends = Int32Array::from(vec![3, 5, 7]);
let values = StringArray::from(vec![Some("a"), None, Some("b")]);
let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
"c",
ree.data_type().clone(),
true,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(ree)]).unwrap();
let mut buf = Vec::new();
{
let mut writer = super::LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}
let batches: Vec<RecordBatch> = ReaderBuilder::new(schema)
.with_batch_size(1024)
.build(std::io::Cursor::new(&buf))
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(batches.len(), 1);
let col = batches[0].column(0);
let run_array = col.as_run::<Int32Type>();
assert_eq!(run_array.len(), 7);
assert_eq!(run_array.run_ends().values(), &[3, 5, 7]);
let values = run_array.values().as_string::<i32>();
assert_eq!(values.len(), 3);
assert_eq!(values.value(0), "a");
assert!(values.is_null(1));
assert_eq!(values.value(2), "b");
}
}