Skip to main content

arrow_json/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! JSON reader
19//!
20//! This JSON reader allows JSON records to be read into the Arrow memory
21//! model. Records are loaded in batches and are then converted from the record-oriented
22//! representation to the columnar arrow data model.
23//!
24//! The reader ignores whitespace between JSON values, including `\n` and `\r`, allowing
25//! parsing of sequences of one or more arbitrarily formatted JSON values, including
26//! but not limited to newline-delimited JSON.
27//!
28//! # Basic Usage
29//!
30//! [`Reader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
31//!
32//! ```
33//! # use arrow_schema::*;
34//! # use std::fs::File;
35//! # use std::io::BufReader;
36//! # use std::sync::Arc;
37//!
38//! let schema = Arc::new(Schema::new(vec![
39//!     Field::new("a", DataType::Float64, false),
40//!     Field::new("b", DataType::Float64, false),
41//!     Field::new("c", DataType::Boolean, true),
42//! ]));
43//!
44//! let file = File::open("test/data/basic.json").unwrap();
45//!
46//! let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
47//! let batch = json.next().unwrap().unwrap();
48//! ```
49//!
50//! # Async Usage
51//!
52//! The lower-level [`Decoder`] can be integrated with various forms of async data streams,
53//! and is designed to be agnostic to the various different kinds of async IO primitives found
54//! within the Rust ecosystem.
55//!
56//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
57//!
58//! ```
59//! # use std::task::{Poll, ready};
60//! # use bytes::{Buf, Bytes};
61//! # use arrow_schema::ArrowError;
62//! # use futures::stream::{Stream, StreamExt};
63//! # use arrow_array::RecordBatch;
64//! # use arrow_json::reader::Decoder;
65//! #
66//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
67//!     mut decoder: Decoder,
68//!     mut input: S,
69//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
70//!     let mut buffered = Bytes::new();
71//!     futures::stream::poll_fn(move |cx| {
72//!         loop {
73//!             if buffered.is_empty() {
74//!                 buffered = match ready!(input.poll_next_unpin(cx)) {
75//!                     Some(b) => b,
76//!                     None => break,
77//!                 };
78//!             }
79//!             let decoded = match decoder.decode(buffered.as_ref()) {
80//!                 Ok(decoded) => decoded,
81//!                 Err(e) => return Poll::Ready(Some(Err(e))),
82//!             };
83//!             let read = buffered.len();
84//!             buffered.advance(decoded);
85//!             if decoded != read {
86//!                 break
87//!             }
88//!         }
89//!
90//!         Poll::Ready(decoder.flush().transpose())
91//!     })
92//! }
93//!
94//! ```
95//!
96//! In a similar vein, it can also be used with tokio-based IO primitives
97//!
98//! ```
99//! # use std::sync::Arc;
100//! # use arrow_schema::{DataType, Field, Schema};
101//! # use std::pin::Pin;
102//! # use std::task::{Poll, ready};
103//! # use futures::{Stream, TryStreamExt};
104//! # use tokio::io::AsyncBufRead;
105//! # use arrow_array::RecordBatch;
106//! # use arrow_json::reader::Decoder;
107//! # use arrow_schema::ArrowError;
108//! fn decode_stream<R: AsyncBufRead + Unpin>(
109//!     mut decoder: Decoder,
110//!     mut reader: R,
111//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
112//!     futures::stream::poll_fn(move |cx| {
113//!         loop {
114//!             let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
115//!                 Ok(b) if b.is_empty() => break,
116//!                 Ok(b) => b,
117//!                 Err(e) => return Poll::Ready(Some(Err(e.into()))),
118//!             };
119//!             let read = b.len();
120//!             let decoded = match decoder.decode(b) {
121//!                 Ok(decoded) => decoded,
122//!                 Err(e) => return Poll::Ready(Some(Err(e))),
123//!             };
124//!             Pin::new(&mut reader).consume(decoded);
125//!             if decoded != read {
126//!                 break;
127//!             }
128//!         }
129//!
130//!         Poll::Ready(decoder.flush().transpose())
131//!     })
132//! }
133//! ```
134//!
135
136use std::borrow::Cow;
137use std::io::BufRead;
138use std::sync::Arc;
139
140use arrow_array::cast::AsArray;
141use arrow_array::timezone::Tz;
142use arrow_array::types::*;
143use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader, downcast_integer};
144use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
145use chrono::Utc;
146use serde_core::Serialize;
147
148use crate::StructMode;
149use crate::reader::binary_array::{
150    BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
151};
152use crate::reader::boolean_array::BooleanArrayDecoder;
153use crate::reader::decimal_array::DecimalArrayDecoder;
154use crate::reader::list_array::{
155    FixedSizeListArrayDecoder, ListArrayDecoder, ListViewArrayDecoder,
156};
157use crate::reader::map_array::MapArrayDecoder;
158use crate::reader::null_array::NullArrayDecoder;
159use crate::reader::primitive_array::PrimitiveArrayDecoder;
160use crate::reader::run_end_array::RunEndEncodedArrayDecoder;
161use crate::reader::string_array::StringArrayDecoder;
162use crate::reader::string_view_array::StringViewArrayDecoder;
163use crate::reader::struct_array::StructArrayDecoder;
164use crate::reader::tape::{Tape, TapeDecoder};
165use crate::reader::timestamp_array::TimestampArrayDecoder;
166
167pub use schema::*;
168pub use value_iter::ValueIter;
169
170mod binary_array;
171mod boolean_array;
172mod decimal_array;
173mod list_array;
174mod map_array;
175mod null_array;
176mod primitive_array;
177mod run_end_array;
178mod schema;
179mod serializer;
180mod string_array;
181mod string_view_array;
182mod struct_array;
183mod tape;
184mod timestamp_array;
185mod value_iter;
186
187/// A builder for [`Reader`] and [`Decoder`]
188pub struct ReaderBuilder {
189    batch_size: usize,
190    coerce_primitive: bool,
191    strict_mode: bool,
192    ignore_type_conflicts: bool,
193    is_field: bool,
194    struct_mode: StructMode,
195
196    schema: SchemaRef,
197}
198
199impl ReaderBuilder {
200    /// Create a new [`ReaderBuilder`] with the provided [`SchemaRef`]
201    ///
202    /// This could be obtained using [`infer_json_schema`] if not known
203    ///
204    /// Any columns not present in `schema` will be ignored, unless `strict_mode` is set to true.
205    /// In this case, an error is returned when a column is missing from `schema`.
206    ///
207    /// [`infer_json_schema`]: crate::reader::infer_json_schema
208    pub fn new(schema: SchemaRef) -> Self {
209        Self {
210            batch_size: 1024,
211            coerce_primitive: false,
212            strict_mode: false,
213            ignore_type_conflicts: false,
214            is_field: false,
215            struct_mode: Default::default(),
216            schema,
217        }
218    }
219
220    /// Create a new [`ReaderBuilder`] that will parse JSON values of `field.data_type()`
221    ///
222    /// Unlike [`ReaderBuilder::new`] this does not require the root of the JSON data
223    /// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON value(s)
224    ///
225    /// ```
226    /// # use std::sync::Arc;
227    /// # use arrow_array::cast::AsArray;
228    /// # use arrow_array::types::Int32Type;
229    /// # use arrow_json::ReaderBuilder;
230    /// # use arrow_schema::{DataType, Field};
231    /// // Root of JSON schema is a numeric type
232    /// let data = "1\n2\n3\n";
233    /// let field = Arc::new(Field::new("int", DataType::Int32, true));
234    /// let mut reader = ReaderBuilder::new_with_field(field.clone()).build(data.as_bytes()).unwrap();
235    /// let b = reader.next().unwrap().unwrap();
236    /// let values = b.column(0).as_primitive::<Int32Type>().values();
237    /// assert_eq!(values, &[1, 2, 3]);
238    ///
239    /// // Root of JSON schema is a list type
240    /// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]";
241    /// let field = Field::new_list("int", field.clone(), true);
242    /// let mut reader = ReaderBuilder::new_with_field(field).build(data.as_bytes()).unwrap();
243    /// let b = reader.next().unwrap().unwrap();
244    /// let list = b.column(0).as_list::<i32>();
245    ///
246    /// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]);
247    /// let list_values = list.values().as_primitive::<Int32Type>();
248    /// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]);
249    /// ```
250    pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
251        Self {
252            batch_size: 1024,
253            coerce_primitive: false,
254            strict_mode: false,
255            ignore_type_conflicts: false,
256            is_field: true,
257            struct_mode: Default::default(),
258            schema: Arc::new(Schema::new([field.into()])),
259        }
260    }
261
262    /// Sets the batch size in rows to read
263    pub fn with_batch_size(self, batch_size: usize) -> Self {
264        Self { batch_size, ..self }
265    }
266
267    /// Sets if the decoder should coerce primitive values (bool and number) into string
268    /// when the Schema's column is Utf8 or LargeUtf8.
269    pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
270        Self {
271            coerce_primitive,
272            ..self
273        }
274    }
275
276    /// Sets if the decoder should return an error if it encounters a column not
277    /// present in `schema`. If `struct_mode` is `ListOnly` the value of
278    /// `strict_mode` is effectively `true`. It is required for all fields of
279    /// the struct to be in the list: without field names, there is no way to
280    /// determine which field is missing.
281    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
282        Self {
283            strict_mode,
284            ..self
285        }
286    }
287
288    /// Set the [`StructMode`] for the reader, which determines whether structs
289    /// can be decoded from JSON as objects or lists. For more details refer to
290    /// the enum documentation. Default is to use `ObjectOnly`.
291    pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
292        Self {
293            struct_mode,
294            ..self
295        }
296    }
297
298    /// Sets whether the decoder should produce NULL instead of returning an error if it encounters
299    /// value that can not be parsed into the specified column type.
300    ///
301    /// For example, if the type is declared to be a nullable array of `DataType::Int32` but the
302    /// reader encounters a string value `"foo"` and the value `ignore_type_conflicts` is:
303    ///
304    /// * `false` (the default): The reader will return an error.
305    ///
306    /// * `true`: The reader will fill in NULL value for that array element.
307    ///
308    /// NOTE: An inferred NULL due to a type conflict will still produce parsing errors for
309    /// non-nullable fields, the same as any other NULL or missing value.
310    pub fn with_ignore_type_conflicts(self, ignore_type_conflicts: bool) -> Self {
311        Self {
312            ignore_type_conflicts,
313            ..self
314        }
315    }
316
317    /// Create a [`Reader`] with the provided [`BufRead`]
318    pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
319        Ok(Reader {
320            reader,
321            decoder: self.build_decoder()?,
322        })
323    }
324
325    /// Create a [`Decoder`]
326    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
327        let (data_type, nullable) = if self.is_field {
328            let field = &self.schema.fields[0];
329            let data_type = Cow::Borrowed(field.data_type());
330            (data_type, field.is_nullable())
331        } else {
332            let data_type = Cow::Owned(DataType::Struct(self.schema.fields.clone()));
333            (data_type, false)
334        };
335
336        let ctx = DecoderContext {
337            coerce_primitive: self.coerce_primitive,
338            strict_mode: self.strict_mode,
339            struct_mode: self.struct_mode,
340            ignore_type_conflicts: self.ignore_type_conflicts,
341        };
342        let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?;
343
344        let num_fields = self.schema.flattened_fields().len();
345
346        Ok(Decoder {
347            decoder,
348            is_field: self.is_field,
349            tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
350            batch_size: self.batch_size,
351            schema: self.schema,
352        })
353    }
354}
355
356/// Reads JSON data with a known schema directly into arrow [`RecordBatch`]
357///
358/// Lines consisting solely of ASCII whitespace are ignored
359pub struct Reader<R> {
360    reader: R,
361    decoder: Decoder,
362}
363
364impl<R> std::fmt::Debug for Reader<R> {
365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366        f.debug_struct("Reader")
367            .field("decoder", &self.decoder)
368            .finish()
369    }
370}
371
372impl<R: BufRead> Reader<R> {
373    /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
374    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
375        loop {
376            let buf = self.reader.fill_buf()?;
377            if buf.is_empty() {
378                break;
379            }
380            let read = buf.len();
381
382            let decoded = self.decoder.decode(buf)?;
383            self.reader.consume(decoded);
384            if decoded != read {
385                break;
386            }
387        }
388        self.decoder.flush()
389    }
390}
391
392impl<R: BufRead> Iterator for Reader<R> {
393    type Item = Result<RecordBatch, ArrowError>;
394
395    fn next(&mut self) -> Option<Self::Item> {
396        self.read().transpose()
397    }
398}
399
400impl<R: BufRead> RecordBatchReader for Reader<R> {
401    fn schema(&self) -> SchemaRef {
402        self.decoder.schema.clone()
403    }
404}
405
406/// A low-level interface for reading JSON data from a byte stream
407///
408/// See [`Reader`] for a higher-level interface for interface with [`BufRead`]
409///
410/// The push-based interface facilitates integration with sources that yield arbitrarily
411/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
412/// object storage
413///
414/// ```
415/// # use std::io::BufRead;
416/// # use arrow_array::RecordBatch;
417/// # use arrow_json::reader::{Decoder, ReaderBuilder};
418/// # use arrow_schema::{ArrowError, SchemaRef};
419/// #
420/// fn read_from_json<R: BufRead>(
421///     mut reader: R,
422///     schema: SchemaRef,
423/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
424///     let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
425///     let mut next = move || {
426///         loop {
427///             // Decoder is agnostic that buf doesn't contain whole records
428///             let buf = reader.fill_buf()?;
429///             if buf.is_empty() {
430///                 break; // Input exhausted
431///             }
432///             let read = buf.len();
433///             let decoded = decoder.decode(buf)?;
434///
435///             // Consume the number of bytes read
436///             reader.consume(decoded);
437///             if decoded != read {
438///                 break; // Read batch size
439///             }
440///         }
441///         decoder.flush()
442///     };
443///     Ok(std::iter::from_fn(move || next().transpose()))
444/// }
445/// ```
446pub struct Decoder {
447    tape_decoder: TapeDecoder,
448    decoder: Box<dyn ArrayDecoder>,
449    batch_size: usize,
450    is_field: bool,
451    schema: SchemaRef,
452}
453
454impl std::fmt::Debug for Decoder {
455    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
456        f.debug_struct("Decoder")
457            .field("schema", &self.schema)
458            .field("batch_size", &self.batch_size)
459            .finish()
460    }
461}
462
463impl Decoder {
464    /// Read JSON objects from `buf`, returning the number of bytes read
465    ///
466    /// This method returns once `batch_size` objects have been parsed since the
467    /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
468    /// should be included in the next call to [`Self::decode`]
469    ///
470    /// There is no requirement that `buf` contains a whole number of records, facilitating
471    /// integration with arbitrary byte streams, such as those yielded by [`BufRead`]
472    pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
473        self.tape_decoder.decode(buf)
474    }
475
476    /// Serialize `rows` to this [`Decoder`]
477    ///
478    /// This provides a simple way to convert [serde]-compatible datastructures into arrow
479    /// [`RecordBatch`].
480    ///
481    /// Custom conversion logic as described in [arrow_array::builder] will likely outperform this,
482    /// especially where the schema is known at compile-time, however, this provides a mechanism
483    /// to get something up and running quickly
484    ///
485    /// It can be used with [`serde_json::Value`]
486    ///
487    /// ```
488    /// # use std::sync::Arc;
489    /// # use serde_json::{Value, json};
490    /// # use arrow_array::cast::AsArray;
491    /// # use arrow_array::types::Float32Type;
492    /// # use arrow_json::ReaderBuilder;
493    /// # use arrow_schema::{DataType, Field, Schema};
494    /// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
495    ///
496    /// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
497    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
498    ///
499    /// decoder.serialize(&json).unwrap();
500    /// let batch = decoder.flush().unwrap().unwrap();
501    /// assert_eq!(batch.num_rows(), 2);
502    /// assert_eq!(batch.num_columns(), 1);
503    /// let values = batch.column(0).as_primitive::<Float32Type>().values();
504    /// assert_eq!(values, &[2.3, 5.7])
505    /// ```
506    ///
507    /// Or with arbitrary [`Serialize`] types
508    ///
509    /// ```
510    /// # use std::sync::Arc;
511    /// # use arrow_json::ReaderBuilder;
512    /// # use arrow_schema::{DataType, Field, Schema};
513    /// # use serde::Serialize;
514    /// # use arrow_array::cast::AsArray;
515    /// # use arrow_array::types::{Float32Type, Int32Type};
516    /// #
517    /// #[derive(Serialize)]
518    /// struct MyStruct {
519    ///     int32: i32,
520    ///     float: f32,
521    /// }
522    ///
523    /// let schema = Schema::new(vec![
524    ///     Field::new("int32", DataType::Int32, false),
525    ///     Field::new("float", DataType::Float32, false),
526    /// ]);
527    ///
528    /// let rows = vec![
529    ///     MyStruct{ int32: 0, float: 3. },
530    ///     MyStruct{ int32: 4, float: 67.53 },
531    /// ];
532    ///
533    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
534    /// decoder.serialize(&rows).unwrap();
535    ///
536    /// let batch = decoder.flush().unwrap().unwrap();
537    ///
538    /// // Expect batch containing two columns
539    /// let int32 = batch.column(0).as_primitive::<Int32Type>();
540    /// assert_eq!(int32.values(), &[0, 4]);
541    ///
542    /// let float = batch.column(1).as_primitive::<Float32Type>();
543    /// assert_eq!(float.values(), &[3., 67.53]);
544    /// ```
545    ///
546    /// Or even complex nested types
547    ///
548    /// ```
549    /// # use std::collections::BTreeMap;
550    /// # use std::sync::Arc;
551    /// # use arrow_array::StructArray;
552    /// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
553    /// # use arrow_json::ReaderBuilder;
554    /// # use arrow_schema::{DataType, Field, Fields, Schema};
555    /// # use serde::Serialize;
556    /// #
557    /// #[derive(Serialize)]
558    /// struct MyStruct {
559    ///     int32: i32,
560    ///     list: Vec<f64>,
561    ///     nested: Vec<Option<Nested>>,
562    /// }
563    ///
564    /// impl MyStruct {
565    ///     /// Returns the [`Fields`] for [`MyStruct`]
566    ///     fn fields() -> Fields {
567    ///         let nested = DataType::Struct(Nested::fields());
568    ///         Fields::from([
569    ///             Arc::new(Field::new("int32", DataType::Int32, false)),
570    ///             Arc::new(Field::new_list(
571    ///                 "list",
572    ///                 Field::new("element", DataType::Float64, false),
573    ///                 false,
574    ///             )),
575    ///             Arc::new(Field::new_list(
576    ///                 "nested",
577    ///                 Field::new("element", nested, true),
578    ///                 true,
579    ///             )),
580    ///         ])
581    ///     }
582    /// }
583    ///
584    /// #[derive(Serialize)]
585    /// struct Nested {
586    ///     map: BTreeMap<String, Vec<String>>
587    /// }
588    ///
589    /// impl Nested {
590    ///     /// Returns the [`Fields`] for [`Nested`]
591    ///     fn fields() -> Fields {
592    ///         let element = Field::new("element", DataType::Utf8, false);
593    ///         Fields::from([
594    ///             Arc::new(Field::new_map(
595    ///                 "map",
596    ///                 "entries",
597    ///                 Field::new("key", DataType::Utf8, false),
598    ///                 Field::new_list("value", element, false),
599    ///                 false, // sorted
600    ///                 false, // nullable
601    ///             ))
602    ///         ])
603    ///     }
604    /// }
605    ///
606    /// let data = vec![
607    ///     MyStruct {
608    ///         int32: 34,
609    ///         list: vec![1., 2., 34.],
610    ///         nested: vec![
611    ///             None,
612    ///             Some(Nested {
613    ///                 map: vec![
614    ///                     ("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
615    ///                     ("key2".to_string(), vec!["baz".to_string()])
616    ///                 ].into_iter().collect()
617    ///             })
618    ///         ]
619    ///     },
620    ///     MyStruct {
621    ///         int32: 56,
622    ///         list: vec![],
623    ///         nested: vec![]
624    ///     },
625    ///     MyStruct {
626    ///         int32: 24,
627    ///         list: vec![-1., 245.],
628    ///         nested: vec![None]
629    ///     }
630    /// ];
631    ///
632    /// let schema = Schema::new(MyStruct::fields());
633    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
634    /// decoder.serialize(&data).unwrap();
635    /// let batch = decoder.flush().unwrap().unwrap();
636    /// assert_eq!(batch.num_rows(), 3);
637    /// assert_eq!(batch.num_columns(), 3);
638    ///
639    /// // Convert to StructArray to format
640    /// let s = StructArray::from(batch);
641    /// let options = FormatOptions::default().with_null("null");
642    /// let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
643    ///
644    /// assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
645    /// assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
646    /// assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
647    /// ```
648    ///
649    /// Note: this ignores any batch size setting, and always decodes all rows
650    ///
651    /// [serde]: https://docs.rs/serde/latest/serde/
652    pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
653        self.tape_decoder.serialize(rows)
654    }
655
656    /// True if the decoder is currently part way through decoding a record.
657    pub fn has_partial_record(&self) -> bool {
658        self.tape_decoder.has_partial_row()
659    }
660
661    /// The number of unflushed records, including the partially decoded record (if any).
662    pub fn len(&self) -> usize {
663        self.tape_decoder.num_buffered_rows()
664    }
665
666    /// True if there are no records to flush, i.e. [`Self::len`] is zero.
667    pub fn is_empty(&self) -> bool {
668        self.len() == 0
669    }
670
671    /// Flushes the currently buffered data to a [`RecordBatch`]
672    ///
673    /// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true.
674    ///
675    /// Note: This will return an error if called part way through decoding a record,
676    /// i.e. [`Self::has_partial_record`] is true.
677    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
678        let tape = self.tape_decoder.finish()?;
679
680        if tape.num_rows() == 0 {
681            return Ok(None);
682        }
683
684        // First offset is null sentinel
685        let mut next_object = 1;
686        let pos: Vec<_> = (0..tape.num_rows())
687            .map(|_| {
688                let next = tape.next(next_object, "row").unwrap();
689                std::mem::replace(&mut next_object, next)
690            })
691            .collect();
692
693        let decoded = self.decoder.decode(&tape, &pos)?;
694        self.tape_decoder.clear();
695
696        let batch = match self.is_field {
697            true => RecordBatch::try_new(self.schema.clone(), vec![decoded])?,
698            false => {
699                RecordBatch::from(decoded.as_struct().clone()).with_schema(self.schema.clone())?
700            }
701        };
702
703        Ok(Some(batch))
704    }
705}
706
707trait ArrayDecoder: Send {
708    /// Decode elements from `tape` starting at the indexes contained in `pos`
709    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError>;
710}
711
712/// Context for decoder creation, containing configuration.
713///
714/// This context is passed through the decoder creation process and contains
715/// all the configuration needed to create decoders recursively.
716pub struct DecoderContext {
717    /// Whether to coerce primitives to strings
718    coerce_primitive: bool,
719    /// Whether to validate struct fields strictly
720    strict_mode: bool,
721    /// How to decode struct fields
722    struct_mode: StructMode,
723    /// Whether to treat columns with incompatible types as missing (i.e. NULL)
724    ignore_type_conflicts: bool,
725}
726
727impl DecoderContext {
728    /// Returns whether to coerce primitive types (e.g., number to string)
729    pub fn coerce_primitive(&self) -> bool {
730        self.coerce_primitive
731    }
732
733    /// Returns whether to validate struct fields strictly
734    pub fn strict_mode(&self) -> bool {
735        self.strict_mode
736    }
737
738    /// Returns how to decode struct fields
739    pub fn struct_mode(&self) -> StructMode {
740        self.struct_mode
741    }
742
743    /// Returns whether to treat columns with incompatible types as missing (i.e. NULL)
744    pub fn ignore_type_conflicts(&self) -> bool {
745        self.ignore_type_conflicts
746    }
747
748    /// Create a decoder for a type.
749    ///
750    /// This is the standard way to create child decoders from within a decoder
751    /// implementation.
752    fn make_decoder(
753        &self,
754        data_type: &DataType,
755        is_nullable: bool,
756    ) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
757        make_decoder(self, data_type, is_nullable)
758    }
759}
760
761fn make_decoder(
762    ctx: &DecoderContext,
763    data_type: &DataType,
764    is_nullable: bool,
765) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
766    macro_rules! primitive_decoder {
767        ($t:ty, $data_type:expr) => {
768            Ok(Box::new(PrimitiveArrayDecoder::<$t>::new(ctx, $data_type)))
769        };
770    }
771    macro_rules! timestamp_decoder {
772        ($t:ty, $data_type:expr, $tz:expr) => {{
773            Ok(Box::new(TimestampArrayDecoder::<$t, _>::new(
774                ctx, $data_type, $tz,
775            )))
776        }};
777    }
778    macro_rules! decimal_decoder {
779        ($t:ty, $p:expr, $s:expr) => {
780            Ok(Box::new(DecimalArrayDecoder::<$t>::new(ctx, $p, $s)))
781        };
782    }
783
784    downcast_integer! {
785        *data_type => (primitive_decoder, data_type),
786        DataType::Null => Ok(Box::new(NullArrayDecoder::new(ctx))),
787        DataType::Float16 => primitive_decoder!(Float16Type, data_type),
788        DataType::Float32 => primitive_decoder!(Float32Type, data_type),
789        DataType::Float64 => primitive_decoder!(Float64Type, data_type),
790        DataType::Timestamp(TimeUnit::Second, None) => {
791            timestamp_decoder!(TimestampSecondType, data_type, Utc)
792        },
793        DataType::Timestamp(TimeUnit::Millisecond, None) => {
794            timestamp_decoder!(TimestampMillisecondType, data_type, Utc)
795        },
796        DataType::Timestamp(TimeUnit::Microsecond, None) => {
797            timestamp_decoder!(TimestampMicrosecondType, data_type, Utc)
798        },
799        DataType::Timestamp(TimeUnit::Nanosecond, None) => {
800            timestamp_decoder!(TimestampNanosecondType, data_type, Utc)
801        },
802        DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
803            let tz: Tz = tz.parse()?;
804            timestamp_decoder!(TimestampSecondType, data_type, tz)
805        },
806        DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
807            let tz: Tz = tz.parse()?;
808            timestamp_decoder!(TimestampMillisecondType, data_type, tz)
809        },
810        DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
811            let tz: Tz = tz.parse()?;
812            timestamp_decoder!(TimestampMicrosecondType, data_type, tz)
813        },
814        DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
815            let tz: Tz = tz.parse()?;
816            timestamp_decoder!(TimestampNanosecondType, data_type, tz)
817        },
818        DataType::Date32 => primitive_decoder!(Date32Type, data_type),
819        DataType::Date64 => primitive_decoder!(Date64Type, data_type),
820        DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
821        DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
822        DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
823        DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
824        DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
825        DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
826        DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
827        DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
828        DataType::Decimal32(p, s) => decimal_decoder!(Decimal32Type, p, s),
829        DataType::Decimal64(p, s) => decimal_decoder!(Decimal64Type, p, s),
830        DataType::Decimal128(p, s) => decimal_decoder!(Decimal128Type, p, s),
831        DataType::Decimal256(p, s) => decimal_decoder!(Decimal256Type, p, s),
832        DataType::Boolean => Ok(Box::new(BooleanArrayDecoder::new(ctx))),
833        DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(ctx))),
834        DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(ctx))),
835        DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(ctx))),
836        DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
837        DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
838        DataType::ListView(_) => Ok(Box::new(ListViewArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
839        DataType::LargeListView(_) => Ok(Box::new(ListViewArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
840        DataType::FixedSizeList(_, _) => Ok(Box::new(FixedSizeListArrayDecoder::new(ctx, data_type, is_nullable)?)),
841        DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
842        DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
843        DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
844        DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
845        DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
846        DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(ctx, data_type, is_nullable)?)),
847        DataType::RunEndEncoded(ref r, _) => match r.data_type() {
848            DataType::Int16 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int16Type>::new(ctx, data_type, is_nullable)?)),
849            DataType::Int32 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int32Type>::new(ctx, data_type, is_nullable)?)),
850            DataType::Int64 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int64Type>::new(ctx, data_type, is_nullable)?)),
851            d => unreachable!("unsupported run end index type: {d}"),
852        },
853        _ => Err(ArrowError::NotYetImplemented(format!("Support for {data_type} in JSON reader")))
854    }
855}
856
857#[cfg(test)]
858mod tests {
859    use arrow_array::cast::AsArray;
860    use arrow_array::{
861        Array, BooleanArray, Float64Array, GenericListViewArray, Int32Array, ListArray, MapArray,
862        NullArray, OffsetSizeTrait, StringArray, StringViewArray, StructArray,
863    };
864    use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer};
865    use arrow_cast::display::{ArrayFormatter, FormatOptions};
866    use arrow_schema::{Field, Fields};
867    use serde_json::json;
868    use std::fs::File;
869    use std::io::{BufReader, Cursor, Seek};
870
871    use super::*;
872
873    fn do_read(
874        buf: &str,
875        batch_size: usize,
876        coerce_primitive: bool,
877        strict_mode: bool,
878        schema: SchemaRef,
879    ) -> Vec<RecordBatch> {
880        let mut unbuffered = vec![];
881
882        // Test with different batch sizes to test for boundary conditions
883        for batch_size in [1, 3, 100, batch_size] {
884            unbuffered = ReaderBuilder::new(schema.clone())
885                .with_batch_size(batch_size)
886                .with_coerce_primitive(coerce_primitive)
887                .build(Cursor::new(buf.as_bytes()))
888                .unwrap()
889                .collect::<Result<Vec<_>, _>>()
890                .unwrap();
891
892            for b in unbuffered.iter().take(unbuffered.len() - 1) {
893                assert_eq!(b.num_rows(), batch_size)
894            }
895
896            // Test with different buffer sizes to test for boundary conditions
897            for b in [1, 3, 5] {
898                let buffered = ReaderBuilder::new(schema.clone())
899                    .with_batch_size(batch_size)
900                    .with_coerce_primitive(coerce_primitive)
901                    .with_strict_mode(strict_mode)
902                    .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
903                    .unwrap()
904                    .collect::<Result<Vec<_>, _>>()
905                    .unwrap();
906                assert_eq!(unbuffered, buffered);
907            }
908        }
909
910        unbuffered
911    }
912
913    #[test]
914    fn test_basic() {
915        let buf = r#"
916        {"a": 1, "b": 2, "c": true, "d": 1}
917        {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
918
919        {"b": 6, "a": 2.0, "d": 45}
920        {"b": "5", "a": 2}
921        {"b": 4e0}
922        {"b": 7, "a": null}
923        "#;
924
925        let schema = Arc::new(Schema::new(vec![
926            Field::new("a", DataType::Int64, true),
927            Field::new("b", DataType::Int32, true),
928            Field::new("c", DataType::Boolean, true),
929            Field::new("d", DataType::Date32, true),
930            Field::new("e", DataType::Date64, true),
931        ]));
932
933        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
934        assert!(decoder.is_empty());
935        assert_eq!(decoder.len(), 0);
936        assert!(!decoder.has_partial_record());
937        assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
938        assert!(!decoder.is_empty());
939        assert_eq!(decoder.len(), 6);
940        assert!(!decoder.has_partial_record());
941        let batch = decoder.flush().unwrap().unwrap();
942        assert_eq!(batch.num_rows(), 6);
943        assert!(decoder.is_empty());
944        assert_eq!(decoder.len(), 0);
945        assert!(!decoder.has_partial_record());
946
947        let batches = do_read(buf, 1024, false, false, schema);
948        assert_eq!(batches.len(), 1);
949
950        let col1 = batches[0].column(0).as_primitive::<Int64Type>();
951        assert_eq!(col1.null_count(), 2);
952        assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
953        assert!(col1.is_null(4));
954        assert!(col1.is_null(5));
955
956        let col2 = batches[0].column(1).as_primitive::<Int32Type>();
957        assert_eq!(col2.null_count(), 0);
958        assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
959
960        let col3 = batches[0].column(2).as_boolean();
961        assert_eq!(col3.null_count(), 4);
962        assert!(col3.value(0));
963        assert!(!col3.is_null(0));
964        assert!(!col3.value(1));
965        assert!(!col3.is_null(1));
966
967        let col4 = batches[0].column(3).as_primitive::<Date32Type>();
968        assert_eq!(col4.null_count(), 3);
969        assert!(col4.is_null(3));
970        assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
971
972        let col5 = batches[0].column(4).as_primitive::<Date64Type>();
973        assert_eq!(col5.null_count(), 5);
974        assert!(col5.is_null(0));
975        assert!(col5.is_null(2));
976        assert!(col5.is_null(3));
977        assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
978    }
979
980    #[test]
981    fn test_string() {
982        let buf = r#"
983        {"a": "1", "b": "2"}
984        {"a": "hello", "b": "shoo"}
985        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
986
987        {"b": null}
988        {"b": "", "a": null}
989
990        "#;
991        let schema = Arc::new(Schema::new(vec![
992            Field::new("a", DataType::Utf8, true),
993            Field::new("b", DataType::LargeUtf8, true),
994        ]));
995
996        let batches = do_read(buf, 1024, false, false, schema);
997        assert_eq!(batches.len(), 1);
998
999        let col1 = batches[0].column(0).as_string::<i32>();
1000        assert_eq!(col1.null_count(), 2);
1001        assert_eq!(col1.value(0), "1");
1002        assert_eq!(col1.value(1), "hello");
1003        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1004        assert!(col1.is_null(3));
1005        assert!(col1.is_null(4));
1006
1007        let col2 = batches[0].column(1).as_string::<i64>();
1008        assert_eq!(col2.null_count(), 1);
1009        assert_eq!(col2.value(0), "2");
1010        assert_eq!(col2.value(1), "shoo");
1011        assert_eq!(col2.value(2), "\t😁foo");
1012        assert!(col2.is_null(3));
1013        assert_eq!(col2.value(4), "");
1014    }
1015
1016    #[test]
1017    fn test_long_string_view_allocation() {
1018        // The JSON input contains field "a" with different string lengths.
1019        // According to the implementation in the decoder:
1020        // - For a string, capacity is only increased if its length > 12 bytes.
1021        // Therefore, for:
1022        // Row 1: "short" (5 bytes) -> capacity += 0
1023        // Row 2: "this is definitely long" (24 bytes) -> capacity += 24
1024        // Row 3: "hello" (5 bytes) -> capacity += 0
1025        // Row 4: "\nfoobar😀asfgÿ" (17 bytes) -> capacity += 17
1026        // Expected total capacity = 24 + 17 = 41
1027        let expected_capacity: usize = 41;
1028
1029        let buf = r#"
1030        {"a": "short", "b": "dummy"}
1031        {"a": "this is definitely long", "b": "dummy"}
1032        {"a": "hello", "b": "dummy"}
1033        {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
1034        "#;
1035
1036        let schema = Arc::new(Schema::new(vec![
1037            Field::new("a", DataType::Utf8View, true),
1038            Field::new("b", DataType::LargeUtf8, true),
1039        ]));
1040
1041        let batches = do_read(buf, 1024, false, false, schema);
1042        assert_eq!(batches.len(), 1, "Expected one record batch");
1043
1044        // Get the first column ("a") as a StringViewArray.
1045        let col_a = batches[0].column(0);
1046        let string_view_array = col_a
1047            .as_any()
1048            .downcast_ref::<StringViewArray>()
1049            .expect("Column should be a StringViewArray");
1050
1051        // Retrieve the underlying data buffer from the array.
1052        // The builder pre-allocates capacity based on the sum of lengths for long strings.
1053        let data_buffer = string_view_array.to_data().buffers()[0].len();
1054
1055        // Check that the allocated capacity is at least what we expected.
1056        // (The actual buffer may be larger than expected due to rounding or internal allocation strategies.)
1057        assert!(
1058            data_buffer >= expected_capacity,
1059            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1060        );
1061
1062        // Additionally, verify that the decoded values are correct.
1063        assert_eq!(string_view_array.value(0), "short");
1064        assert_eq!(string_view_array.value(1), "this is definitely long");
1065        assert_eq!(string_view_array.value(2), "hello");
1066        assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
1067    }
1068
1069    /// Test the memory capacity allocation logic when converting numeric types to strings.
1070    #[test]
1071    fn test_numeric_view_allocation() {
1072        // For numeric types, the expected capacity calculation is as follows:
1073        // Row 1: 123456789  -> Number converts to the string "123456789" (length 9), 9 <= 12, so no capacity is added.
1074        // Row 2: 1000000000000 -> Treated as an I64 number; its string is "1000000000000" (length 13),
1075        //                        which is >12 and its absolute value is > 999_999_999_999, so 13 bytes are added.
1076        // Row 3: 3.1415 -> F32 number, a fixed estimate of 10 bytes is added.
1077        // Row 4: 2.718281828459045 -> F64 number, a fixed estimate of 10 bytes is added.
1078        // Total expected capacity = 13 + 10 + 10 = 33 bytes.
1079        let expected_capacity: usize = 33;
1080
1081        let buf = r#"
1082    {"n": 123456789}
1083    {"n": 1000000000000}
1084    {"n": 3.1415}
1085    {"n": 2.718281828459045}
1086    "#;
1087
1088        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
1089
1090        let batches = do_read(buf, 1024, true, false, schema);
1091        assert_eq!(batches.len(), 1, "Expected one record batch");
1092
1093        let col_n = batches[0].column(0);
1094        let string_view_array = col_n
1095            .as_any()
1096            .downcast_ref::<StringViewArray>()
1097            .expect("Column should be a StringViewArray");
1098
1099        // Check that the underlying data buffer capacity is at least the expected value.
1100        let data_buffer = string_view_array.to_data().buffers()[0].len();
1101        assert!(
1102            data_buffer >= expected_capacity,
1103            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1104        );
1105
1106        // Verify that the converted string values are correct.
1107        // Note: The format of the number converted to a string should match the actual implementation.
1108        assert_eq!(string_view_array.value(0), "123456789");
1109        assert_eq!(string_view_array.value(1), "1000000000000");
1110        assert_eq!(string_view_array.value(2), "3.1415");
1111        assert_eq!(string_view_array.value(3), "2.718281828459045");
1112    }
1113
1114    #[test]
1115    fn test_string_with_uft8view() {
1116        let buf = r#"
1117        {"a": "1", "b": "2"}
1118        {"a": "hello", "b": "shoo"}
1119        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1120
1121        {"b": null}
1122        {"b": "", "a": null}
1123
1124        "#;
1125        let schema = Arc::new(Schema::new(vec![
1126            Field::new("a", DataType::Utf8View, true),
1127            Field::new("b", DataType::LargeUtf8, true),
1128        ]));
1129
1130        let batches = do_read(buf, 1024, false, false, schema);
1131        assert_eq!(batches.len(), 1);
1132
1133        let col1 = batches[0].column(0).as_string_view();
1134        assert_eq!(col1.null_count(), 2);
1135        assert_eq!(col1.value(0), "1");
1136        assert_eq!(col1.value(1), "hello");
1137        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1138        assert!(col1.is_null(3));
1139        assert!(col1.is_null(4));
1140        assert_eq!(col1.data_type(), &DataType::Utf8View);
1141
1142        let col2 = batches[0].column(1).as_string::<i64>();
1143        assert_eq!(col2.null_count(), 1);
1144        assert_eq!(col2.value(0), "2");
1145        assert_eq!(col2.value(1), "shoo");
1146        assert_eq!(col2.value(2), "\t😁foo");
1147        assert!(col2.is_null(3));
1148        assert_eq!(col2.value(4), "");
1149    }
1150
1151    #[test]
1152    fn test_complex() {
1153        let buf = r#"
1154           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1155           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1156           {"list": null, "nested": {"a": null}}
1157        "#;
1158
1159        let schema = Arc::new(Schema::new(vec![
1160            Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1161            Field::new_struct(
1162                "nested",
1163                vec![
1164                    Field::new("a", DataType::Int32, true),
1165                    Field::new("b", DataType::Int32, true),
1166                ],
1167                true,
1168            ),
1169            Field::new_struct(
1170                "nested_list",
1171                vec![Field::new_list(
1172                    "list2",
1173                    Field::new_struct(
1174                        "element",
1175                        vec![Field::new("c", DataType::Int32, false)],
1176                        false,
1177                    ),
1178                    true,
1179                )],
1180                true,
1181            ),
1182        ]));
1183
1184        let batches = do_read(buf, 1024, false, false, schema);
1185        assert_eq!(batches.len(), 1);
1186
1187        let list = batches[0].column(0).as_list::<i32>();
1188        assert_eq!(list.len(), 3);
1189        assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1190        assert_eq!(list.null_count(), 1);
1191        assert!(list.is_null(2));
1192        let list_values = list.values().as_primitive::<Int32Type>();
1193        assert_eq!(list_values.values(), &[5, 6]);
1194
1195        let nested = batches[0].column(1).as_struct();
1196        let a = nested.column(0).as_primitive::<Int32Type>();
1197        assert_eq!(list.null_count(), 1);
1198        assert_eq!(a.values(), &[1, 7, 0]);
1199        assert!(list.is_null(2));
1200
1201        let b = nested.column(1).as_primitive::<Int32Type>();
1202        assert_eq!(b.null_count(), 2);
1203        assert_eq!(b.len(), 3);
1204        assert_eq!(b.value(0), 2);
1205        assert!(b.is_null(1));
1206        assert!(b.is_null(2));
1207
1208        let nested_list = batches[0].column(2).as_struct();
1209        assert_eq!(nested_list.len(), 3);
1210        assert_eq!(nested_list.null_count(), 1);
1211        assert!(nested_list.is_null(2));
1212
1213        let list2 = nested_list.column(0).as_list::<i32>();
1214        assert_eq!(list2.len(), 3);
1215        assert_eq!(list2.null_count(), 1);
1216        assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1217        assert!(list2.is_null(2));
1218
1219        let list2_values = list2.values().as_struct();
1220
1221        let c = list2_values.column(0).as_primitive::<Int32Type>();
1222        assert_eq!(c.values(), &[3, 4]);
1223    }
1224
1225    #[test]
1226    fn test_projection() {
1227        let buf = r#"
1228           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1229           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1230        "#;
1231
1232        let schema = Arc::new(Schema::new(vec![
1233            Field::new_struct(
1234                "nested",
1235                vec![Field::new("a", DataType::Int32, false)],
1236                true,
1237            ),
1238            Field::new_struct(
1239                "nested_list",
1240                vec![Field::new_list(
1241                    "list2",
1242                    Field::new_struct(
1243                        "element",
1244                        vec![Field::new("d", DataType::Int32, true)],
1245                        false,
1246                    ),
1247                    true,
1248                )],
1249                true,
1250            ),
1251        ]));
1252
1253        let batches = do_read(buf, 1024, false, false, schema);
1254        assert_eq!(batches.len(), 1);
1255
1256        let nested = batches[0].column(0).as_struct();
1257        assert_eq!(nested.num_columns(), 1);
1258        let a = nested.column(0).as_primitive::<Int32Type>();
1259        assert_eq!(a.null_count(), 0);
1260        assert_eq!(a.values(), &[1, 7]);
1261
1262        let nested_list = batches[0].column(1).as_struct();
1263        assert_eq!(nested_list.num_columns(), 1);
1264        assert_eq!(nested_list.null_count(), 0);
1265
1266        let list2 = nested_list.column(0).as_list::<i32>();
1267        assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1268        assert_eq!(list2.null_count(), 0);
1269
1270        let child = list2.values().as_struct();
1271        assert_eq!(child.num_columns(), 1);
1272        assert_eq!(child.len(), 2);
1273        assert_eq!(child.null_count(), 0);
1274
1275        let c = child.column(0).as_primitive::<Int32Type>();
1276        assert_eq!(c.values(), &[5, 0]);
1277        assert_eq!(c.null_count(), 1);
1278        assert!(c.is_null(1));
1279    }
1280
1281    #[test]
1282    fn test_map() {
1283        let buf = r#"
1284           {"map": {"a": ["foo", null]}}
1285           {"map": {"a": [null], "b": []}}
1286           {"map": {"c": null, "a": ["baz"]}}
1287        "#;
1288        let map = Field::new_map(
1289            "map",
1290            "entries",
1291            Field::new("key", DataType::Utf8, false),
1292            Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1293            false,
1294            true,
1295        );
1296
1297        let schema = Arc::new(Schema::new(vec![map]));
1298
1299        let batches = do_read(buf, 1024, false, false, schema);
1300        assert_eq!(batches.len(), 1);
1301
1302        let map = batches[0].column(0).as_map();
1303        let map_keys = map.keys().as_string::<i32>();
1304        let map_values = map.values().as_list::<i32>();
1305        assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1306
1307        let k: Vec<_> = map_keys.iter().flatten().collect();
1308        assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1309
1310        let list_values = map_values.values().as_string::<i32>();
1311        let lv: Vec<_> = list_values.iter().collect();
1312        assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1313        assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1314        assert_eq!(map_values.null_count(), 1);
1315        assert!(map_values.is_null(3));
1316
1317        let options = FormatOptions::default().with_null("null");
1318        let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1319        assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1320        assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1321        assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1322    }
1323
1324    #[test]
1325    fn test_not_coercing_primitive_into_string_without_flag() {
1326        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1327
1328        let buf = r#"{"a": 1}"#;
1329        let err = ReaderBuilder::new(schema.clone())
1330            .with_batch_size(1024)
1331            .build(Cursor::new(buf.as_bytes()))
1332            .unwrap()
1333            .read()
1334            .unwrap_err();
1335
1336        assert_eq!(
1337            err.to_string(),
1338            "Json error: whilst decoding field 'a': expected string got 1"
1339        );
1340
1341        let buf = r#"{"a": true}"#;
1342        let err = ReaderBuilder::new(schema)
1343            .with_batch_size(1024)
1344            .build(Cursor::new(buf.as_bytes()))
1345            .unwrap()
1346            .read()
1347            .unwrap_err();
1348
1349        assert_eq!(
1350            err.to_string(),
1351            "Json error: whilst decoding field 'a': expected string got true"
1352        );
1353    }
1354
1355    #[test]
1356    fn test_coercing_primitive_into_string() {
1357        let buf = r#"
1358        {"a": 1, "b": 2, "c": true}
1359        {"a": 2E0, "b": 4, "c": false}
1360
1361        {"b": 6, "a": 2.0}
1362        {"b": "5", "a": 2}
1363        {"b": 4e0}
1364        {"b": 7, "a": null}
1365        "#;
1366
1367        let schema = Arc::new(Schema::new(vec![
1368            Field::new("a", DataType::Utf8, true),
1369            Field::new("b", DataType::Utf8, true),
1370            Field::new("c", DataType::Utf8, true),
1371        ]));
1372
1373        let batches = do_read(buf, 1024, true, false, schema);
1374        assert_eq!(batches.len(), 1);
1375
1376        let col1 = batches[0].column(0).as_string::<i32>();
1377        assert_eq!(col1.null_count(), 2);
1378        assert_eq!(col1.value(0), "1");
1379        assert_eq!(col1.value(1), "2E0");
1380        assert_eq!(col1.value(2), "2.0");
1381        assert_eq!(col1.value(3), "2");
1382        assert!(col1.is_null(4));
1383        assert!(col1.is_null(5));
1384
1385        let col2 = batches[0].column(1).as_string::<i32>();
1386        assert_eq!(col2.null_count(), 0);
1387        assert_eq!(col2.value(0), "2");
1388        assert_eq!(col2.value(1), "4");
1389        assert_eq!(col2.value(2), "6");
1390        assert_eq!(col2.value(3), "5");
1391        assert_eq!(col2.value(4), "4e0");
1392        assert_eq!(col2.value(5), "7");
1393
1394        let col3 = batches[0].column(2).as_string::<i32>();
1395        assert_eq!(col3.null_count(), 4);
1396        assert_eq!(col3.value(0), "true");
1397        assert_eq!(col3.value(1), "false");
1398        assert!(col3.is_null(2));
1399        assert!(col3.is_null(3));
1400        assert!(col3.is_null(4));
1401        assert!(col3.is_null(5));
1402    }
1403
1404    fn test_decimal<T: DecimalType>(data_type: DataType) {
1405        let buf = r#"
1406        {"a": 1, "b": 2, "c": 38.30}
1407        {"a": 2, "b": 4, "c": 123.456}
1408
1409        {"b": 1337, "a": "2.0452"}
1410        {"b": "5", "a": "11034.2"}
1411        {"b": 40}
1412        {"b": 1234, "a": null}
1413        "#;
1414
1415        let schema = Arc::new(Schema::new(vec![
1416            Field::new("a", data_type.clone(), true),
1417            Field::new("b", data_type.clone(), true),
1418            Field::new("c", data_type, true),
1419        ]));
1420
1421        let batches = do_read(buf, 1024, true, false, schema);
1422        assert_eq!(batches.len(), 1);
1423
1424        let col1 = batches[0].column(0).as_primitive::<T>();
1425        assert_eq!(col1.null_count(), 2);
1426        assert!(col1.is_null(4));
1427        assert!(col1.is_null(5));
1428        assert_eq!(
1429            col1.values(),
1430            &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1431        );
1432
1433        let col2 = batches[0].column(1).as_primitive::<T>();
1434        assert_eq!(col2.null_count(), 0);
1435        assert_eq!(
1436            col2.values(),
1437            &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1438        );
1439
1440        let col3 = batches[0].column(2).as_primitive::<T>();
1441        assert_eq!(col3.null_count(), 4);
1442        assert!(!col3.is_null(0));
1443        assert!(!col3.is_null(1));
1444        assert!(col3.is_null(2));
1445        assert!(col3.is_null(3));
1446        assert!(col3.is_null(4));
1447        assert!(col3.is_null(5));
1448        assert_eq!(
1449            col3.values(),
1450            &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1451        );
1452    }
1453
1454    #[test]
1455    fn test_decimals() {
1456        test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1457        test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1458        test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1459        test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1460    }
1461
1462    fn test_timestamp<T: ArrowTimestampType>() {
1463        let buf = r#"
1464        {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1465        {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1466
1467        {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1468        {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1469        {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1470        {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1471        "#;
1472
1473        let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1474        let schema = Arc::new(Schema::new(vec![
1475            Field::new("a", T::DATA_TYPE, true),
1476            Field::new("b", T::DATA_TYPE, true),
1477            Field::new("c", T::DATA_TYPE, true),
1478            Field::new("d", with_timezone, true),
1479        ]));
1480
1481        let batches = do_read(buf, 1024, true, false, schema);
1482        assert_eq!(batches.len(), 1);
1483
1484        let unit_in_nanos: i64 = match T::UNIT {
1485            TimeUnit::Second => 1_000_000_000,
1486            TimeUnit::Millisecond => 1_000_000,
1487            TimeUnit::Microsecond => 1_000,
1488            TimeUnit::Nanosecond => 1,
1489        };
1490
1491        let col1 = batches[0].column(0).as_primitive::<T>();
1492        assert_eq!(col1.null_count(), 4);
1493        assert!(col1.is_null(2));
1494        assert!(col1.is_null(3));
1495        assert!(col1.is_null(4));
1496        assert!(col1.is_null(5));
1497        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1498
1499        let col2 = batches[0].column(1).as_primitive::<T>();
1500        assert_eq!(col2.null_count(), 1);
1501        assert!(col2.is_null(5));
1502        assert_eq!(
1503            col2.values(),
1504            &[
1505                1599572549190855000 / unit_in_nanos,
1506                1599572549190855000 / unit_in_nanos,
1507                1599572549000000000 / unit_in_nanos,
1508                40,
1509                1234,
1510                0
1511            ]
1512        );
1513
1514        let col3 = batches[0].column(2).as_primitive::<T>();
1515        assert_eq!(col3.null_count(), 0);
1516        assert_eq!(
1517            col3.values(),
1518            &[
1519                38,
1520                123,
1521                854702816123000000 / unit_in_nanos,
1522                1599572549190855000 / unit_in_nanos,
1523                854702816123000000 / unit_in_nanos,
1524                854738816123000000 / unit_in_nanos
1525            ]
1526        );
1527
1528        let col4 = batches[0].column(3).as_primitive::<T>();
1529
1530        assert_eq!(col4.null_count(), 0);
1531        assert_eq!(
1532            col4.values(),
1533            &[
1534                854674016123000000 / unit_in_nanos,
1535                123,
1536                854702816123000000 / unit_in_nanos,
1537                854720816123000000 / unit_in_nanos,
1538                854674016000000000 / unit_in_nanos,
1539                854640000000000000 / unit_in_nanos
1540            ]
1541        );
1542    }
1543
1544    #[test]
1545    fn test_timestamps() {
1546        test_timestamp::<TimestampSecondType>();
1547        test_timestamp::<TimestampMillisecondType>();
1548        test_timestamp::<TimestampMicrosecondType>();
1549        test_timestamp::<TimestampNanosecondType>();
1550    }
1551
1552    fn test_time<T: ArrowTemporalType>() {
1553        let buf = r#"
1554        {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1555        {"a": 2, "b": "23:59:59", "c": 123.456}
1556
1557        {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1558        {"b": 40, "c": "13:42:29.190855"}
1559        {"b": 1234, "a": null, "c": "09:26:56.123"}
1560        {"c": "14:26:56.123"}
1561        "#;
1562
1563        let unit = match T::DATA_TYPE {
1564            DataType::Time32(unit) | DataType::Time64(unit) => unit,
1565            _ => unreachable!(),
1566        };
1567
1568        let unit_in_nanos = match unit {
1569            TimeUnit::Second => 1_000_000_000,
1570            TimeUnit::Millisecond => 1_000_000,
1571            TimeUnit::Microsecond => 1_000,
1572            TimeUnit::Nanosecond => 1,
1573        };
1574
1575        let schema = Arc::new(Schema::new(vec![
1576            Field::new("a", T::DATA_TYPE, true),
1577            Field::new("b", T::DATA_TYPE, true),
1578            Field::new("c", T::DATA_TYPE, true),
1579        ]));
1580
1581        let batches = do_read(buf, 1024, true, false, schema);
1582        assert_eq!(batches.len(), 1);
1583
1584        let col1 = batches[0].column(0).as_primitive::<T>();
1585        assert_eq!(col1.null_count(), 4);
1586        assert!(col1.is_null(2));
1587        assert!(col1.is_null(3));
1588        assert!(col1.is_null(4));
1589        assert!(col1.is_null(5));
1590        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1591
1592        let col2 = batches[0].column(1).as_primitive::<T>();
1593        assert_eq!(col2.null_count(), 1);
1594        assert!(col2.is_null(5));
1595        assert_eq!(
1596            col2.values(),
1597            &[
1598                34016123000000 / unit_in_nanos,
1599                86399000000000 / unit_in_nanos,
1600                64800000000000 / unit_in_nanos,
1601                40,
1602                1234,
1603                0
1604            ]
1605            .map(T::Native::usize_as)
1606        );
1607
1608        let col3 = batches[0].column(2).as_primitive::<T>();
1609        assert_eq!(col3.null_count(), 0);
1610        assert_eq!(
1611            col3.values(),
1612            &[
1613                38,
1614                123,
1615                34016123000000 / unit_in_nanos,
1616                49349190855000 / unit_in_nanos,
1617                34016123000000 / unit_in_nanos,
1618                52016123000000 / unit_in_nanos
1619            ]
1620            .map(T::Native::usize_as)
1621        );
1622    }
1623
1624    #[test]
1625    fn test_times() {
1626        test_time::<Time32MillisecondType>();
1627        test_time::<Time32SecondType>();
1628        test_time::<Time64MicrosecondType>();
1629        test_time::<Time64NanosecondType>();
1630    }
1631
1632    fn test_duration<T: ArrowTemporalType>() {
1633        let buf = r#"
1634        {"a": 1, "b": "2"}
1635        {"a": 3, "b": null}
1636        "#;
1637
1638        let schema = Arc::new(Schema::new(vec![
1639            Field::new("a", T::DATA_TYPE, true),
1640            Field::new("b", T::DATA_TYPE, true),
1641        ]));
1642
1643        let batches = do_read(buf, 1024, true, false, schema);
1644        assert_eq!(batches.len(), 1);
1645
1646        let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1647        assert_eq!(col_a.null_count(), 0);
1648        assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1649
1650        let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1651        assert_eq!(col2.null_count(), 1);
1652        assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1653    }
1654
1655    #[test]
1656    fn test_durations() {
1657        test_duration::<DurationNanosecondType>();
1658        test_duration::<DurationMicrosecondType>();
1659        test_duration::<DurationMillisecondType>();
1660        test_duration::<DurationSecondType>();
1661    }
1662
1663    #[test]
1664    fn test_delta_checkpoint() {
1665        let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1666        let schema = Arc::new(Schema::new(vec![
1667            Field::new_struct(
1668                "protocol",
1669                vec![
1670                    Field::new("minReaderVersion", DataType::Int32, true),
1671                    Field::new("minWriterVersion", DataType::Int32, true),
1672                ],
1673                true,
1674            ),
1675            Field::new_struct(
1676                "add",
1677                vec![Field::new_map(
1678                    "partitionValues",
1679                    "key_value",
1680                    Field::new("key", DataType::Utf8, false),
1681                    Field::new("value", DataType::Utf8, true),
1682                    false,
1683                    false,
1684                )],
1685                true,
1686            ),
1687        ]));
1688
1689        let batches = do_read(json, 1024, true, false, schema);
1690        assert_eq!(batches.len(), 1);
1691
1692        let s: StructArray = batches.into_iter().next().unwrap().into();
1693        let opts = FormatOptions::default().with_null("null");
1694        let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1695        assert_eq!(
1696            formatter.value(0).to_string(),
1697            "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1698        );
1699    }
1700
1701    #[test]
1702    fn struct_nullability() {
1703        let do_test = |child: DataType| {
1704            // Test correctly enforced nullability
1705            let non_null = r#"{"foo": {}}"#;
1706            let schema = Arc::new(Schema::new(vec![Field::new_struct(
1707                "foo",
1708                vec![Field::new("bar", child, false)],
1709                true,
1710            )]));
1711            let mut reader = ReaderBuilder::new(schema.clone())
1712                .build(Cursor::new(non_null.as_bytes()))
1713                .unwrap();
1714            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1715
1716            let null = r#"{"foo": {bar: null}}"#;
1717            let mut reader = ReaderBuilder::new(schema.clone())
1718                .build(Cursor::new(null.as_bytes()))
1719                .unwrap();
1720            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1721
1722            // Test nulls in nullable parent can mask nulls in non-nullable child
1723            let null = r#"{"foo": null}"#;
1724            let mut reader = ReaderBuilder::new(schema)
1725                .build(Cursor::new(null.as_bytes()))
1726                .unwrap();
1727            let batch = reader.next().unwrap().unwrap();
1728            assert_eq!(batch.num_columns(), 1);
1729            let foo = batch.column(0).as_struct();
1730            assert_eq!(foo.len(), 1);
1731            assert!(foo.is_null(0));
1732            assert_eq!(foo.num_columns(), 1);
1733
1734            let bar = foo.column(0);
1735            assert_eq!(bar.len(), 1);
1736            // Non-nullable child can still contain null as masked by parent
1737            assert!(bar.is_null(0));
1738        };
1739
1740        do_test(DataType::Boolean);
1741        do_test(DataType::Int32);
1742        do_test(DataType::Utf8);
1743        do_test(DataType::Decimal128(2, 1));
1744        do_test(DataType::Timestamp(
1745            TimeUnit::Microsecond,
1746            Some("+00:00".into()),
1747        ));
1748    }
1749
1750    #[test]
1751    fn test_truncation() {
1752        let buf = r#"
1753        {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1754        {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1755        {"i64": -9223372036854775808, "u64": 0 }
1756        {"i64": "-9223372036854775808", "u64": 0 }
1757        "#;
1758
1759        let schema = Arc::new(Schema::new(vec![
1760            Field::new("i64", DataType::Int64, true),
1761            Field::new("u64", DataType::UInt64, true),
1762        ]));
1763
1764        let batches = do_read(buf, 1024, true, false, schema);
1765        assert_eq!(batches.len(), 1);
1766
1767        let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1768        assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1769
1770        let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1771        assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1772    }
1773
1774    #[test]
1775    fn test_timestamp_truncation() {
1776        let buf = r#"
1777        {"time": 9223372036854775807 }
1778        {"time": -9223372036854775808 }
1779        {"time": 9e5 }
1780        "#;
1781
1782        let schema = Arc::new(Schema::new(vec![Field::new(
1783            "time",
1784            DataType::Timestamp(TimeUnit::Nanosecond, None),
1785            true,
1786        )]));
1787
1788        let batches = do_read(buf, 1024, true, false, schema);
1789        assert_eq!(batches.len(), 1);
1790
1791        let i64 = batches[0]
1792            .column(0)
1793            .as_primitive::<TimestampNanosecondType>();
1794        assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1795    }
1796
1797    #[test]
1798    fn test_strict_mode_no_missing_columns_in_schema() {
1799        let buf = r#"
1800        {"a": 1, "b": "2", "c": true}
1801        {"a": 2E0, "b": "4", "c": false}
1802        "#;
1803
1804        let schema = Arc::new(Schema::new(vec![
1805            Field::new("a", DataType::Int16, false),
1806            Field::new("b", DataType::Utf8, false),
1807            Field::new("c", DataType::Boolean, false),
1808        ]));
1809
1810        let batches = do_read(buf, 1024, true, true, schema);
1811        assert_eq!(batches.len(), 1);
1812
1813        let buf = r#"
1814        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1815        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1816        "#;
1817
1818        let schema = Arc::new(Schema::new(vec![
1819            Field::new("a", DataType::Int16, false),
1820            Field::new("b", DataType::Utf8, false),
1821            Field::new_struct(
1822                "c",
1823                vec![
1824                    Field::new("a", DataType::Boolean, false),
1825                    Field::new("b", DataType::Int16, false),
1826                ],
1827                false,
1828            ),
1829        ]));
1830
1831        let batches = do_read(buf, 1024, true, true, schema);
1832        assert_eq!(batches.len(), 1);
1833    }
1834
1835    #[test]
1836    fn test_strict_mode_missing_columns_in_schema() {
1837        let buf = r#"
1838        {"a": 1, "b": "2", "c": true}
1839        {"a": 2E0, "b": "4", "c": false}
1840        "#;
1841
1842        let schema = Arc::new(Schema::new(vec![
1843            Field::new("a", DataType::Int16, true),
1844            Field::new("c", DataType::Boolean, true),
1845        ]));
1846
1847        let err = ReaderBuilder::new(schema)
1848            .with_batch_size(1024)
1849            .with_strict_mode(true)
1850            .build(Cursor::new(buf.as_bytes()))
1851            .unwrap()
1852            .read()
1853            .unwrap_err();
1854
1855        assert_eq!(
1856            err.to_string(),
1857            "Json error: column 'b' missing from schema"
1858        );
1859
1860        let buf = r#"
1861        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1862        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1863        "#;
1864
1865        let schema = Arc::new(Schema::new(vec![
1866            Field::new("a", DataType::Int16, false),
1867            Field::new("b", DataType::Utf8, false),
1868            Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1869        ]));
1870
1871        let err = ReaderBuilder::new(schema)
1872            .with_batch_size(1024)
1873            .with_strict_mode(true)
1874            .build(Cursor::new(buf.as_bytes()))
1875            .unwrap()
1876            .read()
1877            .unwrap_err();
1878
1879        assert_eq!(
1880            err.to_string(),
1881            "Json error: whilst decoding field 'c': column 'b' missing from schema"
1882        );
1883    }
1884
1885    fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1886        let file = File::open(path).unwrap();
1887        let mut reader = BufReader::new(file);
1888        let schema = schema.unwrap_or_else(|| {
1889            let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1890            reader.rewind().unwrap();
1891            schema
1892        });
1893        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1894        builder.build(reader).unwrap()
1895    }
1896
1897    #[test]
1898    fn test_json_basic() {
1899        let mut reader = read_file("test/data/basic.json", None);
1900        let batch = reader.next().unwrap().unwrap();
1901
1902        assert_eq!(8, batch.num_columns());
1903        assert_eq!(12, batch.num_rows());
1904
1905        let schema = reader.schema();
1906        let batch_schema = batch.schema();
1907        assert_eq!(schema, batch_schema);
1908
1909        let a = schema.column_with_name("a").unwrap();
1910        assert_eq!(0, a.0);
1911        assert_eq!(&DataType::Int64, a.1.data_type());
1912        let b = schema.column_with_name("b").unwrap();
1913        assert_eq!(1, b.0);
1914        assert_eq!(&DataType::Float64, b.1.data_type());
1915        let c = schema.column_with_name("c").unwrap();
1916        assert_eq!(2, c.0);
1917        assert_eq!(&DataType::Boolean, c.1.data_type());
1918        let d = schema.column_with_name("d").unwrap();
1919        assert_eq!(3, d.0);
1920        assert_eq!(&DataType::Utf8, d.1.data_type());
1921
1922        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1923        assert_eq!(1, aa.value(0));
1924        assert_eq!(-10, aa.value(1));
1925        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1926        assert_eq!(2.0, bb.value(0));
1927        assert_eq!(-3.5, bb.value(1));
1928        let cc = batch.column(c.0).as_boolean();
1929        assert!(!cc.value(0));
1930        assert!(cc.value(10));
1931        let dd = batch.column(d.0).as_string::<i32>();
1932        assert_eq!("4", dd.value(0));
1933        assert_eq!("text", dd.value(8));
1934    }
1935
1936    #[test]
1937    fn test_json_empty_projection() {
1938        let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1939        let batch = reader.next().unwrap().unwrap();
1940
1941        assert_eq!(0, batch.num_columns());
1942        assert_eq!(12, batch.num_rows());
1943    }
1944
1945    #[test]
1946    fn test_json_basic_with_nulls() {
1947        let mut reader = read_file("test/data/basic_nulls.json", None);
1948        let batch = reader.next().unwrap().unwrap();
1949
1950        assert_eq!(4, batch.num_columns());
1951        assert_eq!(12, batch.num_rows());
1952
1953        let schema = reader.schema();
1954        let batch_schema = batch.schema();
1955        assert_eq!(schema, batch_schema);
1956
1957        let a = schema.column_with_name("a").unwrap();
1958        assert_eq!(&DataType::Int64, a.1.data_type());
1959        let b = schema.column_with_name("b").unwrap();
1960        assert_eq!(&DataType::Float64, b.1.data_type());
1961        let c = schema.column_with_name("c").unwrap();
1962        assert_eq!(&DataType::Boolean, c.1.data_type());
1963        let d = schema.column_with_name("d").unwrap();
1964        assert_eq!(&DataType::Utf8, d.1.data_type());
1965
1966        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1967        assert!(aa.is_valid(0));
1968        assert!(!aa.is_valid(1));
1969        assert!(!aa.is_valid(11));
1970        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1971        assert!(bb.is_valid(0));
1972        assert!(!bb.is_valid(2));
1973        assert!(!bb.is_valid(11));
1974        let cc = batch.column(c.0).as_boolean();
1975        assert!(cc.is_valid(0));
1976        assert!(!cc.is_valid(4));
1977        assert!(!cc.is_valid(11));
1978        let dd = batch.column(d.0).as_string::<i32>();
1979        assert!(!dd.is_valid(0));
1980        assert!(dd.is_valid(1));
1981        assert!(!dd.is_valid(4));
1982        assert!(!dd.is_valid(11));
1983    }
1984
1985    #[test]
1986    fn test_json_basic_schema() {
1987        let schema = Schema::new(vec![
1988            Field::new("a", DataType::Int64, true),
1989            Field::new("b", DataType::Float32, false),
1990            Field::new("c", DataType::Boolean, false),
1991            Field::new("d", DataType::Utf8, false),
1992        ]);
1993
1994        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1995        let reader_schema = reader.schema();
1996        assert_eq!(reader_schema.as_ref(), &schema);
1997        let batch = reader.next().unwrap().unwrap();
1998
1999        assert_eq!(4, batch.num_columns());
2000        assert_eq!(12, batch.num_rows());
2001
2002        let schema = batch.schema();
2003
2004        let a = schema.column_with_name("a").unwrap();
2005        assert_eq!(&DataType::Int64, a.1.data_type());
2006        let b = schema.column_with_name("b").unwrap();
2007        assert_eq!(&DataType::Float32, b.1.data_type());
2008        let c = schema.column_with_name("c").unwrap();
2009        assert_eq!(&DataType::Boolean, c.1.data_type());
2010        let d = schema.column_with_name("d").unwrap();
2011        assert_eq!(&DataType::Utf8, d.1.data_type());
2012
2013        let aa = batch.column(a.0).as_primitive::<Int64Type>();
2014        assert_eq!(1, aa.value(0));
2015        assert_eq!(100000000000000, aa.value(11));
2016        let bb = batch.column(b.0).as_primitive::<Float32Type>();
2017        assert_eq!(2.0, bb.value(0));
2018        assert_eq!(-3.5, bb.value(1));
2019    }
2020
2021    #[test]
2022    fn test_json_basic_schema_projection() {
2023        let schema = Schema::new(vec![
2024            Field::new("a", DataType::Int64, true),
2025            Field::new("c", DataType::Boolean, false),
2026        ]);
2027
2028        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
2029        let batch = reader.next().unwrap().unwrap();
2030
2031        assert_eq!(2, batch.num_columns());
2032        assert_eq!(2, batch.schema().fields().len());
2033        assert_eq!(12, batch.num_rows());
2034
2035        assert_eq!(batch.schema().as_ref(), &schema);
2036
2037        let a = schema.column_with_name("a").unwrap();
2038        assert_eq!(0, a.0);
2039        assert_eq!(&DataType::Int64, a.1.data_type());
2040        let c = schema.column_with_name("c").unwrap();
2041        assert_eq!(1, c.0);
2042        assert_eq!(&DataType::Boolean, c.1.data_type());
2043    }
2044
2045    #[test]
2046    fn test_json_arrays() {
2047        let mut reader = read_file("test/data/arrays.json", None);
2048        let batch = reader.next().unwrap().unwrap();
2049
2050        assert_eq!(4, batch.num_columns());
2051        assert_eq!(3, batch.num_rows());
2052
2053        let schema = batch.schema();
2054
2055        let a = schema.column_with_name("a").unwrap();
2056        assert_eq!(&DataType::Int64, a.1.data_type());
2057        let b = schema.column_with_name("b").unwrap();
2058        assert_eq!(
2059            &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
2060            b.1.data_type()
2061        );
2062        let c = schema.column_with_name("c").unwrap();
2063        assert_eq!(
2064            &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2065            c.1.data_type()
2066        );
2067        let d = schema.column_with_name("d").unwrap();
2068        assert_eq!(&DataType::Utf8, d.1.data_type());
2069
2070        let aa = batch.column(a.0).as_primitive::<Int64Type>();
2071        assert_eq!(1, aa.value(0));
2072        assert_eq!(-10, aa.value(1));
2073        assert_eq!(1627668684594000000, aa.value(2));
2074        let bb = batch.column(b.0).as_list::<i32>();
2075        let bb = bb.values().as_primitive::<Float64Type>();
2076        assert_eq!(9, bb.len());
2077        assert_eq!(2.0, bb.value(0));
2078        assert_eq!(-6.1, bb.value(5));
2079        assert!(!bb.is_valid(7));
2080
2081        let cc = batch
2082            .column(c.0)
2083            .as_any()
2084            .downcast_ref::<ListArray>()
2085            .unwrap();
2086        let cc = cc.values().as_boolean();
2087        assert_eq!(6, cc.len());
2088        assert!(!cc.value(0));
2089        assert!(!cc.value(4));
2090        assert!(!cc.is_valid(5));
2091    }
2092
2093    #[test]
2094    fn test_empty_json_arrays() {
2095        let json_content = r#"
2096            {"items": []}
2097            {"items": null}
2098            {}
2099            "#;
2100
2101        let schema = Arc::new(Schema::new(vec![Field::new(
2102            "items",
2103            DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2104            true,
2105        )]));
2106
2107        let batches = do_read(json_content, 1024, false, false, schema);
2108        assert_eq!(batches.len(), 1);
2109
2110        let col1 = batches[0].column(0).as_list::<i32>();
2111        assert_eq!(col1.null_count(), 2);
2112        assert!(col1.value(0).is_empty());
2113        assert_eq!(col1.value(0).data_type(), &DataType::Null);
2114        assert!(col1.is_null(1));
2115        assert!(col1.is_null(2));
2116    }
2117
2118    #[test]
2119    fn test_nested_empty_json_arrays() {
2120        let json_content = r#"
2121            {"items": [[],[]]}
2122            {"items": [[null, null],[null]]}
2123            "#;
2124
2125        let schema = Arc::new(Schema::new(vec![Field::new(
2126            "items",
2127            DataType::List(FieldRef::new(Field::new_list_field(
2128                DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2129                true,
2130            ))),
2131            true,
2132        )]));
2133
2134        let batches = do_read(json_content, 1024, false, false, schema);
2135        assert_eq!(batches.len(), 1);
2136
2137        let col1 = batches[0].column(0).as_list::<i32>();
2138        assert_eq!(col1.null_count(), 0);
2139        assert_eq!(col1.value(0).len(), 2);
2140        assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2141        assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2142
2143        assert_eq!(col1.value(1).len(), 2);
2144        assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2145        assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2146    }
2147
2148    #[test]
2149    fn test_nested_list_json_arrays() {
2150        let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2151        let a_struct_field = Field::new_struct(
2152            "a",
2153            vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2154            true,
2155        );
2156        let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2157        let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2158        let builder = ReaderBuilder::new(schema).with_batch_size(64);
2159        let json_content = r#"
2160        {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2161        {"a": [{"b": false, "c": null}]}
2162        {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2163        {"a": null}
2164        {"a": []}
2165        {"a": [null]}
2166        "#;
2167        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2168
2169        // build expected output
2170        let d = StringArray::from(vec![
2171            Some("a_text"),
2172            Some("b_text"),
2173            None,
2174            Some("c_text"),
2175            Some("d_text"),
2176            None,
2177            None,
2178        ]);
2179        let c = StructArray::new(
2180            vec![Field::new("d", DataType::Utf8, true)].into(),
2181            vec![Arc::new(d.clone()) as ArrayRef],
2182            Some(NullBuffer::from(vec![
2183                true, true, false, true, true, true, false,
2184            ])),
2185        );
2186        let b = BooleanArray::from(vec![
2187            Some(true),
2188            Some(false),
2189            Some(false),
2190            Some(true),
2191            None,
2192            Some(true),
2193            None,
2194        ]);
2195        let a = StructArray::new(
2196            vec![Field::new("b", DataType::Boolean, true), c_field.clone()].into(),
2197            vec![
2198                Arc::new(b.clone()) as ArrayRef,
2199                Arc::new(c.clone()) as ArrayRef,
2200            ],
2201            Some(NullBuffer::from(vec![
2202                true, true, true, true, true, true, false,
2203            ])),
2204        );
2205        let a_list = ListArray::new(
2206            Arc::new(a_struct_field.clone()),
2207            OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 3, 6, 6, 6, 7])),
2208            Arc::new(a),
2209            Some(NullBuffer::from(vec![true, true, true, false, true, true])),
2210        );
2211
2212        // compare `a` with result from json reader
2213        let batch = reader.next().unwrap().unwrap();
2214        let read = batch.column(0);
2215        assert_eq!(read.len(), 6);
2216        // compare the arrays the long way around, to better detect differences
2217        let read: &ListArray = read.as_list::<i32>();
2218        let expected = &a_list;
2219        assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2220        // compare list null buffers
2221        assert_eq!(read.nulls(), expected.nulls());
2222        // build struct from list
2223        let struct_array = read.values().as_struct();
2224        let expected_struct_array = expected.values().as_struct();
2225
2226        assert_eq!(7, struct_array.len());
2227        assert_eq!(1, struct_array.null_count());
2228        assert_eq!(7, expected_struct_array.len());
2229        assert_eq!(1, expected_struct_array.null_count());
2230        // test struct's nulls
2231        assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2232        // test struct's fields
2233        let read_b = struct_array.column(0);
2234        assert_eq!(read_b.as_ref(), &b);
2235        let read_c = struct_array.column(1);
2236        assert_eq!(read_c.as_struct(), &c);
2237        let read_c = read_c.as_struct();
2238        let read_d = read_c.column(0);
2239        assert_eq!(read_d.as_ref(), &d);
2240
2241        assert_eq!(read, expected);
2242    }
2243
2244    fn assert_read_list_view<O: OffsetSizeTrait>() {
2245        let field = Arc::new(Field::new("item", DataType::Int32, true));
2246        let data_type = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(field.clone());
2247        let schema = Arc::new(Schema::new(vec![Field::new("lv", data_type, true)]));
2248
2249        let buf = r#"
2250        {"lv": [1, 2, 3]}
2251        {"lv": [4, null]}
2252        {"lv": null}
2253        {"lv": [6]}
2254        {"lv": []}
2255        "#;
2256
2257        let batches = do_read(buf, 1024, false, false, schema);
2258        assert_eq!(batches.len(), 1);
2259        let batch = &batches[0];
2260        let col = batch.column(0);
2261        let list_view = col
2262            .as_any()
2263            .downcast_ref::<GenericListViewArray<O>>()
2264            .unwrap();
2265
2266        assert_eq!(list_view.len(), 5);
2267
2268        // Check offsets and sizes
2269        let expected_offsets: Vec<O> = vec![0, 3, 5, 5, 6]
2270            .into_iter()
2271            .map(|v| O::usize_as(v))
2272            .collect();
2273        let expected_sizes: Vec<O> = vec![3, 2, 0, 1, 0]
2274            .into_iter()
2275            .map(|v| O::usize_as(v))
2276            .collect();
2277        assert_eq!(list_view.value_offsets(), &expected_offsets);
2278        assert_eq!(list_view.value_sizes(), &expected_sizes);
2279
2280        // Row 0: [1, 2, 3]
2281        assert!(list_view.is_valid(0));
2282        let vals = list_view.value(0);
2283        let ints = vals.as_primitive::<Int32Type>();
2284        assert_eq!(ints.values(), &[1, 2, 3]);
2285
2286        // Row 1: [4, null]
2287        assert!(list_view.is_valid(1));
2288        let vals = list_view.value(1);
2289        let ints = vals.as_primitive::<Int32Type>();
2290        assert_eq!(ints.len(), 2);
2291        assert_eq!(ints.value(0), 4);
2292        assert!(ints.is_null(1));
2293
2294        // Row 2: null
2295        assert!(list_view.is_null(2));
2296
2297        // Row 3: [6]
2298        assert!(list_view.is_valid(3));
2299        let vals = list_view.value(3);
2300        let ints = vals.as_primitive::<Int32Type>();
2301        assert_eq!(ints.values(), &[6]);
2302
2303        // Row 4: []
2304        assert!(list_view.is_valid(4));
2305        let vals = list_view.value(4);
2306        assert_eq!(vals.len(), 0);
2307    }
2308
2309    #[test]
2310    fn test_read_list_view() {
2311        assert_read_list_view::<i32>();
2312        assert_read_list_view::<i64>();
2313    }
2314
2315    #[test]
2316    fn test_fixed_size_list() {
2317        let buf = r#"
2318        {"a": [1, 2, 3]}
2319        {"a": [4, 5, 6]}
2320        {"a": [7, 8, 9]}
2321        "#;
2322
2323        let field = Field::new_list_field(DataType::Int32, true);
2324        let schema = Arc::new(Schema::new(vec![Field::new(
2325            "a",
2326            DataType::FixedSizeList(Arc::new(field), 3),
2327            false,
2328        )]));
2329
2330        let batches = do_read(buf, 1024, false, false, schema);
2331        assert_eq!(batches.len(), 1);
2332
2333        let col = batches[0].column(0).as_fixed_size_list();
2334        assert_eq!(col.len(), 3);
2335        assert_eq!(col.value_length(), 3);
2336
2337        let values = col.values().as_primitive::<Int32Type>();
2338        assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
2339    }
2340
2341    #[test]
2342    fn test_fixed_size_list_nullable() {
2343        let buf = r#"
2344        {"a": [1, 2]}
2345        {"a": null}
2346        {"a": [3, null]}
2347        "#;
2348
2349        let field = Field::new_list_field(DataType::Int32, true);
2350        let schema = Arc::new(Schema::new(vec![Field::new(
2351            "a",
2352            DataType::FixedSizeList(Arc::new(field), 2),
2353            true,
2354        )]));
2355
2356        let batches = do_read(buf, 1024, false, false, schema);
2357        assert_eq!(batches.len(), 1);
2358
2359        let col = batches[0].column(0).as_fixed_size_list();
2360        assert_eq!(col.len(), 3);
2361        assert!(col.is_valid(0));
2362        assert!(col.is_null(1));
2363        assert!(col.is_valid(2));
2364
2365        let values = col.values().as_primitive::<Int32Type>();
2366        assert_eq!(values.value(0), 1);
2367        assert_eq!(values.value(1), 2);
2368        assert_eq!(values.value(4), 3);
2369        assert!(values.is_null(5));
2370    }
2371
2372    #[test]
2373    fn test_fixed_size_list_wrong_size() {
2374        let buf = r#"{"a": [1, 2, 3]}"#;
2375
2376        let field = Field::new_list_field(DataType::Int32, true);
2377        let schema = Arc::new(Schema::new(vec![Field::new(
2378            "a",
2379            DataType::FixedSizeList(Arc::new(field), 2),
2380            false,
2381        )]));
2382
2383        let err = ReaderBuilder::new(schema)
2384            .build(Cursor::new(buf.as_bytes()))
2385            .unwrap()
2386            .next()
2387            .unwrap()
2388            .unwrap_err();
2389
2390        assert!(err.to_string().contains("expected 2 but got 3"), "{}", err);
2391    }
2392
2393    #[test]
2394    fn test_fixed_size_list_nested() {
2395        let buf = r#"
2396        {"a": [[1, 2], [3, 4]]}
2397        {"a": [[5, 6], [7, 8]]}
2398        "#;
2399
2400        let inner_field = Field::new_list_field(DataType::Int32, true);
2401        let inner_type = DataType::FixedSizeList(Arc::new(inner_field), 2);
2402        let outer_field = Arc::new(Field::new_list_field(inner_type.clone(), true));
2403        let schema = Arc::new(Schema::new(vec![Field::new(
2404            "a",
2405            DataType::FixedSizeList(outer_field, 2),
2406            false,
2407        )]));
2408
2409        let batches = do_read(buf, 1024, false, false, schema);
2410        assert_eq!(batches.len(), 1);
2411
2412        let col = batches[0].column(0).as_fixed_size_list();
2413        assert_eq!(col.len(), 2);
2414        assert_eq!(col.value_length(), 2);
2415
2416        let inner = col.values().as_fixed_size_list();
2417        assert_eq!(inner.len(), 4);
2418        assert_eq!(inner.value_length(), 2);
2419
2420        let values = inner.values().as_primitive::<Int32Type>();
2421        assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6, 7, 8]);
2422    }
2423
2424    #[test]
2425    fn test_fixed_size_list_ignore_type_conflicts() {
2426        let field = Field::new("item", DataType::Int32, true);
2427        let schema = Arc::new(Schema::new(vec![Field::new(
2428            "a",
2429            DataType::FixedSizeList(Arc::new(field), 2),
2430            true,
2431        )]));
2432
2433        let json = vec![
2434            json!({"a": [1, 2]}),
2435            json!({"a": "not a list"}),
2436            json!({"a": 42}),
2437            json!({"a": [6, 7]}),
2438        ];
2439
2440        let mut decoder = ReaderBuilder::new(schema)
2441            .with_ignore_type_conflicts(true)
2442            .build_decoder()
2443            .unwrap();
2444        decoder.serialize(&json).unwrap();
2445        let batch = decoder.flush().unwrap().unwrap();
2446
2447        let col = batch.column(0).as_fixed_size_list();
2448        assert_eq!(col.len(), 4);
2449        assert!(col.is_valid(0));
2450        assert!(col.is_null(1)); // string -> null
2451        assert!(col.is_null(2)); // number -> null
2452        assert!(col.is_valid(3));
2453
2454        let values = col.values().as_primitive::<Int32Type>();
2455        assert_eq!(values.value(0), 1);
2456        assert_eq!(values.value(1), 2);
2457        assert_eq!(values.value(6), 6);
2458        assert_eq!(values.value(7), 7);
2459    }
2460
2461    #[test]
2462    fn test_skip_empty_lines() {
2463        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2464        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2465        let json_content = "
2466        {\"a\": 1}
2467        {\"a\": 2}
2468        {\"a\": 3}";
2469        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2470        let batch = reader.next().unwrap().unwrap();
2471
2472        assert_eq!(1, batch.num_columns());
2473        assert_eq!(3, batch.num_rows());
2474
2475        let schema = reader.schema();
2476        let c = schema.column_with_name("a").unwrap();
2477        assert_eq!(&DataType::Int64, c.1.data_type());
2478    }
2479
2480    #[test]
2481    fn test_with_multiple_batches() {
2482        let file = File::open("test/data/basic_nulls.json").unwrap();
2483        let mut reader = BufReader::new(file);
2484        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2485        reader.rewind().unwrap();
2486
2487        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2488        let mut reader = builder.build(reader).unwrap();
2489
2490        let mut num_records = Vec::new();
2491        while let Some(rb) = reader.next().transpose().unwrap() {
2492            num_records.push(rb.num_rows());
2493        }
2494
2495        assert_eq!(vec![5, 5, 2], num_records);
2496    }
2497
2498    #[test]
2499    fn test_timestamp_from_json_seconds() {
2500        let schema = Schema::new(vec![Field::new(
2501            "a",
2502            DataType::Timestamp(TimeUnit::Second, None),
2503            true,
2504        )]);
2505
2506        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2507        let batch = reader.next().unwrap().unwrap();
2508
2509        assert_eq!(1, batch.num_columns());
2510        assert_eq!(12, batch.num_rows());
2511
2512        let schema = reader.schema();
2513        let batch_schema = batch.schema();
2514        assert_eq!(schema, batch_schema);
2515
2516        let a = schema.column_with_name("a").unwrap();
2517        assert_eq!(
2518            &DataType::Timestamp(TimeUnit::Second, None),
2519            a.1.data_type()
2520        );
2521
2522        let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2523        assert!(aa.is_valid(0));
2524        assert!(!aa.is_valid(1));
2525        assert!(!aa.is_valid(2));
2526        assert_eq!(1, aa.value(0));
2527        assert_eq!(1, aa.value(3));
2528        assert_eq!(5, aa.value(7));
2529    }
2530
2531    #[test]
2532    fn test_timestamp_from_json_milliseconds() {
2533        let schema = Schema::new(vec![Field::new(
2534            "a",
2535            DataType::Timestamp(TimeUnit::Millisecond, None),
2536            true,
2537        )]);
2538
2539        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2540        let batch = reader.next().unwrap().unwrap();
2541
2542        assert_eq!(1, batch.num_columns());
2543        assert_eq!(12, batch.num_rows());
2544
2545        let schema = reader.schema();
2546        let batch_schema = batch.schema();
2547        assert_eq!(schema, batch_schema);
2548
2549        let a = schema.column_with_name("a").unwrap();
2550        assert_eq!(
2551            &DataType::Timestamp(TimeUnit::Millisecond, None),
2552            a.1.data_type()
2553        );
2554
2555        let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2556        assert!(aa.is_valid(0));
2557        assert!(!aa.is_valid(1));
2558        assert!(!aa.is_valid(2));
2559        assert_eq!(1, aa.value(0));
2560        assert_eq!(1, aa.value(3));
2561        assert_eq!(5, aa.value(7));
2562    }
2563
2564    #[test]
2565    fn test_date_from_json_milliseconds() {
2566        let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2567
2568        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2569        let batch = reader.next().unwrap().unwrap();
2570
2571        assert_eq!(1, batch.num_columns());
2572        assert_eq!(12, batch.num_rows());
2573
2574        let schema = reader.schema();
2575        let batch_schema = batch.schema();
2576        assert_eq!(schema, batch_schema);
2577
2578        let a = schema.column_with_name("a").unwrap();
2579        assert_eq!(&DataType::Date64, a.1.data_type());
2580
2581        let aa = batch.column(a.0).as_primitive::<Date64Type>();
2582        assert!(aa.is_valid(0));
2583        assert!(!aa.is_valid(1));
2584        assert!(!aa.is_valid(2));
2585        assert_eq!(1, aa.value(0));
2586        assert_eq!(1, aa.value(3));
2587        assert_eq!(5, aa.value(7));
2588    }
2589
2590    #[test]
2591    fn test_time_from_json_nanoseconds() {
2592        let schema = Schema::new(vec![Field::new(
2593            "a",
2594            DataType::Time64(TimeUnit::Nanosecond),
2595            true,
2596        )]);
2597
2598        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2599        let batch = reader.next().unwrap().unwrap();
2600
2601        assert_eq!(1, batch.num_columns());
2602        assert_eq!(12, batch.num_rows());
2603
2604        let schema = reader.schema();
2605        let batch_schema = batch.schema();
2606        assert_eq!(schema, batch_schema);
2607
2608        let a = schema.column_with_name("a").unwrap();
2609        assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2610
2611        let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2612        assert!(aa.is_valid(0));
2613        assert!(!aa.is_valid(1));
2614        assert!(!aa.is_valid(2));
2615        assert_eq!(1, aa.value(0));
2616        assert_eq!(1, aa.value(3));
2617        assert_eq!(5, aa.value(7));
2618    }
2619
2620    #[test]
2621    fn test_json_iterator() {
2622        let file = File::open("test/data/basic.json").unwrap();
2623        let mut reader = BufReader::new(file);
2624        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2625        reader.rewind().unwrap();
2626
2627        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2628        let reader = builder.build(reader).unwrap();
2629        let schema = reader.schema();
2630        let (col_a_index, _) = schema.column_with_name("a").unwrap();
2631
2632        let mut sum_num_rows = 0;
2633        let mut num_batches = 0;
2634        let mut sum_a = 0;
2635        for batch in reader {
2636            let batch = batch.unwrap();
2637            assert_eq!(8, batch.num_columns());
2638            sum_num_rows += batch.num_rows();
2639            num_batches += 1;
2640            let batch_schema = batch.schema();
2641            assert_eq!(schema, batch_schema);
2642            let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2643            sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2644        }
2645        assert_eq!(12, sum_num_rows);
2646        assert_eq!(3, num_batches);
2647        assert_eq!(100000000000011, sum_a);
2648    }
2649
2650    #[test]
2651    fn test_decoder_error() {
2652        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2653            "a",
2654            vec![Field::new("child", DataType::Int32, false)],
2655            true,
2656        )]));
2657
2658        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2659        let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2660        assert!(decoder.tape_decoder.has_partial_row());
2661        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2662        let _ = decoder.flush().unwrap_err();
2663        assert!(decoder.tape_decoder.has_partial_row());
2664        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2665
2666        let parse_err = |s: &str| {
2667            ReaderBuilder::new(schema.clone())
2668                .build(Cursor::new(s.as_bytes()))
2669                .unwrap()
2670                .next()
2671                .unwrap()
2672                .unwrap_err()
2673                .to_string()
2674        };
2675
2676        let err = parse_err(r#"{"a": 123}"#);
2677        assert_eq!(
2678            err,
2679            "Json error: whilst decoding field 'a': expected { got 123"
2680        );
2681
2682        let err = parse_err(r#"{"a": ["bar"]}"#);
2683        assert_eq!(
2684            err,
2685            r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2686        );
2687
2688        let err = parse_err(r#"{"a": []}"#);
2689        assert_eq!(
2690            err,
2691            "Json error: whilst decoding field 'a': expected { got []"
2692        );
2693
2694        let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2695        assert_eq!(
2696            err,
2697            r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2698        );
2699
2700        let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2701        assert_eq!(
2702            err,
2703            r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2704        );
2705
2706        let err = parse_err(r#"{"a": true}"#);
2707        assert_eq!(
2708            err,
2709            "Json error: whilst decoding field 'a': expected { got true"
2710        );
2711
2712        let err = parse_err(r#"{"a": false}"#);
2713        assert_eq!(
2714            err,
2715            "Json error: whilst decoding field 'a': expected { got false"
2716        );
2717
2718        let err = parse_err(r#"{"a": "foo"}"#);
2719        assert_eq!(
2720            err,
2721            "Json error: whilst decoding field 'a': expected { got \"foo\""
2722        );
2723
2724        let err = parse_err(r#"{"a": {"child": false}}"#);
2725        assert_eq!(
2726            err,
2727            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2728        );
2729
2730        let err = parse_err(r#"{"a": {"child": []}}"#);
2731        assert_eq!(
2732            err,
2733            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2734        );
2735
2736        let err = parse_err(r#"{"a": {"child": [123]}}"#);
2737        assert_eq!(
2738            err,
2739            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2740        );
2741
2742        let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2743        assert_eq!(
2744            err,
2745            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2746        );
2747    }
2748
2749    #[test]
2750    fn test_serialize_timestamp() {
2751        let json = vec![
2752            json!({"timestamp": 1681319393}),
2753            json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2754        ];
2755        let schema = Schema::new(vec![Field::new(
2756            "timestamp",
2757            DataType::Timestamp(TimeUnit::Second, None),
2758            true,
2759        )]);
2760        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2761            .build_decoder()
2762            .unwrap();
2763        decoder.serialize(&json).unwrap();
2764        let batch = decoder.flush().unwrap().unwrap();
2765        assert_eq!(batch.num_rows(), 2);
2766        assert_eq!(batch.num_columns(), 1);
2767        let values = batch.column(0).as_primitive::<TimestampSecondType>();
2768        assert_eq!(values.values(), &[1681319393, -7200]);
2769    }
2770
2771    #[test]
2772    fn test_serialize_decimal() {
2773        let json = vec![
2774            json!({"decimal": 1.234}),
2775            json!({"decimal": "1.234"}),
2776            json!({"decimal": 1234}),
2777            json!({"decimal": "1234"}),
2778        ];
2779        let schema = Schema::new(vec![Field::new(
2780            "decimal",
2781            DataType::Decimal128(10, 3),
2782            true,
2783        )]);
2784        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2785            .build_decoder()
2786            .unwrap();
2787        decoder.serialize(&json).unwrap();
2788        let batch = decoder.flush().unwrap().unwrap();
2789        assert_eq!(batch.num_rows(), 4);
2790        assert_eq!(batch.num_columns(), 1);
2791        let values = batch.column(0).as_primitive::<Decimal128Type>();
2792        assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2793    }
2794
2795    #[test]
2796    fn test_serde_field() {
2797        let field = Field::new("int", DataType::Int32, true);
2798        let mut decoder = ReaderBuilder::new_with_field(field)
2799            .build_decoder()
2800            .unwrap();
2801        decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2802        let b = decoder.flush().unwrap().unwrap();
2803        let values = b.column(0).as_primitive::<Int32Type>().values();
2804        assert_eq!(values, &[1, 2, 3, 4]);
2805    }
2806
2807    #[test]
2808    fn test_serde_large_numbers() {
2809        let field = Field::new("int", DataType::Int64, true);
2810        let mut decoder = ReaderBuilder::new_with_field(field)
2811            .build_decoder()
2812            .unwrap();
2813
2814        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2815        let b = decoder.flush().unwrap().unwrap();
2816        let values = b.column(0).as_primitive::<Int64Type>().values();
2817        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2818
2819        let field = Field::new(
2820            "int",
2821            DataType::Timestamp(TimeUnit::Microsecond, None),
2822            true,
2823        );
2824        let mut decoder = ReaderBuilder::new_with_field(field)
2825            .build_decoder()
2826            .unwrap();
2827
2828        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2829        let b = decoder.flush().unwrap().unwrap();
2830        let values = b
2831            .column(0)
2832            .as_primitive::<TimestampMicrosecondType>()
2833            .values();
2834        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2835    }
2836
2837    #[test]
2838    fn test_coercing_primitive_into_string_decoder() {
2839        let buf = &format!(
2840            r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2841            (i32::MAX as i64 + 10),
2842            i64::MAX - 10
2843        );
2844        let schema = Schema::new(vec![
2845            Field::new("a", DataType::Float64, true),
2846            Field::new("b", DataType::Utf8, true),
2847            Field::new("c", DataType::Utf8, true),
2848        ]);
2849        let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2850        let schema_ref = Arc::new(schema);
2851
2852        // read record batches
2853        let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2854        let mut decoder = reader.build_decoder().unwrap();
2855        decoder.serialize(json_array.as_slice()).unwrap();
2856        let batch = decoder.flush().unwrap().unwrap();
2857        assert_eq!(
2858            batch,
2859            RecordBatch::try_new(
2860                schema_ref,
2861                vec![
2862                    Arc::new(Float64Array::from(vec![
2863                        1.0,
2864                        2.0,
2865                        (i32::MAX as i64 + 10) as f64,
2866                        (i64::MAX - 10) as f64
2867                    ])),
2868                    Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2869                    Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2870                ]
2871            )
2872            .unwrap()
2873        );
2874    }
2875
2876    // Parse the given `row` in `struct_mode` as a type given by fields.
2877    //
2878    // If as_struct == true, wrap the fields in a Struct field with name "r".
2879    // If as_struct == false, wrap the fields in a Schema.
2880    fn _parse_structs(
2881        row: &str,
2882        struct_mode: StructMode,
2883        fields: Fields,
2884        as_struct: bool,
2885    ) -> Result<RecordBatch, ArrowError> {
2886        let builder = if as_struct {
2887            ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2888        } else {
2889            ReaderBuilder::new(Arc::new(Schema::new(fields)))
2890        };
2891        builder
2892            .with_struct_mode(struct_mode)
2893            .build(Cursor::new(row.as_bytes()))
2894            .unwrap()
2895            .next()
2896            .unwrap()
2897    }
2898
2899    #[test]
2900    fn test_struct_decoding_list_length() {
2901        use arrow_array::array;
2902
2903        let row = "[1, 2]";
2904
2905        let mut fields = vec![Field::new("a", DataType::Int32, true)];
2906        let too_few_fields = Fields::from(fields.clone());
2907        fields.push(Field::new("b", DataType::Int32, true));
2908        let correct_fields = Fields::from(fields.clone());
2909        fields.push(Field::new("c", DataType::Int32, true));
2910        let too_many_fields = Fields::from(fields.clone());
2911
2912        let parse = |fields: Fields, as_struct: bool| {
2913            _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2914        };
2915
2916        let expected_row = StructArray::new(
2917            correct_fields.clone(),
2918            vec![
2919                Arc::new(array::Int32Array::from(vec![1])),
2920                Arc::new(array::Int32Array::from(vec![2])),
2921            ],
2922            None,
2923        );
2924        let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2925
2926        assert_eq!(
2927            parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2928            "Json error: found extra columns for 1 fields".to_string()
2929        );
2930        assert_eq!(
2931            parse(too_few_fields, false).unwrap_err().to_string(),
2932            "Json error: found extra columns for 1 fields".to_string()
2933        );
2934        assert_eq!(
2935            parse(correct_fields.clone(), true).unwrap(),
2936            RecordBatch::try_new(
2937                Arc::new(Schema::new(vec![row_field])),
2938                vec![Arc::new(expected_row.clone())]
2939            )
2940            .unwrap()
2941        );
2942        assert_eq!(
2943            parse(correct_fields, false).unwrap(),
2944            RecordBatch::from(expected_row)
2945        );
2946        assert_eq!(
2947            parse(too_many_fields.clone(), true)
2948                .unwrap_err()
2949                .to_string(),
2950            "Json error: found 2 columns for 3 fields".to_string()
2951        );
2952        assert_eq!(
2953            parse(too_many_fields, false).unwrap_err().to_string(),
2954            "Json error: found 2 columns for 3 fields".to_string()
2955        );
2956    }
2957
2958    #[test]
2959    fn test_struct_decoding() {
2960        use arrow_array::builder;
2961
2962        let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2963        let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2964        let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2965
2966        let struct_fields = Fields::from(vec![
2967            Field::new("b", DataType::new_list(DataType::Int32, true), true),
2968            Field::new_map(
2969                "c",
2970                "entries",
2971                Field::new("keys", DataType::Utf8, false),
2972                Field::new("values", DataType::Int32, true),
2973                false,
2974                false,
2975            ),
2976        ]);
2977
2978        let list_array =
2979            ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2980
2981        let map_array = {
2982            let mut map_builder = builder::MapBuilder::new(
2983                None,
2984                builder::StringBuilder::new(),
2985                builder::Int32Builder::new(),
2986            );
2987            map_builder.keys().append_value("d");
2988            map_builder.values().append_value(3);
2989            map_builder.append(true).unwrap();
2990            map_builder.finish()
2991        };
2992
2993        let struct_array = StructArray::new(
2994            struct_fields.clone(),
2995            vec![Arc::new(list_array), Arc::new(map_array)],
2996            None,
2997        );
2998
2999        let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
3000        let schema = Arc::new(Schema::new(fields.clone()));
3001        let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
3002
3003        let parse = |row: &str, struct_mode: StructMode| {
3004            _parse_structs(row, struct_mode, fields.clone(), false)
3005        };
3006
3007        assert_eq!(
3008            parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
3009            expected
3010        );
3011        assert_eq!(
3012            parse(nested_list_json, StructMode::ObjectOnly)
3013                .unwrap_err()
3014                .to_string(),
3015            "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
3016        );
3017        assert_eq!(
3018            parse(nested_mixed_json, StructMode::ObjectOnly)
3019                .unwrap_err()
3020                .to_string(),
3021            "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
3022        );
3023
3024        assert_eq!(
3025            parse(nested_list_json, StructMode::ListOnly).unwrap(),
3026            expected
3027        );
3028        assert_eq!(
3029            parse(nested_object_json, StructMode::ListOnly)
3030                .unwrap_err()
3031                .to_string(),
3032            "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
3033        );
3034        assert_eq!(
3035            parse(nested_mixed_json, StructMode::ListOnly)
3036                .unwrap_err()
3037                .to_string(),
3038            "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
3039        );
3040    }
3041
3042    // Test cases:
3043    // [] -> RecordBatch row with no entries.  Schema = [('a', Int32)] -> Error
3044    // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error
3045    // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error
3046    // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error
3047    #[test]
3048    fn test_struct_decoding_empty_list() {
3049        let int_field = Field::new("a", DataType::Int32, true);
3050        let struct_field = Field::new(
3051            "r",
3052            DataType::Struct(Fields::from(vec![int_field.clone()])),
3053            true,
3054        );
3055
3056        let parse = |row: &str, as_struct: bool, field: Field| {
3057            _parse_structs(
3058                row,
3059                StructMode::ListOnly,
3060                Fields::from(vec![field]),
3061                as_struct,
3062            )
3063        };
3064
3065        // Missing fields
3066        assert_eq!(
3067            parse("[]", true, struct_field.clone())
3068                .unwrap_err()
3069                .to_string(),
3070            "Json error: found 0 columns for 1 fields".to_owned()
3071        );
3072        assert_eq!(
3073            parse("[]", false, int_field.clone())
3074                .unwrap_err()
3075                .to_string(),
3076            "Json error: found 0 columns for 1 fields".to_owned()
3077        );
3078        assert_eq!(
3079            parse("[]", false, struct_field.clone())
3080                .unwrap_err()
3081                .to_string(),
3082            "Json error: found 0 columns for 1 fields".to_owned()
3083        );
3084        assert_eq!(
3085            parse("[[]]", false, struct_field.clone())
3086                .unwrap_err()
3087                .to_string(),
3088            "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
3089        );
3090    }
3091
3092    #[test]
3093    fn test_decode_list_struct_with_wrong_types() {
3094        let int_field = Field::new("a", DataType::Int32, true);
3095        let struct_field = Field::new(
3096            "r",
3097            DataType::Struct(Fields::from(vec![int_field.clone()])),
3098            true,
3099        );
3100
3101        let parse = |row: &str, as_struct: bool, field: Field| {
3102            _parse_structs(
3103                row,
3104                StructMode::ListOnly,
3105                Fields::from(vec![field]),
3106                as_struct,
3107            )
3108        };
3109
3110        // Wrong values
3111        assert_eq!(
3112            parse(r#"[["a"]]"#, false, struct_field.clone())
3113                .unwrap_err()
3114                .to_string(),
3115            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
3116        );
3117        assert_eq!(
3118            parse(r#"[["a"]]"#, true, struct_field.clone())
3119                .unwrap_err()
3120                .to_string(),
3121            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
3122        );
3123        assert_eq!(
3124            parse(r#"["a"]"#, true, int_field.clone())
3125                .unwrap_err()
3126                .to_string(),
3127            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
3128        );
3129        assert_eq!(
3130            parse(r#"["a"]"#, false, int_field.clone())
3131                .unwrap_err()
3132                .to_string(),
3133            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
3134        );
3135    }
3136
3137    #[test]
3138    fn test_type_conflict_nulls() {
3139        let schema = Schema::new(vec![
3140            Field::new("null", DataType::Null, true),
3141            Field::new("bool", DataType::Boolean, true),
3142            Field::new("primitive", DataType::Int32, true),
3143            Field::new("numeric", DataType::Decimal128(10, 3), true),
3144            Field::new("string", DataType::Utf8, true),
3145            Field::new("string_view", DataType::Utf8View, true),
3146            Field::new(
3147                "timestamp",
3148                DataType::Timestamp(TimeUnit::Second, None),
3149                true,
3150            ),
3151            Field::new(
3152                "array",
3153                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3154                true,
3155            ),
3156            Field::new(
3157                "map",
3158                DataType::Map(
3159                    Arc::new(Field::new(
3160                        "entries",
3161                        DataType::Struct(Fields::from(vec![
3162                            Field::new("keys", DataType::Utf8, false),
3163                            Field::new("values", DataType::Utf8, true),
3164                        ])),
3165                        false, // not nullable
3166                    )),
3167                    false, // not sorted
3168                ),
3169                true, // nullable
3170            ),
3171            Field::new(
3172                "struct",
3173                DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3174                true,
3175            ),
3176        ]);
3177
3178        // A compatible value for each schema field above, in schema order
3179        let json_values = vec![
3180            json!(null),
3181            json!(true),
3182            json!(42),
3183            json!(1.234),
3184            json!("hi"),
3185            json!("ho"),
3186            json!("1970-01-01T00:00:00+02:00"),
3187            json!([1, "ho", 3]),
3188            json!({"k": "value"}),
3189            json!({"a": 1}),
3190        ];
3191
3192        // Create a set of JSON rows that rotates each value past every field
3193        let json: Vec<_> = (0..json_values.len())
3194            .map(|i| {
3195                let pairs = json_values[i..]
3196                    .iter()
3197                    .chain(json_values[..i].iter())
3198                    .zip(&schema.fields)
3199                    .map(|(v, f)| (f.name().to_string(), v.clone()))
3200                    .collect();
3201                serde_json::Value::Object(pairs)
3202            })
3203            .collect();
3204        let mut decoder = ReaderBuilder::new(Arc::new(schema))
3205            .with_ignore_type_conflicts(true)
3206            .with_coerce_primitive(true)
3207            .build_decoder()
3208            .unwrap();
3209        decoder.serialize(&json).unwrap();
3210        let batch = decoder.flush().unwrap().unwrap();
3211        assert_eq!(batch.num_rows(), 10);
3212        assert_eq!(batch.num_columns(), 10);
3213
3214        // NOTE: NullArray doesn't materialize any values (they're all NULL by definition)
3215        let _ = batch
3216            .column(0)
3217            .as_any()
3218            .downcast_ref::<NullArray>()
3219            .unwrap();
3220
3221        assert!(
3222            batch
3223                .column(1)
3224                .as_any()
3225                .downcast_ref::<BooleanArray>()
3226                .unwrap()
3227                .iter()
3228                .eq([
3229                    Some(true),
3230                    None,
3231                    None,
3232                    None,
3233                    None,
3234                    None,
3235                    None,
3236                    None,
3237                    None,
3238                    None
3239                ])
3240        );
3241
3242        assert!(batch.column(2).as_primitive::<Int32Type>().iter().eq([
3243            Some(42),
3244            Some(1),
3245            None,
3246            None,
3247            None,
3248            None,
3249            None,
3250            None,
3251            None,
3252            None
3253        ]));
3254
3255        assert!(batch.column(3).as_primitive::<Decimal128Type>().iter().eq([
3256            Some(1234),
3257            None,
3258            None,
3259            None,
3260            None,
3261            None,
3262            None,
3263            None,
3264            None,
3265            Some(42000)
3266        ]));
3267
3268        assert!(
3269            batch
3270                .column(4)
3271                .as_any()
3272                .downcast_ref::<StringArray>()
3273                .unwrap()
3274                .iter()
3275                .eq([
3276                    Some("hi"),
3277                    Some("ho"),
3278                    Some("1970-01-01T00:00:00+02:00"),
3279                    None,
3280                    None,
3281                    None,
3282                    None,
3283                    Some("true"),
3284                    Some("42"),
3285                    Some("1.234"),
3286                ])
3287        );
3288
3289        assert!(
3290            batch
3291                .column(5)
3292                .as_any()
3293                .downcast_ref::<StringViewArray>()
3294                .unwrap()
3295                .iter()
3296                .eq([
3297                    Some("ho"),
3298                    Some("1970-01-01T00:00:00+02:00"),
3299                    None,
3300                    None,
3301                    None,
3302                    None,
3303                    Some("true"),
3304                    Some("42"),
3305                    Some("1.234"),
3306                    Some("hi"),
3307                ])
3308        );
3309
3310        assert!(
3311            batch
3312                .column(6)
3313                .as_primitive::<TimestampSecondType>()
3314                .iter()
3315                .eq([
3316                    Some(-7200),
3317                    None,
3318                    None,
3319                    None,
3320                    None,
3321                    None,
3322                    Some(42),
3323                    None,
3324                    None,
3325                    None,
3326                ])
3327        );
3328
3329        let arrays = batch
3330            .column(7)
3331            .as_any()
3332            .downcast_ref::<ListArray>()
3333            .unwrap();
3334        assert_eq!(
3335            arrays.nulls(),
3336            Some(&NullBuffer::from(
3337                &[
3338                    true, false, false, false, false, false, false, false, false, false
3339                ][..]
3340            ))
3341        );
3342        assert_eq!(arrays.offsets()[1], 3);
3343        let array_values = arrays
3344            .values()
3345            .as_any()
3346            .downcast_ref::<Int32Array>()
3347            .unwrap();
3348        assert!(array_values.iter().eq([Some(1), None, Some(3)]));
3349
3350        let maps = batch.column(8).as_any().downcast_ref::<MapArray>().unwrap();
3351        assert_eq!(
3352            maps.nulls(),
3353            Some(&NullBuffer::from(
3354                // Both map and struct can parse
3355                &[
3356                    true, true, false, false, false, false, false, false, false, false
3357                ][..]
3358            ))
3359        );
3360        let map_keys = maps.keys().as_any().downcast_ref::<StringArray>().unwrap();
3361        assert!(map_keys.iter().eq([Some("k"), Some("a")]));
3362        let map_values = maps
3363            .values()
3364            .as_any()
3365            .downcast_ref::<StringArray>()
3366            .unwrap();
3367        assert!(map_values.iter().eq([Some("value"), Some("1")]));
3368
3369        let structs = batch
3370            .column(9)
3371            .as_any()
3372            .downcast_ref::<StructArray>()
3373            .unwrap();
3374        assert_eq!(
3375            structs.nulls(),
3376            Some(&NullBuffer::from(
3377                // Both map and struct can parse
3378                &[
3379                    true, false, false, false, false, false, false, false, false, true
3380                ][..]
3381            ))
3382        );
3383        let struct_fields = structs
3384            .column(0)
3385            .as_any()
3386            .downcast_ref::<Int32Array>()
3387            .unwrap();
3388        assert!(struct_fields.slice(0, 2).iter().eq([Some(1), None]));
3389    }
3390
3391    #[test]
3392    fn test_type_conflict_non_nullable() {
3393        let fields = [
3394            Field::new("bool", DataType::Boolean, false),
3395            Field::new("primitive", DataType::Int32, false),
3396            Field::new("numeric", DataType::Decimal128(10, 3), false),
3397            Field::new("string", DataType::Utf8, false),
3398            Field::new("string_view", DataType::Utf8View, false),
3399            Field::new(
3400                "timestamp",
3401                DataType::Timestamp(TimeUnit::Second, None),
3402                false,
3403            ),
3404            Field::new(
3405                "array",
3406                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3407                false,
3408            ),
3409            Field::new(
3410                "fixed_size_list",
3411                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 2),
3412                false,
3413            ),
3414            Field::new(
3415                "map",
3416                DataType::Map(
3417                    Arc::new(Field::new(
3418                        "entries",
3419                        DataType::Struct(Fields::from(vec![
3420                            Field::new("keys", DataType::Utf8, false),
3421                            Field::new("values", DataType::Utf8, true),
3422                        ])),
3423                        false, // not nullable
3424                    )),
3425                    false, // not sorted
3426                ),
3427                false, // not nullable
3428            ),
3429            Field::new(
3430                "struct",
3431                DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3432                false,
3433            ),
3434        ];
3435
3436        // Every field above will have a type conflict with at least one of these values
3437        let json_values = vec![json!(true), json!({"a": 1})];
3438
3439        for field in fields {
3440            let mut decoder = ReaderBuilder::new_with_field(field)
3441                .with_ignore_type_conflicts(true)
3442                .build_decoder()
3443                .unwrap();
3444            decoder.serialize(&json_values).unwrap();
3445            decoder
3446                .flush()
3447                .expect_err("type conflict on non-nullable type");
3448        }
3449    }
3450
3451    #[test]
3452    fn test_ignore_type_conflicts_disabled() {
3453        let fields = [
3454            Field::new("null", DataType::Null, true),
3455            Field::new("bool", DataType::Boolean, true),
3456            Field::new("primitive", DataType::Int32, true),
3457            Field::new("numeric", DataType::Decimal128(10, 3), true),
3458            Field::new("string", DataType::Utf8, true),
3459            Field::new("string_view", DataType::Utf8View, true),
3460            Field::new(
3461                "timestamp",
3462                DataType::Timestamp(TimeUnit::Second, None),
3463                true,
3464            ),
3465            Field::new(
3466                "array",
3467                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3468                true,
3469            ),
3470            Field::new(
3471                "fixed_size_list",
3472                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 2),
3473                true,
3474            ),
3475            Field::new(
3476                "map",
3477                DataType::Map(
3478                    Arc::new(Field::new(
3479                        "entries",
3480                        DataType::Struct(Fields::from(vec![
3481                            Field::new("keys", DataType::Utf8, false),
3482                            Field::new("values", DataType::Utf8, true),
3483                        ])),
3484                        false, // not nullable
3485                    )),
3486                    false, // not sorted
3487                ),
3488                true, // not nullable
3489            ),
3490            Field::new(
3491                "struct",
3492                DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3493                true,
3494            ),
3495        ];
3496
3497        // Every field above will have a type conflict with at least one of these values
3498        let json_values = vec![json!(true), json!({"a": 1})];
3499
3500        for field in fields {
3501            let mut decoder = ReaderBuilder::new_with_field(field)
3502                .build_decoder()
3503                .unwrap();
3504            decoder.serialize(&json_values).unwrap();
3505            decoder
3506                .flush()
3507                .expect_err("type conflict on non-nullable type");
3508        }
3509    }
3510
3511    #[test]
3512    fn test_read_run_end_encoded() {
3513        let buf = r#"
3514        {"a": "x"}
3515        {"a": "x"}
3516        {"a": "y"}
3517        {"a": "y"}
3518        {"a": "y"}
3519        "#;
3520
3521        let ree_type = DataType::RunEndEncoded(
3522            Arc::new(Field::new("run_ends", DataType::Int32, false)),
3523            Arc::new(Field::new("values", DataType::Utf8, true)),
3524        );
3525        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3526        let batches = do_read(buf, 1024, false, false, schema);
3527        assert_eq!(batches.len(), 1);
3528
3529        let col = batches[0].column(0);
3530        let run_array = col.as_run::<arrow_array::types::Int32Type>();
3531
3532        // 5 logical values compressed into 2 runs
3533        assert_eq!(run_array.len(), 5);
3534        assert_eq!(run_array.run_ends().values(), &[2, 5]);
3535
3536        let values = run_array.values().as_string::<i32>();
3537        assert_eq!(values.len(), 2);
3538        assert_eq!(values.value(0), "x");
3539        assert_eq!(values.value(1), "y");
3540    }
3541
3542    #[test]
3543    fn test_read_run_end_encoded_consecutive_nulls() {
3544        let buf = r#"
3545        {"a": "x"}
3546        {}
3547        {}
3548        {}
3549        {"a": "y"}
3550        "#;
3551
3552        let ree_type = DataType::RunEndEncoded(
3553            Arc::new(Field::new("run_ends", DataType::Int32, false)),
3554            Arc::new(Field::new("values", DataType::Utf8, true)),
3555        );
3556        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3557        let batches = do_read(buf, 1024, false, false, schema);
3558        assert_eq!(batches.len(), 1);
3559
3560        let col = batches[0].column(0);
3561        let run_array = col.as_run::<arrow_array::types::Int32Type>();
3562
3563        // 5 logical values: "x", null, null, null, "y" → 3 runs
3564        assert_eq!(run_array.len(), 5);
3565        assert_eq!(run_array.run_ends().values(), &[1, 4, 5]);
3566
3567        let values = run_array.values().as_string::<i32>();
3568        assert_eq!(values.len(), 3);
3569        assert_eq!(values.value(0), "x");
3570        assert!(values.is_null(1));
3571        assert_eq!(values.value(2), "y");
3572    }
3573
3574    #[test]
3575    fn test_read_run_end_encoded_all_unique() {
3576        let buf = r#"
3577        {"a": 1}
3578        {"a": 2}
3579        {"a": 3}
3580        "#;
3581
3582        let ree_type = DataType::RunEndEncoded(
3583            Arc::new(Field::new("run_ends", DataType::Int32, false)),
3584            Arc::new(Field::new("values", DataType::Int32, true)),
3585        );
3586        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3587        let batches = do_read(buf, 1024, false, false, schema);
3588        assert_eq!(batches.len(), 1);
3589
3590        let col = batches[0].column(0);
3591        let run_array = col.as_run::<arrow_array::types::Int32Type>();
3592
3593        // No compression: 3 unique values → 3 runs
3594        assert_eq!(run_array.len(), 3);
3595        assert_eq!(run_array.run_ends().values(), &[1, 2, 3]);
3596    }
3597
3598    #[test]
3599    fn test_read_run_end_encoded_int16_run_ends() {
3600        let buf = r#"
3601        {"a": "x"}
3602        {"a": "x"}
3603        {"a": "y"}
3604        "#;
3605
3606        let ree_type = DataType::RunEndEncoded(
3607            Arc::new(Field::new("run_ends", DataType::Int16, false)),
3608            Arc::new(Field::new("values", DataType::Utf8, true)),
3609        );
3610        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3611        let batches = do_read(buf, 1024, false, false, schema);
3612        assert_eq!(batches.len(), 1);
3613
3614        let col = batches[0].column(0);
3615        let run_array = col.as_run::<arrow_array::types::Int16Type>();
3616
3617        assert_eq!(run_array.len(), 3);
3618        assert_eq!(run_array.run_ends().values(), &[2i16, 3]);
3619    }
3620
3621    #[test]
3622    fn test_read_nested_run_end_encoded() {
3623        let buf = r#"
3624        {"a": "x"}
3625        {"a": "x"}
3626        {"a": "y"}
3627        "#;
3628
3629        // The outer REE compresses whole rows, while the inner REE compresses the
3630        // repeated string values produced by decoding those rows.
3631        let inner_type = DataType::RunEndEncoded(
3632            Arc::new(Field::new("run_ends", DataType::Int64, false)),
3633            Arc::new(Field::new("values", DataType::Utf8, true)),
3634        );
3635        let outer_type = DataType::RunEndEncoded(
3636            Arc::new(Field::new("run_ends", DataType::Int64, false)),
3637            Arc::new(Field::new("values", inner_type, true)),
3638        );
3639        let schema = Arc::new(Schema::new(vec![Field::new("a", outer_type, true)]));
3640        let batches = do_read(buf, 1024, false, false, schema);
3641        assert_eq!(batches.len(), 1);
3642
3643        let col = batches[0].column(0);
3644        let outer = col.as_run::<arrow_array::types::Int64Type>();
3645        // Three logical rows compress to two outer runs: ["x", "x"] and ["y"].
3646        assert_eq!(outer.len(), 3);
3647        assert_eq!(outer.run_ends().values(), &[2, 3]);
3648
3649        let nested = outer.values().as_run::<arrow_array::types::Int64Type>();
3650        // The physical values of the outer REE are themselves a two-element REE.
3651        assert_eq!(nested.len(), 2);
3652        assert_eq!(nested.run_ends().values(), &[1, 2]);
3653
3654        let nested_values = nested.values().as_string::<i32>();
3655        assert_eq!(nested_values.len(), 2);
3656        assert_eq!(nested_values.value(0), "x");
3657        assert_eq!(nested_values.value(1), "y");
3658    }
3659}