1use crate::StructMode;
137use crate::reader::binary_array::{
138 BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
139};
140use std::borrow::Cow;
141use std::io::BufRead;
142use std::sync::Arc;
143
144use chrono::Utc;
145use serde_core::Serialize;
146
147use arrow_array::timezone::Tz;
148use arrow_array::types::*;
149use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, make_array};
150use arrow_data::ArrayData;
151use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
152pub use schema::*;
153
154use crate::reader::boolean_array::BooleanArrayDecoder;
155use crate::reader::decimal_array::DecimalArrayDecoder;
156use crate::reader::list_array::ListArrayDecoder;
157use crate::reader::map_array::MapArrayDecoder;
158use crate::reader::null_array::NullArrayDecoder;
159use crate::reader::primitive_array::PrimitiveArrayDecoder;
160use crate::reader::run_end_array::RunEndEncodedArrayDecoder;
161use crate::reader::string_array::StringArrayDecoder;
162use crate::reader::string_view_array::StringViewArrayDecoder;
163use crate::reader::struct_array::StructArrayDecoder;
164use crate::reader::tape::{Tape, TapeDecoder};
165use crate::reader::timestamp_array::TimestampArrayDecoder;
166
167mod binary_array;
168mod boolean_array;
169mod decimal_array;
170mod list_array;
171mod map_array;
172mod null_array;
173mod primitive_array;
174mod run_end_array;
175mod schema;
176mod serializer;
177mod string_array;
178mod string_view_array;
179mod struct_array;
180mod tape;
181mod timestamp_array;
182
183pub struct ReaderBuilder {
185 batch_size: usize,
186 coerce_primitive: bool,
187 strict_mode: bool,
188 is_field: bool,
189 struct_mode: StructMode,
190
191 schema: SchemaRef,
192}
193
194impl ReaderBuilder {
195 pub fn new(schema: SchemaRef) -> Self {
204 Self {
205 batch_size: 1024,
206 coerce_primitive: false,
207 strict_mode: false,
208 is_field: false,
209 struct_mode: Default::default(),
210 schema,
211 }
212 }
213
214 pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
245 Self {
246 batch_size: 1024,
247 coerce_primitive: false,
248 strict_mode: false,
249 is_field: true,
250 struct_mode: Default::default(),
251 schema: Arc::new(Schema::new([field.into()])),
252 }
253 }
254
255 pub fn with_batch_size(self, batch_size: usize) -> Self {
257 Self { batch_size, ..self }
258 }
259
260 pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
263 Self {
264 coerce_primitive,
265 ..self
266 }
267 }
268
269 pub fn with_strict_mode(self, strict_mode: bool) -> Self {
275 Self {
276 strict_mode,
277 ..self
278 }
279 }
280
281 pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
285 Self {
286 struct_mode,
287 ..self
288 }
289 }
290
291 pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
293 Ok(Reader {
294 reader,
295 decoder: self.build_decoder()?,
296 })
297 }
298
299 pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
301 let (data_type, nullable) = if self.is_field {
302 let field = &self.schema.fields[0];
303 let data_type = Cow::Borrowed(field.data_type());
304 (data_type, field.is_nullable())
305 } else {
306 let data_type = Cow::Owned(DataType::Struct(self.schema.fields.clone()));
307 (data_type, false)
308 };
309
310 let ctx = DecoderContext {
311 coerce_primitive: self.coerce_primitive,
312 strict_mode: self.strict_mode,
313 struct_mode: self.struct_mode,
314 };
315 let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?;
316
317 let num_fields = self.schema.flattened_fields().len();
318
319 Ok(Decoder {
320 decoder,
321 is_field: self.is_field,
322 tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
323 batch_size: self.batch_size,
324 schema: self.schema,
325 })
326 }
327}
328
329pub struct Reader<R> {
333 reader: R,
334 decoder: Decoder,
335}
336
337impl<R> std::fmt::Debug for Reader<R> {
338 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
339 f.debug_struct("Reader")
340 .field("decoder", &self.decoder)
341 .finish()
342 }
343}
344
345impl<R: BufRead> Reader<R> {
346 fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
348 loop {
349 let buf = self.reader.fill_buf()?;
350 if buf.is_empty() {
351 break;
352 }
353 let read = buf.len();
354
355 let decoded = self.decoder.decode(buf)?;
356 self.reader.consume(decoded);
357 if decoded != read {
358 break;
359 }
360 }
361 self.decoder.flush()
362 }
363}
364
365impl<R: BufRead> Iterator for Reader<R> {
366 type Item = Result<RecordBatch, ArrowError>;
367
368 fn next(&mut self) -> Option<Self::Item> {
369 self.read().transpose()
370 }
371}
372
373impl<R: BufRead> RecordBatchReader for Reader<R> {
374 fn schema(&self) -> SchemaRef {
375 self.decoder.schema.clone()
376 }
377}
378
379pub struct Decoder {
420 tape_decoder: TapeDecoder,
421 decoder: Box<dyn ArrayDecoder>,
422 batch_size: usize,
423 is_field: bool,
424 schema: SchemaRef,
425}
426
427impl std::fmt::Debug for Decoder {
428 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429 f.debug_struct("Decoder")
430 .field("schema", &self.schema)
431 .field("batch_size", &self.batch_size)
432 .finish()
433 }
434}
435
436impl Decoder {
437 pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
446 self.tape_decoder.decode(buf)
447 }
448
449 pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
626 self.tape_decoder.serialize(rows)
627 }
628
629 pub fn has_partial_record(&self) -> bool {
631 self.tape_decoder.has_partial_row()
632 }
633
634 pub fn len(&self) -> usize {
636 self.tape_decoder.num_buffered_rows()
637 }
638
639 pub fn is_empty(&self) -> bool {
641 self.len() == 0
642 }
643
644 pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
651 let tape = self.tape_decoder.finish()?;
652
653 if tape.num_rows() == 0 {
654 return Ok(None);
655 }
656
657 let mut next_object = 1;
659 let pos: Vec<_> = (0..tape.num_rows())
660 .map(|_| {
661 let next = tape.next(next_object, "row").unwrap();
662 std::mem::replace(&mut next_object, next)
663 })
664 .collect();
665
666 let decoded = self.decoder.decode(&tape, &pos)?;
667 self.tape_decoder.clear();
668
669 let batch = match self.is_field {
670 true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
671 false => {
672 RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
673 }
674 };
675
676 Ok(Some(batch))
677 }
678}
679
680trait ArrayDecoder: Send {
681 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
683}
684
685pub struct DecoderContext {
690 coerce_primitive: bool,
692 strict_mode: bool,
694 struct_mode: StructMode,
696}
697
698impl DecoderContext {
699 pub fn coerce_primitive(&self) -> bool {
701 self.coerce_primitive
702 }
703
704 pub fn strict_mode(&self) -> bool {
706 self.strict_mode
707 }
708
709 pub fn struct_mode(&self) -> StructMode {
711 self.struct_mode
712 }
713
714 fn make_decoder(
719 &self,
720 data_type: &DataType,
721 is_nullable: bool,
722 ) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
723 make_decoder(self, data_type, is_nullable)
724 }
725}
726
727macro_rules! primitive_decoder {
728 ($t:ty, $data_type:expr) => {
729 Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
730 };
731}
732
733fn make_decoder(
734 ctx: &DecoderContext,
735 data_type: &DataType,
736 is_nullable: bool,
737) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
738 let coerce_primitive = ctx.coerce_primitive();
739 downcast_integer! {
740 *data_type => (primitive_decoder, data_type),
741 DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
742 DataType::Float16 => primitive_decoder!(Float16Type, data_type),
743 DataType::Float32 => primitive_decoder!(Float32Type, data_type),
744 DataType::Float64 => primitive_decoder!(Float64Type, data_type),
745 DataType::Timestamp(TimeUnit::Second, None) => {
746 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
747 },
748 DataType::Timestamp(TimeUnit::Millisecond, None) => {
749 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
750 },
751 DataType::Timestamp(TimeUnit::Microsecond, None) => {
752 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
753 },
754 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
755 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
756 },
757 DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
758 let tz: Tz = tz.parse()?;
759 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
760 },
761 DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
762 let tz: Tz = tz.parse()?;
763 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
764 },
765 DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
766 let tz: Tz = tz.parse()?;
767 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
768 },
769 DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
770 let tz: Tz = tz.parse()?;
771 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
772 },
773 DataType::Date32 => primitive_decoder!(Date32Type, data_type),
774 DataType::Date64 => primitive_decoder!(Date64Type, data_type),
775 DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
776 DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
777 DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
778 DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
779 DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
780 DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
781 DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
782 DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
783 DataType::Decimal32(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal32Type>::new(p, s))),
784 DataType::Decimal64(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal64Type>::new(p, s))),
785 DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
786 DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
787 DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
788 DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
789 DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
790 DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
791 DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
792 DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
793 DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
794 DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
795 DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
796 DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
797 DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
798 DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(ctx, data_type, is_nullable)?)),
799 DataType::RunEndEncoded(ref r, _) => match r.data_type() {
800 DataType::Int16 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int16Type>::new(ctx, data_type, is_nullable)?)),
801 DataType::Int32 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int32Type>::new(ctx, data_type, is_nullable)?)),
802 DataType::Int64 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int64Type>::new(ctx, data_type, is_nullable)?)),
803 d => unreachable!("unsupported run end index type: {d}"),
804 },
805 _ => Err(ArrowError::NotYetImplemented(format!("Support for {data_type} in JSON reader")))
806 }
807}
808
809#[cfg(test)]
810mod tests {
811 use serde_json::json;
812 use std::fs::File;
813 use std::io::{BufReader, Cursor, Seek};
814
815 use arrow_array::cast::AsArray;
816 use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
817 use arrow_buffer::{ArrowNativeType, Buffer};
818 use arrow_cast::display::{ArrayFormatter, FormatOptions};
819 use arrow_data::ArrayDataBuilder;
820 use arrow_schema::{Field, Fields};
821
822 use super::*;
823
824 fn do_read(
825 buf: &str,
826 batch_size: usize,
827 coerce_primitive: bool,
828 strict_mode: bool,
829 schema: SchemaRef,
830 ) -> Vec<RecordBatch> {
831 let mut unbuffered = vec![];
832
833 for batch_size in [1, 3, 100, batch_size] {
835 unbuffered = ReaderBuilder::new(schema.clone())
836 .with_batch_size(batch_size)
837 .with_coerce_primitive(coerce_primitive)
838 .build(Cursor::new(buf.as_bytes()))
839 .unwrap()
840 .collect::<Result<Vec<_>, _>>()
841 .unwrap();
842
843 for b in unbuffered.iter().take(unbuffered.len() - 1) {
844 assert_eq!(b.num_rows(), batch_size)
845 }
846
847 for b in [1, 3, 5] {
849 let buffered = ReaderBuilder::new(schema.clone())
850 .with_batch_size(batch_size)
851 .with_coerce_primitive(coerce_primitive)
852 .with_strict_mode(strict_mode)
853 .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
854 .unwrap()
855 .collect::<Result<Vec<_>, _>>()
856 .unwrap();
857 assert_eq!(unbuffered, buffered);
858 }
859 }
860
861 unbuffered
862 }
863
864 #[test]
865 fn test_basic() {
866 let buf = r#"
867 {"a": 1, "b": 2, "c": true, "d": 1}
868 {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
869
870 {"b": 6, "a": 2.0, "d": 45}
871 {"b": "5", "a": 2}
872 {"b": 4e0}
873 {"b": 7, "a": null}
874 "#;
875
876 let schema = Arc::new(Schema::new(vec![
877 Field::new("a", DataType::Int64, true),
878 Field::new("b", DataType::Int32, true),
879 Field::new("c", DataType::Boolean, true),
880 Field::new("d", DataType::Date32, true),
881 Field::new("e", DataType::Date64, true),
882 ]));
883
884 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
885 assert!(decoder.is_empty());
886 assert_eq!(decoder.len(), 0);
887 assert!(!decoder.has_partial_record());
888 assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
889 assert!(!decoder.is_empty());
890 assert_eq!(decoder.len(), 6);
891 assert!(!decoder.has_partial_record());
892 let batch = decoder.flush().unwrap().unwrap();
893 assert_eq!(batch.num_rows(), 6);
894 assert!(decoder.is_empty());
895 assert_eq!(decoder.len(), 0);
896 assert!(!decoder.has_partial_record());
897
898 let batches = do_read(buf, 1024, false, false, schema);
899 assert_eq!(batches.len(), 1);
900
901 let col1 = batches[0].column(0).as_primitive::<Int64Type>();
902 assert_eq!(col1.null_count(), 2);
903 assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
904 assert!(col1.is_null(4));
905 assert!(col1.is_null(5));
906
907 let col2 = batches[0].column(1).as_primitive::<Int32Type>();
908 assert_eq!(col2.null_count(), 0);
909 assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
910
911 let col3 = batches[0].column(2).as_boolean();
912 assert_eq!(col3.null_count(), 4);
913 assert!(col3.value(0));
914 assert!(!col3.is_null(0));
915 assert!(!col3.value(1));
916 assert!(!col3.is_null(1));
917
918 let col4 = batches[0].column(3).as_primitive::<Date32Type>();
919 assert_eq!(col4.null_count(), 3);
920 assert!(col4.is_null(3));
921 assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
922
923 let col5 = batches[0].column(4).as_primitive::<Date64Type>();
924 assert_eq!(col5.null_count(), 5);
925 assert!(col5.is_null(0));
926 assert!(col5.is_null(2));
927 assert!(col5.is_null(3));
928 assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
929 }
930
931 #[test]
932 fn test_string() {
933 let buf = r#"
934 {"a": "1", "b": "2"}
935 {"a": "hello", "b": "shoo"}
936 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
937
938 {"b": null}
939 {"b": "", "a": null}
940
941 "#;
942 let schema = Arc::new(Schema::new(vec![
943 Field::new("a", DataType::Utf8, true),
944 Field::new("b", DataType::LargeUtf8, true),
945 ]));
946
947 let batches = do_read(buf, 1024, false, false, schema);
948 assert_eq!(batches.len(), 1);
949
950 let col1 = batches[0].column(0).as_string::<i32>();
951 assert_eq!(col1.null_count(), 2);
952 assert_eq!(col1.value(0), "1");
953 assert_eq!(col1.value(1), "hello");
954 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
955 assert!(col1.is_null(3));
956 assert!(col1.is_null(4));
957
958 let col2 = batches[0].column(1).as_string::<i64>();
959 assert_eq!(col2.null_count(), 1);
960 assert_eq!(col2.value(0), "2");
961 assert_eq!(col2.value(1), "shoo");
962 assert_eq!(col2.value(2), "\t😁foo");
963 assert!(col2.is_null(3));
964 assert_eq!(col2.value(4), "");
965 }
966
967 #[test]
968 fn test_long_string_view_allocation() {
969 let expected_capacity: usize = 41;
979
980 let buf = r#"
981 {"a": "short", "b": "dummy"}
982 {"a": "this is definitely long", "b": "dummy"}
983 {"a": "hello", "b": "dummy"}
984 {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
985 "#;
986
987 let schema = Arc::new(Schema::new(vec![
988 Field::new("a", DataType::Utf8View, true),
989 Field::new("b", DataType::LargeUtf8, true),
990 ]));
991
992 let batches = do_read(buf, 1024, false, false, schema);
993 assert_eq!(batches.len(), 1, "Expected one record batch");
994
995 let col_a = batches[0].column(0);
997 let string_view_array = col_a
998 .as_any()
999 .downcast_ref::<StringViewArray>()
1000 .expect("Column should be a StringViewArray");
1001
1002 let data_buffer = string_view_array.to_data().buffers()[0].len();
1005
1006 assert!(
1009 data_buffer >= expected_capacity,
1010 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1011 );
1012
1013 assert_eq!(string_view_array.value(0), "short");
1015 assert_eq!(string_view_array.value(1), "this is definitely long");
1016 assert_eq!(string_view_array.value(2), "hello");
1017 assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
1018 }
1019
1020 #[test]
1022 fn test_numeric_view_allocation() {
1023 let expected_capacity: usize = 33;
1031
1032 let buf = r#"
1033 {"n": 123456789}
1034 {"n": 1000000000000}
1035 {"n": 3.1415}
1036 {"n": 2.718281828459045}
1037 "#;
1038
1039 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
1040
1041 let batches = do_read(buf, 1024, true, false, schema);
1042 assert_eq!(batches.len(), 1, "Expected one record batch");
1043
1044 let col_n = batches[0].column(0);
1045 let string_view_array = col_n
1046 .as_any()
1047 .downcast_ref::<StringViewArray>()
1048 .expect("Column should be a StringViewArray");
1049
1050 let data_buffer = string_view_array.to_data().buffers()[0].len();
1052 assert!(
1053 data_buffer >= expected_capacity,
1054 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1055 );
1056
1057 assert_eq!(string_view_array.value(0), "123456789");
1060 assert_eq!(string_view_array.value(1), "1000000000000");
1061 assert_eq!(string_view_array.value(2), "3.1415");
1062 assert_eq!(string_view_array.value(3), "2.718281828459045");
1063 }
1064
1065 #[test]
1066 fn test_string_with_uft8view() {
1067 let buf = r#"
1068 {"a": "1", "b": "2"}
1069 {"a": "hello", "b": "shoo"}
1070 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1071
1072 {"b": null}
1073 {"b": "", "a": null}
1074
1075 "#;
1076 let schema = Arc::new(Schema::new(vec![
1077 Field::new("a", DataType::Utf8View, true),
1078 Field::new("b", DataType::LargeUtf8, true),
1079 ]));
1080
1081 let batches = do_read(buf, 1024, false, false, schema);
1082 assert_eq!(batches.len(), 1);
1083
1084 let col1 = batches[0].column(0).as_string_view();
1085 assert_eq!(col1.null_count(), 2);
1086 assert_eq!(col1.value(0), "1");
1087 assert_eq!(col1.value(1), "hello");
1088 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1089 assert!(col1.is_null(3));
1090 assert!(col1.is_null(4));
1091 assert_eq!(col1.data_type(), &DataType::Utf8View);
1092
1093 let col2 = batches[0].column(1).as_string::<i64>();
1094 assert_eq!(col2.null_count(), 1);
1095 assert_eq!(col2.value(0), "2");
1096 assert_eq!(col2.value(1), "shoo");
1097 assert_eq!(col2.value(2), "\t😁foo");
1098 assert!(col2.is_null(3));
1099 assert_eq!(col2.value(4), "");
1100 }
1101
1102 #[test]
1103 fn test_complex() {
1104 let buf = r#"
1105 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1106 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1107 {"list": null, "nested": {"a": null}}
1108 "#;
1109
1110 let schema = Arc::new(Schema::new(vec![
1111 Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1112 Field::new_struct(
1113 "nested",
1114 vec![
1115 Field::new("a", DataType::Int32, true),
1116 Field::new("b", DataType::Int32, true),
1117 ],
1118 true,
1119 ),
1120 Field::new_struct(
1121 "nested_list",
1122 vec![Field::new_list(
1123 "list2",
1124 Field::new_struct(
1125 "element",
1126 vec![Field::new("c", DataType::Int32, false)],
1127 false,
1128 ),
1129 true,
1130 )],
1131 true,
1132 ),
1133 ]));
1134
1135 let batches = do_read(buf, 1024, false, false, schema);
1136 assert_eq!(batches.len(), 1);
1137
1138 let list = batches[0].column(0).as_list::<i32>();
1139 assert_eq!(list.len(), 3);
1140 assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1141 assert_eq!(list.null_count(), 1);
1142 assert!(list.is_null(2));
1143 let list_values = list.values().as_primitive::<Int32Type>();
1144 assert_eq!(list_values.values(), &[5, 6]);
1145
1146 let nested = batches[0].column(1).as_struct();
1147 let a = nested.column(0).as_primitive::<Int32Type>();
1148 assert_eq!(list.null_count(), 1);
1149 assert_eq!(a.values(), &[1, 7, 0]);
1150 assert!(list.is_null(2));
1151
1152 let b = nested.column(1).as_primitive::<Int32Type>();
1153 assert_eq!(b.null_count(), 2);
1154 assert_eq!(b.len(), 3);
1155 assert_eq!(b.value(0), 2);
1156 assert!(b.is_null(1));
1157 assert!(b.is_null(2));
1158
1159 let nested_list = batches[0].column(2).as_struct();
1160 assert_eq!(nested_list.len(), 3);
1161 assert_eq!(nested_list.null_count(), 1);
1162 assert!(nested_list.is_null(2));
1163
1164 let list2 = nested_list.column(0).as_list::<i32>();
1165 assert_eq!(list2.len(), 3);
1166 assert_eq!(list2.null_count(), 1);
1167 assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1168 assert!(list2.is_null(2));
1169
1170 let list2_values = list2.values().as_struct();
1171
1172 let c = list2_values.column(0).as_primitive::<Int32Type>();
1173 assert_eq!(c.values(), &[3, 4]);
1174 }
1175
1176 #[test]
1177 fn test_projection() {
1178 let buf = r#"
1179 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1180 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1181 "#;
1182
1183 let schema = Arc::new(Schema::new(vec![
1184 Field::new_struct(
1185 "nested",
1186 vec![Field::new("a", DataType::Int32, false)],
1187 true,
1188 ),
1189 Field::new_struct(
1190 "nested_list",
1191 vec![Field::new_list(
1192 "list2",
1193 Field::new_struct(
1194 "element",
1195 vec![Field::new("d", DataType::Int32, true)],
1196 false,
1197 ),
1198 true,
1199 )],
1200 true,
1201 ),
1202 ]));
1203
1204 let batches = do_read(buf, 1024, false, false, schema);
1205 assert_eq!(batches.len(), 1);
1206
1207 let nested = batches[0].column(0).as_struct();
1208 assert_eq!(nested.num_columns(), 1);
1209 let a = nested.column(0).as_primitive::<Int32Type>();
1210 assert_eq!(a.null_count(), 0);
1211 assert_eq!(a.values(), &[1, 7]);
1212
1213 let nested_list = batches[0].column(1).as_struct();
1214 assert_eq!(nested_list.num_columns(), 1);
1215 assert_eq!(nested_list.null_count(), 0);
1216
1217 let list2 = nested_list.column(0).as_list::<i32>();
1218 assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1219 assert_eq!(list2.null_count(), 0);
1220
1221 let child = list2.values().as_struct();
1222 assert_eq!(child.num_columns(), 1);
1223 assert_eq!(child.len(), 2);
1224 assert_eq!(child.null_count(), 0);
1225
1226 let c = child.column(0).as_primitive::<Int32Type>();
1227 assert_eq!(c.values(), &[5, 0]);
1228 assert_eq!(c.null_count(), 1);
1229 assert!(c.is_null(1));
1230 }
1231
1232 #[test]
1233 fn test_map() {
1234 let buf = r#"
1235 {"map": {"a": ["foo", null]}}
1236 {"map": {"a": [null], "b": []}}
1237 {"map": {"c": null, "a": ["baz"]}}
1238 "#;
1239 let map = Field::new_map(
1240 "map",
1241 "entries",
1242 Field::new("key", DataType::Utf8, false),
1243 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1244 false,
1245 true,
1246 );
1247
1248 let schema = Arc::new(Schema::new(vec![map]));
1249
1250 let batches = do_read(buf, 1024, false, false, schema);
1251 assert_eq!(batches.len(), 1);
1252
1253 let map = batches[0].column(0).as_map();
1254 let map_keys = map.keys().as_string::<i32>();
1255 let map_values = map.values().as_list::<i32>();
1256 assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1257
1258 let k: Vec<_> = map_keys.iter().flatten().collect();
1259 assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1260
1261 let list_values = map_values.values().as_string::<i32>();
1262 let lv: Vec<_> = list_values.iter().collect();
1263 assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1264 assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1265 assert_eq!(map_values.null_count(), 1);
1266 assert!(map_values.is_null(3));
1267
1268 let options = FormatOptions::default().with_null("null");
1269 let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1270 assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1271 assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1272 assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1273 }
1274
1275 #[test]
1276 fn test_not_coercing_primitive_into_string_without_flag() {
1277 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1278
1279 let buf = r#"{"a": 1}"#;
1280 let err = ReaderBuilder::new(schema.clone())
1281 .with_batch_size(1024)
1282 .build(Cursor::new(buf.as_bytes()))
1283 .unwrap()
1284 .read()
1285 .unwrap_err();
1286
1287 assert_eq!(
1288 err.to_string(),
1289 "Json error: whilst decoding field 'a': expected string got 1"
1290 );
1291
1292 let buf = r#"{"a": true}"#;
1293 let err = ReaderBuilder::new(schema)
1294 .with_batch_size(1024)
1295 .build(Cursor::new(buf.as_bytes()))
1296 .unwrap()
1297 .read()
1298 .unwrap_err();
1299
1300 assert_eq!(
1301 err.to_string(),
1302 "Json error: whilst decoding field 'a': expected string got true"
1303 );
1304 }
1305
1306 #[test]
1307 fn test_coercing_primitive_into_string() {
1308 let buf = r#"
1309 {"a": 1, "b": 2, "c": true}
1310 {"a": 2E0, "b": 4, "c": false}
1311
1312 {"b": 6, "a": 2.0}
1313 {"b": "5", "a": 2}
1314 {"b": 4e0}
1315 {"b": 7, "a": null}
1316 "#;
1317
1318 let schema = Arc::new(Schema::new(vec![
1319 Field::new("a", DataType::Utf8, true),
1320 Field::new("b", DataType::Utf8, true),
1321 Field::new("c", DataType::Utf8, true),
1322 ]));
1323
1324 let batches = do_read(buf, 1024, true, false, schema);
1325 assert_eq!(batches.len(), 1);
1326
1327 let col1 = batches[0].column(0).as_string::<i32>();
1328 assert_eq!(col1.null_count(), 2);
1329 assert_eq!(col1.value(0), "1");
1330 assert_eq!(col1.value(1), "2E0");
1331 assert_eq!(col1.value(2), "2.0");
1332 assert_eq!(col1.value(3), "2");
1333 assert!(col1.is_null(4));
1334 assert!(col1.is_null(5));
1335
1336 let col2 = batches[0].column(1).as_string::<i32>();
1337 assert_eq!(col2.null_count(), 0);
1338 assert_eq!(col2.value(0), "2");
1339 assert_eq!(col2.value(1), "4");
1340 assert_eq!(col2.value(2), "6");
1341 assert_eq!(col2.value(3), "5");
1342 assert_eq!(col2.value(4), "4e0");
1343 assert_eq!(col2.value(5), "7");
1344
1345 let col3 = batches[0].column(2).as_string::<i32>();
1346 assert_eq!(col3.null_count(), 4);
1347 assert_eq!(col3.value(0), "true");
1348 assert_eq!(col3.value(1), "false");
1349 assert!(col3.is_null(2));
1350 assert!(col3.is_null(3));
1351 assert!(col3.is_null(4));
1352 assert!(col3.is_null(5));
1353 }
1354
1355 fn test_decimal<T: DecimalType>(data_type: DataType) {
1356 let buf = r#"
1357 {"a": 1, "b": 2, "c": 38.30}
1358 {"a": 2, "b": 4, "c": 123.456}
1359
1360 {"b": 1337, "a": "2.0452"}
1361 {"b": "5", "a": "11034.2"}
1362 {"b": 40}
1363 {"b": 1234, "a": null}
1364 "#;
1365
1366 let schema = Arc::new(Schema::new(vec![
1367 Field::new("a", data_type.clone(), true),
1368 Field::new("b", data_type.clone(), true),
1369 Field::new("c", data_type, true),
1370 ]));
1371
1372 let batches = do_read(buf, 1024, true, false, schema);
1373 assert_eq!(batches.len(), 1);
1374
1375 let col1 = batches[0].column(0).as_primitive::<T>();
1376 assert_eq!(col1.null_count(), 2);
1377 assert!(col1.is_null(4));
1378 assert!(col1.is_null(5));
1379 assert_eq!(
1380 col1.values(),
1381 &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1382 );
1383
1384 let col2 = batches[0].column(1).as_primitive::<T>();
1385 assert_eq!(col2.null_count(), 0);
1386 assert_eq!(
1387 col2.values(),
1388 &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1389 );
1390
1391 let col3 = batches[0].column(2).as_primitive::<T>();
1392 assert_eq!(col3.null_count(), 4);
1393 assert!(!col3.is_null(0));
1394 assert!(!col3.is_null(1));
1395 assert!(col3.is_null(2));
1396 assert!(col3.is_null(3));
1397 assert!(col3.is_null(4));
1398 assert!(col3.is_null(5));
1399 assert_eq!(
1400 col3.values(),
1401 &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1402 );
1403 }
1404
1405 #[test]
1406 fn test_decimals() {
1407 test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1408 test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1409 test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1410 test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1411 }
1412
1413 fn test_timestamp<T: ArrowTimestampType>() {
1414 let buf = r#"
1415 {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1416 {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1417
1418 {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1419 {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1420 {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1421 {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1422 "#;
1423
1424 let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1425 let schema = Arc::new(Schema::new(vec![
1426 Field::new("a", T::DATA_TYPE, true),
1427 Field::new("b", T::DATA_TYPE, true),
1428 Field::new("c", T::DATA_TYPE, true),
1429 Field::new("d", with_timezone, true),
1430 ]));
1431
1432 let batches = do_read(buf, 1024, true, false, schema);
1433 assert_eq!(batches.len(), 1);
1434
1435 let unit_in_nanos: i64 = match T::UNIT {
1436 TimeUnit::Second => 1_000_000_000,
1437 TimeUnit::Millisecond => 1_000_000,
1438 TimeUnit::Microsecond => 1_000,
1439 TimeUnit::Nanosecond => 1,
1440 };
1441
1442 let col1 = batches[0].column(0).as_primitive::<T>();
1443 assert_eq!(col1.null_count(), 4);
1444 assert!(col1.is_null(2));
1445 assert!(col1.is_null(3));
1446 assert!(col1.is_null(4));
1447 assert!(col1.is_null(5));
1448 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1449
1450 let col2 = batches[0].column(1).as_primitive::<T>();
1451 assert_eq!(col2.null_count(), 1);
1452 assert!(col2.is_null(5));
1453 assert_eq!(
1454 col2.values(),
1455 &[
1456 1599572549190855000 / unit_in_nanos,
1457 1599572549190855000 / unit_in_nanos,
1458 1599572549000000000 / unit_in_nanos,
1459 40,
1460 1234,
1461 0
1462 ]
1463 );
1464
1465 let col3 = batches[0].column(2).as_primitive::<T>();
1466 assert_eq!(col3.null_count(), 0);
1467 assert_eq!(
1468 col3.values(),
1469 &[
1470 38,
1471 123,
1472 854702816123000000 / unit_in_nanos,
1473 1599572549190855000 / unit_in_nanos,
1474 854702816123000000 / unit_in_nanos,
1475 854738816123000000 / unit_in_nanos
1476 ]
1477 );
1478
1479 let col4 = batches[0].column(3).as_primitive::<T>();
1480
1481 assert_eq!(col4.null_count(), 0);
1482 assert_eq!(
1483 col4.values(),
1484 &[
1485 854674016123000000 / unit_in_nanos,
1486 123,
1487 854702816123000000 / unit_in_nanos,
1488 854720816123000000 / unit_in_nanos,
1489 854674016000000000 / unit_in_nanos,
1490 854640000000000000 / unit_in_nanos
1491 ]
1492 );
1493 }
1494
1495 #[test]
1496 fn test_timestamps() {
1497 test_timestamp::<TimestampSecondType>();
1498 test_timestamp::<TimestampMillisecondType>();
1499 test_timestamp::<TimestampMicrosecondType>();
1500 test_timestamp::<TimestampNanosecondType>();
1501 }
1502
1503 fn test_time<T: ArrowTemporalType>() {
1504 let buf = r#"
1505 {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1506 {"a": 2, "b": "23:59:59", "c": 123.456}
1507
1508 {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1509 {"b": 40, "c": "13:42:29.190855"}
1510 {"b": 1234, "a": null, "c": "09:26:56.123"}
1511 {"c": "14:26:56.123"}
1512 "#;
1513
1514 let unit = match T::DATA_TYPE {
1515 DataType::Time32(unit) | DataType::Time64(unit) => unit,
1516 _ => unreachable!(),
1517 };
1518
1519 let unit_in_nanos = match unit {
1520 TimeUnit::Second => 1_000_000_000,
1521 TimeUnit::Millisecond => 1_000_000,
1522 TimeUnit::Microsecond => 1_000,
1523 TimeUnit::Nanosecond => 1,
1524 };
1525
1526 let schema = Arc::new(Schema::new(vec![
1527 Field::new("a", T::DATA_TYPE, true),
1528 Field::new("b", T::DATA_TYPE, true),
1529 Field::new("c", T::DATA_TYPE, true),
1530 ]));
1531
1532 let batches = do_read(buf, 1024, true, false, schema);
1533 assert_eq!(batches.len(), 1);
1534
1535 let col1 = batches[0].column(0).as_primitive::<T>();
1536 assert_eq!(col1.null_count(), 4);
1537 assert!(col1.is_null(2));
1538 assert!(col1.is_null(3));
1539 assert!(col1.is_null(4));
1540 assert!(col1.is_null(5));
1541 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1542
1543 let col2 = batches[0].column(1).as_primitive::<T>();
1544 assert_eq!(col2.null_count(), 1);
1545 assert!(col2.is_null(5));
1546 assert_eq!(
1547 col2.values(),
1548 &[
1549 34016123000000 / unit_in_nanos,
1550 86399000000000 / unit_in_nanos,
1551 64800000000000 / unit_in_nanos,
1552 40,
1553 1234,
1554 0
1555 ]
1556 .map(T::Native::usize_as)
1557 );
1558
1559 let col3 = batches[0].column(2).as_primitive::<T>();
1560 assert_eq!(col3.null_count(), 0);
1561 assert_eq!(
1562 col3.values(),
1563 &[
1564 38,
1565 123,
1566 34016123000000 / unit_in_nanos,
1567 49349190855000 / unit_in_nanos,
1568 34016123000000 / unit_in_nanos,
1569 52016123000000 / unit_in_nanos
1570 ]
1571 .map(T::Native::usize_as)
1572 );
1573 }
1574
1575 #[test]
1576 fn test_times() {
1577 test_time::<Time32MillisecondType>();
1578 test_time::<Time32SecondType>();
1579 test_time::<Time64MicrosecondType>();
1580 test_time::<Time64NanosecondType>();
1581 }
1582
1583 fn test_duration<T: ArrowTemporalType>() {
1584 let buf = r#"
1585 {"a": 1, "b": "2"}
1586 {"a": 3, "b": null}
1587 "#;
1588
1589 let schema = Arc::new(Schema::new(vec![
1590 Field::new("a", T::DATA_TYPE, true),
1591 Field::new("b", T::DATA_TYPE, true),
1592 ]));
1593
1594 let batches = do_read(buf, 1024, true, false, schema);
1595 assert_eq!(batches.len(), 1);
1596
1597 let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1598 assert_eq!(col_a.null_count(), 0);
1599 assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1600
1601 let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1602 assert_eq!(col2.null_count(), 1);
1603 assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1604 }
1605
1606 #[test]
1607 fn test_durations() {
1608 test_duration::<DurationNanosecondType>();
1609 test_duration::<DurationMicrosecondType>();
1610 test_duration::<DurationMillisecondType>();
1611 test_duration::<DurationSecondType>();
1612 }
1613
1614 #[test]
1615 fn test_delta_checkpoint() {
1616 let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1617 let schema = Arc::new(Schema::new(vec![
1618 Field::new_struct(
1619 "protocol",
1620 vec![
1621 Field::new("minReaderVersion", DataType::Int32, true),
1622 Field::new("minWriterVersion", DataType::Int32, true),
1623 ],
1624 true,
1625 ),
1626 Field::new_struct(
1627 "add",
1628 vec![Field::new_map(
1629 "partitionValues",
1630 "key_value",
1631 Field::new("key", DataType::Utf8, false),
1632 Field::new("value", DataType::Utf8, true),
1633 false,
1634 false,
1635 )],
1636 true,
1637 ),
1638 ]));
1639
1640 let batches = do_read(json, 1024, true, false, schema);
1641 assert_eq!(batches.len(), 1);
1642
1643 let s: StructArray = batches.into_iter().next().unwrap().into();
1644 let opts = FormatOptions::default().with_null("null");
1645 let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1646 assert_eq!(
1647 formatter.value(0).to_string(),
1648 "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1649 );
1650 }
1651
1652 #[test]
1653 fn struct_nullability() {
1654 let do_test = |child: DataType| {
1655 let non_null = r#"{"foo": {}}"#;
1657 let schema = Arc::new(Schema::new(vec![Field::new_struct(
1658 "foo",
1659 vec![Field::new("bar", child, false)],
1660 true,
1661 )]));
1662 let mut reader = ReaderBuilder::new(schema.clone())
1663 .build(Cursor::new(non_null.as_bytes()))
1664 .unwrap();
1665 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": {bar: null}}"#;
1668 let mut reader = ReaderBuilder::new(schema.clone())
1669 .build(Cursor::new(null.as_bytes()))
1670 .unwrap();
1671 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": null}"#;
1675 let mut reader = ReaderBuilder::new(schema)
1676 .build(Cursor::new(null.as_bytes()))
1677 .unwrap();
1678 let batch = reader.next().unwrap().unwrap();
1679 assert_eq!(batch.num_columns(), 1);
1680 let foo = batch.column(0).as_struct();
1681 assert_eq!(foo.len(), 1);
1682 assert!(foo.is_null(0));
1683 assert_eq!(foo.num_columns(), 1);
1684
1685 let bar = foo.column(0);
1686 assert_eq!(bar.len(), 1);
1687 assert!(bar.is_null(0));
1689 };
1690
1691 do_test(DataType::Boolean);
1692 do_test(DataType::Int32);
1693 do_test(DataType::Utf8);
1694 do_test(DataType::Decimal128(2, 1));
1695 do_test(DataType::Timestamp(
1696 TimeUnit::Microsecond,
1697 Some("+00:00".into()),
1698 ));
1699 }
1700
1701 #[test]
1702 fn test_truncation() {
1703 let buf = r#"
1704 {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1705 {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1706 {"i64": -9223372036854775808, "u64": 0 }
1707 {"i64": "-9223372036854775808", "u64": 0 }
1708 "#;
1709
1710 let schema = Arc::new(Schema::new(vec![
1711 Field::new("i64", DataType::Int64, true),
1712 Field::new("u64", DataType::UInt64, true),
1713 ]));
1714
1715 let batches = do_read(buf, 1024, true, false, schema);
1716 assert_eq!(batches.len(), 1);
1717
1718 let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1719 assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1720
1721 let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1722 assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1723 }
1724
1725 #[test]
1726 fn test_timestamp_truncation() {
1727 let buf = r#"
1728 {"time": 9223372036854775807 }
1729 {"time": -9223372036854775808 }
1730 {"time": 9e5 }
1731 "#;
1732
1733 let schema = Arc::new(Schema::new(vec![Field::new(
1734 "time",
1735 DataType::Timestamp(TimeUnit::Nanosecond, None),
1736 true,
1737 )]));
1738
1739 let batches = do_read(buf, 1024, true, false, schema);
1740 assert_eq!(batches.len(), 1);
1741
1742 let i64 = batches[0]
1743 .column(0)
1744 .as_primitive::<TimestampNanosecondType>();
1745 assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1746 }
1747
1748 #[test]
1749 fn test_strict_mode_no_missing_columns_in_schema() {
1750 let buf = r#"
1751 {"a": 1, "b": "2", "c": true}
1752 {"a": 2E0, "b": "4", "c": false}
1753 "#;
1754
1755 let schema = Arc::new(Schema::new(vec![
1756 Field::new("a", DataType::Int16, false),
1757 Field::new("b", DataType::Utf8, false),
1758 Field::new("c", DataType::Boolean, false),
1759 ]));
1760
1761 let batches = do_read(buf, 1024, true, true, schema);
1762 assert_eq!(batches.len(), 1);
1763
1764 let buf = r#"
1765 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1766 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1767 "#;
1768
1769 let schema = Arc::new(Schema::new(vec![
1770 Field::new("a", DataType::Int16, false),
1771 Field::new("b", DataType::Utf8, false),
1772 Field::new_struct(
1773 "c",
1774 vec![
1775 Field::new("a", DataType::Boolean, false),
1776 Field::new("b", DataType::Int16, false),
1777 ],
1778 false,
1779 ),
1780 ]));
1781
1782 let batches = do_read(buf, 1024, true, true, schema);
1783 assert_eq!(batches.len(), 1);
1784 }
1785
1786 #[test]
1787 fn test_strict_mode_missing_columns_in_schema() {
1788 let buf = r#"
1789 {"a": 1, "b": "2", "c": true}
1790 {"a": 2E0, "b": "4", "c": false}
1791 "#;
1792
1793 let schema = Arc::new(Schema::new(vec![
1794 Field::new("a", DataType::Int16, true),
1795 Field::new("c", DataType::Boolean, true),
1796 ]));
1797
1798 let err = ReaderBuilder::new(schema)
1799 .with_batch_size(1024)
1800 .with_strict_mode(true)
1801 .build(Cursor::new(buf.as_bytes()))
1802 .unwrap()
1803 .read()
1804 .unwrap_err();
1805
1806 assert_eq!(
1807 err.to_string(),
1808 "Json error: column 'b' missing from schema"
1809 );
1810
1811 let buf = r#"
1812 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1813 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1814 "#;
1815
1816 let schema = Arc::new(Schema::new(vec![
1817 Field::new("a", DataType::Int16, false),
1818 Field::new("b", DataType::Utf8, false),
1819 Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1820 ]));
1821
1822 let err = ReaderBuilder::new(schema)
1823 .with_batch_size(1024)
1824 .with_strict_mode(true)
1825 .build(Cursor::new(buf.as_bytes()))
1826 .unwrap()
1827 .read()
1828 .unwrap_err();
1829
1830 assert_eq!(
1831 err.to_string(),
1832 "Json error: whilst decoding field 'c': column 'b' missing from schema"
1833 );
1834 }
1835
1836 fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1837 let file = File::open(path).unwrap();
1838 let mut reader = BufReader::new(file);
1839 let schema = schema.unwrap_or_else(|| {
1840 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1841 reader.rewind().unwrap();
1842 schema
1843 });
1844 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1845 builder.build(reader).unwrap()
1846 }
1847
1848 #[test]
1849 fn test_json_basic() {
1850 let mut reader = read_file("test/data/basic.json", None);
1851 let batch = reader.next().unwrap().unwrap();
1852
1853 assert_eq!(8, batch.num_columns());
1854 assert_eq!(12, batch.num_rows());
1855
1856 let schema = reader.schema();
1857 let batch_schema = batch.schema();
1858 assert_eq!(schema, batch_schema);
1859
1860 let a = schema.column_with_name("a").unwrap();
1861 assert_eq!(0, a.0);
1862 assert_eq!(&DataType::Int64, a.1.data_type());
1863 let b = schema.column_with_name("b").unwrap();
1864 assert_eq!(1, b.0);
1865 assert_eq!(&DataType::Float64, b.1.data_type());
1866 let c = schema.column_with_name("c").unwrap();
1867 assert_eq!(2, c.0);
1868 assert_eq!(&DataType::Boolean, c.1.data_type());
1869 let d = schema.column_with_name("d").unwrap();
1870 assert_eq!(3, d.0);
1871 assert_eq!(&DataType::Utf8, d.1.data_type());
1872
1873 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1874 assert_eq!(1, aa.value(0));
1875 assert_eq!(-10, aa.value(1));
1876 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1877 assert_eq!(2.0, bb.value(0));
1878 assert_eq!(-3.5, bb.value(1));
1879 let cc = batch.column(c.0).as_boolean();
1880 assert!(!cc.value(0));
1881 assert!(cc.value(10));
1882 let dd = batch.column(d.0).as_string::<i32>();
1883 assert_eq!("4", dd.value(0));
1884 assert_eq!("text", dd.value(8));
1885 }
1886
1887 #[test]
1888 fn test_json_empty_projection() {
1889 let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1890 let batch = reader.next().unwrap().unwrap();
1891
1892 assert_eq!(0, batch.num_columns());
1893 assert_eq!(12, batch.num_rows());
1894 }
1895
1896 #[test]
1897 fn test_json_basic_with_nulls() {
1898 let mut reader = read_file("test/data/basic_nulls.json", None);
1899 let batch = reader.next().unwrap().unwrap();
1900
1901 assert_eq!(4, batch.num_columns());
1902 assert_eq!(12, batch.num_rows());
1903
1904 let schema = reader.schema();
1905 let batch_schema = batch.schema();
1906 assert_eq!(schema, batch_schema);
1907
1908 let a = schema.column_with_name("a").unwrap();
1909 assert_eq!(&DataType::Int64, a.1.data_type());
1910 let b = schema.column_with_name("b").unwrap();
1911 assert_eq!(&DataType::Float64, b.1.data_type());
1912 let c = schema.column_with_name("c").unwrap();
1913 assert_eq!(&DataType::Boolean, c.1.data_type());
1914 let d = schema.column_with_name("d").unwrap();
1915 assert_eq!(&DataType::Utf8, d.1.data_type());
1916
1917 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1918 assert!(aa.is_valid(0));
1919 assert!(!aa.is_valid(1));
1920 assert!(!aa.is_valid(11));
1921 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1922 assert!(bb.is_valid(0));
1923 assert!(!bb.is_valid(2));
1924 assert!(!bb.is_valid(11));
1925 let cc = batch.column(c.0).as_boolean();
1926 assert!(cc.is_valid(0));
1927 assert!(!cc.is_valid(4));
1928 assert!(!cc.is_valid(11));
1929 let dd = batch.column(d.0).as_string::<i32>();
1930 assert!(!dd.is_valid(0));
1931 assert!(dd.is_valid(1));
1932 assert!(!dd.is_valid(4));
1933 assert!(!dd.is_valid(11));
1934 }
1935
1936 #[test]
1937 fn test_json_basic_schema() {
1938 let schema = Schema::new(vec![
1939 Field::new("a", DataType::Int64, true),
1940 Field::new("b", DataType::Float32, false),
1941 Field::new("c", DataType::Boolean, false),
1942 Field::new("d", DataType::Utf8, false),
1943 ]);
1944
1945 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1946 let reader_schema = reader.schema();
1947 assert_eq!(reader_schema.as_ref(), &schema);
1948 let batch = reader.next().unwrap().unwrap();
1949
1950 assert_eq!(4, batch.num_columns());
1951 assert_eq!(12, batch.num_rows());
1952
1953 let schema = batch.schema();
1954
1955 let a = schema.column_with_name("a").unwrap();
1956 assert_eq!(&DataType::Int64, a.1.data_type());
1957 let b = schema.column_with_name("b").unwrap();
1958 assert_eq!(&DataType::Float32, b.1.data_type());
1959 let c = schema.column_with_name("c").unwrap();
1960 assert_eq!(&DataType::Boolean, c.1.data_type());
1961 let d = schema.column_with_name("d").unwrap();
1962 assert_eq!(&DataType::Utf8, d.1.data_type());
1963
1964 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1965 assert_eq!(1, aa.value(0));
1966 assert_eq!(100000000000000, aa.value(11));
1967 let bb = batch.column(b.0).as_primitive::<Float32Type>();
1968 assert_eq!(2.0, bb.value(0));
1969 assert_eq!(-3.5, bb.value(1));
1970 }
1971
1972 #[test]
1973 fn test_json_basic_schema_projection() {
1974 let schema = Schema::new(vec![
1975 Field::new("a", DataType::Int64, true),
1976 Field::new("c", DataType::Boolean, false),
1977 ]);
1978
1979 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1980 let batch = reader.next().unwrap().unwrap();
1981
1982 assert_eq!(2, batch.num_columns());
1983 assert_eq!(2, batch.schema().fields().len());
1984 assert_eq!(12, batch.num_rows());
1985
1986 assert_eq!(batch.schema().as_ref(), &schema);
1987
1988 let a = schema.column_with_name("a").unwrap();
1989 assert_eq!(0, a.0);
1990 assert_eq!(&DataType::Int64, a.1.data_type());
1991 let c = schema.column_with_name("c").unwrap();
1992 assert_eq!(1, c.0);
1993 assert_eq!(&DataType::Boolean, c.1.data_type());
1994 }
1995
1996 #[test]
1997 fn test_json_arrays() {
1998 let mut reader = read_file("test/data/arrays.json", None);
1999 let batch = reader.next().unwrap().unwrap();
2000
2001 assert_eq!(4, batch.num_columns());
2002 assert_eq!(3, batch.num_rows());
2003
2004 let schema = batch.schema();
2005
2006 let a = schema.column_with_name("a").unwrap();
2007 assert_eq!(&DataType::Int64, a.1.data_type());
2008 let b = schema.column_with_name("b").unwrap();
2009 assert_eq!(
2010 &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
2011 b.1.data_type()
2012 );
2013 let c = schema.column_with_name("c").unwrap();
2014 assert_eq!(
2015 &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2016 c.1.data_type()
2017 );
2018 let d = schema.column_with_name("d").unwrap();
2019 assert_eq!(&DataType::Utf8, d.1.data_type());
2020
2021 let aa = batch.column(a.0).as_primitive::<Int64Type>();
2022 assert_eq!(1, aa.value(0));
2023 assert_eq!(-10, aa.value(1));
2024 assert_eq!(1627668684594000000, aa.value(2));
2025 let bb = batch.column(b.0).as_list::<i32>();
2026 let bb = bb.values().as_primitive::<Float64Type>();
2027 assert_eq!(9, bb.len());
2028 assert_eq!(2.0, bb.value(0));
2029 assert_eq!(-6.1, bb.value(5));
2030 assert!(!bb.is_valid(7));
2031
2032 let cc = batch
2033 .column(c.0)
2034 .as_any()
2035 .downcast_ref::<ListArray>()
2036 .unwrap();
2037 let cc = cc.values().as_boolean();
2038 assert_eq!(6, cc.len());
2039 assert!(!cc.value(0));
2040 assert!(!cc.value(4));
2041 assert!(!cc.is_valid(5));
2042 }
2043
2044 #[test]
2045 fn test_empty_json_arrays() {
2046 let json_content = r#"
2047 {"items": []}
2048 {"items": null}
2049 {}
2050 "#;
2051
2052 let schema = Arc::new(Schema::new(vec![Field::new(
2053 "items",
2054 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2055 true,
2056 )]));
2057
2058 let batches = do_read(json_content, 1024, false, false, schema);
2059 assert_eq!(batches.len(), 1);
2060
2061 let col1 = batches[0].column(0).as_list::<i32>();
2062 assert_eq!(col1.null_count(), 2);
2063 assert!(col1.value(0).is_empty());
2064 assert_eq!(col1.value(0).data_type(), &DataType::Null);
2065 assert!(col1.is_null(1));
2066 assert!(col1.is_null(2));
2067 }
2068
2069 #[test]
2070 fn test_nested_empty_json_arrays() {
2071 let json_content = r#"
2072 {"items": [[],[]]}
2073 {"items": [[null, null],[null]]}
2074 "#;
2075
2076 let schema = Arc::new(Schema::new(vec![Field::new(
2077 "items",
2078 DataType::List(FieldRef::new(Field::new_list_field(
2079 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2080 true,
2081 ))),
2082 true,
2083 )]));
2084
2085 let batches = do_read(json_content, 1024, false, false, schema);
2086 assert_eq!(batches.len(), 1);
2087
2088 let col1 = batches[0].column(0).as_list::<i32>();
2089 assert_eq!(col1.null_count(), 0);
2090 assert_eq!(col1.value(0).len(), 2);
2091 assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2092 assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2093
2094 assert_eq!(col1.value(1).len(), 2);
2095 assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2096 assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2097 }
2098
2099 #[test]
2100 fn test_nested_list_json_arrays() {
2101 let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2102 let a_struct_field = Field::new_struct(
2103 "a",
2104 vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2105 true,
2106 );
2107 let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2108 let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2109 let builder = ReaderBuilder::new(schema).with_batch_size(64);
2110 let json_content = r#"
2111 {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2112 {"a": [{"b": false, "c": null}]}
2113 {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2114 {"a": null}
2115 {"a": []}
2116 {"a": [null]}
2117 "#;
2118 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2119
2120 let d = StringArray::from(vec![
2122 Some("a_text"),
2123 Some("b_text"),
2124 None,
2125 Some("c_text"),
2126 Some("d_text"),
2127 None,
2128 None,
2129 ]);
2130 let c = ArrayDataBuilder::new(c_field.data_type().clone())
2131 .len(7)
2132 .add_child_data(d.to_data())
2133 .null_bit_buffer(Some(Buffer::from([0b00111011])))
2134 .build()
2135 .unwrap();
2136 let b = BooleanArray::from(vec![
2137 Some(true),
2138 Some(false),
2139 Some(false),
2140 Some(true),
2141 None,
2142 Some(true),
2143 None,
2144 ]);
2145 let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2146 .len(7)
2147 .add_child_data(b.to_data())
2148 .add_child_data(c.clone())
2149 .null_bit_buffer(Some(Buffer::from([0b00111111])))
2150 .build()
2151 .unwrap();
2152 let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2153 .len(6)
2154 .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2155 .add_child_data(a)
2156 .null_bit_buffer(Some(Buffer::from([0b00110111])))
2157 .build()
2158 .unwrap();
2159 let expected = make_array(a_list);
2160
2161 let batch = reader.next().unwrap().unwrap();
2163 let read = batch.column(0);
2164 assert_eq!(read.len(), 6);
2165 let read: &ListArray = read.as_list::<i32>();
2167 let expected = expected.as_list::<i32>();
2168 assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2169 assert_eq!(read.nulls(), expected.nulls());
2171 let struct_array = read.values().as_struct();
2173 let expected_struct_array = expected.values().as_struct();
2174
2175 assert_eq!(7, struct_array.len());
2176 assert_eq!(1, struct_array.null_count());
2177 assert_eq!(7, expected_struct_array.len());
2178 assert_eq!(1, expected_struct_array.null_count());
2179 assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2181 let read_b = struct_array.column(0);
2183 assert_eq!(read_b.as_ref(), &b);
2184 let read_c = struct_array.column(1);
2185 assert_eq!(read_c.to_data(), c);
2186 let read_c = read_c.as_struct();
2187 let read_d = read_c.column(0);
2188 assert_eq!(read_d.as_ref(), &d);
2189
2190 assert_eq!(read, expected);
2191 }
2192
2193 #[test]
2194 fn test_skip_empty_lines() {
2195 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2196 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2197 let json_content = "
2198 {\"a\": 1}
2199 {\"a\": 2}
2200 {\"a\": 3}";
2201 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2202 let batch = reader.next().unwrap().unwrap();
2203
2204 assert_eq!(1, batch.num_columns());
2205 assert_eq!(3, batch.num_rows());
2206
2207 let schema = reader.schema();
2208 let c = schema.column_with_name("a").unwrap();
2209 assert_eq!(&DataType::Int64, c.1.data_type());
2210 }
2211
2212 #[test]
2213 fn test_with_multiple_batches() {
2214 let file = File::open("test/data/basic_nulls.json").unwrap();
2215 let mut reader = BufReader::new(file);
2216 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2217 reader.rewind().unwrap();
2218
2219 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2220 let mut reader = builder.build(reader).unwrap();
2221
2222 let mut num_records = Vec::new();
2223 while let Some(rb) = reader.next().transpose().unwrap() {
2224 num_records.push(rb.num_rows());
2225 }
2226
2227 assert_eq!(vec![5, 5, 2], num_records);
2228 }
2229
2230 #[test]
2231 fn test_timestamp_from_json_seconds() {
2232 let schema = Schema::new(vec![Field::new(
2233 "a",
2234 DataType::Timestamp(TimeUnit::Second, None),
2235 true,
2236 )]);
2237
2238 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2239 let batch = reader.next().unwrap().unwrap();
2240
2241 assert_eq!(1, batch.num_columns());
2242 assert_eq!(12, batch.num_rows());
2243
2244 let schema = reader.schema();
2245 let batch_schema = batch.schema();
2246 assert_eq!(schema, batch_schema);
2247
2248 let a = schema.column_with_name("a").unwrap();
2249 assert_eq!(
2250 &DataType::Timestamp(TimeUnit::Second, None),
2251 a.1.data_type()
2252 );
2253
2254 let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2255 assert!(aa.is_valid(0));
2256 assert!(!aa.is_valid(1));
2257 assert!(!aa.is_valid(2));
2258 assert_eq!(1, aa.value(0));
2259 assert_eq!(1, aa.value(3));
2260 assert_eq!(5, aa.value(7));
2261 }
2262
2263 #[test]
2264 fn test_timestamp_from_json_milliseconds() {
2265 let schema = Schema::new(vec![Field::new(
2266 "a",
2267 DataType::Timestamp(TimeUnit::Millisecond, None),
2268 true,
2269 )]);
2270
2271 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2272 let batch = reader.next().unwrap().unwrap();
2273
2274 assert_eq!(1, batch.num_columns());
2275 assert_eq!(12, batch.num_rows());
2276
2277 let schema = reader.schema();
2278 let batch_schema = batch.schema();
2279 assert_eq!(schema, batch_schema);
2280
2281 let a = schema.column_with_name("a").unwrap();
2282 assert_eq!(
2283 &DataType::Timestamp(TimeUnit::Millisecond, None),
2284 a.1.data_type()
2285 );
2286
2287 let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2288 assert!(aa.is_valid(0));
2289 assert!(!aa.is_valid(1));
2290 assert!(!aa.is_valid(2));
2291 assert_eq!(1, aa.value(0));
2292 assert_eq!(1, aa.value(3));
2293 assert_eq!(5, aa.value(7));
2294 }
2295
2296 #[test]
2297 fn test_date_from_json_milliseconds() {
2298 let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2299
2300 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2301 let batch = reader.next().unwrap().unwrap();
2302
2303 assert_eq!(1, batch.num_columns());
2304 assert_eq!(12, batch.num_rows());
2305
2306 let schema = reader.schema();
2307 let batch_schema = batch.schema();
2308 assert_eq!(schema, batch_schema);
2309
2310 let a = schema.column_with_name("a").unwrap();
2311 assert_eq!(&DataType::Date64, a.1.data_type());
2312
2313 let aa = batch.column(a.0).as_primitive::<Date64Type>();
2314 assert!(aa.is_valid(0));
2315 assert!(!aa.is_valid(1));
2316 assert!(!aa.is_valid(2));
2317 assert_eq!(1, aa.value(0));
2318 assert_eq!(1, aa.value(3));
2319 assert_eq!(5, aa.value(7));
2320 }
2321
2322 #[test]
2323 fn test_time_from_json_nanoseconds() {
2324 let schema = Schema::new(vec![Field::new(
2325 "a",
2326 DataType::Time64(TimeUnit::Nanosecond),
2327 true,
2328 )]);
2329
2330 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2331 let batch = reader.next().unwrap().unwrap();
2332
2333 assert_eq!(1, batch.num_columns());
2334 assert_eq!(12, batch.num_rows());
2335
2336 let schema = reader.schema();
2337 let batch_schema = batch.schema();
2338 assert_eq!(schema, batch_schema);
2339
2340 let a = schema.column_with_name("a").unwrap();
2341 assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2342
2343 let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2344 assert!(aa.is_valid(0));
2345 assert!(!aa.is_valid(1));
2346 assert!(!aa.is_valid(2));
2347 assert_eq!(1, aa.value(0));
2348 assert_eq!(1, aa.value(3));
2349 assert_eq!(5, aa.value(7));
2350 }
2351
2352 #[test]
2353 fn test_json_iterator() {
2354 let file = File::open("test/data/basic.json").unwrap();
2355 let mut reader = BufReader::new(file);
2356 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2357 reader.rewind().unwrap();
2358
2359 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2360 let reader = builder.build(reader).unwrap();
2361 let schema = reader.schema();
2362 let (col_a_index, _) = schema.column_with_name("a").unwrap();
2363
2364 let mut sum_num_rows = 0;
2365 let mut num_batches = 0;
2366 let mut sum_a = 0;
2367 for batch in reader {
2368 let batch = batch.unwrap();
2369 assert_eq!(8, batch.num_columns());
2370 sum_num_rows += batch.num_rows();
2371 num_batches += 1;
2372 let batch_schema = batch.schema();
2373 assert_eq!(schema, batch_schema);
2374 let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2375 sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2376 }
2377 assert_eq!(12, sum_num_rows);
2378 assert_eq!(3, num_batches);
2379 assert_eq!(100000000000011, sum_a);
2380 }
2381
2382 #[test]
2383 fn test_decoder_error() {
2384 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2385 "a",
2386 vec![Field::new("child", DataType::Int32, false)],
2387 true,
2388 )]));
2389
2390 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2391 let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2392 assert!(decoder.tape_decoder.has_partial_row());
2393 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2394 let _ = decoder.flush().unwrap_err();
2395 assert!(decoder.tape_decoder.has_partial_row());
2396 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2397
2398 let parse_err = |s: &str| {
2399 ReaderBuilder::new(schema.clone())
2400 .build(Cursor::new(s.as_bytes()))
2401 .unwrap()
2402 .next()
2403 .unwrap()
2404 .unwrap_err()
2405 .to_string()
2406 };
2407
2408 let err = parse_err(r#"{"a": 123}"#);
2409 assert_eq!(
2410 err,
2411 "Json error: whilst decoding field 'a': expected { got 123"
2412 );
2413
2414 let err = parse_err(r#"{"a": ["bar"]}"#);
2415 assert_eq!(
2416 err,
2417 r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2418 );
2419
2420 let err = parse_err(r#"{"a": []}"#);
2421 assert_eq!(
2422 err,
2423 "Json error: whilst decoding field 'a': expected { got []"
2424 );
2425
2426 let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2427 assert_eq!(
2428 err,
2429 r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2430 );
2431
2432 let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2433 assert_eq!(
2434 err,
2435 r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2436 );
2437
2438 let err = parse_err(r#"{"a": true}"#);
2439 assert_eq!(
2440 err,
2441 "Json error: whilst decoding field 'a': expected { got true"
2442 );
2443
2444 let err = parse_err(r#"{"a": false}"#);
2445 assert_eq!(
2446 err,
2447 "Json error: whilst decoding field 'a': expected { got false"
2448 );
2449
2450 let err = parse_err(r#"{"a": "foo"}"#);
2451 assert_eq!(
2452 err,
2453 "Json error: whilst decoding field 'a': expected { got \"foo\""
2454 );
2455
2456 let err = parse_err(r#"{"a": {"child": false}}"#);
2457 assert_eq!(
2458 err,
2459 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2460 );
2461
2462 let err = parse_err(r#"{"a": {"child": []}}"#);
2463 assert_eq!(
2464 err,
2465 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2466 );
2467
2468 let err = parse_err(r#"{"a": {"child": [123]}}"#);
2469 assert_eq!(
2470 err,
2471 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2472 );
2473
2474 let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2475 assert_eq!(
2476 err,
2477 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2478 );
2479 }
2480
2481 #[test]
2482 fn test_serialize_timestamp() {
2483 let json = vec![
2484 json!({"timestamp": 1681319393}),
2485 json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2486 ];
2487 let schema = Schema::new(vec![Field::new(
2488 "timestamp",
2489 DataType::Timestamp(TimeUnit::Second, None),
2490 true,
2491 )]);
2492 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2493 .build_decoder()
2494 .unwrap();
2495 decoder.serialize(&json).unwrap();
2496 let batch = decoder.flush().unwrap().unwrap();
2497 assert_eq!(batch.num_rows(), 2);
2498 assert_eq!(batch.num_columns(), 1);
2499 let values = batch.column(0).as_primitive::<TimestampSecondType>();
2500 assert_eq!(values.values(), &[1681319393, -7200]);
2501 }
2502
2503 #[test]
2504 fn test_serialize_decimal() {
2505 let json = vec![
2506 json!({"decimal": 1.234}),
2507 json!({"decimal": "1.234"}),
2508 json!({"decimal": 1234}),
2509 json!({"decimal": "1234"}),
2510 ];
2511 let schema = Schema::new(vec![Field::new(
2512 "decimal",
2513 DataType::Decimal128(10, 3),
2514 true,
2515 )]);
2516 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2517 .build_decoder()
2518 .unwrap();
2519 decoder.serialize(&json).unwrap();
2520 let batch = decoder.flush().unwrap().unwrap();
2521 assert_eq!(batch.num_rows(), 4);
2522 assert_eq!(batch.num_columns(), 1);
2523 let values = batch.column(0).as_primitive::<Decimal128Type>();
2524 assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2525 }
2526
2527 #[test]
2528 fn test_serde_field() {
2529 let field = Field::new("int", DataType::Int32, true);
2530 let mut decoder = ReaderBuilder::new_with_field(field)
2531 .build_decoder()
2532 .unwrap();
2533 decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2534 let b = decoder.flush().unwrap().unwrap();
2535 let values = b.column(0).as_primitive::<Int32Type>().values();
2536 assert_eq!(values, &[1, 2, 3, 4]);
2537 }
2538
2539 #[test]
2540 fn test_serde_large_numbers() {
2541 let field = Field::new("int", DataType::Int64, true);
2542 let mut decoder = ReaderBuilder::new_with_field(field)
2543 .build_decoder()
2544 .unwrap();
2545
2546 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2547 let b = decoder.flush().unwrap().unwrap();
2548 let values = b.column(0).as_primitive::<Int64Type>().values();
2549 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2550
2551 let field = Field::new(
2552 "int",
2553 DataType::Timestamp(TimeUnit::Microsecond, None),
2554 true,
2555 );
2556 let mut decoder = ReaderBuilder::new_with_field(field)
2557 .build_decoder()
2558 .unwrap();
2559
2560 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2561 let b = decoder.flush().unwrap().unwrap();
2562 let values = b
2563 .column(0)
2564 .as_primitive::<TimestampMicrosecondType>()
2565 .values();
2566 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2567 }
2568
2569 #[test]
2570 fn test_coercing_primitive_into_string_decoder() {
2571 let buf = &format!(
2572 r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2573 (i32::MAX as i64 + 10),
2574 i64::MAX - 10
2575 );
2576 let schema = Schema::new(vec![
2577 Field::new("a", DataType::Float64, true),
2578 Field::new("b", DataType::Utf8, true),
2579 Field::new("c", DataType::Utf8, true),
2580 ]);
2581 let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2582 let schema_ref = Arc::new(schema);
2583
2584 let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2586 let mut decoder = reader.build_decoder().unwrap();
2587 decoder.serialize(json_array.as_slice()).unwrap();
2588 let batch = decoder.flush().unwrap().unwrap();
2589 assert_eq!(
2590 batch,
2591 RecordBatch::try_new(
2592 schema_ref,
2593 vec![
2594 Arc::new(Float64Array::from(vec![
2595 1.0,
2596 2.0,
2597 (i32::MAX as i64 + 10) as f64,
2598 (i64::MAX - 10) as f64
2599 ])),
2600 Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2601 Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2602 ]
2603 )
2604 .unwrap()
2605 );
2606 }
2607
2608 fn _parse_structs(
2613 row: &str,
2614 struct_mode: StructMode,
2615 fields: Fields,
2616 as_struct: bool,
2617 ) -> Result<RecordBatch, ArrowError> {
2618 let builder = if as_struct {
2619 ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2620 } else {
2621 ReaderBuilder::new(Arc::new(Schema::new(fields)))
2622 };
2623 builder
2624 .with_struct_mode(struct_mode)
2625 .build(Cursor::new(row.as_bytes()))
2626 .unwrap()
2627 .next()
2628 .unwrap()
2629 }
2630
2631 #[test]
2632 fn test_struct_decoding_list_length() {
2633 use arrow_array::array;
2634
2635 let row = "[1, 2]";
2636
2637 let mut fields = vec![Field::new("a", DataType::Int32, true)];
2638 let too_few_fields = Fields::from(fields.clone());
2639 fields.push(Field::new("b", DataType::Int32, true));
2640 let correct_fields = Fields::from(fields.clone());
2641 fields.push(Field::new("c", DataType::Int32, true));
2642 let too_many_fields = Fields::from(fields.clone());
2643
2644 let parse = |fields: Fields, as_struct: bool| {
2645 _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2646 };
2647
2648 let expected_row = StructArray::new(
2649 correct_fields.clone(),
2650 vec![
2651 Arc::new(array::Int32Array::from(vec![1])),
2652 Arc::new(array::Int32Array::from(vec![2])),
2653 ],
2654 None,
2655 );
2656 let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2657
2658 assert_eq!(
2659 parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2660 "Json error: found extra columns for 1 fields".to_string()
2661 );
2662 assert_eq!(
2663 parse(too_few_fields, false).unwrap_err().to_string(),
2664 "Json error: found extra columns for 1 fields".to_string()
2665 );
2666 assert_eq!(
2667 parse(correct_fields.clone(), true).unwrap(),
2668 RecordBatch::try_new(
2669 Arc::new(Schema::new(vec![row_field])),
2670 vec![Arc::new(expected_row.clone())]
2671 )
2672 .unwrap()
2673 );
2674 assert_eq!(
2675 parse(correct_fields, false).unwrap(),
2676 RecordBatch::from(expected_row)
2677 );
2678 assert_eq!(
2679 parse(too_many_fields.clone(), true)
2680 .unwrap_err()
2681 .to_string(),
2682 "Json error: found 2 columns for 3 fields".to_string()
2683 );
2684 assert_eq!(
2685 parse(too_many_fields, false).unwrap_err().to_string(),
2686 "Json error: found 2 columns for 3 fields".to_string()
2687 );
2688 }
2689
2690 #[test]
2691 fn test_struct_decoding() {
2692 use arrow_array::builder;
2693
2694 let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2695 let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2696 let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2697
2698 let struct_fields = Fields::from(vec![
2699 Field::new("b", DataType::new_list(DataType::Int32, true), true),
2700 Field::new_map(
2701 "c",
2702 "entries",
2703 Field::new("keys", DataType::Utf8, false),
2704 Field::new("values", DataType::Int32, true),
2705 false,
2706 false,
2707 ),
2708 ]);
2709
2710 let list_array =
2711 ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2712
2713 let map_array = {
2714 let mut map_builder = builder::MapBuilder::new(
2715 None,
2716 builder::StringBuilder::new(),
2717 builder::Int32Builder::new(),
2718 );
2719 map_builder.keys().append_value("d");
2720 map_builder.values().append_value(3);
2721 map_builder.append(true).unwrap();
2722 map_builder.finish()
2723 };
2724
2725 let struct_array = StructArray::new(
2726 struct_fields.clone(),
2727 vec![Arc::new(list_array), Arc::new(map_array)],
2728 None,
2729 );
2730
2731 let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2732 let schema = Arc::new(Schema::new(fields.clone()));
2733 let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2734
2735 let parse = |row: &str, struct_mode: StructMode| {
2736 _parse_structs(row, struct_mode, fields.clone(), false)
2737 };
2738
2739 assert_eq!(
2740 parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2741 expected
2742 );
2743 assert_eq!(
2744 parse(nested_list_json, StructMode::ObjectOnly)
2745 .unwrap_err()
2746 .to_string(),
2747 "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2748 );
2749 assert_eq!(
2750 parse(nested_mixed_json, StructMode::ObjectOnly)
2751 .unwrap_err()
2752 .to_string(),
2753 "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2754 );
2755
2756 assert_eq!(
2757 parse(nested_list_json, StructMode::ListOnly).unwrap(),
2758 expected
2759 );
2760 assert_eq!(
2761 parse(nested_object_json, StructMode::ListOnly)
2762 .unwrap_err()
2763 .to_string(),
2764 "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2765 );
2766 assert_eq!(
2767 parse(nested_mixed_json, StructMode::ListOnly)
2768 .unwrap_err()
2769 .to_string(),
2770 "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2771 );
2772 }
2773
2774 #[test]
2780 fn test_struct_decoding_empty_list() {
2781 let int_field = Field::new("a", DataType::Int32, true);
2782 let struct_field = Field::new(
2783 "r",
2784 DataType::Struct(Fields::from(vec![int_field.clone()])),
2785 true,
2786 );
2787
2788 let parse = |row: &str, as_struct: bool, field: Field| {
2789 _parse_structs(
2790 row,
2791 StructMode::ListOnly,
2792 Fields::from(vec![field]),
2793 as_struct,
2794 )
2795 };
2796
2797 assert_eq!(
2799 parse("[]", true, struct_field.clone())
2800 .unwrap_err()
2801 .to_string(),
2802 "Json error: found 0 columns for 1 fields".to_owned()
2803 );
2804 assert_eq!(
2805 parse("[]", false, int_field.clone())
2806 .unwrap_err()
2807 .to_string(),
2808 "Json error: found 0 columns for 1 fields".to_owned()
2809 );
2810 assert_eq!(
2811 parse("[]", false, struct_field.clone())
2812 .unwrap_err()
2813 .to_string(),
2814 "Json error: found 0 columns for 1 fields".to_owned()
2815 );
2816 assert_eq!(
2817 parse("[[]]", false, struct_field.clone())
2818 .unwrap_err()
2819 .to_string(),
2820 "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2821 );
2822 }
2823
2824 #[test]
2825 fn test_decode_list_struct_with_wrong_types() {
2826 let int_field = Field::new("a", DataType::Int32, true);
2827 let struct_field = Field::new(
2828 "r",
2829 DataType::Struct(Fields::from(vec![int_field.clone()])),
2830 true,
2831 );
2832
2833 let parse = |row: &str, as_struct: bool, field: Field| {
2834 _parse_structs(
2835 row,
2836 StructMode::ListOnly,
2837 Fields::from(vec![field]),
2838 as_struct,
2839 )
2840 };
2841
2842 assert_eq!(
2844 parse(r#"[["a"]]"#, false, struct_field.clone())
2845 .unwrap_err()
2846 .to_string(),
2847 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2848 );
2849 assert_eq!(
2850 parse(r#"[["a"]]"#, true, struct_field.clone())
2851 .unwrap_err()
2852 .to_string(),
2853 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2854 );
2855 assert_eq!(
2856 parse(r#"["a"]"#, true, int_field.clone())
2857 .unwrap_err()
2858 .to_string(),
2859 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2860 );
2861 assert_eq!(
2862 parse(r#"["a"]"#, false, int_field.clone())
2863 .unwrap_err()
2864 .to_string(),
2865 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2866 );
2867 }
2868
2869 #[test]
2870 fn test_read_run_end_encoded() {
2871 let buf = r#"
2872 {"a": "x"}
2873 {"a": "x"}
2874 {"a": "y"}
2875 {"a": "y"}
2876 {"a": "y"}
2877 "#;
2878
2879 let ree_type = DataType::RunEndEncoded(
2880 Arc::new(Field::new("run_ends", DataType::Int32, false)),
2881 Arc::new(Field::new("values", DataType::Utf8, true)),
2882 );
2883 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2884 let batches = do_read(buf, 1024, false, false, schema);
2885 assert_eq!(batches.len(), 1);
2886
2887 let col = batches[0].column(0);
2888 let run_array = col.as_run::<arrow_array::types::Int32Type>();
2889
2890 assert_eq!(run_array.len(), 5);
2892 assert_eq!(run_array.run_ends().values(), &[2, 5]);
2893
2894 let values = run_array.values().as_string::<i32>();
2895 assert_eq!(values.len(), 2);
2896 assert_eq!(values.value(0), "x");
2897 assert_eq!(values.value(1), "y");
2898 }
2899
2900 #[test]
2901 fn test_read_run_end_encoded_consecutive_nulls() {
2902 let buf = r#"
2903 {"a": "x"}
2904 {}
2905 {}
2906 {}
2907 {"a": "y"}
2908 "#;
2909
2910 let ree_type = DataType::RunEndEncoded(
2911 Arc::new(Field::new("run_ends", DataType::Int32, false)),
2912 Arc::new(Field::new("values", DataType::Utf8, true)),
2913 );
2914 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2915 let batches = do_read(buf, 1024, false, false, schema);
2916 assert_eq!(batches.len(), 1);
2917
2918 let col = batches[0].column(0);
2919 let run_array = col.as_run::<arrow_array::types::Int32Type>();
2920
2921 assert_eq!(run_array.len(), 5);
2923 assert_eq!(run_array.run_ends().values(), &[1, 4, 5]);
2924
2925 let values = run_array.values().as_string::<i32>();
2926 assert_eq!(values.len(), 3);
2927 assert_eq!(values.value(0), "x");
2928 assert!(values.is_null(1));
2929 assert_eq!(values.value(2), "y");
2930 }
2931
2932 #[test]
2933 fn test_read_run_end_encoded_all_unique() {
2934 let buf = r#"
2935 {"a": 1}
2936 {"a": 2}
2937 {"a": 3}
2938 "#;
2939
2940 let ree_type = DataType::RunEndEncoded(
2941 Arc::new(Field::new("run_ends", DataType::Int32, false)),
2942 Arc::new(Field::new("values", DataType::Int32, true)),
2943 );
2944 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2945 let batches = do_read(buf, 1024, false, false, schema);
2946 assert_eq!(batches.len(), 1);
2947
2948 let col = batches[0].column(0);
2949 let run_array = col.as_run::<arrow_array::types::Int32Type>();
2950
2951 assert_eq!(run_array.len(), 3);
2953 assert_eq!(run_array.run_ends().values(), &[1, 2, 3]);
2954 }
2955
2956 #[test]
2957 fn test_read_run_end_encoded_int16_run_ends() {
2958 let buf = r#"
2959 {"a": "x"}
2960 {"a": "x"}
2961 {"a": "y"}
2962 "#;
2963
2964 let ree_type = DataType::RunEndEncoded(
2965 Arc::new(Field::new("run_ends", DataType::Int16, false)),
2966 Arc::new(Field::new("values", DataType::Utf8, true)),
2967 );
2968 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2969 let batches = do_read(buf, 1024, false, false, schema);
2970 assert_eq!(batches.len(), 1);
2971
2972 let col = batches[0].column(0);
2973 let run_array = col.as_run::<arrow_array::types::Int16Type>();
2974
2975 assert_eq!(run_array.len(), 3);
2976 assert_eq!(run_array.run_ends().values(), &[2i16, 3]);
2977 }
2978}