Skip to main content

arrow_json/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! JSON reader
19//!
20//! This JSON reader allows JSON records to be read into the Arrow memory
21//! model. Records are loaded in batches and are then converted from the record-oriented
22//! representation to the columnar arrow data model.
23//!
24//! The reader ignores whitespace between JSON values, including `\n` and `\r`, allowing
25//! parsing of sequences of one or more arbitrarily formatted JSON values, including
26//! but not limited to newline-delimited JSON.
27//!
28//! # Basic Usage
29//!
30//! [`Reader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
31//!
32//! ```
33//! # use arrow_schema::*;
34//! # use std::fs::File;
35//! # use std::io::BufReader;
36//! # use std::sync::Arc;
37//!
38//! let schema = Arc::new(Schema::new(vec![
39//!     Field::new("a", DataType::Float64, false),
40//!     Field::new("b", DataType::Float64, false),
41//!     Field::new("c", DataType::Boolean, true),
42//! ]));
43//!
44//! let file = File::open("test/data/basic.json").unwrap();
45//!
46//! let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
47//! let batch = json.next().unwrap().unwrap();
48//! ```
49//!
50//! # Async Usage
51//!
52//! The lower-level [`Decoder`] can be integrated with various forms of async data streams,
53//! and is designed to be agnostic to the various different kinds of async IO primitives found
54//! within the Rust ecosystem.
55//!
56//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
57//!
58//! ```
59//! # use std::task::{Poll, ready};
60//! # use bytes::{Buf, Bytes};
61//! # use arrow_schema::ArrowError;
62//! # use futures::stream::{Stream, StreamExt};
63//! # use arrow_array::RecordBatch;
64//! # use arrow_json::reader::Decoder;
65//! #
66//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
67//!     mut decoder: Decoder,
68//!     mut input: S,
69//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
70//!     let mut buffered = Bytes::new();
71//!     futures::stream::poll_fn(move |cx| {
72//!         loop {
73//!             if buffered.is_empty() {
74//!                 buffered = match ready!(input.poll_next_unpin(cx)) {
75//!                     Some(b) => b,
76//!                     None => break,
77//!                 };
78//!             }
79//!             let decoded = match decoder.decode(buffered.as_ref()) {
80//!                 Ok(decoded) => decoded,
81//!                 Err(e) => return Poll::Ready(Some(Err(e))),
82//!             };
83//!             let read = buffered.len();
84//!             buffered.advance(decoded);
85//!             if decoded != read {
86//!                 break
87//!             }
88//!         }
89//!
90//!         Poll::Ready(decoder.flush().transpose())
91//!     })
92//! }
93//!
94//! ```
95//!
96//! In a similar vein, it can also be used with tokio-based IO primitives
97//!
98//! ```
99//! # use std::sync::Arc;
100//! # use arrow_schema::{DataType, Field, Schema};
101//! # use std::pin::Pin;
102//! # use std::task::{Poll, ready};
103//! # use futures::{Stream, TryStreamExt};
104//! # use tokio::io::AsyncBufRead;
105//! # use arrow_array::RecordBatch;
106//! # use arrow_json::reader::Decoder;
107//! # use arrow_schema::ArrowError;
108//! fn decode_stream<R: AsyncBufRead + Unpin>(
109//!     mut decoder: Decoder,
110//!     mut reader: R,
111//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
112//!     futures::stream::poll_fn(move |cx| {
113//!         loop {
114//!             let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
115//!                 Ok(b) if b.is_empty() => break,
116//!                 Ok(b) => b,
117//!                 Err(e) => return Poll::Ready(Some(Err(e.into()))),
118//!             };
119//!             let read = b.len();
120//!             let decoded = match decoder.decode(b) {
121//!                 Ok(decoded) => decoded,
122//!                 Err(e) => return Poll::Ready(Some(Err(e))),
123//!             };
124//!             Pin::new(&mut reader).consume(decoded);
125//!             if decoded != read {
126//!                 break;
127//!             }
128//!         }
129//!
130//!         Poll::Ready(decoder.flush().transpose())
131//!     })
132//! }
133//! ```
134//!
135
136use crate::StructMode;
137use crate::reader::binary_array::{
138    BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
139};
140use std::borrow::Cow;
141use std::io::BufRead;
142use std::sync::Arc;
143
144use chrono::Utc;
145use serde_core::Serialize;
146
147use arrow_array::timezone::Tz;
148use arrow_array::types::*;
149use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, make_array};
150use arrow_data::ArrayData;
151use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
152pub use schema::*;
153
154use crate::reader::boolean_array::BooleanArrayDecoder;
155use crate::reader::decimal_array::DecimalArrayDecoder;
156use crate::reader::list_array::ListArrayDecoder;
157use crate::reader::map_array::MapArrayDecoder;
158use crate::reader::null_array::NullArrayDecoder;
159use crate::reader::primitive_array::PrimitiveArrayDecoder;
160use crate::reader::run_end_array::RunEndEncodedArrayDecoder;
161use crate::reader::string_array::StringArrayDecoder;
162use crate::reader::string_view_array::StringViewArrayDecoder;
163use crate::reader::struct_array::StructArrayDecoder;
164use crate::reader::tape::{Tape, TapeDecoder};
165use crate::reader::timestamp_array::TimestampArrayDecoder;
166
167mod binary_array;
168mod boolean_array;
169mod decimal_array;
170mod list_array;
171mod map_array;
172mod null_array;
173mod primitive_array;
174mod run_end_array;
175mod schema;
176mod serializer;
177mod string_array;
178mod string_view_array;
179mod struct_array;
180mod tape;
181mod timestamp_array;
182
183/// A builder for [`Reader`] and [`Decoder`]
184pub struct ReaderBuilder {
185    batch_size: usize,
186    coerce_primitive: bool,
187    strict_mode: bool,
188    is_field: bool,
189    struct_mode: StructMode,
190
191    schema: SchemaRef,
192}
193
194impl ReaderBuilder {
195    /// Create a new [`ReaderBuilder`] with the provided [`SchemaRef`]
196    ///
197    /// This could be obtained using [`infer_json_schema`] if not known
198    ///
199    /// Any columns not present in `schema` will be ignored, unless `strict_mode` is set to true.
200    /// In this case, an error is returned when a column is missing from `schema`.
201    ///
202    /// [`infer_json_schema`]: crate::reader::infer_json_schema
203    pub fn new(schema: SchemaRef) -> Self {
204        Self {
205            batch_size: 1024,
206            coerce_primitive: false,
207            strict_mode: false,
208            is_field: false,
209            struct_mode: Default::default(),
210            schema,
211        }
212    }
213
214    /// Create a new [`ReaderBuilder`] that will parse JSON values of `field.data_type()`
215    ///
216    /// Unlike [`ReaderBuilder::new`] this does not require the root of the JSON data
217    /// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON value(s)
218    ///
219    /// ```
220    /// # use std::sync::Arc;
221    /// # use arrow_array::cast::AsArray;
222    /// # use arrow_array::types::Int32Type;
223    /// # use arrow_json::ReaderBuilder;
224    /// # use arrow_schema::{DataType, Field};
225    /// // Root of JSON schema is a numeric type
226    /// let data = "1\n2\n3\n";
227    /// let field = Arc::new(Field::new("int", DataType::Int32, true));
228    /// let mut reader = ReaderBuilder::new_with_field(field.clone()).build(data.as_bytes()).unwrap();
229    /// let b = reader.next().unwrap().unwrap();
230    /// let values = b.column(0).as_primitive::<Int32Type>().values();
231    /// assert_eq!(values, &[1, 2, 3]);
232    ///
233    /// // Root of JSON schema is a list type
234    /// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]";
235    /// let field = Field::new_list("int", field.clone(), true);
236    /// let mut reader = ReaderBuilder::new_with_field(field).build(data.as_bytes()).unwrap();
237    /// let b = reader.next().unwrap().unwrap();
238    /// let list = b.column(0).as_list::<i32>();
239    ///
240    /// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]);
241    /// let list_values = list.values().as_primitive::<Int32Type>();
242    /// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]);
243    /// ```
244    pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
245        Self {
246            batch_size: 1024,
247            coerce_primitive: false,
248            strict_mode: false,
249            is_field: true,
250            struct_mode: Default::default(),
251            schema: Arc::new(Schema::new([field.into()])),
252        }
253    }
254
255    /// Sets the batch size in rows to read
256    pub fn with_batch_size(self, batch_size: usize) -> Self {
257        Self { batch_size, ..self }
258    }
259
260    /// Sets if the decoder should coerce primitive values (bool and number) into string
261    /// when the Schema's column is Utf8 or LargeUtf8.
262    pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
263        Self {
264            coerce_primitive,
265            ..self
266        }
267    }
268
269    /// Sets if the decoder should return an error if it encounters a column not
270    /// present in `schema`. If `struct_mode` is `ListOnly` the value of
271    /// `strict_mode` is effectively `true`. It is required for all fields of
272    /// the struct to be in the list: without field names, there is no way to
273    /// determine which field is missing.
274    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
275        Self {
276            strict_mode,
277            ..self
278        }
279    }
280
281    /// Set the [`StructMode`] for the reader, which determines whether structs
282    /// can be decoded from JSON as objects or lists. For more details refer to
283    /// the enum documentation. Default is to use `ObjectOnly`.
284    pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
285        Self {
286            struct_mode,
287            ..self
288        }
289    }
290
291    /// Create a [`Reader`] with the provided [`BufRead`]
292    pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
293        Ok(Reader {
294            reader,
295            decoder: self.build_decoder()?,
296        })
297    }
298
299    /// Create a [`Decoder`]
300    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
301        let (data_type, nullable) = if self.is_field {
302            let field = &self.schema.fields[0];
303            let data_type = Cow::Borrowed(field.data_type());
304            (data_type, field.is_nullable())
305        } else {
306            let data_type = Cow::Owned(DataType::Struct(self.schema.fields.clone()));
307            (data_type, false)
308        };
309
310        let ctx = DecoderContext {
311            coerce_primitive: self.coerce_primitive,
312            strict_mode: self.strict_mode,
313            struct_mode: self.struct_mode,
314        };
315        let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?;
316
317        let num_fields = self.schema.flattened_fields().len();
318
319        Ok(Decoder {
320            decoder,
321            is_field: self.is_field,
322            tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
323            batch_size: self.batch_size,
324            schema: self.schema,
325        })
326    }
327}
328
329/// Reads JSON data with a known schema directly into arrow [`RecordBatch`]
330///
331/// Lines consisting solely of ASCII whitespace are ignored
332pub struct Reader<R> {
333    reader: R,
334    decoder: Decoder,
335}
336
337impl<R> std::fmt::Debug for Reader<R> {
338    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
339        f.debug_struct("Reader")
340            .field("decoder", &self.decoder)
341            .finish()
342    }
343}
344
345impl<R: BufRead> Reader<R> {
346    /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
347    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
348        loop {
349            let buf = self.reader.fill_buf()?;
350            if buf.is_empty() {
351                break;
352            }
353            let read = buf.len();
354
355            let decoded = self.decoder.decode(buf)?;
356            self.reader.consume(decoded);
357            if decoded != read {
358                break;
359            }
360        }
361        self.decoder.flush()
362    }
363}
364
365impl<R: BufRead> Iterator for Reader<R> {
366    type Item = Result<RecordBatch, ArrowError>;
367
368    fn next(&mut self) -> Option<Self::Item> {
369        self.read().transpose()
370    }
371}
372
373impl<R: BufRead> RecordBatchReader for Reader<R> {
374    fn schema(&self) -> SchemaRef {
375        self.decoder.schema.clone()
376    }
377}
378
379/// A low-level interface for reading JSON data from a byte stream
380///
381/// See [`Reader`] for a higher-level interface for interface with [`BufRead`]
382///
383/// The push-based interface facilitates integration with sources that yield arbitrarily
384/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
385/// object storage
386///
387/// ```
388/// # use std::io::BufRead;
389/// # use arrow_array::RecordBatch;
390/// # use arrow_json::reader::{Decoder, ReaderBuilder};
391/// # use arrow_schema::{ArrowError, SchemaRef};
392/// #
393/// fn read_from_json<R: BufRead>(
394///     mut reader: R,
395///     schema: SchemaRef,
396/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
397///     let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
398///     let mut next = move || {
399///         loop {
400///             // Decoder is agnostic that buf doesn't contain whole records
401///             let buf = reader.fill_buf()?;
402///             if buf.is_empty() {
403///                 break; // Input exhausted
404///             }
405///             let read = buf.len();
406///             let decoded = decoder.decode(buf)?;
407///
408///             // Consume the number of bytes read
409///             reader.consume(decoded);
410///             if decoded != read {
411///                 break; // Read batch size
412///             }
413///         }
414///         decoder.flush()
415///     };
416///     Ok(std::iter::from_fn(move || next().transpose()))
417/// }
418/// ```
419pub struct Decoder {
420    tape_decoder: TapeDecoder,
421    decoder: Box<dyn ArrayDecoder>,
422    batch_size: usize,
423    is_field: bool,
424    schema: SchemaRef,
425}
426
427impl std::fmt::Debug for Decoder {
428    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429        f.debug_struct("Decoder")
430            .field("schema", &self.schema)
431            .field("batch_size", &self.batch_size)
432            .finish()
433    }
434}
435
436impl Decoder {
437    /// Read JSON objects from `buf`, returning the number of bytes read
438    ///
439    /// This method returns once `batch_size` objects have been parsed since the
440    /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
441    /// should be included in the next call to [`Self::decode`]
442    ///
443    /// There is no requirement that `buf` contains a whole number of records, facilitating
444    /// integration with arbitrary byte streams, such as those yielded by [`BufRead`]
445    pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
446        self.tape_decoder.decode(buf)
447    }
448
449    /// Serialize `rows` to this [`Decoder`]
450    ///
451    /// This provides a simple way to convert [serde]-compatible datastructures into arrow
452    /// [`RecordBatch`].
453    ///
454    /// Custom conversion logic as described in [arrow_array::builder] will likely outperform this,
455    /// especially where the schema is known at compile-time, however, this provides a mechanism
456    /// to get something up and running quickly
457    ///
458    /// It can be used with [`serde_json::Value`]
459    ///
460    /// ```
461    /// # use std::sync::Arc;
462    /// # use serde_json::{Value, json};
463    /// # use arrow_array::cast::AsArray;
464    /// # use arrow_array::types::Float32Type;
465    /// # use arrow_json::ReaderBuilder;
466    /// # use arrow_schema::{DataType, Field, Schema};
467    /// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
468    ///
469    /// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
470    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
471    ///
472    /// decoder.serialize(&json).unwrap();
473    /// let batch = decoder.flush().unwrap().unwrap();
474    /// assert_eq!(batch.num_rows(), 2);
475    /// assert_eq!(batch.num_columns(), 1);
476    /// let values = batch.column(0).as_primitive::<Float32Type>().values();
477    /// assert_eq!(values, &[2.3, 5.7])
478    /// ```
479    ///
480    /// Or with arbitrary [`Serialize`] types
481    ///
482    /// ```
483    /// # use std::sync::Arc;
484    /// # use arrow_json::ReaderBuilder;
485    /// # use arrow_schema::{DataType, Field, Schema};
486    /// # use serde::Serialize;
487    /// # use arrow_array::cast::AsArray;
488    /// # use arrow_array::types::{Float32Type, Int32Type};
489    /// #
490    /// #[derive(Serialize)]
491    /// struct MyStruct {
492    ///     int32: i32,
493    ///     float: f32,
494    /// }
495    ///
496    /// let schema = Schema::new(vec![
497    ///     Field::new("int32", DataType::Int32, false),
498    ///     Field::new("float", DataType::Float32, false),
499    /// ]);
500    ///
501    /// let rows = vec![
502    ///     MyStruct{ int32: 0, float: 3. },
503    ///     MyStruct{ int32: 4, float: 67.53 },
504    /// ];
505    ///
506    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
507    /// decoder.serialize(&rows).unwrap();
508    ///
509    /// let batch = decoder.flush().unwrap().unwrap();
510    ///
511    /// // Expect batch containing two columns
512    /// let int32 = batch.column(0).as_primitive::<Int32Type>();
513    /// assert_eq!(int32.values(), &[0, 4]);
514    ///
515    /// let float = batch.column(1).as_primitive::<Float32Type>();
516    /// assert_eq!(float.values(), &[3., 67.53]);
517    /// ```
518    ///
519    /// Or even complex nested types
520    ///
521    /// ```
522    /// # use std::collections::BTreeMap;
523    /// # use std::sync::Arc;
524    /// # use arrow_array::StructArray;
525    /// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
526    /// # use arrow_json::ReaderBuilder;
527    /// # use arrow_schema::{DataType, Field, Fields, Schema};
528    /// # use serde::Serialize;
529    /// #
530    /// #[derive(Serialize)]
531    /// struct MyStruct {
532    ///     int32: i32,
533    ///     list: Vec<f64>,
534    ///     nested: Vec<Option<Nested>>,
535    /// }
536    ///
537    /// impl MyStruct {
538    ///     /// Returns the [`Fields`] for [`MyStruct`]
539    ///     fn fields() -> Fields {
540    ///         let nested = DataType::Struct(Nested::fields());
541    ///         Fields::from([
542    ///             Arc::new(Field::new("int32", DataType::Int32, false)),
543    ///             Arc::new(Field::new_list(
544    ///                 "list",
545    ///                 Field::new("element", DataType::Float64, false),
546    ///                 false,
547    ///             )),
548    ///             Arc::new(Field::new_list(
549    ///                 "nested",
550    ///                 Field::new("element", nested, true),
551    ///                 true,
552    ///             )),
553    ///         ])
554    ///     }
555    /// }
556    ///
557    /// #[derive(Serialize)]
558    /// struct Nested {
559    ///     map: BTreeMap<String, Vec<String>>
560    /// }
561    ///
562    /// impl Nested {
563    ///     /// Returns the [`Fields`] for [`Nested`]
564    ///     fn fields() -> Fields {
565    ///         let element = Field::new("element", DataType::Utf8, false);
566    ///         Fields::from([
567    ///             Arc::new(Field::new_map(
568    ///                 "map",
569    ///                 "entries",
570    ///                 Field::new("key", DataType::Utf8, false),
571    ///                 Field::new_list("value", element, false),
572    ///                 false, // sorted
573    ///                 false, // nullable
574    ///             ))
575    ///         ])
576    ///     }
577    /// }
578    ///
579    /// let data = vec![
580    ///     MyStruct {
581    ///         int32: 34,
582    ///         list: vec![1., 2., 34.],
583    ///         nested: vec![
584    ///             None,
585    ///             Some(Nested {
586    ///                 map: vec![
587    ///                     ("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
588    ///                     ("key2".to_string(), vec!["baz".to_string()])
589    ///                 ].into_iter().collect()
590    ///             })
591    ///         ]
592    ///     },
593    ///     MyStruct {
594    ///         int32: 56,
595    ///         list: vec![],
596    ///         nested: vec![]
597    ///     },
598    ///     MyStruct {
599    ///         int32: 24,
600    ///         list: vec![-1., 245.],
601    ///         nested: vec![None]
602    ///     }
603    /// ];
604    ///
605    /// let schema = Schema::new(MyStruct::fields());
606    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
607    /// decoder.serialize(&data).unwrap();
608    /// let batch = decoder.flush().unwrap().unwrap();
609    /// assert_eq!(batch.num_rows(), 3);
610    /// assert_eq!(batch.num_columns(), 3);
611    ///
612    /// // Convert to StructArray to format
613    /// let s = StructArray::from(batch);
614    /// let options = FormatOptions::default().with_null("null");
615    /// let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
616    ///
617    /// assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
618    /// assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
619    /// assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
620    /// ```
621    ///
622    /// Note: this ignores any batch size setting, and always decodes all rows
623    ///
624    /// [serde]: https://docs.rs/serde/latest/serde/
625    pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
626        self.tape_decoder.serialize(rows)
627    }
628
629    /// True if the decoder is currently part way through decoding a record.
630    pub fn has_partial_record(&self) -> bool {
631        self.tape_decoder.has_partial_row()
632    }
633
634    /// The number of unflushed records, including the partially decoded record (if any).
635    pub fn len(&self) -> usize {
636        self.tape_decoder.num_buffered_rows()
637    }
638
639    /// True if there are no records to flush, i.e. [`Self::len`] is zero.
640    pub fn is_empty(&self) -> bool {
641        self.len() == 0
642    }
643
644    /// Flushes the currently buffered data to a [`RecordBatch`]
645    ///
646    /// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true.
647    ///
648    /// Note: This will return an error if called part way through decoding a record,
649    /// i.e. [`Self::has_partial_record`] is true.
650    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
651        let tape = self.tape_decoder.finish()?;
652
653        if tape.num_rows() == 0 {
654            return Ok(None);
655        }
656
657        // First offset is null sentinel
658        let mut next_object = 1;
659        let pos: Vec<_> = (0..tape.num_rows())
660            .map(|_| {
661                let next = tape.next(next_object, "row").unwrap();
662                std::mem::replace(&mut next_object, next)
663            })
664            .collect();
665
666        let decoded = self.decoder.decode(&tape, &pos)?;
667        self.tape_decoder.clear();
668
669        let batch = match self.is_field {
670            true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
671            false => {
672                RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
673            }
674        };
675
676        Ok(Some(batch))
677    }
678}
679
680trait ArrayDecoder: Send {
681    /// Decode elements from `tape` starting at the indexes contained in `pos`
682    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
683}
684
685/// Context for decoder creation, containing configuration.
686///
687/// This context is passed through the decoder creation process and contains
688/// all the configuration needed to create decoders recursively.
689pub struct DecoderContext {
690    /// Whether to coerce primitives to strings
691    coerce_primitive: bool,
692    /// Whether to validate struct fields strictly
693    strict_mode: bool,
694    /// How to decode struct fields
695    struct_mode: StructMode,
696}
697
698impl DecoderContext {
699    /// Returns whether to coerce primitive types (e.g., number to string)
700    pub fn coerce_primitive(&self) -> bool {
701        self.coerce_primitive
702    }
703
704    /// Returns whether to validate struct fields strictly
705    pub fn strict_mode(&self) -> bool {
706        self.strict_mode
707    }
708
709    /// Returns how to decode struct fields
710    pub fn struct_mode(&self) -> StructMode {
711        self.struct_mode
712    }
713
714    /// Create a decoder for a type.
715    ///
716    /// This is the standard way to create child decoders from within a decoder
717    /// implementation.
718    fn make_decoder(
719        &self,
720        data_type: &DataType,
721        is_nullable: bool,
722    ) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
723        make_decoder(self, data_type, is_nullable)
724    }
725}
726
727macro_rules! primitive_decoder {
728    ($t:ty, $data_type:expr) => {
729        Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
730    };
731}
732
733fn make_decoder(
734    ctx: &DecoderContext,
735    data_type: &DataType,
736    is_nullable: bool,
737) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
738    let coerce_primitive = ctx.coerce_primitive();
739    downcast_integer! {
740        *data_type => (primitive_decoder, data_type),
741        DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
742        DataType::Float16 => primitive_decoder!(Float16Type, data_type),
743        DataType::Float32 => primitive_decoder!(Float32Type, data_type),
744        DataType::Float64 => primitive_decoder!(Float64Type, data_type),
745        DataType::Timestamp(TimeUnit::Second, None) => {
746            Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
747        },
748        DataType::Timestamp(TimeUnit::Millisecond, None) => {
749            Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
750        },
751        DataType::Timestamp(TimeUnit::Microsecond, None) => {
752            Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
753        },
754        DataType::Timestamp(TimeUnit::Nanosecond, None) => {
755            Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
756        },
757        DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
758            let tz: Tz = tz.parse()?;
759            Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
760        },
761        DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
762            let tz: Tz = tz.parse()?;
763            Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
764        },
765        DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
766            let tz: Tz = tz.parse()?;
767            Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
768        },
769        DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
770            let tz: Tz = tz.parse()?;
771            Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
772        },
773        DataType::Date32 => primitive_decoder!(Date32Type, data_type),
774        DataType::Date64 => primitive_decoder!(Date64Type, data_type),
775        DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
776        DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
777        DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
778        DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
779        DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
780        DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
781        DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
782        DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
783        DataType::Decimal32(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal32Type>::new(p, s))),
784        DataType::Decimal64(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal64Type>::new(p, s))),
785        DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
786        DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
787        DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
788        DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
789        DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
790        DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
791        DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
792        DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
793        DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
794        DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
795        DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
796        DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
797        DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
798        DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(ctx, data_type, is_nullable)?)),
799        DataType::RunEndEncoded(ref r, _) => match r.data_type() {
800            DataType::Int16 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int16Type>::new(ctx, data_type, is_nullable)?)),
801            DataType::Int32 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int32Type>::new(ctx, data_type, is_nullable)?)),
802            DataType::Int64 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int64Type>::new(ctx, data_type, is_nullable)?)),
803            d => unreachable!("unsupported run end index type: {d}"),
804        },
805        _ => Err(ArrowError::NotYetImplemented(format!("Support for {data_type} in JSON reader")))
806    }
807}
808
809#[cfg(test)]
810mod tests {
811    use serde_json::json;
812    use std::fs::File;
813    use std::io::{BufReader, Cursor, Seek};
814
815    use arrow_array::cast::AsArray;
816    use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
817    use arrow_buffer::{ArrowNativeType, Buffer};
818    use arrow_cast::display::{ArrayFormatter, FormatOptions};
819    use arrow_data::ArrayDataBuilder;
820    use arrow_schema::{Field, Fields};
821
822    use super::*;
823
824    fn do_read(
825        buf: &str,
826        batch_size: usize,
827        coerce_primitive: bool,
828        strict_mode: bool,
829        schema: SchemaRef,
830    ) -> Vec<RecordBatch> {
831        let mut unbuffered = vec![];
832
833        // Test with different batch sizes to test for boundary conditions
834        for batch_size in [1, 3, 100, batch_size] {
835            unbuffered = ReaderBuilder::new(schema.clone())
836                .with_batch_size(batch_size)
837                .with_coerce_primitive(coerce_primitive)
838                .build(Cursor::new(buf.as_bytes()))
839                .unwrap()
840                .collect::<Result<Vec<_>, _>>()
841                .unwrap();
842
843            for b in unbuffered.iter().take(unbuffered.len() - 1) {
844                assert_eq!(b.num_rows(), batch_size)
845            }
846
847            // Test with different buffer sizes to test for boundary conditions
848            for b in [1, 3, 5] {
849                let buffered = ReaderBuilder::new(schema.clone())
850                    .with_batch_size(batch_size)
851                    .with_coerce_primitive(coerce_primitive)
852                    .with_strict_mode(strict_mode)
853                    .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
854                    .unwrap()
855                    .collect::<Result<Vec<_>, _>>()
856                    .unwrap();
857                assert_eq!(unbuffered, buffered);
858            }
859        }
860
861        unbuffered
862    }
863
864    #[test]
865    fn test_basic() {
866        let buf = r#"
867        {"a": 1, "b": 2, "c": true, "d": 1}
868        {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
869
870        {"b": 6, "a": 2.0, "d": 45}
871        {"b": "5", "a": 2}
872        {"b": 4e0}
873        {"b": 7, "a": null}
874        "#;
875
876        let schema = Arc::new(Schema::new(vec![
877            Field::new("a", DataType::Int64, true),
878            Field::new("b", DataType::Int32, true),
879            Field::new("c", DataType::Boolean, true),
880            Field::new("d", DataType::Date32, true),
881            Field::new("e", DataType::Date64, true),
882        ]));
883
884        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
885        assert!(decoder.is_empty());
886        assert_eq!(decoder.len(), 0);
887        assert!(!decoder.has_partial_record());
888        assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
889        assert!(!decoder.is_empty());
890        assert_eq!(decoder.len(), 6);
891        assert!(!decoder.has_partial_record());
892        let batch = decoder.flush().unwrap().unwrap();
893        assert_eq!(batch.num_rows(), 6);
894        assert!(decoder.is_empty());
895        assert_eq!(decoder.len(), 0);
896        assert!(!decoder.has_partial_record());
897
898        let batches = do_read(buf, 1024, false, false, schema);
899        assert_eq!(batches.len(), 1);
900
901        let col1 = batches[0].column(0).as_primitive::<Int64Type>();
902        assert_eq!(col1.null_count(), 2);
903        assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
904        assert!(col1.is_null(4));
905        assert!(col1.is_null(5));
906
907        let col2 = batches[0].column(1).as_primitive::<Int32Type>();
908        assert_eq!(col2.null_count(), 0);
909        assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
910
911        let col3 = batches[0].column(2).as_boolean();
912        assert_eq!(col3.null_count(), 4);
913        assert!(col3.value(0));
914        assert!(!col3.is_null(0));
915        assert!(!col3.value(1));
916        assert!(!col3.is_null(1));
917
918        let col4 = batches[0].column(3).as_primitive::<Date32Type>();
919        assert_eq!(col4.null_count(), 3);
920        assert!(col4.is_null(3));
921        assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
922
923        let col5 = batches[0].column(4).as_primitive::<Date64Type>();
924        assert_eq!(col5.null_count(), 5);
925        assert!(col5.is_null(0));
926        assert!(col5.is_null(2));
927        assert!(col5.is_null(3));
928        assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
929    }
930
931    #[test]
932    fn test_string() {
933        let buf = r#"
934        {"a": "1", "b": "2"}
935        {"a": "hello", "b": "shoo"}
936        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
937
938        {"b": null}
939        {"b": "", "a": null}
940
941        "#;
942        let schema = Arc::new(Schema::new(vec![
943            Field::new("a", DataType::Utf8, true),
944            Field::new("b", DataType::LargeUtf8, true),
945        ]));
946
947        let batches = do_read(buf, 1024, false, false, schema);
948        assert_eq!(batches.len(), 1);
949
950        let col1 = batches[0].column(0).as_string::<i32>();
951        assert_eq!(col1.null_count(), 2);
952        assert_eq!(col1.value(0), "1");
953        assert_eq!(col1.value(1), "hello");
954        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
955        assert!(col1.is_null(3));
956        assert!(col1.is_null(4));
957
958        let col2 = batches[0].column(1).as_string::<i64>();
959        assert_eq!(col2.null_count(), 1);
960        assert_eq!(col2.value(0), "2");
961        assert_eq!(col2.value(1), "shoo");
962        assert_eq!(col2.value(2), "\t😁foo");
963        assert!(col2.is_null(3));
964        assert_eq!(col2.value(4), "");
965    }
966
967    #[test]
968    fn test_long_string_view_allocation() {
969        // The JSON input contains field "a" with different string lengths.
970        // According to the implementation in the decoder:
971        // - For a string, capacity is only increased if its length > 12 bytes.
972        // Therefore, for:
973        // Row 1: "short" (5 bytes) -> capacity += 0
974        // Row 2: "this is definitely long" (24 bytes) -> capacity += 24
975        // Row 3: "hello" (5 bytes) -> capacity += 0
976        // Row 4: "\nfoobar😀asfgÿ" (17 bytes) -> capacity += 17
977        // Expected total capacity = 24 + 17 = 41
978        let expected_capacity: usize = 41;
979
980        let buf = r#"
981        {"a": "short", "b": "dummy"}
982        {"a": "this is definitely long", "b": "dummy"}
983        {"a": "hello", "b": "dummy"}
984        {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
985        "#;
986
987        let schema = Arc::new(Schema::new(vec![
988            Field::new("a", DataType::Utf8View, true),
989            Field::new("b", DataType::LargeUtf8, true),
990        ]));
991
992        let batches = do_read(buf, 1024, false, false, schema);
993        assert_eq!(batches.len(), 1, "Expected one record batch");
994
995        // Get the first column ("a") as a StringViewArray.
996        let col_a = batches[0].column(0);
997        let string_view_array = col_a
998            .as_any()
999            .downcast_ref::<StringViewArray>()
1000            .expect("Column should be a StringViewArray");
1001
1002        // Retrieve the underlying data buffer from the array.
1003        // The builder pre-allocates capacity based on the sum of lengths for long strings.
1004        let data_buffer = string_view_array.to_data().buffers()[0].len();
1005
1006        // Check that the allocated capacity is at least what we expected.
1007        // (The actual buffer may be larger than expected due to rounding or internal allocation strategies.)
1008        assert!(
1009            data_buffer >= expected_capacity,
1010            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1011        );
1012
1013        // Additionally, verify that the decoded values are correct.
1014        assert_eq!(string_view_array.value(0), "short");
1015        assert_eq!(string_view_array.value(1), "this is definitely long");
1016        assert_eq!(string_view_array.value(2), "hello");
1017        assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
1018    }
1019
1020    /// Test the memory capacity allocation logic when converting numeric types to strings.
1021    #[test]
1022    fn test_numeric_view_allocation() {
1023        // For numeric types, the expected capacity calculation is as follows:
1024        // Row 1: 123456789  -> Number converts to the string "123456789" (length 9), 9 <= 12, so no capacity is added.
1025        // Row 2: 1000000000000 -> Treated as an I64 number; its string is "1000000000000" (length 13),
1026        //                        which is >12 and its absolute value is > 999_999_999_999, so 13 bytes are added.
1027        // Row 3: 3.1415 -> F32 number, a fixed estimate of 10 bytes is added.
1028        // Row 4: 2.718281828459045 -> F64 number, a fixed estimate of 10 bytes is added.
1029        // Total expected capacity = 13 + 10 + 10 = 33 bytes.
1030        let expected_capacity: usize = 33;
1031
1032        let buf = r#"
1033    {"n": 123456789}
1034    {"n": 1000000000000}
1035    {"n": 3.1415}
1036    {"n": 2.718281828459045}
1037    "#;
1038
1039        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
1040
1041        let batches = do_read(buf, 1024, true, false, schema);
1042        assert_eq!(batches.len(), 1, "Expected one record batch");
1043
1044        let col_n = batches[0].column(0);
1045        let string_view_array = col_n
1046            .as_any()
1047            .downcast_ref::<StringViewArray>()
1048            .expect("Column should be a StringViewArray");
1049
1050        // Check that the underlying data buffer capacity is at least the expected value.
1051        let data_buffer = string_view_array.to_data().buffers()[0].len();
1052        assert!(
1053            data_buffer >= expected_capacity,
1054            "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1055        );
1056
1057        // Verify that the converted string values are correct.
1058        // Note: The format of the number converted to a string should match the actual implementation.
1059        assert_eq!(string_view_array.value(0), "123456789");
1060        assert_eq!(string_view_array.value(1), "1000000000000");
1061        assert_eq!(string_view_array.value(2), "3.1415");
1062        assert_eq!(string_view_array.value(3), "2.718281828459045");
1063    }
1064
1065    #[test]
1066    fn test_string_with_uft8view() {
1067        let buf = r#"
1068        {"a": "1", "b": "2"}
1069        {"a": "hello", "b": "shoo"}
1070        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1071
1072        {"b": null}
1073        {"b": "", "a": null}
1074
1075        "#;
1076        let schema = Arc::new(Schema::new(vec![
1077            Field::new("a", DataType::Utf8View, true),
1078            Field::new("b", DataType::LargeUtf8, true),
1079        ]));
1080
1081        let batches = do_read(buf, 1024, false, false, schema);
1082        assert_eq!(batches.len(), 1);
1083
1084        let col1 = batches[0].column(0).as_string_view();
1085        assert_eq!(col1.null_count(), 2);
1086        assert_eq!(col1.value(0), "1");
1087        assert_eq!(col1.value(1), "hello");
1088        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1089        assert!(col1.is_null(3));
1090        assert!(col1.is_null(4));
1091        assert_eq!(col1.data_type(), &DataType::Utf8View);
1092
1093        let col2 = batches[0].column(1).as_string::<i64>();
1094        assert_eq!(col2.null_count(), 1);
1095        assert_eq!(col2.value(0), "2");
1096        assert_eq!(col2.value(1), "shoo");
1097        assert_eq!(col2.value(2), "\t😁foo");
1098        assert!(col2.is_null(3));
1099        assert_eq!(col2.value(4), "");
1100    }
1101
1102    #[test]
1103    fn test_complex() {
1104        let buf = r#"
1105           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1106           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1107           {"list": null, "nested": {"a": null}}
1108        "#;
1109
1110        let schema = Arc::new(Schema::new(vec![
1111            Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1112            Field::new_struct(
1113                "nested",
1114                vec![
1115                    Field::new("a", DataType::Int32, true),
1116                    Field::new("b", DataType::Int32, true),
1117                ],
1118                true,
1119            ),
1120            Field::new_struct(
1121                "nested_list",
1122                vec![Field::new_list(
1123                    "list2",
1124                    Field::new_struct(
1125                        "element",
1126                        vec![Field::new("c", DataType::Int32, false)],
1127                        false,
1128                    ),
1129                    true,
1130                )],
1131                true,
1132            ),
1133        ]));
1134
1135        let batches = do_read(buf, 1024, false, false, schema);
1136        assert_eq!(batches.len(), 1);
1137
1138        let list = batches[0].column(0).as_list::<i32>();
1139        assert_eq!(list.len(), 3);
1140        assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1141        assert_eq!(list.null_count(), 1);
1142        assert!(list.is_null(2));
1143        let list_values = list.values().as_primitive::<Int32Type>();
1144        assert_eq!(list_values.values(), &[5, 6]);
1145
1146        let nested = batches[0].column(1).as_struct();
1147        let a = nested.column(0).as_primitive::<Int32Type>();
1148        assert_eq!(list.null_count(), 1);
1149        assert_eq!(a.values(), &[1, 7, 0]);
1150        assert!(list.is_null(2));
1151
1152        let b = nested.column(1).as_primitive::<Int32Type>();
1153        assert_eq!(b.null_count(), 2);
1154        assert_eq!(b.len(), 3);
1155        assert_eq!(b.value(0), 2);
1156        assert!(b.is_null(1));
1157        assert!(b.is_null(2));
1158
1159        let nested_list = batches[0].column(2).as_struct();
1160        assert_eq!(nested_list.len(), 3);
1161        assert_eq!(nested_list.null_count(), 1);
1162        assert!(nested_list.is_null(2));
1163
1164        let list2 = nested_list.column(0).as_list::<i32>();
1165        assert_eq!(list2.len(), 3);
1166        assert_eq!(list2.null_count(), 1);
1167        assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1168        assert!(list2.is_null(2));
1169
1170        let list2_values = list2.values().as_struct();
1171
1172        let c = list2_values.column(0).as_primitive::<Int32Type>();
1173        assert_eq!(c.values(), &[3, 4]);
1174    }
1175
1176    #[test]
1177    fn test_projection() {
1178        let buf = r#"
1179           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1180           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1181        "#;
1182
1183        let schema = Arc::new(Schema::new(vec![
1184            Field::new_struct(
1185                "nested",
1186                vec![Field::new("a", DataType::Int32, false)],
1187                true,
1188            ),
1189            Field::new_struct(
1190                "nested_list",
1191                vec![Field::new_list(
1192                    "list2",
1193                    Field::new_struct(
1194                        "element",
1195                        vec![Field::new("d", DataType::Int32, true)],
1196                        false,
1197                    ),
1198                    true,
1199                )],
1200                true,
1201            ),
1202        ]));
1203
1204        let batches = do_read(buf, 1024, false, false, schema);
1205        assert_eq!(batches.len(), 1);
1206
1207        let nested = batches[0].column(0).as_struct();
1208        assert_eq!(nested.num_columns(), 1);
1209        let a = nested.column(0).as_primitive::<Int32Type>();
1210        assert_eq!(a.null_count(), 0);
1211        assert_eq!(a.values(), &[1, 7]);
1212
1213        let nested_list = batches[0].column(1).as_struct();
1214        assert_eq!(nested_list.num_columns(), 1);
1215        assert_eq!(nested_list.null_count(), 0);
1216
1217        let list2 = nested_list.column(0).as_list::<i32>();
1218        assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1219        assert_eq!(list2.null_count(), 0);
1220
1221        let child = list2.values().as_struct();
1222        assert_eq!(child.num_columns(), 1);
1223        assert_eq!(child.len(), 2);
1224        assert_eq!(child.null_count(), 0);
1225
1226        let c = child.column(0).as_primitive::<Int32Type>();
1227        assert_eq!(c.values(), &[5, 0]);
1228        assert_eq!(c.null_count(), 1);
1229        assert!(c.is_null(1));
1230    }
1231
1232    #[test]
1233    fn test_map() {
1234        let buf = r#"
1235           {"map": {"a": ["foo", null]}}
1236           {"map": {"a": [null], "b": []}}
1237           {"map": {"c": null, "a": ["baz"]}}
1238        "#;
1239        let map = Field::new_map(
1240            "map",
1241            "entries",
1242            Field::new("key", DataType::Utf8, false),
1243            Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1244            false,
1245            true,
1246        );
1247
1248        let schema = Arc::new(Schema::new(vec![map]));
1249
1250        let batches = do_read(buf, 1024, false, false, schema);
1251        assert_eq!(batches.len(), 1);
1252
1253        let map = batches[0].column(0).as_map();
1254        let map_keys = map.keys().as_string::<i32>();
1255        let map_values = map.values().as_list::<i32>();
1256        assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1257
1258        let k: Vec<_> = map_keys.iter().flatten().collect();
1259        assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1260
1261        let list_values = map_values.values().as_string::<i32>();
1262        let lv: Vec<_> = list_values.iter().collect();
1263        assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1264        assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1265        assert_eq!(map_values.null_count(), 1);
1266        assert!(map_values.is_null(3));
1267
1268        let options = FormatOptions::default().with_null("null");
1269        let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1270        assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1271        assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1272        assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1273    }
1274
1275    #[test]
1276    fn test_not_coercing_primitive_into_string_without_flag() {
1277        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1278
1279        let buf = r#"{"a": 1}"#;
1280        let err = ReaderBuilder::new(schema.clone())
1281            .with_batch_size(1024)
1282            .build(Cursor::new(buf.as_bytes()))
1283            .unwrap()
1284            .read()
1285            .unwrap_err();
1286
1287        assert_eq!(
1288            err.to_string(),
1289            "Json error: whilst decoding field 'a': expected string got 1"
1290        );
1291
1292        let buf = r#"{"a": true}"#;
1293        let err = ReaderBuilder::new(schema)
1294            .with_batch_size(1024)
1295            .build(Cursor::new(buf.as_bytes()))
1296            .unwrap()
1297            .read()
1298            .unwrap_err();
1299
1300        assert_eq!(
1301            err.to_string(),
1302            "Json error: whilst decoding field 'a': expected string got true"
1303        );
1304    }
1305
1306    #[test]
1307    fn test_coercing_primitive_into_string() {
1308        let buf = r#"
1309        {"a": 1, "b": 2, "c": true}
1310        {"a": 2E0, "b": 4, "c": false}
1311
1312        {"b": 6, "a": 2.0}
1313        {"b": "5", "a": 2}
1314        {"b": 4e0}
1315        {"b": 7, "a": null}
1316        "#;
1317
1318        let schema = Arc::new(Schema::new(vec![
1319            Field::new("a", DataType::Utf8, true),
1320            Field::new("b", DataType::Utf8, true),
1321            Field::new("c", DataType::Utf8, true),
1322        ]));
1323
1324        let batches = do_read(buf, 1024, true, false, schema);
1325        assert_eq!(batches.len(), 1);
1326
1327        let col1 = batches[0].column(0).as_string::<i32>();
1328        assert_eq!(col1.null_count(), 2);
1329        assert_eq!(col1.value(0), "1");
1330        assert_eq!(col1.value(1), "2E0");
1331        assert_eq!(col1.value(2), "2.0");
1332        assert_eq!(col1.value(3), "2");
1333        assert!(col1.is_null(4));
1334        assert!(col1.is_null(5));
1335
1336        let col2 = batches[0].column(1).as_string::<i32>();
1337        assert_eq!(col2.null_count(), 0);
1338        assert_eq!(col2.value(0), "2");
1339        assert_eq!(col2.value(1), "4");
1340        assert_eq!(col2.value(2), "6");
1341        assert_eq!(col2.value(3), "5");
1342        assert_eq!(col2.value(4), "4e0");
1343        assert_eq!(col2.value(5), "7");
1344
1345        let col3 = batches[0].column(2).as_string::<i32>();
1346        assert_eq!(col3.null_count(), 4);
1347        assert_eq!(col3.value(0), "true");
1348        assert_eq!(col3.value(1), "false");
1349        assert!(col3.is_null(2));
1350        assert!(col3.is_null(3));
1351        assert!(col3.is_null(4));
1352        assert!(col3.is_null(5));
1353    }
1354
1355    fn test_decimal<T: DecimalType>(data_type: DataType) {
1356        let buf = r#"
1357        {"a": 1, "b": 2, "c": 38.30}
1358        {"a": 2, "b": 4, "c": 123.456}
1359
1360        {"b": 1337, "a": "2.0452"}
1361        {"b": "5", "a": "11034.2"}
1362        {"b": 40}
1363        {"b": 1234, "a": null}
1364        "#;
1365
1366        let schema = Arc::new(Schema::new(vec![
1367            Field::new("a", data_type.clone(), true),
1368            Field::new("b", data_type.clone(), true),
1369            Field::new("c", data_type, true),
1370        ]));
1371
1372        let batches = do_read(buf, 1024, true, false, schema);
1373        assert_eq!(batches.len(), 1);
1374
1375        let col1 = batches[0].column(0).as_primitive::<T>();
1376        assert_eq!(col1.null_count(), 2);
1377        assert!(col1.is_null(4));
1378        assert!(col1.is_null(5));
1379        assert_eq!(
1380            col1.values(),
1381            &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1382        );
1383
1384        let col2 = batches[0].column(1).as_primitive::<T>();
1385        assert_eq!(col2.null_count(), 0);
1386        assert_eq!(
1387            col2.values(),
1388            &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1389        );
1390
1391        let col3 = batches[0].column(2).as_primitive::<T>();
1392        assert_eq!(col3.null_count(), 4);
1393        assert!(!col3.is_null(0));
1394        assert!(!col3.is_null(1));
1395        assert!(col3.is_null(2));
1396        assert!(col3.is_null(3));
1397        assert!(col3.is_null(4));
1398        assert!(col3.is_null(5));
1399        assert_eq!(
1400            col3.values(),
1401            &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1402        );
1403    }
1404
1405    #[test]
1406    fn test_decimals() {
1407        test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1408        test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1409        test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1410        test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1411    }
1412
1413    fn test_timestamp<T: ArrowTimestampType>() {
1414        let buf = r#"
1415        {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1416        {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1417
1418        {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1419        {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1420        {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1421        {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1422        "#;
1423
1424        let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1425        let schema = Arc::new(Schema::new(vec![
1426            Field::new("a", T::DATA_TYPE, true),
1427            Field::new("b", T::DATA_TYPE, true),
1428            Field::new("c", T::DATA_TYPE, true),
1429            Field::new("d", with_timezone, true),
1430        ]));
1431
1432        let batches = do_read(buf, 1024, true, false, schema);
1433        assert_eq!(batches.len(), 1);
1434
1435        let unit_in_nanos: i64 = match T::UNIT {
1436            TimeUnit::Second => 1_000_000_000,
1437            TimeUnit::Millisecond => 1_000_000,
1438            TimeUnit::Microsecond => 1_000,
1439            TimeUnit::Nanosecond => 1,
1440        };
1441
1442        let col1 = batches[0].column(0).as_primitive::<T>();
1443        assert_eq!(col1.null_count(), 4);
1444        assert!(col1.is_null(2));
1445        assert!(col1.is_null(3));
1446        assert!(col1.is_null(4));
1447        assert!(col1.is_null(5));
1448        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1449
1450        let col2 = batches[0].column(1).as_primitive::<T>();
1451        assert_eq!(col2.null_count(), 1);
1452        assert!(col2.is_null(5));
1453        assert_eq!(
1454            col2.values(),
1455            &[
1456                1599572549190855000 / unit_in_nanos,
1457                1599572549190855000 / unit_in_nanos,
1458                1599572549000000000 / unit_in_nanos,
1459                40,
1460                1234,
1461                0
1462            ]
1463        );
1464
1465        let col3 = batches[0].column(2).as_primitive::<T>();
1466        assert_eq!(col3.null_count(), 0);
1467        assert_eq!(
1468            col3.values(),
1469            &[
1470                38,
1471                123,
1472                854702816123000000 / unit_in_nanos,
1473                1599572549190855000 / unit_in_nanos,
1474                854702816123000000 / unit_in_nanos,
1475                854738816123000000 / unit_in_nanos
1476            ]
1477        );
1478
1479        let col4 = batches[0].column(3).as_primitive::<T>();
1480
1481        assert_eq!(col4.null_count(), 0);
1482        assert_eq!(
1483            col4.values(),
1484            &[
1485                854674016123000000 / unit_in_nanos,
1486                123,
1487                854702816123000000 / unit_in_nanos,
1488                854720816123000000 / unit_in_nanos,
1489                854674016000000000 / unit_in_nanos,
1490                854640000000000000 / unit_in_nanos
1491            ]
1492        );
1493    }
1494
1495    #[test]
1496    fn test_timestamps() {
1497        test_timestamp::<TimestampSecondType>();
1498        test_timestamp::<TimestampMillisecondType>();
1499        test_timestamp::<TimestampMicrosecondType>();
1500        test_timestamp::<TimestampNanosecondType>();
1501    }
1502
1503    fn test_time<T: ArrowTemporalType>() {
1504        let buf = r#"
1505        {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1506        {"a": 2, "b": "23:59:59", "c": 123.456}
1507
1508        {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1509        {"b": 40, "c": "13:42:29.190855"}
1510        {"b": 1234, "a": null, "c": "09:26:56.123"}
1511        {"c": "14:26:56.123"}
1512        "#;
1513
1514        let unit = match T::DATA_TYPE {
1515            DataType::Time32(unit) | DataType::Time64(unit) => unit,
1516            _ => unreachable!(),
1517        };
1518
1519        let unit_in_nanos = match unit {
1520            TimeUnit::Second => 1_000_000_000,
1521            TimeUnit::Millisecond => 1_000_000,
1522            TimeUnit::Microsecond => 1_000,
1523            TimeUnit::Nanosecond => 1,
1524        };
1525
1526        let schema = Arc::new(Schema::new(vec![
1527            Field::new("a", T::DATA_TYPE, true),
1528            Field::new("b", T::DATA_TYPE, true),
1529            Field::new("c", T::DATA_TYPE, true),
1530        ]));
1531
1532        let batches = do_read(buf, 1024, true, false, schema);
1533        assert_eq!(batches.len(), 1);
1534
1535        let col1 = batches[0].column(0).as_primitive::<T>();
1536        assert_eq!(col1.null_count(), 4);
1537        assert!(col1.is_null(2));
1538        assert!(col1.is_null(3));
1539        assert!(col1.is_null(4));
1540        assert!(col1.is_null(5));
1541        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1542
1543        let col2 = batches[0].column(1).as_primitive::<T>();
1544        assert_eq!(col2.null_count(), 1);
1545        assert!(col2.is_null(5));
1546        assert_eq!(
1547            col2.values(),
1548            &[
1549                34016123000000 / unit_in_nanos,
1550                86399000000000 / unit_in_nanos,
1551                64800000000000 / unit_in_nanos,
1552                40,
1553                1234,
1554                0
1555            ]
1556            .map(T::Native::usize_as)
1557        );
1558
1559        let col3 = batches[0].column(2).as_primitive::<T>();
1560        assert_eq!(col3.null_count(), 0);
1561        assert_eq!(
1562            col3.values(),
1563            &[
1564                38,
1565                123,
1566                34016123000000 / unit_in_nanos,
1567                49349190855000 / unit_in_nanos,
1568                34016123000000 / unit_in_nanos,
1569                52016123000000 / unit_in_nanos
1570            ]
1571            .map(T::Native::usize_as)
1572        );
1573    }
1574
1575    #[test]
1576    fn test_times() {
1577        test_time::<Time32MillisecondType>();
1578        test_time::<Time32SecondType>();
1579        test_time::<Time64MicrosecondType>();
1580        test_time::<Time64NanosecondType>();
1581    }
1582
1583    fn test_duration<T: ArrowTemporalType>() {
1584        let buf = r#"
1585        {"a": 1, "b": "2"}
1586        {"a": 3, "b": null}
1587        "#;
1588
1589        let schema = Arc::new(Schema::new(vec![
1590            Field::new("a", T::DATA_TYPE, true),
1591            Field::new("b", T::DATA_TYPE, true),
1592        ]));
1593
1594        let batches = do_read(buf, 1024, true, false, schema);
1595        assert_eq!(batches.len(), 1);
1596
1597        let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1598        assert_eq!(col_a.null_count(), 0);
1599        assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1600
1601        let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1602        assert_eq!(col2.null_count(), 1);
1603        assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1604    }
1605
1606    #[test]
1607    fn test_durations() {
1608        test_duration::<DurationNanosecondType>();
1609        test_duration::<DurationMicrosecondType>();
1610        test_duration::<DurationMillisecondType>();
1611        test_duration::<DurationSecondType>();
1612    }
1613
1614    #[test]
1615    fn test_delta_checkpoint() {
1616        let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1617        let schema = Arc::new(Schema::new(vec![
1618            Field::new_struct(
1619                "protocol",
1620                vec![
1621                    Field::new("minReaderVersion", DataType::Int32, true),
1622                    Field::new("minWriterVersion", DataType::Int32, true),
1623                ],
1624                true,
1625            ),
1626            Field::new_struct(
1627                "add",
1628                vec![Field::new_map(
1629                    "partitionValues",
1630                    "key_value",
1631                    Field::new("key", DataType::Utf8, false),
1632                    Field::new("value", DataType::Utf8, true),
1633                    false,
1634                    false,
1635                )],
1636                true,
1637            ),
1638        ]));
1639
1640        let batches = do_read(json, 1024, true, false, schema);
1641        assert_eq!(batches.len(), 1);
1642
1643        let s: StructArray = batches.into_iter().next().unwrap().into();
1644        let opts = FormatOptions::default().with_null("null");
1645        let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1646        assert_eq!(
1647            formatter.value(0).to_string(),
1648            "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1649        );
1650    }
1651
1652    #[test]
1653    fn struct_nullability() {
1654        let do_test = |child: DataType| {
1655            // Test correctly enforced nullability
1656            let non_null = r#"{"foo": {}}"#;
1657            let schema = Arc::new(Schema::new(vec![Field::new_struct(
1658                "foo",
1659                vec![Field::new("bar", child, false)],
1660                true,
1661            )]));
1662            let mut reader = ReaderBuilder::new(schema.clone())
1663                .build(Cursor::new(non_null.as_bytes()))
1664                .unwrap();
1665            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1666
1667            let null = r#"{"foo": {bar: null}}"#;
1668            let mut reader = ReaderBuilder::new(schema.clone())
1669                .build(Cursor::new(null.as_bytes()))
1670                .unwrap();
1671            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1672
1673            // Test nulls in nullable parent can mask nulls in non-nullable child
1674            let null = r#"{"foo": null}"#;
1675            let mut reader = ReaderBuilder::new(schema)
1676                .build(Cursor::new(null.as_bytes()))
1677                .unwrap();
1678            let batch = reader.next().unwrap().unwrap();
1679            assert_eq!(batch.num_columns(), 1);
1680            let foo = batch.column(0).as_struct();
1681            assert_eq!(foo.len(), 1);
1682            assert!(foo.is_null(0));
1683            assert_eq!(foo.num_columns(), 1);
1684
1685            let bar = foo.column(0);
1686            assert_eq!(bar.len(), 1);
1687            // Non-nullable child can still contain null as masked by parent
1688            assert!(bar.is_null(0));
1689        };
1690
1691        do_test(DataType::Boolean);
1692        do_test(DataType::Int32);
1693        do_test(DataType::Utf8);
1694        do_test(DataType::Decimal128(2, 1));
1695        do_test(DataType::Timestamp(
1696            TimeUnit::Microsecond,
1697            Some("+00:00".into()),
1698        ));
1699    }
1700
1701    #[test]
1702    fn test_truncation() {
1703        let buf = r#"
1704        {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1705        {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1706        {"i64": -9223372036854775808, "u64": 0 }
1707        {"i64": "-9223372036854775808", "u64": 0 }
1708        "#;
1709
1710        let schema = Arc::new(Schema::new(vec![
1711            Field::new("i64", DataType::Int64, true),
1712            Field::new("u64", DataType::UInt64, true),
1713        ]));
1714
1715        let batches = do_read(buf, 1024, true, false, schema);
1716        assert_eq!(batches.len(), 1);
1717
1718        let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1719        assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1720
1721        let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1722        assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1723    }
1724
1725    #[test]
1726    fn test_timestamp_truncation() {
1727        let buf = r#"
1728        {"time": 9223372036854775807 }
1729        {"time": -9223372036854775808 }
1730        {"time": 9e5 }
1731        "#;
1732
1733        let schema = Arc::new(Schema::new(vec![Field::new(
1734            "time",
1735            DataType::Timestamp(TimeUnit::Nanosecond, None),
1736            true,
1737        )]));
1738
1739        let batches = do_read(buf, 1024, true, false, schema);
1740        assert_eq!(batches.len(), 1);
1741
1742        let i64 = batches[0]
1743            .column(0)
1744            .as_primitive::<TimestampNanosecondType>();
1745        assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1746    }
1747
1748    #[test]
1749    fn test_strict_mode_no_missing_columns_in_schema() {
1750        let buf = r#"
1751        {"a": 1, "b": "2", "c": true}
1752        {"a": 2E0, "b": "4", "c": false}
1753        "#;
1754
1755        let schema = Arc::new(Schema::new(vec![
1756            Field::new("a", DataType::Int16, false),
1757            Field::new("b", DataType::Utf8, false),
1758            Field::new("c", DataType::Boolean, false),
1759        ]));
1760
1761        let batches = do_read(buf, 1024, true, true, schema);
1762        assert_eq!(batches.len(), 1);
1763
1764        let buf = r#"
1765        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1766        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1767        "#;
1768
1769        let schema = Arc::new(Schema::new(vec![
1770            Field::new("a", DataType::Int16, false),
1771            Field::new("b", DataType::Utf8, false),
1772            Field::new_struct(
1773                "c",
1774                vec![
1775                    Field::new("a", DataType::Boolean, false),
1776                    Field::new("b", DataType::Int16, false),
1777                ],
1778                false,
1779            ),
1780        ]));
1781
1782        let batches = do_read(buf, 1024, true, true, schema);
1783        assert_eq!(batches.len(), 1);
1784    }
1785
1786    #[test]
1787    fn test_strict_mode_missing_columns_in_schema() {
1788        let buf = r#"
1789        {"a": 1, "b": "2", "c": true}
1790        {"a": 2E0, "b": "4", "c": false}
1791        "#;
1792
1793        let schema = Arc::new(Schema::new(vec![
1794            Field::new("a", DataType::Int16, true),
1795            Field::new("c", DataType::Boolean, true),
1796        ]));
1797
1798        let err = ReaderBuilder::new(schema)
1799            .with_batch_size(1024)
1800            .with_strict_mode(true)
1801            .build(Cursor::new(buf.as_bytes()))
1802            .unwrap()
1803            .read()
1804            .unwrap_err();
1805
1806        assert_eq!(
1807            err.to_string(),
1808            "Json error: column 'b' missing from schema"
1809        );
1810
1811        let buf = r#"
1812        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1813        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1814        "#;
1815
1816        let schema = Arc::new(Schema::new(vec![
1817            Field::new("a", DataType::Int16, false),
1818            Field::new("b", DataType::Utf8, false),
1819            Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1820        ]));
1821
1822        let err = ReaderBuilder::new(schema)
1823            .with_batch_size(1024)
1824            .with_strict_mode(true)
1825            .build(Cursor::new(buf.as_bytes()))
1826            .unwrap()
1827            .read()
1828            .unwrap_err();
1829
1830        assert_eq!(
1831            err.to_string(),
1832            "Json error: whilst decoding field 'c': column 'b' missing from schema"
1833        );
1834    }
1835
1836    fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1837        let file = File::open(path).unwrap();
1838        let mut reader = BufReader::new(file);
1839        let schema = schema.unwrap_or_else(|| {
1840            let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1841            reader.rewind().unwrap();
1842            schema
1843        });
1844        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1845        builder.build(reader).unwrap()
1846    }
1847
1848    #[test]
1849    fn test_json_basic() {
1850        let mut reader = read_file("test/data/basic.json", None);
1851        let batch = reader.next().unwrap().unwrap();
1852
1853        assert_eq!(8, batch.num_columns());
1854        assert_eq!(12, batch.num_rows());
1855
1856        let schema = reader.schema();
1857        let batch_schema = batch.schema();
1858        assert_eq!(schema, batch_schema);
1859
1860        let a = schema.column_with_name("a").unwrap();
1861        assert_eq!(0, a.0);
1862        assert_eq!(&DataType::Int64, a.1.data_type());
1863        let b = schema.column_with_name("b").unwrap();
1864        assert_eq!(1, b.0);
1865        assert_eq!(&DataType::Float64, b.1.data_type());
1866        let c = schema.column_with_name("c").unwrap();
1867        assert_eq!(2, c.0);
1868        assert_eq!(&DataType::Boolean, c.1.data_type());
1869        let d = schema.column_with_name("d").unwrap();
1870        assert_eq!(3, d.0);
1871        assert_eq!(&DataType::Utf8, d.1.data_type());
1872
1873        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1874        assert_eq!(1, aa.value(0));
1875        assert_eq!(-10, aa.value(1));
1876        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1877        assert_eq!(2.0, bb.value(0));
1878        assert_eq!(-3.5, bb.value(1));
1879        let cc = batch.column(c.0).as_boolean();
1880        assert!(!cc.value(0));
1881        assert!(cc.value(10));
1882        let dd = batch.column(d.0).as_string::<i32>();
1883        assert_eq!("4", dd.value(0));
1884        assert_eq!("text", dd.value(8));
1885    }
1886
1887    #[test]
1888    fn test_json_empty_projection() {
1889        let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1890        let batch = reader.next().unwrap().unwrap();
1891
1892        assert_eq!(0, batch.num_columns());
1893        assert_eq!(12, batch.num_rows());
1894    }
1895
1896    #[test]
1897    fn test_json_basic_with_nulls() {
1898        let mut reader = read_file("test/data/basic_nulls.json", None);
1899        let batch = reader.next().unwrap().unwrap();
1900
1901        assert_eq!(4, batch.num_columns());
1902        assert_eq!(12, batch.num_rows());
1903
1904        let schema = reader.schema();
1905        let batch_schema = batch.schema();
1906        assert_eq!(schema, batch_schema);
1907
1908        let a = schema.column_with_name("a").unwrap();
1909        assert_eq!(&DataType::Int64, a.1.data_type());
1910        let b = schema.column_with_name("b").unwrap();
1911        assert_eq!(&DataType::Float64, b.1.data_type());
1912        let c = schema.column_with_name("c").unwrap();
1913        assert_eq!(&DataType::Boolean, c.1.data_type());
1914        let d = schema.column_with_name("d").unwrap();
1915        assert_eq!(&DataType::Utf8, d.1.data_type());
1916
1917        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1918        assert!(aa.is_valid(0));
1919        assert!(!aa.is_valid(1));
1920        assert!(!aa.is_valid(11));
1921        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1922        assert!(bb.is_valid(0));
1923        assert!(!bb.is_valid(2));
1924        assert!(!bb.is_valid(11));
1925        let cc = batch.column(c.0).as_boolean();
1926        assert!(cc.is_valid(0));
1927        assert!(!cc.is_valid(4));
1928        assert!(!cc.is_valid(11));
1929        let dd = batch.column(d.0).as_string::<i32>();
1930        assert!(!dd.is_valid(0));
1931        assert!(dd.is_valid(1));
1932        assert!(!dd.is_valid(4));
1933        assert!(!dd.is_valid(11));
1934    }
1935
1936    #[test]
1937    fn test_json_basic_schema() {
1938        let schema = Schema::new(vec![
1939            Field::new("a", DataType::Int64, true),
1940            Field::new("b", DataType::Float32, false),
1941            Field::new("c", DataType::Boolean, false),
1942            Field::new("d", DataType::Utf8, false),
1943        ]);
1944
1945        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1946        let reader_schema = reader.schema();
1947        assert_eq!(reader_schema.as_ref(), &schema);
1948        let batch = reader.next().unwrap().unwrap();
1949
1950        assert_eq!(4, batch.num_columns());
1951        assert_eq!(12, batch.num_rows());
1952
1953        let schema = batch.schema();
1954
1955        let a = schema.column_with_name("a").unwrap();
1956        assert_eq!(&DataType::Int64, a.1.data_type());
1957        let b = schema.column_with_name("b").unwrap();
1958        assert_eq!(&DataType::Float32, b.1.data_type());
1959        let c = schema.column_with_name("c").unwrap();
1960        assert_eq!(&DataType::Boolean, c.1.data_type());
1961        let d = schema.column_with_name("d").unwrap();
1962        assert_eq!(&DataType::Utf8, d.1.data_type());
1963
1964        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1965        assert_eq!(1, aa.value(0));
1966        assert_eq!(100000000000000, aa.value(11));
1967        let bb = batch.column(b.0).as_primitive::<Float32Type>();
1968        assert_eq!(2.0, bb.value(0));
1969        assert_eq!(-3.5, bb.value(1));
1970    }
1971
1972    #[test]
1973    fn test_json_basic_schema_projection() {
1974        let schema = Schema::new(vec![
1975            Field::new("a", DataType::Int64, true),
1976            Field::new("c", DataType::Boolean, false),
1977        ]);
1978
1979        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1980        let batch = reader.next().unwrap().unwrap();
1981
1982        assert_eq!(2, batch.num_columns());
1983        assert_eq!(2, batch.schema().fields().len());
1984        assert_eq!(12, batch.num_rows());
1985
1986        assert_eq!(batch.schema().as_ref(), &schema);
1987
1988        let a = schema.column_with_name("a").unwrap();
1989        assert_eq!(0, a.0);
1990        assert_eq!(&DataType::Int64, a.1.data_type());
1991        let c = schema.column_with_name("c").unwrap();
1992        assert_eq!(1, c.0);
1993        assert_eq!(&DataType::Boolean, c.1.data_type());
1994    }
1995
1996    #[test]
1997    fn test_json_arrays() {
1998        let mut reader = read_file("test/data/arrays.json", None);
1999        let batch = reader.next().unwrap().unwrap();
2000
2001        assert_eq!(4, batch.num_columns());
2002        assert_eq!(3, batch.num_rows());
2003
2004        let schema = batch.schema();
2005
2006        let a = schema.column_with_name("a").unwrap();
2007        assert_eq!(&DataType::Int64, a.1.data_type());
2008        let b = schema.column_with_name("b").unwrap();
2009        assert_eq!(
2010            &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
2011            b.1.data_type()
2012        );
2013        let c = schema.column_with_name("c").unwrap();
2014        assert_eq!(
2015            &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2016            c.1.data_type()
2017        );
2018        let d = schema.column_with_name("d").unwrap();
2019        assert_eq!(&DataType::Utf8, d.1.data_type());
2020
2021        let aa = batch.column(a.0).as_primitive::<Int64Type>();
2022        assert_eq!(1, aa.value(0));
2023        assert_eq!(-10, aa.value(1));
2024        assert_eq!(1627668684594000000, aa.value(2));
2025        let bb = batch.column(b.0).as_list::<i32>();
2026        let bb = bb.values().as_primitive::<Float64Type>();
2027        assert_eq!(9, bb.len());
2028        assert_eq!(2.0, bb.value(0));
2029        assert_eq!(-6.1, bb.value(5));
2030        assert!(!bb.is_valid(7));
2031
2032        let cc = batch
2033            .column(c.0)
2034            .as_any()
2035            .downcast_ref::<ListArray>()
2036            .unwrap();
2037        let cc = cc.values().as_boolean();
2038        assert_eq!(6, cc.len());
2039        assert!(!cc.value(0));
2040        assert!(!cc.value(4));
2041        assert!(!cc.is_valid(5));
2042    }
2043
2044    #[test]
2045    fn test_empty_json_arrays() {
2046        let json_content = r#"
2047            {"items": []}
2048            {"items": null}
2049            {}
2050            "#;
2051
2052        let schema = Arc::new(Schema::new(vec![Field::new(
2053            "items",
2054            DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2055            true,
2056        )]));
2057
2058        let batches = do_read(json_content, 1024, false, false, schema);
2059        assert_eq!(batches.len(), 1);
2060
2061        let col1 = batches[0].column(0).as_list::<i32>();
2062        assert_eq!(col1.null_count(), 2);
2063        assert!(col1.value(0).is_empty());
2064        assert_eq!(col1.value(0).data_type(), &DataType::Null);
2065        assert!(col1.is_null(1));
2066        assert!(col1.is_null(2));
2067    }
2068
2069    #[test]
2070    fn test_nested_empty_json_arrays() {
2071        let json_content = r#"
2072            {"items": [[],[]]}
2073            {"items": [[null, null],[null]]}
2074            "#;
2075
2076        let schema = Arc::new(Schema::new(vec![Field::new(
2077            "items",
2078            DataType::List(FieldRef::new(Field::new_list_field(
2079                DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2080                true,
2081            ))),
2082            true,
2083        )]));
2084
2085        let batches = do_read(json_content, 1024, false, false, schema);
2086        assert_eq!(batches.len(), 1);
2087
2088        let col1 = batches[0].column(0).as_list::<i32>();
2089        assert_eq!(col1.null_count(), 0);
2090        assert_eq!(col1.value(0).len(), 2);
2091        assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2092        assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2093
2094        assert_eq!(col1.value(1).len(), 2);
2095        assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2096        assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2097    }
2098
2099    #[test]
2100    fn test_nested_list_json_arrays() {
2101        let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2102        let a_struct_field = Field::new_struct(
2103            "a",
2104            vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2105            true,
2106        );
2107        let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2108        let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2109        let builder = ReaderBuilder::new(schema).with_batch_size(64);
2110        let json_content = r#"
2111        {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2112        {"a": [{"b": false, "c": null}]}
2113        {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2114        {"a": null}
2115        {"a": []}
2116        {"a": [null]}
2117        "#;
2118        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2119
2120        // build expected output
2121        let d = StringArray::from(vec![
2122            Some("a_text"),
2123            Some("b_text"),
2124            None,
2125            Some("c_text"),
2126            Some("d_text"),
2127            None,
2128            None,
2129        ]);
2130        let c = ArrayDataBuilder::new(c_field.data_type().clone())
2131            .len(7)
2132            .add_child_data(d.to_data())
2133            .null_bit_buffer(Some(Buffer::from([0b00111011])))
2134            .build()
2135            .unwrap();
2136        let b = BooleanArray::from(vec![
2137            Some(true),
2138            Some(false),
2139            Some(false),
2140            Some(true),
2141            None,
2142            Some(true),
2143            None,
2144        ]);
2145        let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2146            .len(7)
2147            .add_child_data(b.to_data())
2148            .add_child_data(c.clone())
2149            .null_bit_buffer(Some(Buffer::from([0b00111111])))
2150            .build()
2151            .unwrap();
2152        let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2153            .len(6)
2154            .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2155            .add_child_data(a)
2156            .null_bit_buffer(Some(Buffer::from([0b00110111])))
2157            .build()
2158            .unwrap();
2159        let expected = make_array(a_list);
2160
2161        // compare `a` with result from json reader
2162        let batch = reader.next().unwrap().unwrap();
2163        let read = batch.column(0);
2164        assert_eq!(read.len(), 6);
2165        // compare the arrays the long way around, to better detect differences
2166        let read: &ListArray = read.as_list::<i32>();
2167        let expected = expected.as_list::<i32>();
2168        assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2169        // compare list null buffers
2170        assert_eq!(read.nulls(), expected.nulls());
2171        // build struct from list
2172        let struct_array = read.values().as_struct();
2173        let expected_struct_array = expected.values().as_struct();
2174
2175        assert_eq!(7, struct_array.len());
2176        assert_eq!(1, struct_array.null_count());
2177        assert_eq!(7, expected_struct_array.len());
2178        assert_eq!(1, expected_struct_array.null_count());
2179        // test struct's nulls
2180        assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2181        // test struct's fields
2182        let read_b = struct_array.column(0);
2183        assert_eq!(read_b.as_ref(), &b);
2184        let read_c = struct_array.column(1);
2185        assert_eq!(read_c.to_data(), c);
2186        let read_c = read_c.as_struct();
2187        let read_d = read_c.column(0);
2188        assert_eq!(read_d.as_ref(), &d);
2189
2190        assert_eq!(read, expected);
2191    }
2192
2193    #[test]
2194    fn test_skip_empty_lines() {
2195        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2196        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2197        let json_content = "
2198        {\"a\": 1}
2199        {\"a\": 2}
2200        {\"a\": 3}";
2201        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2202        let batch = reader.next().unwrap().unwrap();
2203
2204        assert_eq!(1, batch.num_columns());
2205        assert_eq!(3, batch.num_rows());
2206
2207        let schema = reader.schema();
2208        let c = schema.column_with_name("a").unwrap();
2209        assert_eq!(&DataType::Int64, c.1.data_type());
2210    }
2211
2212    #[test]
2213    fn test_with_multiple_batches() {
2214        let file = File::open("test/data/basic_nulls.json").unwrap();
2215        let mut reader = BufReader::new(file);
2216        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2217        reader.rewind().unwrap();
2218
2219        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2220        let mut reader = builder.build(reader).unwrap();
2221
2222        let mut num_records = Vec::new();
2223        while let Some(rb) = reader.next().transpose().unwrap() {
2224            num_records.push(rb.num_rows());
2225        }
2226
2227        assert_eq!(vec![5, 5, 2], num_records);
2228    }
2229
2230    #[test]
2231    fn test_timestamp_from_json_seconds() {
2232        let schema = Schema::new(vec![Field::new(
2233            "a",
2234            DataType::Timestamp(TimeUnit::Second, None),
2235            true,
2236        )]);
2237
2238        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2239        let batch = reader.next().unwrap().unwrap();
2240
2241        assert_eq!(1, batch.num_columns());
2242        assert_eq!(12, batch.num_rows());
2243
2244        let schema = reader.schema();
2245        let batch_schema = batch.schema();
2246        assert_eq!(schema, batch_schema);
2247
2248        let a = schema.column_with_name("a").unwrap();
2249        assert_eq!(
2250            &DataType::Timestamp(TimeUnit::Second, None),
2251            a.1.data_type()
2252        );
2253
2254        let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2255        assert!(aa.is_valid(0));
2256        assert!(!aa.is_valid(1));
2257        assert!(!aa.is_valid(2));
2258        assert_eq!(1, aa.value(0));
2259        assert_eq!(1, aa.value(3));
2260        assert_eq!(5, aa.value(7));
2261    }
2262
2263    #[test]
2264    fn test_timestamp_from_json_milliseconds() {
2265        let schema = Schema::new(vec![Field::new(
2266            "a",
2267            DataType::Timestamp(TimeUnit::Millisecond, None),
2268            true,
2269        )]);
2270
2271        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2272        let batch = reader.next().unwrap().unwrap();
2273
2274        assert_eq!(1, batch.num_columns());
2275        assert_eq!(12, batch.num_rows());
2276
2277        let schema = reader.schema();
2278        let batch_schema = batch.schema();
2279        assert_eq!(schema, batch_schema);
2280
2281        let a = schema.column_with_name("a").unwrap();
2282        assert_eq!(
2283            &DataType::Timestamp(TimeUnit::Millisecond, None),
2284            a.1.data_type()
2285        );
2286
2287        let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2288        assert!(aa.is_valid(0));
2289        assert!(!aa.is_valid(1));
2290        assert!(!aa.is_valid(2));
2291        assert_eq!(1, aa.value(0));
2292        assert_eq!(1, aa.value(3));
2293        assert_eq!(5, aa.value(7));
2294    }
2295
2296    #[test]
2297    fn test_date_from_json_milliseconds() {
2298        let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2299
2300        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2301        let batch = reader.next().unwrap().unwrap();
2302
2303        assert_eq!(1, batch.num_columns());
2304        assert_eq!(12, batch.num_rows());
2305
2306        let schema = reader.schema();
2307        let batch_schema = batch.schema();
2308        assert_eq!(schema, batch_schema);
2309
2310        let a = schema.column_with_name("a").unwrap();
2311        assert_eq!(&DataType::Date64, a.1.data_type());
2312
2313        let aa = batch.column(a.0).as_primitive::<Date64Type>();
2314        assert!(aa.is_valid(0));
2315        assert!(!aa.is_valid(1));
2316        assert!(!aa.is_valid(2));
2317        assert_eq!(1, aa.value(0));
2318        assert_eq!(1, aa.value(3));
2319        assert_eq!(5, aa.value(7));
2320    }
2321
2322    #[test]
2323    fn test_time_from_json_nanoseconds() {
2324        let schema = Schema::new(vec![Field::new(
2325            "a",
2326            DataType::Time64(TimeUnit::Nanosecond),
2327            true,
2328        )]);
2329
2330        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2331        let batch = reader.next().unwrap().unwrap();
2332
2333        assert_eq!(1, batch.num_columns());
2334        assert_eq!(12, batch.num_rows());
2335
2336        let schema = reader.schema();
2337        let batch_schema = batch.schema();
2338        assert_eq!(schema, batch_schema);
2339
2340        let a = schema.column_with_name("a").unwrap();
2341        assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2342
2343        let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2344        assert!(aa.is_valid(0));
2345        assert!(!aa.is_valid(1));
2346        assert!(!aa.is_valid(2));
2347        assert_eq!(1, aa.value(0));
2348        assert_eq!(1, aa.value(3));
2349        assert_eq!(5, aa.value(7));
2350    }
2351
2352    #[test]
2353    fn test_json_iterator() {
2354        let file = File::open("test/data/basic.json").unwrap();
2355        let mut reader = BufReader::new(file);
2356        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2357        reader.rewind().unwrap();
2358
2359        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2360        let reader = builder.build(reader).unwrap();
2361        let schema = reader.schema();
2362        let (col_a_index, _) = schema.column_with_name("a").unwrap();
2363
2364        let mut sum_num_rows = 0;
2365        let mut num_batches = 0;
2366        let mut sum_a = 0;
2367        for batch in reader {
2368            let batch = batch.unwrap();
2369            assert_eq!(8, batch.num_columns());
2370            sum_num_rows += batch.num_rows();
2371            num_batches += 1;
2372            let batch_schema = batch.schema();
2373            assert_eq!(schema, batch_schema);
2374            let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2375            sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2376        }
2377        assert_eq!(12, sum_num_rows);
2378        assert_eq!(3, num_batches);
2379        assert_eq!(100000000000011, sum_a);
2380    }
2381
2382    #[test]
2383    fn test_decoder_error() {
2384        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2385            "a",
2386            vec![Field::new("child", DataType::Int32, false)],
2387            true,
2388        )]));
2389
2390        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2391        let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2392        assert!(decoder.tape_decoder.has_partial_row());
2393        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2394        let _ = decoder.flush().unwrap_err();
2395        assert!(decoder.tape_decoder.has_partial_row());
2396        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2397
2398        let parse_err = |s: &str| {
2399            ReaderBuilder::new(schema.clone())
2400                .build(Cursor::new(s.as_bytes()))
2401                .unwrap()
2402                .next()
2403                .unwrap()
2404                .unwrap_err()
2405                .to_string()
2406        };
2407
2408        let err = parse_err(r#"{"a": 123}"#);
2409        assert_eq!(
2410            err,
2411            "Json error: whilst decoding field 'a': expected { got 123"
2412        );
2413
2414        let err = parse_err(r#"{"a": ["bar"]}"#);
2415        assert_eq!(
2416            err,
2417            r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2418        );
2419
2420        let err = parse_err(r#"{"a": []}"#);
2421        assert_eq!(
2422            err,
2423            "Json error: whilst decoding field 'a': expected { got []"
2424        );
2425
2426        let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2427        assert_eq!(
2428            err,
2429            r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2430        );
2431
2432        let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2433        assert_eq!(
2434            err,
2435            r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2436        );
2437
2438        let err = parse_err(r#"{"a": true}"#);
2439        assert_eq!(
2440            err,
2441            "Json error: whilst decoding field 'a': expected { got true"
2442        );
2443
2444        let err = parse_err(r#"{"a": false}"#);
2445        assert_eq!(
2446            err,
2447            "Json error: whilst decoding field 'a': expected { got false"
2448        );
2449
2450        let err = parse_err(r#"{"a": "foo"}"#);
2451        assert_eq!(
2452            err,
2453            "Json error: whilst decoding field 'a': expected { got \"foo\""
2454        );
2455
2456        let err = parse_err(r#"{"a": {"child": false}}"#);
2457        assert_eq!(
2458            err,
2459            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2460        );
2461
2462        let err = parse_err(r#"{"a": {"child": []}}"#);
2463        assert_eq!(
2464            err,
2465            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2466        );
2467
2468        let err = parse_err(r#"{"a": {"child": [123]}}"#);
2469        assert_eq!(
2470            err,
2471            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2472        );
2473
2474        let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2475        assert_eq!(
2476            err,
2477            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2478        );
2479    }
2480
2481    #[test]
2482    fn test_serialize_timestamp() {
2483        let json = vec![
2484            json!({"timestamp": 1681319393}),
2485            json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2486        ];
2487        let schema = Schema::new(vec![Field::new(
2488            "timestamp",
2489            DataType::Timestamp(TimeUnit::Second, None),
2490            true,
2491        )]);
2492        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2493            .build_decoder()
2494            .unwrap();
2495        decoder.serialize(&json).unwrap();
2496        let batch = decoder.flush().unwrap().unwrap();
2497        assert_eq!(batch.num_rows(), 2);
2498        assert_eq!(batch.num_columns(), 1);
2499        let values = batch.column(0).as_primitive::<TimestampSecondType>();
2500        assert_eq!(values.values(), &[1681319393, -7200]);
2501    }
2502
2503    #[test]
2504    fn test_serialize_decimal() {
2505        let json = vec![
2506            json!({"decimal": 1.234}),
2507            json!({"decimal": "1.234"}),
2508            json!({"decimal": 1234}),
2509            json!({"decimal": "1234"}),
2510        ];
2511        let schema = Schema::new(vec![Field::new(
2512            "decimal",
2513            DataType::Decimal128(10, 3),
2514            true,
2515        )]);
2516        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2517            .build_decoder()
2518            .unwrap();
2519        decoder.serialize(&json).unwrap();
2520        let batch = decoder.flush().unwrap().unwrap();
2521        assert_eq!(batch.num_rows(), 4);
2522        assert_eq!(batch.num_columns(), 1);
2523        let values = batch.column(0).as_primitive::<Decimal128Type>();
2524        assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2525    }
2526
2527    #[test]
2528    fn test_serde_field() {
2529        let field = Field::new("int", DataType::Int32, true);
2530        let mut decoder = ReaderBuilder::new_with_field(field)
2531            .build_decoder()
2532            .unwrap();
2533        decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2534        let b = decoder.flush().unwrap().unwrap();
2535        let values = b.column(0).as_primitive::<Int32Type>().values();
2536        assert_eq!(values, &[1, 2, 3, 4]);
2537    }
2538
2539    #[test]
2540    fn test_serde_large_numbers() {
2541        let field = Field::new("int", DataType::Int64, true);
2542        let mut decoder = ReaderBuilder::new_with_field(field)
2543            .build_decoder()
2544            .unwrap();
2545
2546        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2547        let b = decoder.flush().unwrap().unwrap();
2548        let values = b.column(0).as_primitive::<Int64Type>().values();
2549        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2550
2551        let field = Field::new(
2552            "int",
2553            DataType::Timestamp(TimeUnit::Microsecond, None),
2554            true,
2555        );
2556        let mut decoder = ReaderBuilder::new_with_field(field)
2557            .build_decoder()
2558            .unwrap();
2559
2560        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2561        let b = decoder.flush().unwrap().unwrap();
2562        let values = b
2563            .column(0)
2564            .as_primitive::<TimestampMicrosecondType>()
2565            .values();
2566        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2567    }
2568
2569    #[test]
2570    fn test_coercing_primitive_into_string_decoder() {
2571        let buf = &format!(
2572            r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2573            (i32::MAX as i64 + 10),
2574            i64::MAX - 10
2575        );
2576        let schema = Schema::new(vec![
2577            Field::new("a", DataType::Float64, true),
2578            Field::new("b", DataType::Utf8, true),
2579            Field::new("c", DataType::Utf8, true),
2580        ]);
2581        let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2582        let schema_ref = Arc::new(schema);
2583
2584        // read record batches
2585        let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2586        let mut decoder = reader.build_decoder().unwrap();
2587        decoder.serialize(json_array.as_slice()).unwrap();
2588        let batch = decoder.flush().unwrap().unwrap();
2589        assert_eq!(
2590            batch,
2591            RecordBatch::try_new(
2592                schema_ref,
2593                vec![
2594                    Arc::new(Float64Array::from(vec![
2595                        1.0,
2596                        2.0,
2597                        (i32::MAX as i64 + 10) as f64,
2598                        (i64::MAX - 10) as f64
2599                    ])),
2600                    Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2601                    Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2602                ]
2603            )
2604            .unwrap()
2605        );
2606    }
2607
2608    // Parse the given `row` in `struct_mode` as a type given by fields.
2609    //
2610    // If as_struct == true, wrap the fields in a Struct field with name "r".
2611    // If as_struct == false, wrap the fields in a Schema.
2612    fn _parse_structs(
2613        row: &str,
2614        struct_mode: StructMode,
2615        fields: Fields,
2616        as_struct: bool,
2617    ) -> Result<RecordBatch, ArrowError> {
2618        let builder = if as_struct {
2619            ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2620        } else {
2621            ReaderBuilder::new(Arc::new(Schema::new(fields)))
2622        };
2623        builder
2624            .with_struct_mode(struct_mode)
2625            .build(Cursor::new(row.as_bytes()))
2626            .unwrap()
2627            .next()
2628            .unwrap()
2629    }
2630
2631    #[test]
2632    fn test_struct_decoding_list_length() {
2633        use arrow_array::array;
2634
2635        let row = "[1, 2]";
2636
2637        let mut fields = vec![Field::new("a", DataType::Int32, true)];
2638        let too_few_fields = Fields::from(fields.clone());
2639        fields.push(Field::new("b", DataType::Int32, true));
2640        let correct_fields = Fields::from(fields.clone());
2641        fields.push(Field::new("c", DataType::Int32, true));
2642        let too_many_fields = Fields::from(fields.clone());
2643
2644        let parse = |fields: Fields, as_struct: bool| {
2645            _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2646        };
2647
2648        let expected_row = StructArray::new(
2649            correct_fields.clone(),
2650            vec![
2651                Arc::new(array::Int32Array::from(vec![1])),
2652                Arc::new(array::Int32Array::from(vec![2])),
2653            ],
2654            None,
2655        );
2656        let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2657
2658        assert_eq!(
2659            parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2660            "Json error: found extra columns for 1 fields".to_string()
2661        );
2662        assert_eq!(
2663            parse(too_few_fields, false).unwrap_err().to_string(),
2664            "Json error: found extra columns for 1 fields".to_string()
2665        );
2666        assert_eq!(
2667            parse(correct_fields.clone(), true).unwrap(),
2668            RecordBatch::try_new(
2669                Arc::new(Schema::new(vec![row_field])),
2670                vec![Arc::new(expected_row.clone())]
2671            )
2672            .unwrap()
2673        );
2674        assert_eq!(
2675            parse(correct_fields, false).unwrap(),
2676            RecordBatch::from(expected_row)
2677        );
2678        assert_eq!(
2679            parse(too_many_fields.clone(), true)
2680                .unwrap_err()
2681                .to_string(),
2682            "Json error: found 2 columns for 3 fields".to_string()
2683        );
2684        assert_eq!(
2685            parse(too_many_fields, false).unwrap_err().to_string(),
2686            "Json error: found 2 columns for 3 fields".to_string()
2687        );
2688    }
2689
2690    #[test]
2691    fn test_struct_decoding() {
2692        use arrow_array::builder;
2693
2694        let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2695        let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2696        let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2697
2698        let struct_fields = Fields::from(vec![
2699            Field::new("b", DataType::new_list(DataType::Int32, true), true),
2700            Field::new_map(
2701                "c",
2702                "entries",
2703                Field::new("keys", DataType::Utf8, false),
2704                Field::new("values", DataType::Int32, true),
2705                false,
2706                false,
2707            ),
2708        ]);
2709
2710        let list_array =
2711            ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2712
2713        let map_array = {
2714            let mut map_builder = builder::MapBuilder::new(
2715                None,
2716                builder::StringBuilder::new(),
2717                builder::Int32Builder::new(),
2718            );
2719            map_builder.keys().append_value("d");
2720            map_builder.values().append_value(3);
2721            map_builder.append(true).unwrap();
2722            map_builder.finish()
2723        };
2724
2725        let struct_array = StructArray::new(
2726            struct_fields.clone(),
2727            vec![Arc::new(list_array), Arc::new(map_array)],
2728            None,
2729        );
2730
2731        let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2732        let schema = Arc::new(Schema::new(fields.clone()));
2733        let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2734
2735        let parse = |row: &str, struct_mode: StructMode| {
2736            _parse_structs(row, struct_mode, fields.clone(), false)
2737        };
2738
2739        assert_eq!(
2740            parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2741            expected
2742        );
2743        assert_eq!(
2744            parse(nested_list_json, StructMode::ObjectOnly)
2745                .unwrap_err()
2746                .to_string(),
2747            "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2748        );
2749        assert_eq!(
2750            parse(nested_mixed_json, StructMode::ObjectOnly)
2751                .unwrap_err()
2752                .to_string(),
2753            "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2754        );
2755
2756        assert_eq!(
2757            parse(nested_list_json, StructMode::ListOnly).unwrap(),
2758            expected
2759        );
2760        assert_eq!(
2761            parse(nested_object_json, StructMode::ListOnly)
2762                .unwrap_err()
2763                .to_string(),
2764            "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2765        );
2766        assert_eq!(
2767            parse(nested_mixed_json, StructMode::ListOnly)
2768                .unwrap_err()
2769                .to_string(),
2770            "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2771        );
2772    }
2773
2774    // Test cases:
2775    // [] -> RecordBatch row with no entries.  Schema = [('a', Int32)] -> Error
2776    // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error
2777    // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error
2778    // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error
2779    #[test]
2780    fn test_struct_decoding_empty_list() {
2781        let int_field = Field::new("a", DataType::Int32, true);
2782        let struct_field = Field::new(
2783            "r",
2784            DataType::Struct(Fields::from(vec![int_field.clone()])),
2785            true,
2786        );
2787
2788        let parse = |row: &str, as_struct: bool, field: Field| {
2789            _parse_structs(
2790                row,
2791                StructMode::ListOnly,
2792                Fields::from(vec![field]),
2793                as_struct,
2794            )
2795        };
2796
2797        // Missing fields
2798        assert_eq!(
2799            parse("[]", true, struct_field.clone())
2800                .unwrap_err()
2801                .to_string(),
2802            "Json error: found 0 columns for 1 fields".to_owned()
2803        );
2804        assert_eq!(
2805            parse("[]", false, int_field.clone())
2806                .unwrap_err()
2807                .to_string(),
2808            "Json error: found 0 columns for 1 fields".to_owned()
2809        );
2810        assert_eq!(
2811            parse("[]", false, struct_field.clone())
2812                .unwrap_err()
2813                .to_string(),
2814            "Json error: found 0 columns for 1 fields".to_owned()
2815        );
2816        assert_eq!(
2817            parse("[[]]", false, struct_field.clone())
2818                .unwrap_err()
2819                .to_string(),
2820            "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2821        );
2822    }
2823
2824    #[test]
2825    fn test_decode_list_struct_with_wrong_types() {
2826        let int_field = Field::new("a", DataType::Int32, true);
2827        let struct_field = Field::new(
2828            "r",
2829            DataType::Struct(Fields::from(vec![int_field.clone()])),
2830            true,
2831        );
2832
2833        let parse = |row: &str, as_struct: bool, field: Field| {
2834            _parse_structs(
2835                row,
2836                StructMode::ListOnly,
2837                Fields::from(vec![field]),
2838                as_struct,
2839            )
2840        };
2841
2842        // Wrong values
2843        assert_eq!(
2844            parse(r#"[["a"]]"#, false, struct_field.clone())
2845                .unwrap_err()
2846                .to_string(),
2847            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2848        );
2849        assert_eq!(
2850            parse(r#"[["a"]]"#, true, struct_field.clone())
2851                .unwrap_err()
2852                .to_string(),
2853            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2854        );
2855        assert_eq!(
2856            parse(r#"["a"]"#, true, int_field.clone())
2857                .unwrap_err()
2858                .to_string(),
2859            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2860        );
2861        assert_eq!(
2862            parse(r#"["a"]"#, false, int_field.clone())
2863                .unwrap_err()
2864                .to_string(),
2865            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2866        );
2867    }
2868
2869    #[test]
2870    fn test_read_run_end_encoded() {
2871        let buf = r#"
2872        {"a": "x"}
2873        {"a": "x"}
2874        {"a": "y"}
2875        {"a": "y"}
2876        {"a": "y"}
2877        "#;
2878
2879        let ree_type = DataType::RunEndEncoded(
2880            Arc::new(Field::new("run_ends", DataType::Int32, false)),
2881            Arc::new(Field::new("values", DataType::Utf8, true)),
2882        );
2883        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2884        let batches = do_read(buf, 1024, false, false, schema);
2885        assert_eq!(batches.len(), 1);
2886
2887        let col = batches[0].column(0);
2888        let run_array = col.as_run::<arrow_array::types::Int32Type>();
2889
2890        // 5 logical values compressed into 2 runs
2891        assert_eq!(run_array.len(), 5);
2892        assert_eq!(run_array.run_ends().values(), &[2, 5]);
2893
2894        let values = run_array.values().as_string::<i32>();
2895        assert_eq!(values.len(), 2);
2896        assert_eq!(values.value(0), "x");
2897        assert_eq!(values.value(1), "y");
2898    }
2899
2900    #[test]
2901    fn test_read_run_end_encoded_consecutive_nulls() {
2902        let buf = r#"
2903        {"a": "x"}
2904        {}
2905        {}
2906        {}
2907        {"a": "y"}
2908        "#;
2909
2910        let ree_type = DataType::RunEndEncoded(
2911            Arc::new(Field::new("run_ends", DataType::Int32, false)),
2912            Arc::new(Field::new("values", DataType::Utf8, true)),
2913        );
2914        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2915        let batches = do_read(buf, 1024, false, false, schema);
2916        assert_eq!(batches.len(), 1);
2917
2918        let col = batches[0].column(0);
2919        let run_array = col.as_run::<arrow_array::types::Int32Type>();
2920
2921        // 5 logical values: "x", null, null, null, "y" → 3 runs
2922        assert_eq!(run_array.len(), 5);
2923        assert_eq!(run_array.run_ends().values(), &[1, 4, 5]);
2924
2925        let values = run_array.values().as_string::<i32>();
2926        assert_eq!(values.len(), 3);
2927        assert_eq!(values.value(0), "x");
2928        assert!(values.is_null(1));
2929        assert_eq!(values.value(2), "y");
2930    }
2931
2932    #[test]
2933    fn test_read_run_end_encoded_all_unique() {
2934        let buf = r#"
2935        {"a": 1}
2936        {"a": 2}
2937        {"a": 3}
2938        "#;
2939
2940        let ree_type = DataType::RunEndEncoded(
2941            Arc::new(Field::new("run_ends", DataType::Int32, false)),
2942            Arc::new(Field::new("values", DataType::Int32, true)),
2943        );
2944        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2945        let batches = do_read(buf, 1024, false, false, schema);
2946        assert_eq!(batches.len(), 1);
2947
2948        let col = batches[0].column(0);
2949        let run_array = col.as_run::<arrow_array::types::Int32Type>();
2950
2951        // No compression: 3 unique values → 3 runs
2952        assert_eq!(run_array.len(), 3);
2953        assert_eq!(run_array.run_ends().values(), &[1, 2, 3]);
2954    }
2955
2956    #[test]
2957    fn test_read_run_end_encoded_int16_run_ends() {
2958        let buf = r#"
2959        {"a": "x"}
2960        {"a": "x"}
2961        {"a": "y"}
2962        "#;
2963
2964        let ree_type = DataType::RunEndEncoded(
2965            Arc::new(Field::new("run_ends", DataType::Int16, false)),
2966            Arc::new(Field::new("values", DataType::Utf8, true)),
2967        );
2968        let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2969        let batches = do_read(buf, 1024, false, false, schema);
2970        assert_eq!(batches.len(), 1);
2971
2972        let col = batches[0].column(0);
2973        let run_array = col.as_run::<arrow_array::types::Int16Type>();
2974
2975        assert_eq!(run_array.len(), 3);
2976        assert_eq!(run_array.run_ends().values(), &[2i16, 3]);
2977    }
2978}