Skip to main content

arrow_json/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! # JSON Writer
19//!
20//! This JSON writer converts Arrow [`RecordBatch`]es into arrays of
21//! JSON objects or JSON formatted byte streams.
22//!
23//! ## Writing JSON formatted byte streams
24//!
25//! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use
26//! [`LineDelimitedWriter`]:
27//!
28//! ```
29//! # use std::sync::Arc;
30//! # use arrow_array::{Int32Array, RecordBatch};
31//! # use arrow_schema::{DataType, Field, Schema};
32//!
33//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
34//! let a = Int32Array::from(vec![1, 2, 3]);
35//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
36//!
37//! // Write the record batch out as JSON
38//! let buf = Vec::new();
39//! let mut writer = arrow_json::LineDelimitedWriter::new(buf);
40//! writer.write_batches(&vec![&batch]).unwrap();
41//! writer.finish().unwrap();
42//!
43//! // Get the underlying buffer back,
44//! let buf = writer.into_inner();
45//! assert_eq!(r#"{"a":1}
46//! {"a":2}
47//! {"a":3}
48//!"#, String::from_utf8(buf).unwrap())
49//! ```
50//!
51//! To serialize [`RecordBatch`]es into a well formed JSON array, use
52//! [`ArrayWriter`]:
53//!
54//! ```
55//! # use std::sync::Arc;
56//! # use arrow_array::{Int32Array, RecordBatch};
57//! use arrow_schema::{DataType, Field, Schema};
58//!
59//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
60//! let a = Int32Array::from(vec![1, 2, 3]);
61//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
62//!
63//! // Write the record batch out as a JSON array
64//! let buf = Vec::new();
65//! let mut writer = arrow_json::ArrayWriter::new(buf);
66//! writer.write_batches(&vec![&batch]).unwrap();
67//! writer.finish().unwrap();
68//!
69//! // Get the underlying buffer back,
70//! let buf = writer.into_inner();
71//! assert_eq!(r#"[{"a":1},{"a":2},{"a":3}]"#, String::from_utf8(buf).unwrap())
72//! ```
73//!
74//! [`LineDelimitedWriter`] and [`ArrayWriter`] will omit writing keys with null values.
75//! In order to explicitly write null values for keys, configure a custom [`Writer`] by
76//! using a [`WriterBuilder`] to construct a [`Writer`].
77//!
78//! ## Writing to [serde_json] JSON Objects
79//!
80//! To serialize [`RecordBatch`]es into an array of
81//! [JSON](https://docs.serde.rs/serde_json/) objects you can reparse the resulting JSON string.
82//! Note that this is less efficient than using the `Writer` API.
83//!
84//! ```
85//! # use std::sync::Arc;
86//! # use arrow_array::{Int32Array, RecordBatch};
87//! # use arrow_schema::{DataType, Field, Schema};
88//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
89//! let a = Int32Array::from(vec![1, 2, 3]);
90//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
91//!
92//! // Write the record batch out as json bytes (string)
93//! let buf = Vec::new();
94//! let mut writer = arrow_json::ArrayWriter::new(buf);
95//! writer.write_batches(&vec![&batch]).unwrap();
96//! writer.finish().unwrap();
97//! let json_data = writer.into_inner();
98//!
99//! // Parse the string using serde_json
100//! use serde_json::{Map, Value};
101//! let json_rows: Vec<Map<String, Value>> = serde_json::from_reader(json_data.as_slice()).unwrap();
102//! assert_eq!(
103//!     serde_json::Value::Object(json_rows[1].clone()),
104//!     serde_json::json!({"a": 2}),
105//! );
106//! ```
107mod encoder;
108
109use std::{fmt::Debug, io::Write, sync::Arc};
110
111use crate::StructMode;
112use arrow_array::*;
113use arrow_schema::*;
114
115pub use encoder::{Encoder, EncoderFactory, EncoderOptions, NullableEncoder, make_encoder};
116
117/// This trait defines how to format a sequence of JSON objects to a
118/// byte stream.
119pub trait JsonFormat: Debug + Default {
120    #[inline]
121    /// write any bytes needed at the start of the file to the writer
122    fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
123        Ok(())
124    }
125
126    #[inline]
127    /// write any bytes needed for the start of each row
128    fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<(), ArrowError> {
129        Ok(())
130    }
131
132    #[inline]
133    /// write any bytes needed for the end of each row
134    fn end_row<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
135        Ok(())
136    }
137
138    /// write any bytes needed for the start of each row
139    fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
140        Ok(())
141    }
142}
143
144/// Produces JSON output with one record per line.
145///
146/// For example:
147///
148/// ```json
149/// {"foo":1}
150/// {"bar":1}
151///
152/// ```
153#[derive(Debug, Default)]
154pub struct LineDelimited {}
155
156impl JsonFormat for LineDelimited {
157    fn end_row<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
158        writer.write_all(b"\n")?;
159        Ok(())
160    }
161}
162
163/// Produces JSON output as a single JSON array.
164///
165/// For example:
166///
167/// ```json
168/// [{"foo":1},{"bar":1}]
169/// ```
170#[derive(Debug, Default)]
171pub struct JsonArray {}
172
173impl JsonFormat for JsonArray {
174    fn start_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
175        writer.write_all(b"[")?;
176        Ok(())
177    }
178
179    fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<(), ArrowError> {
180        if !is_first_row {
181            writer.write_all(b",")?;
182        }
183        Ok(())
184    }
185
186    fn end_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
187        writer.write_all(b"]")?;
188        Ok(())
189    }
190}
191
192/// A JSON writer which serializes [`RecordBatch`]es to newline delimited JSON objects.
193pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
194
195/// A JSON writer which serializes [`RecordBatch`]es to JSON arrays.
196pub type ArrayWriter<W> = Writer<W, JsonArray>;
197
198/// JSON writer builder.
199#[derive(Debug, Clone, Default)]
200pub struct WriterBuilder(EncoderOptions);
201
202impl WriterBuilder {
203    /// Create a new builder for configuring JSON writing options.
204    ///
205    /// # Example
206    ///
207    /// ```
208    /// # use arrow_json::{Writer, WriterBuilder};
209    /// # use arrow_json::writer::LineDelimited;
210    /// # use std::fs::File;
211    ///
212    /// fn example() -> Writer<File, LineDelimited> {
213    ///     let file = File::create("target/out.json").unwrap();
214    ///
215    ///     // create a builder that keeps keys with null values
216    ///     let builder = WriterBuilder::new().with_explicit_nulls(true);
217    ///     let writer = builder.build::<_, LineDelimited>(file);
218    ///
219    ///     writer
220    /// }
221    /// ```
222    pub fn new() -> Self {
223        Self::default()
224    }
225
226    /// Returns `true` if this writer is configured to keep keys with null values.
227    pub fn explicit_nulls(&self) -> bool {
228        self.0.explicit_nulls()
229    }
230
231    /// Set whether to keep keys with null values, or to omit writing them.
232    ///
233    /// For example, with [`LineDelimited`] format:
234    ///
235    /// Skip nulls (set to `false`):
236    ///
237    /// ```json
238    /// {"foo":1}
239    /// {"foo":1,"bar":2}
240    /// {}
241    /// ```
242    ///
243    /// Keep nulls (set to `true`):
244    ///
245    /// ```json
246    /// {"foo":1,"bar":null}
247    /// {"foo":1,"bar":2}
248    /// {"foo":null,"bar":null}
249    /// ```
250    ///
251    /// Default is to skip nulls (set to `false`). If `struct_mode == ListOnly`,
252    /// nulls will be written explicitly regardless of this setting.
253    pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
254        self.0 = self.0.with_explicit_nulls(explicit_nulls);
255        self
256    }
257
258    /// Returns if this writer is configured to write structs as JSON Objects or Arrays.
259    pub fn struct_mode(&self) -> StructMode {
260        self.0.struct_mode()
261    }
262
263    /// Set the [`StructMode`] for the writer, which determines whether structs
264    /// are encoded to JSON as objects or lists. For more details refer to the
265    /// enum documentation. Default is to use `ObjectOnly`. If this is set to
266    /// `ListOnly`, nulls will be written explicitly regardless of the
267    /// `explicit_nulls` setting.
268    pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
269        self.0 = self.0.with_struct_mode(struct_mode);
270        self
271    }
272
273    /// Set an encoder factory to use when creating encoders for writing JSON.
274    ///
275    /// This can be used to override how some types are encoded or to provide
276    /// a fallback for types that are not supported by the default encoder.
277    pub fn with_encoder_factory(mut self, factory: Arc<dyn EncoderFactory>) -> Self {
278        self.0 = self.0.with_encoder_factory(factory);
279        self
280    }
281
282    /// Set the JSON file's date format
283    pub fn with_date_format(mut self, format: String) -> Self {
284        self.0 = self.0.with_date_format(format);
285        self
286    }
287
288    /// Set the JSON file's datetime format
289    pub fn with_datetime_format(mut self, format: String) -> Self {
290        self.0 = self.0.with_datetime_format(format);
291        self
292    }
293
294    /// Set the JSON file's time format
295    pub fn with_time_format(mut self, format: String) -> Self {
296        self.0 = self.0.with_time_format(format);
297        self
298    }
299
300    /// Set the JSON file's timestamp format
301    pub fn with_timestamp_format(mut self, format: String) -> Self {
302        self.0 = self.0.with_timestamp_format(format);
303        self
304    }
305
306    /// Set the JSON file's timestamp tz format
307    pub fn with_timestamp_tz_format(mut self, tz_format: String) -> Self {
308        self.0 = self.0.with_timestamp_tz_format(tz_format);
309        self
310    }
311
312    /// Create a new `Writer` with specified `JsonFormat` and builder options.
313    pub fn build<W, F>(self, writer: W) -> Writer<W, F>
314    where
315        W: Write,
316        F: JsonFormat,
317    {
318        Writer {
319            writer,
320            started: false,
321            finished: false,
322            format: F::default(),
323            options: self.0,
324        }
325    }
326}
327
328/// A JSON writer which serializes [`RecordBatch`]es to a stream of
329/// `u8` encoded JSON objects.
330///
331/// See the module level documentation for detailed usage and examples.
332/// The specific format of the stream is controlled by the [`JsonFormat`]
333/// type parameter.
334///
335/// By default the writer will skip writing keys with null values for
336/// backward compatibility. See [`WriterBuilder`] on how to customize
337/// this behaviour when creating a new writer.
338#[derive(Debug)]
339pub struct Writer<W, F>
340where
341    W: Write,
342    F: JsonFormat,
343{
344    /// Underlying writer to use to write bytes
345    writer: W,
346
347    /// Has the writer output any records yet?
348    started: bool,
349
350    /// Is the writer finished?
351    finished: bool,
352
353    /// Determines how the byte stream is formatted
354    format: F,
355
356    /// Controls how JSON should be encoded, e.g. whether to write explicit nulls or skip them
357    options: EncoderOptions,
358}
359
360impl<W, F> Writer<W, F>
361where
362    W: Write,
363    F: JsonFormat,
364{
365    /// Construct a new writer
366    pub fn new(writer: W) -> Self {
367        Self {
368            writer,
369            started: false,
370            finished: false,
371            format: F::default(),
372            options: EncoderOptions::default(),
373        }
374    }
375
376    /// Serialize `batch` to JSON output
377    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
378        if batch.num_rows() == 0 {
379            return Ok(());
380        }
381
382        // BufWriter uses a buffer size of 8KB
383        // We therefore double this and flush once we have more than 8KB
384        let mut buffer = Vec::with_capacity(16 * 1024);
385
386        let mut is_first_row = !self.started;
387        if !self.started {
388            self.format.start_stream(&mut buffer)?;
389            self.started = true;
390        }
391
392        let array = StructArray::from(batch.clone());
393        let field = Arc::new(Field::new_struct(
394            "",
395            batch.schema().fields().clone(),
396            false,
397        ));
398
399        let mut encoder = make_encoder(&field, &array, &self.options)?;
400
401        // Validate that the root is not nullable
402        assert!(!encoder.has_nulls(), "root cannot be nullable");
403        for idx in 0..batch.num_rows() {
404            self.format.start_row(&mut buffer, is_first_row)?;
405            is_first_row = false;
406
407            encoder.encode(idx, &mut buffer);
408            if buffer.len() > 8 * 1024 {
409                self.writer.write_all(&buffer)?;
410                buffer.clear();
411            }
412            self.format.end_row(&mut buffer)?;
413        }
414
415        if !buffer.is_empty() {
416            self.writer.write_all(&buffer)?;
417        }
418
419        Ok(())
420    }
421
422    /// Serialize `batches` to JSON output
423    pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
424        for b in batches {
425            self.write(b)?;
426        }
427        Ok(())
428    }
429
430    /// Finishes the output stream. This function must be called after
431    /// all record batches have been produced. (e.g. producing the final `']'` if writing
432    /// arrays.
433    pub fn finish(&mut self) -> Result<(), ArrowError> {
434        if !self.started {
435            self.format.start_stream(&mut self.writer)?;
436            self.started = true;
437        }
438        if !self.finished {
439            self.format.end_stream(&mut self.writer)?;
440            self.finished = true;
441        }
442
443        Ok(())
444    }
445
446    /// Gets a reference to the underlying writer.
447    pub fn get_ref(&self) -> &W {
448        &self.writer
449    }
450
451    /// Gets a mutable reference to the underlying writer.
452    ///
453    /// Writing to the underlying writer must be done with care
454    /// to avoid corrupting the output JSON.
455    pub fn get_mut(&mut self) -> &mut W {
456        &mut self.writer
457    }
458
459    /// Unwraps this `Writer<W>`, returning the underlying writer
460    pub fn into_inner(self) -> W {
461        self.writer
462    }
463}
464
465impl<W, F> RecordBatchWriter for Writer<W, F>
466where
467    W: Write,
468    F: JsonFormat,
469{
470    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
471        self.write(batch)
472    }
473
474    fn close(mut self) -> Result<(), ArrowError> {
475        self.finish()
476    }
477}
478
479#[cfg(test)]
480mod tests {
481    use core::str;
482    use std::collections::HashMap;
483    use std::fs::{File, read_to_string};
484    use std::io::{BufReader, Seek};
485    use std::sync::Arc;
486
487    use arrow_array::cast::AsArray;
488    use serde_json::{Value, json};
489
490    use super::LineDelimited;
491    use super::{Encoder, WriterBuilder};
492    use arrow_array::builder::*;
493    use arrow_array::types::*;
494    use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer, i256};
495
496    use crate::reader::*;
497
498    use super::*;
499
500    /// Asserts that the NDJSON `input` is semantically identical to `expected`
501    fn assert_json_eq(input: &[u8], expected: &str) {
502        let expected: Vec<Option<Value>> = expected
503            .split('\n')
504            .map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
505            .collect();
506
507        let actual: Vec<Option<Value>> = input
508            .split(|b| *b == b'\n')
509            .map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap()))
510            .collect();
511
512        assert_eq!(actual, expected);
513    }
514
515    #[test]
516    fn write_simple_rows() {
517        let schema = Schema::new(vec![
518            Field::new("c1", DataType::Int32, true),
519            Field::new("c2", DataType::Utf8, true),
520        ]);
521
522        let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
523        let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]);
524
525        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
526
527        let mut buf = Vec::new();
528        {
529            let mut writer = LineDelimitedWriter::new(&mut buf);
530            writer.write_batches(&[&batch]).unwrap();
531        }
532
533        assert_json_eq(
534            &buf,
535            r#"{"c1":1,"c2":"a"}
536{"c1":2,"c2":"b"}
537{"c1":3,"c2":"c"}
538{"c2":"d"}
539{"c1":5}
540"#,
541        );
542    }
543
544    #[test]
545    fn write_large_utf8_and_utf8_view() {
546        let schema = Schema::new(vec![
547            Field::new("c1", DataType::Utf8, true),
548            Field::new("c2", DataType::LargeUtf8, true),
549            Field::new("c3", DataType::Utf8View, true),
550        ]);
551
552        let a = StringArray::from(vec![Some("a"), None, Some("c"), Some("d"), None]);
553        let b = LargeStringArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
554        let c = StringViewArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
555
556        let batch = RecordBatch::try_new(
557            Arc::new(schema),
558            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
559        )
560        .unwrap();
561
562        let mut buf = Vec::new();
563        {
564            let mut writer = LineDelimitedWriter::new(&mut buf);
565            writer.write_batches(&[&batch]).unwrap();
566        }
567
568        assert_json_eq(
569            &buf,
570            r#"{"c1":"a","c2":"a","c3":"a"}
571{"c2":"b","c3":"b"}
572{"c1":"c"}
573{"c1":"d","c2":"d","c3":"d"}
574{}
575"#,
576        );
577    }
578
579    #[test]
580    fn write_dictionary() {
581        let schema = Schema::new(vec![
582            Field::new_dictionary("c1", DataType::Int32, DataType::Utf8, true),
583            Field::new_dictionary("c2", DataType::Int8, DataType::Utf8, true),
584        ]);
585
586        let a: DictionaryArray<Int32Type> = vec![
587            Some("cupcakes"),
588            Some("foo"),
589            Some("foo"),
590            None,
591            Some("cupcakes"),
592        ]
593        .into_iter()
594        .collect();
595        let b: DictionaryArray<Int8Type> =
596            vec![Some("sdsd"), Some("sdsd"), None, Some("sd"), Some("sdsd")]
597                .into_iter()
598                .collect();
599
600        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
601
602        let mut buf = Vec::new();
603        {
604            let mut writer = LineDelimitedWriter::new(&mut buf);
605            writer.write_batches(&[&batch]).unwrap();
606        }
607
608        assert_json_eq(
609            &buf,
610            r#"{"c1":"cupcakes","c2":"sdsd"}
611{"c1":"foo","c2":"sdsd"}
612{"c1":"foo"}
613{"c2":"sd"}
614{"c1":"cupcakes","c2":"sdsd"}
615"#,
616        );
617    }
618
619    #[test]
620    fn write_list_of_dictionary() {
621        let dict_field = Arc::new(Field::new_dictionary(
622            "item",
623            DataType::Int32,
624            DataType::Utf8,
625            true,
626        ));
627        let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
628
629        let dict_array: DictionaryArray<Int32Type> =
630            vec![Some("a"), Some("b"), Some("c"), Some("a"), None, Some("c")]
631                .into_iter()
632                .collect();
633        let list_array = LargeListArray::try_new(
634            dict_field,
635            OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
636            Arc::new(dict_array),
637            Some(NullBuffer::from_iter([true, true, false, true])),
638        )
639        .unwrap();
640
641        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
642
643        let mut buf = Vec::new();
644        {
645            let mut writer = LineDelimitedWriter::new(&mut buf);
646            writer.write_batches(&[&batch]).unwrap();
647        }
648
649        assert_json_eq(
650            &buf,
651            r#"{"l":["a","b","c"]}
652{"l":["a",null]}
653{}
654{"l":["c"]}
655"#,
656        );
657    }
658
659    #[test]
660    fn write_list_of_dictionary_large_values() {
661        let dict_field = Arc::new(Field::new_dictionary(
662            "item",
663            DataType::Int32,
664            DataType::LargeUtf8,
665            true,
666        ));
667        let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
668
669        let keys = PrimitiveArray::<Int32Type>::from(vec![
670            Some(0),
671            Some(1),
672            Some(2),
673            Some(0),
674            None,
675            Some(2),
676        ]);
677        let values = LargeStringArray::from(vec!["a", "b", "c"]);
678        let dict_array = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
679
680        let list_array = LargeListArray::try_new(
681            dict_field,
682            OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
683            Arc::new(dict_array),
684            Some(NullBuffer::from_iter([true, true, false, true])),
685        )
686        .unwrap();
687
688        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
689
690        let mut buf = Vec::new();
691        {
692            let mut writer = LineDelimitedWriter::new(&mut buf);
693            writer.write_batches(&[&batch]).unwrap();
694        }
695
696        assert_json_eq(
697            &buf,
698            r#"{"l":["a","b","c"]}
699{"l":["a",null]}
700{}
701{"l":["c"]}
702"#,
703        );
704    }
705
706    #[test]
707    fn write_timestamps() {
708        let ts_string = "2018-11-13T17:11:10.011375885995";
709        let ts_nanos = ts_string
710            .parse::<chrono::NaiveDateTime>()
711            .unwrap()
712            .and_utc()
713            .timestamp_nanos_opt()
714            .unwrap();
715        let ts_micros = ts_nanos / 1000;
716        let ts_millis = ts_micros / 1000;
717        let ts_secs = ts_millis / 1000;
718
719        let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
720        let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
721        let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
722        let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
723        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
724
725        let schema = Schema::new(vec![
726            Field::new("nanos", arr_nanos.data_type().clone(), true),
727            Field::new("micros", arr_micros.data_type().clone(), true),
728            Field::new("millis", arr_millis.data_type().clone(), true),
729            Field::new("secs", arr_secs.data_type().clone(), true),
730            Field::new("name", arr_names.data_type().clone(), true),
731        ]);
732        let schema = Arc::new(schema);
733
734        let batch = RecordBatch::try_new(
735            schema,
736            vec![
737                Arc::new(arr_nanos),
738                Arc::new(arr_micros),
739                Arc::new(arr_millis),
740                Arc::new(arr_secs),
741                Arc::new(arr_names),
742            ],
743        )
744        .unwrap();
745
746        let mut buf = Vec::new();
747        {
748            let mut writer = LineDelimitedWriter::new(&mut buf);
749            writer.write_batches(&[&batch]).unwrap();
750        }
751
752        assert_json_eq(
753            &buf,
754            r#"{"micros":"2018-11-13T17:11:10.011375","millis":"2018-11-13T17:11:10.011","name":"a","nanos":"2018-11-13T17:11:10.011375885","secs":"2018-11-13T17:11:10"}
755{"name":"b"}
756"#,
757        );
758
759        let mut buf = Vec::new();
760        {
761            let mut writer = WriterBuilder::new()
762                .with_timestamp_format("%m-%d-%Y".to_string())
763                .build::<_, LineDelimited>(&mut buf);
764            writer.write_batches(&[&batch]).unwrap();
765        }
766
767        assert_json_eq(
768            &buf,
769            r#"{"nanos":"11-13-2018","micros":"11-13-2018","millis":"11-13-2018","secs":"11-13-2018","name":"a"}
770{"name":"b"}
771"#,
772        );
773    }
774
775    #[test]
776    fn write_timestamps_with_tz() {
777        let ts_string = "2018-11-13T17:11:10.011375885995";
778        let ts_nanos = ts_string
779            .parse::<chrono::NaiveDateTime>()
780            .unwrap()
781            .and_utc()
782            .timestamp_nanos_opt()
783            .unwrap();
784        let ts_micros = ts_nanos / 1000;
785        let ts_millis = ts_micros / 1000;
786        let ts_secs = ts_millis / 1000;
787
788        let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
789        let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
790        let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
791        let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
792        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
793
794        let tz = "+00:00";
795
796        let arr_nanos = arr_nanos.with_timezone(tz);
797        let arr_micros = arr_micros.with_timezone(tz);
798        let arr_millis = arr_millis.with_timezone(tz);
799        let arr_secs = arr_secs.with_timezone(tz);
800
801        let schema = Schema::new(vec![
802            Field::new("nanos", arr_nanos.data_type().clone(), true),
803            Field::new("micros", arr_micros.data_type().clone(), true),
804            Field::new("millis", arr_millis.data_type().clone(), true),
805            Field::new("secs", arr_secs.data_type().clone(), true),
806            Field::new("name", arr_names.data_type().clone(), true),
807        ]);
808        let schema = Arc::new(schema);
809
810        let batch = RecordBatch::try_new(
811            schema,
812            vec![
813                Arc::new(arr_nanos),
814                Arc::new(arr_micros),
815                Arc::new(arr_millis),
816                Arc::new(arr_secs),
817                Arc::new(arr_names),
818            ],
819        )
820        .unwrap();
821
822        let mut buf = Vec::new();
823        {
824            let mut writer = LineDelimitedWriter::new(&mut buf);
825            writer.write_batches(&[&batch]).unwrap();
826        }
827
828        assert_json_eq(
829            &buf,
830            r#"{"micros":"2018-11-13T17:11:10.011375Z","millis":"2018-11-13T17:11:10.011Z","name":"a","nanos":"2018-11-13T17:11:10.011375885Z","secs":"2018-11-13T17:11:10Z"}
831{"name":"b"}
832"#,
833        );
834
835        let mut buf = Vec::new();
836        {
837            let mut writer = WriterBuilder::new()
838                .with_timestamp_tz_format("%m-%d-%Y %Z".to_string())
839                .build::<_, LineDelimited>(&mut buf);
840            writer.write_batches(&[&batch]).unwrap();
841        }
842
843        assert_json_eq(
844            &buf,
845            r#"{"nanos":"11-13-2018 +00:00","micros":"11-13-2018 +00:00","millis":"11-13-2018 +00:00","secs":"11-13-2018 +00:00","name":"a"}
846{"name":"b"}
847"#,
848        );
849    }
850
851    #[test]
852    fn write_dates() {
853        let ts_string = "2018-11-13T17:11:10.011375885995";
854        let ts_millis = ts_string
855            .parse::<chrono::NaiveDateTime>()
856            .unwrap()
857            .and_utc()
858            .timestamp_millis();
859
860        let arr_date32 = Date32Array::from(vec![
861            Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()),
862            None,
863        ]);
864        let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]);
865        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
866
867        let schema = Schema::new(vec![
868            Field::new("date32", arr_date32.data_type().clone(), true),
869            Field::new("date64", arr_date64.data_type().clone(), true),
870            Field::new("name", arr_names.data_type().clone(), false),
871        ]);
872        let schema = Arc::new(schema);
873
874        let batch = RecordBatch::try_new(
875            schema,
876            vec![
877                Arc::new(arr_date32),
878                Arc::new(arr_date64),
879                Arc::new(arr_names),
880            ],
881        )
882        .unwrap();
883
884        let mut buf = Vec::new();
885        {
886            let mut writer = LineDelimitedWriter::new(&mut buf);
887            writer.write_batches(&[&batch]).unwrap();
888        }
889
890        assert_json_eq(
891            &buf,
892            r#"{"date32":"2018-11-13","date64":"2018-11-13T17:11:10.011","name":"a"}
893{"name":"b"}
894"#,
895        );
896
897        let mut buf = Vec::new();
898        {
899            let mut writer = WriterBuilder::new()
900                .with_date_format("%m-%d-%Y".to_string())
901                .with_datetime_format("%m-%d-%Y %Mmin %Ssec %Hhour".to_string())
902                .build::<_, LineDelimited>(&mut buf);
903            writer.write_batches(&[&batch]).unwrap();
904        }
905
906        assert_json_eq(
907            &buf,
908            r#"{"date32":"11-13-2018","date64":"11-13-2018 11min 10sec 17hour","name":"a"}
909{"name":"b"}
910"#,
911        );
912    }
913
914    #[test]
915    fn write_times() {
916        let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]);
917        let arr_time32msec = Time32MillisecondArray::from(vec![Some(120), None]);
918        let arr_time64usec = Time64MicrosecondArray::from(vec![Some(120), None]);
919        let arr_time64nsec = Time64NanosecondArray::from(vec![Some(120), None]);
920        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
921
922        let schema = Schema::new(vec![
923            Field::new("time32sec", arr_time32sec.data_type().clone(), true),
924            Field::new("time32msec", arr_time32msec.data_type().clone(), true),
925            Field::new("time64usec", arr_time64usec.data_type().clone(), true),
926            Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
927            Field::new("name", arr_names.data_type().clone(), true),
928        ]);
929        let schema = Arc::new(schema);
930
931        let batch = RecordBatch::try_new(
932            schema,
933            vec![
934                Arc::new(arr_time32sec),
935                Arc::new(arr_time32msec),
936                Arc::new(arr_time64usec),
937                Arc::new(arr_time64nsec),
938                Arc::new(arr_names),
939            ],
940        )
941        .unwrap();
942
943        let mut buf = Vec::new();
944        {
945            let mut writer = LineDelimitedWriter::new(&mut buf);
946            writer.write_batches(&[&batch]).unwrap();
947        }
948
949        assert_json_eq(
950            &buf,
951            r#"{"time32sec":"00:02:00","time32msec":"00:00:00.120","time64usec":"00:00:00.000120","time64nsec":"00:00:00.000000120","name":"a"}
952{"name":"b"}
953"#,
954        );
955
956        let mut buf = Vec::new();
957        {
958            let mut writer = WriterBuilder::new()
959                .with_time_format("%H-%M-%S %f".to_string())
960                .build::<_, LineDelimited>(&mut buf);
961            writer.write_batches(&[&batch]).unwrap();
962        }
963
964        assert_json_eq(
965            &buf,
966            r#"{"time32sec":"00-02-00 000000000","time32msec":"00-00-00 120000000","time64usec":"00-00-00 000120000","time64nsec":"00-00-00 000000120","name":"a"}
967{"name":"b"}
968"#,
969        );
970    }
971
972    #[test]
973    fn write_durations() {
974        let arr_durationsec = DurationSecondArray::from(vec![Some(120), None]);
975        let arr_durationmsec = DurationMillisecondArray::from(vec![Some(120), None]);
976        let arr_durationusec = DurationMicrosecondArray::from(vec![Some(120), None]);
977        let arr_durationnsec = DurationNanosecondArray::from(vec![Some(120), None]);
978        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
979
980        let schema = Schema::new(vec![
981            Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
982            Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
983            Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
984            Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
985            Field::new("name", arr_names.data_type().clone(), true),
986        ]);
987        let schema = Arc::new(schema);
988
989        let batch = RecordBatch::try_new(
990            schema,
991            vec![
992                Arc::new(arr_durationsec),
993                Arc::new(arr_durationmsec),
994                Arc::new(arr_durationusec),
995                Arc::new(arr_durationnsec),
996                Arc::new(arr_names),
997            ],
998        )
999        .unwrap();
1000
1001        let mut buf = Vec::new();
1002        {
1003            let mut writer = LineDelimitedWriter::new(&mut buf);
1004            writer.write_batches(&[&batch]).unwrap();
1005        }
1006
1007        assert_json_eq(
1008            &buf,
1009            r#"{"duration_sec":"PT120S","duration_msec":"PT0.12S","duration_usec":"PT0.00012S","duration_nsec":"PT0.00000012S","name":"a"}
1010{"name":"b"}
1011"#,
1012        );
1013    }
1014
1015    #[test]
1016    fn write_nested_structs() {
1017        let schema = Schema::new(vec![
1018            Field::new(
1019                "c1",
1020                DataType::Struct(Fields::from(vec![
1021                    Field::new("c11", DataType::Int32, true),
1022                    Field::new(
1023                        "c12",
1024                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1025                        false,
1026                    ),
1027                ])),
1028                false,
1029            ),
1030            Field::new("c2", DataType::Utf8, false),
1031        ]);
1032
1033        let c1 = StructArray::from(vec![
1034            (
1035                Arc::new(Field::new("c11", DataType::Int32, true)),
1036                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1037            ),
1038            (
1039                Arc::new(Field::new(
1040                    "c12",
1041                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1042                    false,
1043                )),
1044                Arc::new(StructArray::from(vec![(
1045                    Arc::new(Field::new("c121", DataType::Utf8, false)),
1046                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1047                )])) as ArrayRef,
1048            ),
1049        ]);
1050        let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1051
1052        let batch =
1053            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1054
1055        let mut buf = Vec::new();
1056        {
1057            let mut writer = LineDelimitedWriter::new(&mut buf);
1058            writer.write_batches(&[&batch]).unwrap();
1059        }
1060
1061        assert_json_eq(
1062            &buf,
1063            r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"}
1064{"c1":{"c12":{"c121":"f"}},"c2":"b"}
1065{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}
1066"#,
1067        );
1068    }
1069
1070    #[test]
1071    fn write_struct_with_list_field() {
1072        let field_c_list = Arc::new(Field::new("c_list", DataType::Utf8, false));
1073        let field_c1 = Field::new("c1", DataType::List(field_c_list.clone()), false);
1074        let field_c2 = Field::new("c2", DataType::Int32, false);
1075        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1076
1077        let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]);
1078        // list column rows: ["a", "a1"], ["b"], ["c"], ["d"], ["e"]
1079        let a = ListArray::new(
1080            field_c_list,
1081            OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 3, 4, 5, 6])),
1082            Arc::new(a_values),
1083            None,
1084        );
1085
1086        let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
1087
1088        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1089
1090        let mut buf = Vec::new();
1091        {
1092            let mut writer = LineDelimitedWriter::new(&mut buf);
1093            writer.write_batches(&[&batch]).unwrap();
1094        }
1095
1096        assert_json_eq(
1097            &buf,
1098            r#"{"c1":["a","a1"],"c2":1}
1099{"c1":["b"],"c2":2}
1100{"c1":["c"],"c2":3}
1101{"c1":["d"],"c2":4}
1102{"c1":["e"],"c2":5}
1103"#,
1104        );
1105    }
1106
1107    #[test]
1108    fn write_nested_list() {
1109        let field_b = Arc::new(Field::new("b", DataType::Int32, false));
1110        let field_a = Arc::new(Field::new("a", DataType::List(field_b.clone()), false));
1111        let field_c1 = Field::new("c1", DataType::List(field_a.clone()), false);
1112        let field_c2 = Field::new("c2", DataType::Utf8, true);
1113        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1114
1115        // list column rows: [[1, 2], [3]], [], [[4, 5, 6]]
1116        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1117
1118        let a_list = ListArray::new(
1119            field_b,
1120            OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 3, 6])),
1121            Arc::new(a_values),
1122            None,
1123        );
1124
1125        let c1 = ListArray::new(
1126            field_a,
1127            OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 2, 3])),
1128            Arc::new(a_list),
1129            None,
1130        );
1131        let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]);
1132
1133        let batch =
1134            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1135
1136        let mut buf = Vec::new();
1137        {
1138            let mut writer = LineDelimitedWriter::new(&mut buf);
1139            writer.write_batches(&[&batch]).unwrap();
1140        }
1141
1142        assert_json_eq(
1143            &buf,
1144            r#"{"c1":[[1,2],[3]],"c2":"foo"}
1145{"c1":[],"c2":"bar"}
1146{"c1":[[4,5,6]]}
1147"#,
1148        );
1149    }
1150
1151    #[test]
1152    fn write_list_of_struct() {
1153        let field_c1 = Field::new(
1154            "c1",
1155            DataType::List(Arc::new(Field::new(
1156                "s",
1157                DataType::Struct(Fields::from(vec![
1158                    Field::new("c11", DataType::Int32, true),
1159                    Field::new(
1160                        "c12",
1161                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1162                        false,
1163                    ),
1164                ])),
1165                false,
1166            ))),
1167            true,
1168        );
1169        let field_c2 = Field::new("c2", DataType::Int32, false);
1170        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1171
1172        let struct_values = StructArray::from(vec![
1173            (
1174                Arc::new(Field::new("c11", DataType::Int32, true)),
1175                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1176            ),
1177            (
1178                Arc::new(Field::new(
1179                    "c12",
1180                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1181                    false,
1182                )),
1183                Arc::new(StructArray::from(vec![(
1184                    Arc::new(Field::new("c121", DataType::Utf8, false)),
1185                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1186                )])) as ArrayRef,
1187            ),
1188        ]);
1189
1190        // list column rows (c1):
1191        // [{"c11": 1, "c12": {"c121": "e"}}, {"c12": {"c121": "f"}}],
1192        // null,
1193        // [{"c11": 5, "c12": {"c121": "g"}}]
1194        let c1_inner = match field_c1.data_type() {
1195            DataType::List(f) => f.clone(),
1196            _ => unreachable!(),
1197        };
1198        let c1 = ListArray::new(
1199            c1_inner,
1200            OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 2, 3])),
1201            Arc::new(struct_values),
1202            Some(NullBuffer::from(vec![true, false, true])),
1203        );
1204
1205        let c2 = Int32Array::from(vec![1, 2, 3]);
1206
1207        let batch =
1208            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1209
1210        let mut buf = Vec::new();
1211        {
1212            let mut writer = LineDelimitedWriter::new(&mut buf);
1213            writer.write_batches(&[&batch]).unwrap();
1214        }
1215
1216        assert_json_eq(
1217            &buf,
1218            r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1}
1219{"c2":2}
1220{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3}
1221"#,
1222        );
1223    }
1224
1225    fn assert_write_list_view<O: OffsetSizeTrait>() {
1226        let field = Arc::new(Field::new("item", DataType::Int32, true));
1227        let data_type = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(field.clone());
1228        let schema = Schema::new(vec![Field::new("lv", data_type, true)]);
1229
1230        // rows: [1, 2, 3], [4, null], null, [6]
1231        let values = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), None, Some(6)]);
1232        let offsets = [0, 3, 0, 5]
1233            .iter()
1234            .map(|&v| O::from_usize(v).unwrap())
1235            .collect::<Vec<_>>();
1236        let sizes = [3, 2, 0, 1]
1237            .iter()
1238            .map(|&v| O::from_usize(v).unwrap())
1239            .collect::<Vec<_>>();
1240        let list_view = GenericListViewArray::<O>::try_new(
1241            field,
1242            ScalarBuffer::from(offsets),
1243            ScalarBuffer::from(sizes),
1244            Arc::new(values),
1245            Some(NullBuffer::from_iter([true, true, false, true])),
1246        )
1247        .unwrap();
1248
1249        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
1250
1251        let mut buf = Vec::new();
1252        {
1253            let mut writer = LineDelimitedWriter::new(&mut buf);
1254            writer.write_batches(&[&batch]).unwrap();
1255        }
1256
1257        assert_json_eq(
1258            &buf,
1259            r#"{"lv":[1,2,3]}
1260{"lv":[4,null]}
1261{}
1262{"lv":[6]}
1263"#,
1264        );
1265    }
1266
1267    #[test]
1268    fn write_list_view() {
1269        assert_write_list_view::<i32>();
1270        assert_write_list_view::<i64>();
1271    }
1272
1273    fn test_write_for_file(test_file: &str, remove_nulls: bool) {
1274        let file = File::open(test_file).unwrap();
1275        let mut reader = BufReader::new(file);
1276        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1277        reader.rewind().unwrap();
1278
1279        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1280        let mut reader = builder.build(reader).unwrap();
1281        let batch = reader.next().unwrap().unwrap();
1282
1283        let mut buf = Vec::new();
1284        {
1285            if remove_nulls {
1286                let mut writer = LineDelimitedWriter::new(&mut buf);
1287                writer.write_batches(&[&batch]).unwrap();
1288            } else {
1289                let mut writer = WriterBuilder::new()
1290                    .with_explicit_nulls(true)
1291                    .build::<_, LineDelimited>(&mut buf);
1292                writer.write_batches(&[&batch]).unwrap();
1293            }
1294        }
1295
1296        let result = str::from_utf8(&buf).unwrap();
1297        let expected = read_to_string(test_file).unwrap();
1298        for (r, e) in result.lines().zip(expected.lines()) {
1299            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1300            if remove_nulls {
1301                // remove null value from object to make comparison consistent:
1302                if let Value::Object(obj) = expected_json {
1303                    expected_json =
1304                        Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1305                }
1306            }
1307            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1308        }
1309    }
1310
1311    #[test]
1312    fn write_basic_rows() {
1313        test_write_for_file("test/data/basic.json", true);
1314    }
1315
1316    #[test]
1317    fn write_arrays() {
1318        test_write_for_file("test/data/arrays.json", true);
1319    }
1320
1321    #[test]
1322    fn write_basic_nulls() {
1323        test_write_for_file("test/data/basic_nulls.json", true);
1324    }
1325
1326    #[test]
1327    fn write_nested_with_nulls() {
1328        test_write_for_file("test/data/nested_with_nulls.json", false);
1329    }
1330
1331    #[test]
1332    fn json_line_writer_empty() {
1333        let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1334        writer.finish().unwrap();
1335        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1336    }
1337
1338    #[test]
1339    fn json_array_writer_empty() {
1340        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1341        writer.finish().unwrap();
1342        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1343    }
1344
1345    #[test]
1346    fn json_line_writer_empty_batch() {
1347        let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1348
1349        let array = Int32Array::from(Vec::<i32>::new());
1350        let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1351        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1352
1353        writer.write(&batch).unwrap();
1354        writer.finish().unwrap();
1355        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1356    }
1357
1358    #[test]
1359    fn json_array_writer_empty_batch() {
1360        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1361
1362        let array = Int32Array::from(Vec::<i32>::new());
1363        let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1364        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1365
1366        writer.write(&batch).unwrap();
1367        writer.finish().unwrap();
1368        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1369    }
1370
1371    #[test]
1372    fn json_struct_array_nulls() {
1373        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1374            Some(vec![Some(1), Some(2)]),
1375            Some(vec![None]),
1376            Some(vec![]),
1377            Some(vec![Some(3), None]), // masked for a
1378            Some(vec![Some(4), Some(5)]),
1379            None, // masked for a
1380            None,
1381        ]);
1382
1383        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1384        let array = Arc::new(inner) as ArrayRef;
1385        let struct_array_a = StructArray::from((
1386            vec![(field.clone(), array.clone())],
1387            Buffer::from([0b01010111]),
1388        ));
1389        let struct_array_b = StructArray::from(vec![(field, array)]);
1390
1391        let schema = Schema::new(vec![
1392            Field::new_struct("a", struct_array_a.fields().clone(), true),
1393            Field::new_struct("b", struct_array_b.fields().clone(), true),
1394        ]);
1395
1396        let batch = RecordBatch::try_new(
1397            Arc::new(schema),
1398            vec![Arc::new(struct_array_a), Arc::new(struct_array_b)],
1399        )
1400        .unwrap();
1401
1402        let mut buf = Vec::new();
1403        {
1404            let mut writer = LineDelimitedWriter::new(&mut buf);
1405            writer.write_batches(&[&batch]).unwrap();
1406        }
1407
1408        assert_json_eq(
1409            &buf,
1410            r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
1411{"a":{"list":[null]},"b":{"list":[null]}}
1412{"a":{"list":[]},"b":{"list":[]}}
1413{"b":{"list":[3,null]}}
1414{"a":{"list":[4,5]},"b":{"list":[4,5]}}
1415{"b":{}}
1416{"a":{},"b":{}}
1417"#,
1418        );
1419    }
1420
1421    fn run_json_writer_map_with_keys(keys_array: ArrayRef) {
1422        let values_array = super::Int64Array::from(vec![10, 20, 30, 40, 50]);
1423
1424        let keys_field = Arc::new(Field::new("keys", keys_array.data_type().clone(), false));
1425        let values_field = Arc::new(Field::new("values", DataType::Int64, false));
1426        let entry_struct = StructArray::from(vec![
1427            (keys_field, keys_array.clone()),
1428            (values_field, Arc::new(values_array) as ArrayRef),
1429        ]);
1430
1431        let entries_field = Arc::new(Field::new(
1432            "entries",
1433            entry_struct.data_type().clone(),
1434            false,
1435        ));
1436
1437        // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}, {"quux": 50}, {}]
1438        let map = MapArray::new(
1439            entries_field.clone(),
1440            OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 1, 1, 1, 4, 5, 5])),
1441            entry_struct,
1442            Some(NullBuffer::from(vec![true, false, true, true, true, true])),
1443            false,
1444        );
1445
1446        let map_field = Field::new("map", DataType::Map(entries_field, false), true);
1447        let schema = Arc::new(Schema::new(vec![map_field]));
1448
1449        let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
1450
1451        let mut buf = Vec::new();
1452        {
1453            let mut writer = LineDelimitedWriter::new(&mut buf);
1454            writer.write_batches(&[&batch]).unwrap();
1455        }
1456
1457        assert_json_eq(
1458            &buf,
1459            r#"{"map":{"foo":10}}
1460{}
1461{"map":{}}
1462{"map":{"bar":20,"baz":30,"qux":40}}
1463{"map":{"quux":50}}
1464{"map":{}}
1465"#,
1466        );
1467    }
1468
1469    #[test]
1470    fn json_writer_map() {
1471        // Utf8 (StringArray)
1472        let keys_utf8 = super::StringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1473        run_json_writer_map_with_keys(Arc::new(keys_utf8) as ArrayRef);
1474
1475        // LargeUtf8 (LargeStringArray)
1476        let keys_large = super::LargeStringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1477        run_json_writer_map_with_keys(Arc::new(keys_large) as ArrayRef);
1478
1479        // Utf8View (StringViewArray)
1480        let keys_view = super::StringViewArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1481        run_json_writer_map_with_keys(Arc::new(keys_view) as ArrayRef);
1482    }
1483
1484    #[test]
1485    fn test_write_single_batch() {
1486        let test_file = "test/data/basic.json";
1487        let file = File::open(test_file).unwrap();
1488        let mut reader = BufReader::new(file);
1489        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1490        reader.rewind().unwrap();
1491
1492        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1493        let mut reader = builder.build(reader).unwrap();
1494        let batch = reader.next().unwrap().unwrap();
1495
1496        let mut buf = Vec::new();
1497        {
1498            let mut writer = LineDelimitedWriter::new(&mut buf);
1499            writer.write(&batch).unwrap();
1500        }
1501
1502        let result = str::from_utf8(&buf).unwrap();
1503        let expected = read_to_string(test_file).unwrap();
1504        for (r, e) in result.lines().zip(expected.lines()) {
1505            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1506            // remove null value from object to make comparison consistent:
1507            if let Value::Object(obj) = expected_json {
1508                expected_json =
1509                    Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1510            }
1511            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1512        }
1513    }
1514
1515    #[test]
1516    fn test_write_multi_batches() {
1517        let test_file = "test/data/basic.json";
1518
1519        let schema = SchemaRef::new(Schema::new(vec![
1520            Field::new("a", DataType::Int64, true),
1521            Field::new("b", DataType::Float64, true),
1522            Field::new("c", DataType::Boolean, true),
1523            Field::new("d", DataType::Utf8, true),
1524            Field::new("e", DataType::Utf8, true),
1525            Field::new("f", DataType::Utf8, true),
1526            Field::new("g", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1527            Field::new("h", DataType::Float16, true),
1528        ]));
1529
1530        let mut reader = ReaderBuilder::new(schema.clone())
1531            .build(BufReader::new(File::open(test_file).unwrap()))
1532            .unwrap();
1533        let batch = reader.next().unwrap().unwrap();
1534
1535        // test batches = an empty batch + 2 same batches, finally result should be eq to 2 same batches
1536        let batches = [&RecordBatch::new_empty(schema), &batch, &batch];
1537
1538        let mut buf = Vec::new();
1539        {
1540            let mut writer = LineDelimitedWriter::new(&mut buf);
1541            writer.write_batches(&batches).unwrap();
1542        }
1543
1544        let result = str::from_utf8(&buf).unwrap();
1545        let expected = read_to_string(test_file).unwrap();
1546        // result is eq to 2 same batches
1547        let expected = format!("{expected}\n{expected}");
1548        for (r, e) in result.lines().zip(expected.lines()) {
1549            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1550            // remove null value from object to make comparison consistent:
1551            if let Value::Object(obj) = expected_json {
1552                expected_json =
1553                    Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1554            }
1555            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1556        }
1557    }
1558
1559    #[test]
1560    fn test_writer_explicit_nulls() -> Result<(), ArrowError> {
1561        fn nested_list() -> (Arc<ListArray>, Arc<Field>) {
1562            let array = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1563                Some(vec![None, None, None]),
1564                Some(vec![Some(1), Some(2), Some(3)]),
1565                None,
1566                Some(vec![None, None, None]),
1567            ]));
1568            let field = Arc::new(Field::new("list", array.data_type().clone(), true));
1569            // [{"list":[null,null,null]},{"list":[1,2,3]},{"list":null},{"list":[null,null,null]}]
1570            (array, field)
1571        }
1572
1573        fn nested_dict() -> (Arc<DictionaryArray<Int32Type>>, Arc<Field>) {
1574            let array = Arc::new(DictionaryArray::from_iter(vec![
1575                Some("cupcakes"),
1576                None,
1577                Some("bear"),
1578                Some("kuma"),
1579            ]));
1580            let field = Arc::new(Field::new("dict", array.data_type().clone(), true));
1581            // [{"dict":"cupcakes"},{"dict":null},{"dict":"bear"},{"dict":"kuma"}]
1582            (array, field)
1583        }
1584
1585        fn nested_map() -> (Arc<MapArray>, Arc<Field>) {
1586            let string_builder = StringBuilder::new();
1587            let int_builder = Int64Builder::new();
1588            let mut builder = MapBuilder::new(None, string_builder, int_builder);
1589
1590            // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}]
1591            builder.keys().append_value("foo");
1592            builder.values().append_value(10);
1593            builder.append(true).unwrap();
1594
1595            builder.append(false).unwrap();
1596
1597            builder.append(true).unwrap();
1598
1599            builder.keys().append_value("bar");
1600            builder.values().append_value(20);
1601            builder.keys().append_value("baz");
1602            builder.values().append_value(30);
1603            builder.keys().append_value("qux");
1604            builder.values().append_value(40);
1605            builder.append(true).unwrap();
1606
1607            let array = Arc::new(builder.finish());
1608            let field = Arc::new(Field::new("map", array.data_type().clone(), true));
1609            (array, field)
1610        }
1611
1612        fn root_list() -> (Arc<ListArray>, Field) {
1613            let struct_array = StructArray::from(vec![
1614                (
1615                    Arc::new(Field::new("utf8", DataType::Utf8, true)),
1616                    Arc::new(StringArray::from(vec![Some("a"), Some("b"), None, None])) as ArrayRef,
1617                ),
1618                (
1619                    Arc::new(Field::new("int32", DataType::Int32, true)),
1620                    Arc::new(Int32Array::from(vec![Some(1), None, Some(5), None])) as ArrayRef,
1621                ),
1622            ]);
1623
1624            let values_field =
1625                Arc::new(Field::new("struct", struct_array.data_type().clone(), true));
1626            let field = Field::new_list("list", values_field.as_ref().clone(), true);
1627
1628            // [{"list":[{"int32":1,"utf8":"a"},{"int32":null,"utf8":"b"}]},{"list":null},{"list":[{int32":5,"utf8":null}]},{"list":null}]
1629            let array = Arc::new(ListArray::new(
1630                values_field,
1631                OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 2, 3, 3])),
1632                Arc::new(struct_array),
1633                Some(NullBuffer::from(vec![true, false, true, false])),
1634            ));
1635            (array, field)
1636        }
1637
1638        let (nested_list_array, nested_list_field) = nested_list();
1639        let (nested_dict_array, nested_dict_field) = nested_dict();
1640        let (nested_map_array, nested_map_field) = nested_map();
1641        let (root_list_array, root_list_field) = root_list();
1642
1643        let schema = Schema::new(vec![
1644            Field::new("date", DataType::Date32, true),
1645            Field::new("null", DataType::Null, true),
1646            Field::new_struct(
1647                "struct",
1648                vec![
1649                    Arc::new(Field::new("utf8", DataType::Utf8, true)),
1650                    nested_list_field.clone(),
1651                    nested_dict_field.clone(),
1652                    nested_map_field.clone(),
1653                ],
1654                true,
1655            ),
1656            root_list_field,
1657        ]);
1658
1659        let arr_date32 = Date32Array::from(vec![Some(0), None, Some(1), None]);
1660        let arr_null = NullArray::new(4);
1661        let arr_struct = StructArray::from(vec![
1662            // [{"utf8":"a"},{"utf8":null},{"utf8":null},{"utf8":"b"}]
1663            (
1664                Arc::new(Field::new("utf8", DataType::Utf8, true)),
1665                Arc::new(StringArray::from(vec![Some("a"), None, None, Some("b")])) as ArrayRef,
1666            ),
1667            // [{"list":[null,null,null]},{"list":[1,2,3]},{"list":null},{"list":[null,null,null]}]
1668            (nested_list_field, nested_list_array as ArrayRef),
1669            // [{"dict":"cupcakes"},{"dict":null},{"dict":"bear"},{"dict":"kuma"}]
1670            (nested_dict_field, nested_dict_array as ArrayRef),
1671            // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}]
1672            (nested_map_field, nested_map_array as ArrayRef),
1673        ]);
1674
1675        let batch = RecordBatch::try_new(
1676            Arc::new(schema),
1677            vec![
1678                // [{"date":"1970-01-01"},{"date":null},{"date":"1970-01-02"},{"date":null}]
1679                Arc::new(arr_date32),
1680                // [{"null":null},{"null":null},{"null":null},{"null":null}]
1681                Arc::new(arr_null),
1682                Arc::new(arr_struct),
1683                // [{"list":[{"int32":1,"utf8":"a"},{"int32":null,"utf8":"b"}]},{"list":null},{"list":[{int32":5,"utf8":null}]},{"list":null}]
1684                root_list_array,
1685            ],
1686        )?;
1687
1688        let mut buf = Vec::new();
1689        {
1690            let mut writer = WriterBuilder::new()
1691                .with_explicit_nulls(true)
1692                .build::<_, JsonArray>(&mut buf);
1693            writer.write_batches(&[&batch])?;
1694            writer.finish()?;
1695        }
1696
1697        let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
1698        let expected = serde_json::from_value::<Vec<Value>>(json!([
1699          {
1700            "date": "1970-01-01",
1701            "list": [
1702              {
1703                "int32": 1,
1704                "utf8": "a"
1705              },
1706              {
1707                "int32": null,
1708                "utf8": "b"
1709              }
1710            ],
1711            "null": null,
1712            "struct": {
1713              "dict": "cupcakes",
1714              "list": [
1715                null,
1716                null,
1717                null
1718              ],
1719              "map": {
1720                "foo": 10
1721              },
1722              "utf8": "a"
1723            }
1724          },
1725          {
1726            "date": null,
1727            "list": null,
1728            "null": null,
1729            "struct": {
1730              "dict": null,
1731              "list": [
1732                1,
1733                2,
1734                3
1735              ],
1736              "map": null,
1737              "utf8": null
1738            }
1739          },
1740          {
1741            "date": "1970-01-02",
1742            "list": [
1743              {
1744                "int32": 5,
1745                "utf8": null
1746              }
1747            ],
1748            "null": null,
1749            "struct": {
1750              "dict": "bear",
1751              "list": null,
1752              "map": {},
1753              "utf8": null
1754            }
1755          },
1756          {
1757            "date": null,
1758            "list": null,
1759            "null": null,
1760            "struct": {
1761              "dict": "kuma",
1762              "list": [
1763                null,
1764                null,
1765                null
1766              ],
1767              "map": {
1768                "bar": 20,
1769                "baz": 30,
1770                "qux": 40
1771              },
1772              "utf8": "b"
1773            }
1774          }
1775        ]))
1776        .unwrap();
1777
1778        assert_eq!(actual, expected);
1779
1780        Ok(())
1781    }
1782
1783    fn build_array_binary<O: OffsetSizeTrait>(values: &[Option<&[u8]>]) -> RecordBatch {
1784        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1785            "bytes",
1786            GenericBinaryType::<O>::DATA_TYPE,
1787            true,
1788        )]));
1789        let mut builder = GenericByteBuilder::<GenericBinaryType<O>>::new();
1790        for value in values {
1791            match value {
1792                Some(v) => builder.append_value(v),
1793                None => builder.append_null(),
1794            }
1795        }
1796        let array = Arc::new(builder.finish()) as ArrayRef;
1797        RecordBatch::try_new(schema, vec![array]).unwrap()
1798    }
1799
1800    fn build_array_binary_view(values: &[Option<&[u8]>]) -> RecordBatch {
1801        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1802            "bytes",
1803            DataType::BinaryView,
1804            true,
1805        )]));
1806        let mut builder = BinaryViewBuilder::new();
1807        for value in values {
1808            match value {
1809                Some(v) => builder.append_value(v),
1810                None => builder.append_null(),
1811            }
1812        }
1813        let array = Arc::new(builder.finish()) as ArrayRef;
1814        RecordBatch::try_new(schema, vec![array]).unwrap()
1815    }
1816
1817    fn assert_binary_json(batch: &RecordBatch) {
1818        // encode and check JSON with explicit nulls:
1819        {
1820            let mut buf = Vec::new();
1821            let json_value: Value = {
1822                let mut writer = WriterBuilder::new()
1823                    .with_explicit_nulls(true)
1824                    .build::<_, JsonArray>(&mut buf);
1825                writer.write(batch).unwrap();
1826                writer.close().unwrap();
1827                serde_json::from_slice(&buf).unwrap()
1828            };
1829
1830            assert_eq!(
1831                json!([
1832                    {
1833                        "bytes": "4e656420466c616e64657273"
1834                    },
1835                    {
1836                        "bytes": null // the explicit null
1837                    },
1838                    {
1839                        "bytes": "54726f79204d63436c757265"
1840                    }
1841                ]),
1842                json_value,
1843            );
1844        }
1845
1846        // encode and check JSON with no explicit nulls:
1847        {
1848            let mut buf = Vec::new();
1849            let json_value: Value = {
1850                // explicit nulls are off by default, so we don't need
1851                // to set that when creating the writer:
1852                let mut writer = ArrayWriter::new(&mut buf);
1853                writer.write(batch).unwrap();
1854                writer.close().unwrap();
1855                serde_json::from_slice(&buf).unwrap()
1856            };
1857
1858            assert_eq!(
1859                json!([
1860                    { "bytes": "4e656420466c616e64657273" },
1861                    {},
1862                    { "bytes": "54726f79204d63436c757265" }
1863                ]),
1864                json_value
1865            );
1866        }
1867    }
1868
1869    #[test]
1870    fn test_writer_binary() {
1871        let values: [Option<&[u8]>; 3] = [
1872            Some(b"Ned Flanders" as &[u8]),
1873            None,
1874            Some(b"Troy McClure" as &[u8]),
1875        ];
1876        // Binary:
1877        {
1878            let batch = build_array_binary::<i32>(&values);
1879            assert_binary_json(&batch);
1880        }
1881        // LargeBinary:
1882        {
1883            let batch = build_array_binary::<i64>(&values);
1884            assert_binary_json(&batch);
1885        }
1886        {
1887            let batch = build_array_binary_view(&values);
1888            assert_binary_json(&batch);
1889        }
1890    }
1891
1892    #[test]
1893    fn test_writer_fixed_size_binary() {
1894        // set up schema:
1895        let size = 11;
1896        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1897            "bytes",
1898            DataType::FixedSizeBinary(size),
1899            true,
1900        )]));
1901
1902        // build record batch:
1903        let mut builder = FixedSizeBinaryBuilder::new(size);
1904        let values = [Some(b"hello world"), None, Some(b"summer rain")];
1905        for value in values {
1906            match value {
1907                Some(v) => builder.append_value(v).unwrap(),
1908                None => builder.append_null(),
1909            }
1910        }
1911        let array = Arc::new(builder.finish()) as ArrayRef;
1912        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1913
1914        // encode and check JSON with explicit nulls:
1915        {
1916            let mut buf = Vec::new();
1917            let json_value: Value = {
1918                let mut writer = WriterBuilder::new()
1919                    .with_explicit_nulls(true)
1920                    .build::<_, JsonArray>(&mut buf);
1921                writer.write(&batch).unwrap();
1922                writer.close().unwrap();
1923                serde_json::from_slice(&buf).unwrap()
1924            };
1925
1926            assert_eq!(
1927                json!([
1928                    {
1929                        "bytes": "68656c6c6f20776f726c64"
1930                    },
1931                    {
1932                        "bytes": null // the explicit null
1933                    },
1934                    {
1935                        "bytes": "73756d6d6572207261696e"
1936                    }
1937                ]),
1938                json_value,
1939            );
1940        }
1941        // encode and check JSON with no explicit nulls:
1942        {
1943            let mut buf = Vec::new();
1944            let json_value: Value = {
1945                // explicit nulls are off by default, so we don't need
1946                // to set that when creating the writer:
1947                let mut writer = ArrayWriter::new(&mut buf);
1948                writer.write(&batch).unwrap();
1949                writer.close().unwrap();
1950                serde_json::from_slice(&buf).unwrap()
1951            };
1952
1953            assert_eq!(
1954                json!([
1955                    {
1956                        "bytes": "68656c6c6f20776f726c64"
1957                    },
1958                    {}, // empty because nulls are omitted
1959                    {
1960                        "bytes": "73756d6d6572207261696e"
1961                    }
1962                ]),
1963                json_value,
1964            );
1965        }
1966    }
1967
1968    #[test]
1969    fn test_writer_fixed_size_list() {
1970        let size = 3;
1971        let field = FieldRef::new(Field::new_list_field(DataType::Int32, true));
1972        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1973            "list",
1974            DataType::FixedSizeList(field, size),
1975            true,
1976        )]));
1977
1978        let values_builder = Int32Builder::new();
1979        let mut list_builder = FixedSizeListBuilder::new(values_builder, size);
1980        let lists = [
1981            Some([Some(1), Some(2), None]),
1982            Some([Some(3), None, Some(4)]),
1983            Some([None, Some(5), Some(6)]),
1984            None,
1985        ];
1986        for list in lists {
1987            match list {
1988                Some(l) => {
1989                    for value in l {
1990                        match value {
1991                            Some(v) => list_builder.values().append_value(v),
1992                            None => list_builder.values().append_null(),
1993                        }
1994                    }
1995                    list_builder.append(true);
1996                }
1997                None => {
1998                    for _ in 0..size {
1999                        list_builder.values().append_null();
2000                    }
2001                    list_builder.append(false);
2002                }
2003            }
2004        }
2005        let array = Arc::new(list_builder.finish()) as ArrayRef;
2006        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2007
2008        //encode and check JSON with explicit nulls:
2009        {
2010            let json_value: Value = {
2011                let mut buf = Vec::new();
2012                let mut writer = WriterBuilder::new()
2013                    .with_explicit_nulls(true)
2014                    .build::<_, JsonArray>(&mut buf);
2015                writer.write(&batch).unwrap();
2016                writer.close().unwrap();
2017                serde_json::from_slice(&buf).unwrap()
2018            };
2019            assert_eq!(
2020                json!([
2021                    {"list": [1, 2, null]},
2022                    {"list": [3, null, 4]},
2023                    {"list": [null, 5, 6]},
2024                    {"list": null},
2025                ]),
2026                json_value
2027            );
2028        }
2029        // encode and check JSON with no explicit nulls:
2030        {
2031            let json_value: Value = {
2032                let mut buf = Vec::new();
2033                let mut writer = ArrayWriter::new(&mut buf);
2034                writer.write(&batch).unwrap();
2035                writer.close().unwrap();
2036                serde_json::from_slice(&buf).unwrap()
2037            };
2038            assert_eq!(
2039                json!([
2040                    {"list": [1, 2, null]},
2041                    {"list": [3, null, 4]},
2042                    {"list": [null, 5, 6]},
2043                    {}, // empty because nulls are omitted
2044                ]),
2045                json_value
2046            );
2047        }
2048    }
2049
2050    #[test]
2051    fn test_writer_null_dict() {
2052        let keys = Int32Array::from_iter(vec![Some(0), None, Some(1)]);
2053        let values = Arc::new(StringArray::from_iter(vec![Some("a"), None]));
2054        let dict = DictionaryArray::new(keys, values);
2055
2056        let schema = SchemaRef::new(Schema::new(vec![Field::new(
2057            "my_dict",
2058            DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into()),
2059            true,
2060        )]));
2061
2062        let array = Arc::new(dict) as ArrayRef;
2063        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2064
2065        let mut json = Vec::new();
2066        let write_builder = WriterBuilder::new().with_explicit_nulls(true);
2067        let mut writer = write_builder.build::<_, JsonArray>(&mut json);
2068        writer.write(&batch).unwrap();
2069        writer.close().unwrap();
2070
2071        let json_str = str::from_utf8(&json).unwrap();
2072        assert_eq!(
2073            json_str,
2074            r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":""}]"#
2075        )
2076    }
2077
2078    #[test]
2079    fn test_decimal32_encoder() {
2080        let array = Decimal32Array::from_iter_values([1234, 5678, 9012])
2081            .with_precision_and_scale(8, 2)
2082            .unwrap();
2083        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2084        let schema = Schema::new(vec![field]);
2085        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2086
2087        let mut buf = Vec::new();
2088        {
2089            let mut writer = LineDelimitedWriter::new(&mut buf);
2090            writer.write_batches(&[&batch]).unwrap();
2091        }
2092
2093        assert_json_eq(
2094            &buf,
2095            r#"{"decimal":12.34}
2096{"decimal":56.78}
2097{"decimal":90.12}
2098"#,
2099        );
2100    }
2101
2102    #[test]
2103    fn test_decimal64_encoder() {
2104        let array = Decimal64Array::from_iter_values([1234, 5678, 9012])
2105            .with_precision_and_scale(10, 2)
2106            .unwrap();
2107        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2108        let schema = Schema::new(vec![field]);
2109        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2110
2111        let mut buf = Vec::new();
2112        {
2113            let mut writer = LineDelimitedWriter::new(&mut buf);
2114            writer.write_batches(&[&batch]).unwrap();
2115        }
2116
2117        assert_json_eq(
2118            &buf,
2119            r#"{"decimal":12.34}
2120{"decimal":56.78}
2121{"decimal":90.12}
2122"#,
2123        );
2124    }
2125
2126    #[test]
2127    fn test_decimal128_encoder() {
2128        let array = Decimal128Array::from_iter_values([1234, 5678, 9012])
2129            .with_precision_and_scale(10, 2)
2130            .unwrap();
2131        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2132        let schema = Schema::new(vec![field]);
2133        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2134
2135        let mut buf = Vec::new();
2136        {
2137            let mut writer = LineDelimitedWriter::new(&mut buf);
2138            writer.write_batches(&[&batch]).unwrap();
2139        }
2140
2141        assert_json_eq(
2142            &buf,
2143            r#"{"decimal":12.34}
2144{"decimal":56.78}
2145{"decimal":90.12}
2146"#,
2147        );
2148    }
2149
2150    #[test]
2151    fn test_decimal256_encoder() {
2152        let array = Decimal256Array::from_iter_values([
2153            i256::from(123400),
2154            i256::from(567800),
2155            i256::from(901200),
2156        ])
2157        .with_precision_and_scale(10, 4)
2158        .unwrap();
2159        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2160        let schema = Schema::new(vec![field]);
2161        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2162
2163        let mut buf = Vec::new();
2164        {
2165            let mut writer = LineDelimitedWriter::new(&mut buf);
2166            writer.write_batches(&[&batch]).unwrap();
2167        }
2168
2169        assert_json_eq(
2170            &buf,
2171            r#"{"decimal":12.3400}
2172{"decimal":56.7800}
2173{"decimal":90.1200}
2174"#,
2175        );
2176    }
2177
2178    #[test]
2179    fn test_decimal_encoder_with_nulls() {
2180        let array = Decimal128Array::from_iter([Some(1234), None, Some(5678)])
2181            .with_precision_and_scale(10, 2)
2182            .unwrap();
2183        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2184        let schema = Schema::new(vec![field]);
2185        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2186
2187        let mut buf = Vec::new();
2188        {
2189            let mut writer = LineDelimitedWriter::new(&mut buf);
2190            writer.write_batches(&[&batch]).unwrap();
2191        }
2192
2193        assert_json_eq(
2194            &buf,
2195            r#"{"decimal":12.34}
2196{}
2197{"decimal":56.78}
2198"#,
2199        );
2200    }
2201
2202    #[test]
2203    fn write_structs_as_list() {
2204        let schema = Schema::new(vec![
2205            Field::new(
2206                "c1",
2207                DataType::Struct(Fields::from(vec![
2208                    Field::new("c11", DataType::Int32, true),
2209                    Field::new(
2210                        "c12",
2211                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2212                        false,
2213                    ),
2214                ])),
2215                false,
2216            ),
2217            Field::new("c2", DataType::Utf8, false),
2218        ]);
2219
2220        let c1 = StructArray::from(vec![
2221            (
2222                Arc::new(Field::new("c11", DataType::Int32, true)),
2223                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
2224            ),
2225            (
2226                Arc::new(Field::new(
2227                    "c12",
2228                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2229                    false,
2230                )),
2231                Arc::new(StructArray::from(vec![(
2232                    Arc::new(Field::new("c121", DataType::Utf8, false)),
2233                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
2234                )])) as ArrayRef,
2235            ),
2236        ]);
2237        let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
2238
2239        let batch =
2240            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
2241
2242        let expected = r#"[[1,["e"]],"a"]
2243[[null,["f"]],"b"]
2244[[5,["g"]],"c"]
2245"#;
2246
2247        let mut buf = Vec::new();
2248        {
2249            let builder = WriterBuilder::new()
2250                .with_explicit_nulls(true)
2251                .with_struct_mode(StructMode::ListOnly);
2252            let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2253            writer.write_batches(&[&batch]).unwrap();
2254        }
2255        assert_json_eq(&buf, expected);
2256
2257        let mut buf = Vec::new();
2258        {
2259            let builder = WriterBuilder::new()
2260                .with_explicit_nulls(false)
2261                .with_struct_mode(StructMode::ListOnly);
2262            let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2263            writer.write_batches(&[&batch]).unwrap();
2264        }
2265        assert_json_eq(&buf, expected);
2266    }
2267
2268    fn make_fallback_encoder_test_data() -> (RecordBatch, Arc<dyn EncoderFactory>) {
2269        // Note: this is not intended to be an efficient implementation.
2270        // Just a simple example to demonstrate how to implement a custom encoder.
2271        #[derive(Debug)]
2272        enum UnionValue {
2273            Int32(i32),
2274            String(String),
2275        }
2276
2277        #[derive(Debug)]
2278        struct UnionEncoder {
2279            array: Vec<Option<UnionValue>>,
2280        }
2281
2282        impl Encoder for UnionEncoder {
2283            fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2284                match &self.array[idx] {
2285                    None => out.extend_from_slice(b"null"),
2286                    Some(UnionValue::Int32(v)) => out.extend_from_slice(v.to_string().as_bytes()),
2287                    Some(UnionValue::String(v)) => {
2288                        out.extend_from_slice(format!("\"{v}\"").as_bytes())
2289                    }
2290                }
2291            }
2292        }
2293
2294        #[derive(Debug)]
2295        struct UnionEncoderFactory;
2296
2297        impl EncoderFactory for UnionEncoderFactory {
2298            fn make_default_encoder<'a>(
2299                &self,
2300                _field: &'a FieldRef,
2301                array: &'a dyn Array,
2302                _options: &'a EncoderOptions,
2303            ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2304                let data_type = array.data_type();
2305                let fields = match data_type {
2306                    DataType::Union(fields, UnionMode::Sparse) => fields,
2307                    _ => return Ok(None),
2308                };
2309                // check that the fields are supported
2310                let fields = fields.iter().map(|(_, f)| f).collect::<Vec<_>>();
2311                for f in fields.iter() {
2312                    match f.data_type() {
2313                        DataType::Null => {}
2314                        DataType::Int32 => {}
2315                        DataType::Utf8 => {}
2316                        _ => return Ok(None),
2317                    }
2318                }
2319                let (_, type_ids, _, buffers) = array.as_union().clone().into_parts();
2320                let mut values = Vec::with_capacity(type_ids.len());
2321                for idx in 0..type_ids.len() {
2322                    let type_id = type_ids[idx];
2323                    let field = &fields[type_id as usize];
2324                    let value = match field.data_type() {
2325                        DataType::Null => None,
2326                        DataType::Int32 => Some(UnionValue::Int32(
2327                            buffers[type_id as usize]
2328                                .as_primitive::<Int32Type>()
2329                                .value(idx),
2330                        )),
2331                        DataType::Utf8 => Some(UnionValue::String(
2332                            buffers[type_id as usize]
2333                                .as_string::<i32>()
2334                                .value(idx)
2335                                .to_string(),
2336                        )),
2337                        _ => unreachable!(),
2338                    };
2339                    values.push(value);
2340                }
2341                let array_encoder =
2342                    Box::new(UnionEncoder { array: values }) as Box<dyn Encoder + 'a>;
2343                let nulls = array.nulls().cloned();
2344                Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2345            }
2346        }
2347
2348        let int_array = Int32Array::from(vec![Some(1), None, None]);
2349        let string_array = StringArray::from(vec![None, Some("a"), None]);
2350        let null_array = NullArray::new(3);
2351        let type_ids = [0_i8, 1, 2].into_iter().collect::<ScalarBuffer<i8>>();
2352
2353        let union_fields = [
2354            (0, Arc::new(Field::new("A", DataType::Int32, false))),
2355            (1, Arc::new(Field::new("B", DataType::Utf8, false))),
2356            (2, Arc::new(Field::new("C", DataType::Null, false))),
2357        ]
2358        .into_iter()
2359        .collect::<UnionFields>();
2360
2361        let children = vec![
2362            Arc::new(int_array) as Arc<dyn Array>,
2363            Arc::new(string_array),
2364            Arc::new(null_array),
2365        ];
2366
2367        let array = UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap();
2368
2369        let float_array = Float64Array::from(vec![Some(1.0), None, Some(3.4)]);
2370
2371        let fields = vec![
2372            Field::new(
2373                "union",
2374                DataType::Union(union_fields, UnionMode::Sparse),
2375                true,
2376            ),
2377            Field::new("float", DataType::Float64, true),
2378        ];
2379
2380        let batch = RecordBatch::try_new(
2381            Arc::new(Schema::new(fields)),
2382            vec![
2383                Arc::new(array) as Arc<dyn Array>,
2384                Arc::new(float_array) as Arc<dyn Array>,
2385            ],
2386        )
2387        .unwrap();
2388
2389        (batch, Arc::new(UnionEncoderFactory))
2390    }
2391
2392    #[test]
2393    fn test_fallback_encoder_factory_line_delimited_implicit_nulls() {
2394        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2395
2396        let mut buf = Vec::new();
2397        {
2398            let mut writer = WriterBuilder::new()
2399                .with_encoder_factory(encoder_factory)
2400                .with_explicit_nulls(false)
2401                .build::<_, LineDelimited>(&mut buf);
2402            writer.write_batches(&[&batch]).unwrap();
2403            writer.finish().unwrap();
2404        }
2405
2406        println!("{}", str::from_utf8(&buf).unwrap());
2407
2408        assert_json_eq(
2409            &buf,
2410            r#"{"union":1,"float":1.0}
2411{"union":"a"}
2412{"union":null,"float":3.4}
2413"#,
2414        );
2415    }
2416
2417    #[test]
2418    fn test_fallback_encoder_factory_line_delimited_explicit_nulls() {
2419        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2420
2421        let mut buf = Vec::new();
2422        {
2423            let mut writer = WriterBuilder::new()
2424                .with_encoder_factory(encoder_factory)
2425                .with_explicit_nulls(true)
2426                .build::<_, LineDelimited>(&mut buf);
2427            writer.write_batches(&[&batch]).unwrap();
2428            writer.finish().unwrap();
2429        }
2430
2431        assert_json_eq(
2432            &buf,
2433            r#"{"union":1,"float":1.0}
2434{"union":"a","float":null}
2435{"union":null,"float":3.4}
2436"#,
2437        );
2438    }
2439
2440    #[test]
2441    fn test_fallback_encoder_factory_array_implicit_nulls() {
2442        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2443
2444        let json_value: Value = {
2445            let mut buf = Vec::new();
2446            let mut writer = WriterBuilder::new()
2447                .with_encoder_factory(encoder_factory)
2448                .build::<_, JsonArray>(&mut buf);
2449            writer.write_batches(&[&batch]).unwrap();
2450            writer.finish().unwrap();
2451            serde_json::from_slice(&buf).unwrap()
2452        };
2453
2454        let expected = json!([
2455            {"union":1,"float":1.0},
2456            {"union":"a"},
2457            {"float":3.4,"union":null},
2458        ]);
2459
2460        assert_eq!(json_value, expected);
2461    }
2462
2463    #[test]
2464    fn test_fallback_encoder_factory_array_explicit_nulls() {
2465        let (batch, encoder_factory) = make_fallback_encoder_test_data();
2466
2467        let json_value: Value = {
2468            let mut buf = Vec::new();
2469            let mut writer = WriterBuilder::new()
2470                .with_encoder_factory(encoder_factory)
2471                .with_explicit_nulls(true)
2472                .build::<_, JsonArray>(&mut buf);
2473            writer.write_batches(&[&batch]).unwrap();
2474            writer.finish().unwrap();
2475            serde_json::from_slice(&buf).unwrap()
2476        };
2477
2478        let expected = json!([
2479            {"union":1,"float":1.0},
2480            {"union":"a", "float": null},
2481            {"union":null,"float":3.4},
2482        ]);
2483
2484        assert_eq!(json_value, expected);
2485    }
2486
2487    #[test]
2488    fn test_default_encoder_byte_array() {
2489        struct IntArrayBinaryEncoder<B> {
2490            array: B,
2491        }
2492
2493        impl<'a, B> Encoder for IntArrayBinaryEncoder<B>
2494        where
2495            B: ArrayAccessor<Item = &'a [u8]>,
2496        {
2497            fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2498                out.push(b'[');
2499                let child = self.array.value(idx);
2500                for (idx, byte) in child.iter().enumerate() {
2501                    write!(out, "{byte}").unwrap();
2502                    if idx < child.len() - 1 {
2503                        out.push(b',');
2504                    }
2505                }
2506                out.push(b']');
2507            }
2508        }
2509
2510        #[derive(Debug)]
2511        struct IntArayBinaryEncoderFactory;
2512
2513        impl EncoderFactory for IntArayBinaryEncoderFactory {
2514            fn make_default_encoder<'a>(
2515                &self,
2516                _field: &'a FieldRef,
2517                array: &'a dyn Array,
2518                _options: &'a EncoderOptions,
2519            ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2520                match array.data_type() {
2521                    DataType::Binary => {
2522                        let array = array.as_binary::<i32>();
2523                        let encoder = IntArrayBinaryEncoder { array };
2524                        let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2525                        let nulls = array.nulls().cloned();
2526                        Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2527                    }
2528                    _ => Ok(None),
2529                }
2530            }
2531        }
2532
2533        let binary_array = BinaryArray::from_opt_vec(vec![Some(b"a"), None, Some(b"b")]);
2534        let float_array = Float64Array::from(vec![Some(1.0), Some(2.3), None]);
2535        let fields = vec![
2536            Field::new("bytes", DataType::Binary, true),
2537            Field::new("float", DataType::Float64, true),
2538        ];
2539        let batch = RecordBatch::try_new(
2540            Arc::new(Schema::new(fields)),
2541            vec![
2542                Arc::new(binary_array) as Arc<dyn Array>,
2543                Arc::new(float_array) as Arc<dyn Array>,
2544            ],
2545        )
2546        .unwrap();
2547
2548        let json_value: Value = {
2549            let mut buf = Vec::new();
2550            let mut writer = WriterBuilder::new()
2551                .with_encoder_factory(Arc::new(IntArayBinaryEncoderFactory))
2552                .build::<_, JsonArray>(&mut buf);
2553            writer.write_batches(&[&batch]).unwrap();
2554            writer.finish().unwrap();
2555            serde_json::from_slice(&buf).unwrap()
2556        };
2557
2558        let expected = json!([
2559            {"bytes": [97], "float": 1.0},
2560            {"float": 2.3},
2561            {"bytes": [98]},
2562        ]);
2563
2564        assert_eq!(json_value, expected);
2565    }
2566
2567    #[test]
2568    fn test_encoder_factory_customize_dictionary() {
2569        // Test that we can customize the encoding of T even when it shows up as Dictionary<_, T>.
2570
2571        // No particular reason to choose this example.
2572        // Just trying to add some variety to the test cases and demonstrate use cases of the encoder factory.
2573        struct PaddedInt32Encoder {
2574            array: Int32Array,
2575        }
2576
2577        impl Encoder for PaddedInt32Encoder {
2578            fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2579                let value = self.array.value(idx);
2580                write!(out, "\"{value:0>8}\"").unwrap();
2581            }
2582        }
2583
2584        #[derive(Debug)]
2585        struct CustomEncoderFactory;
2586
2587        impl EncoderFactory for CustomEncoderFactory {
2588            fn make_default_encoder<'a>(
2589                &self,
2590                field: &'a FieldRef,
2591                array: &'a dyn Array,
2592                _options: &'a EncoderOptions,
2593            ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2594                // The point here is:
2595                // 1. You can use information from Field to determine how to do the encoding.
2596                // 2. For dictionary arrays the Field is always the outer field but the array may be the keys or values array
2597                //    and thus the data type of `field` may not match the data type of `array`.
2598                let padded = field
2599                    .metadata()
2600                    .get("padded")
2601                    .map(|v| v == "true")
2602                    .unwrap_or_default();
2603                match (array.data_type(), padded) {
2604                    (DataType::Int32, true) => {
2605                        let array = array.as_primitive::<Int32Type>();
2606                        let nulls = array.nulls().cloned();
2607                        let encoder = PaddedInt32Encoder {
2608                            array: array.clone(),
2609                        };
2610                        let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2611                        Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2612                    }
2613                    _ => Ok(None),
2614                }
2615            }
2616        }
2617
2618        let to_json = |batch| {
2619            let mut buf = Vec::new();
2620            let mut writer = WriterBuilder::new()
2621                .with_encoder_factory(Arc::new(CustomEncoderFactory))
2622                .build::<_, JsonArray>(&mut buf);
2623            writer.write_batches(&[batch]).unwrap();
2624            writer.finish().unwrap();
2625            serde_json::from_slice::<Value>(&buf).unwrap()
2626        };
2627
2628        // Control case: no dictionary wrapping works as expected.
2629        let array = Int32Array::from(vec![Some(1), None, Some(2)]);
2630        let field = Arc::new(Field::new("int", DataType::Int32, true).with_metadata(
2631            HashMap::from_iter(vec![("padded".to_string(), "true".to_string())]),
2632        ));
2633        let batch = RecordBatch::try_new(
2634            Arc::new(Schema::new(vec![field.clone()])),
2635            vec![Arc::new(array)],
2636        )
2637        .unwrap();
2638
2639        let json_value = to_json(&batch);
2640
2641        let expected = json!([
2642            {"int": "00000001"},
2643            {},
2644            {"int": "00000002"},
2645        ]);
2646
2647        assert_eq!(json_value, expected);
2648
2649        // Now make a dictionary batch
2650        let mut array_builder = PrimitiveDictionaryBuilder::<UInt16Type, Int32Type>::new();
2651        array_builder.append_value(1);
2652        array_builder.append_null();
2653        array_builder.append_value(1);
2654        let array = array_builder.finish();
2655        let field = Field::new(
2656            "int",
2657            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Int32)),
2658            true,
2659        )
2660        .with_metadata(HashMap::from_iter(vec![(
2661            "padded".to_string(),
2662            "true".to_string(),
2663        )]));
2664        let batch = RecordBatch::try_new(Arc::new(Schema::new(vec![field])), vec![Arc::new(array)])
2665            .unwrap();
2666
2667        let json_value = to_json(&batch);
2668
2669        let expected = json!([
2670            {"int": "00000001"},
2671            {},
2672            {"int": "00000001"},
2673        ]);
2674
2675        assert_eq!(json_value, expected);
2676    }
2677
2678    #[test]
2679    fn test_write_run_end_encoded() {
2680        let run_ends = Int32Array::from(vec![2, 5, 6]);
2681        let values = StringArray::from(vec![Some("a"), Some("b"), None]);
2682        let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
2683
2684        let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
2685            "c1",
2686            ree.data_type().clone(),
2687            true,
2688        )]));
2689
2690        let batch = RecordBatch::try_new(schema, vec![Arc::new(ree)]).unwrap();
2691
2692        let mut buf = Vec::new();
2693        {
2694            let mut writer = LineDelimitedWriter::new(&mut buf);
2695            writer.write_batches(&[&batch]).unwrap();
2696        }
2697
2698        assert_json_eq(
2699            &buf,
2700            r#"{"c1":"a"}
2701{"c1":"a"}
2702{"c1":"b"}
2703{"c1":"b"}
2704{"c1":"b"}
2705{}
2706"#,
2707        );
2708    }
2709
2710    #[test]
2711    fn test_write_run_end_encoded_int_values() {
2712        let run_ends = Int32Array::from(vec![3, 5]);
2713        let values = Int32Array::from(vec![10, 20]);
2714        let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
2715
2716        let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
2717            "n",
2718            ree.data_type().clone(),
2719            true,
2720        )]));
2721
2722        let batch = RecordBatch::try_new(schema, vec![Arc::new(ree)]).unwrap();
2723
2724        let json_value: Value = {
2725            let mut buf = Vec::new();
2726            let mut writer = WriterBuilder::new().build::<_, JsonArray>(&mut buf);
2727            writer.write_batches(&[&batch]).unwrap();
2728            writer.finish().unwrap();
2729            serde_json::from_slice(&buf).unwrap()
2730        };
2731
2732        let expected = json!([
2733            {"n": 10},
2734            {"n": 10},
2735            {"n": 10},
2736            {"n": 20},
2737            {"n": 20},
2738        ]);
2739
2740        assert_eq!(json_value, expected);
2741    }
2742
2743    #[test]
2744    fn test_run_end_encoded_roundtrip() {
2745        let run_ends = Int32Array::from(vec![3, 5, 7]);
2746        let values = StringArray::from(vec![Some("a"), None, Some("b")]);
2747        let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
2748
2749        let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
2750            "c",
2751            ree.data_type().clone(),
2752            true,
2753        )]));
2754        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(ree)]).unwrap();
2755
2756        let mut buf = Vec::new();
2757        {
2758            let mut writer = super::LineDelimitedWriter::new(&mut buf);
2759            writer.write_batches(&[&batch]).unwrap();
2760        }
2761
2762        let batches: Vec<RecordBatch> = ReaderBuilder::new(schema)
2763            .with_batch_size(1024)
2764            .build(std::io::Cursor::new(&buf))
2765            .unwrap()
2766            .collect::<Result<Vec<_>, _>>()
2767            .unwrap();
2768        assert_eq!(batches.len(), 1);
2769
2770        let col = batches[0].column(0);
2771        let run_array = col.as_run::<Int32Type>();
2772
2773        assert_eq!(run_array.len(), 7);
2774        assert_eq!(run_array.run_ends().values(), &[3, 5, 7]);
2775
2776        let values = run_array.values().as_string::<i32>();
2777        assert_eq!(values.len(), 3);
2778        assert_eq!(values.value(0), "a");
2779        assert!(values.is_null(1));
2780        assert_eq!(values.value(2), "b");
2781    }
2782}