1mod encoder;
108
109use std::{fmt::Debug, io::Write, sync::Arc};
110
111use crate::StructMode;
112use arrow_array::*;
113use arrow_schema::*;
114
115pub use encoder::{Encoder, EncoderFactory, EncoderOptions, NullableEncoder, make_encoder};
116
117pub trait JsonFormat: Debug + Default {
120 #[inline]
121 fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
123 Ok(())
124 }
125
126 #[inline]
127 fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<(), ArrowError> {
129 Ok(())
130 }
131
132 #[inline]
133 fn end_row<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
135 Ok(())
136 }
137
138 fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
140 Ok(())
141 }
142}
143
144#[derive(Debug, Default)]
154pub struct LineDelimited {}
155
156impl JsonFormat for LineDelimited {
157 fn end_row<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
158 writer.write_all(b"\n")?;
159 Ok(())
160 }
161}
162
163#[derive(Debug, Default)]
171pub struct JsonArray {}
172
173impl JsonFormat for JsonArray {
174 fn start_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
175 writer.write_all(b"[")?;
176 Ok(())
177 }
178
179 fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<(), ArrowError> {
180 if !is_first_row {
181 writer.write_all(b",")?;
182 }
183 Ok(())
184 }
185
186 fn end_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
187 writer.write_all(b"]")?;
188 Ok(())
189 }
190}
191
192pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
194
195pub type ArrayWriter<W> = Writer<W, JsonArray>;
197
198#[derive(Debug, Clone, Default)]
200pub struct WriterBuilder(EncoderOptions);
201
202impl WriterBuilder {
203 pub fn new() -> Self {
223 Self::default()
224 }
225
226 pub fn explicit_nulls(&self) -> bool {
228 self.0.explicit_nulls()
229 }
230
231 pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
254 self.0 = self.0.with_explicit_nulls(explicit_nulls);
255 self
256 }
257
258 pub fn struct_mode(&self) -> StructMode {
260 self.0.struct_mode()
261 }
262
263 pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
269 self.0 = self.0.with_struct_mode(struct_mode);
270 self
271 }
272
273 pub fn with_encoder_factory(mut self, factory: Arc<dyn EncoderFactory>) -> Self {
278 self.0 = self.0.with_encoder_factory(factory);
279 self
280 }
281
282 pub fn with_date_format(mut self, format: String) -> Self {
284 self.0 = self.0.with_date_format(format);
285 self
286 }
287
288 pub fn with_datetime_format(mut self, format: String) -> Self {
290 self.0 = self.0.with_datetime_format(format);
291 self
292 }
293
294 pub fn with_time_format(mut self, format: String) -> Self {
296 self.0 = self.0.with_time_format(format);
297 self
298 }
299
300 pub fn with_timestamp_format(mut self, format: String) -> Self {
302 self.0 = self.0.with_timestamp_format(format);
303 self
304 }
305
306 pub fn with_timestamp_tz_format(mut self, tz_format: String) -> Self {
308 self.0 = self.0.with_timestamp_tz_format(tz_format);
309 self
310 }
311
312 pub fn build<W, F>(self, writer: W) -> Writer<W, F>
314 where
315 W: Write,
316 F: JsonFormat,
317 {
318 Writer {
319 writer,
320 started: false,
321 finished: false,
322 format: F::default(),
323 options: self.0,
324 }
325 }
326}
327
328#[derive(Debug)]
339pub struct Writer<W, F>
340where
341 W: Write,
342 F: JsonFormat,
343{
344 writer: W,
346
347 started: bool,
349
350 finished: bool,
352
353 format: F,
355
356 options: EncoderOptions,
358}
359
360impl<W, F> Writer<W, F>
361where
362 W: Write,
363 F: JsonFormat,
364{
365 pub fn new(writer: W) -> Self {
367 Self {
368 writer,
369 started: false,
370 finished: false,
371 format: F::default(),
372 options: EncoderOptions::default(),
373 }
374 }
375
376 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
378 if batch.num_rows() == 0 {
379 return Ok(());
380 }
381
382 let mut buffer = Vec::with_capacity(16 * 1024);
385
386 let mut is_first_row = !self.started;
387 if !self.started {
388 self.format.start_stream(&mut buffer)?;
389 self.started = true;
390 }
391
392 let array = StructArray::from(batch.clone());
393 let field = Arc::new(Field::new_struct(
394 "",
395 batch.schema().fields().clone(),
396 false,
397 ));
398
399 let mut encoder = make_encoder(&field, &array, &self.options)?;
400
401 assert!(!encoder.has_nulls(), "root cannot be nullable");
403 for idx in 0..batch.num_rows() {
404 self.format.start_row(&mut buffer, is_first_row)?;
405 is_first_row = false;
406
407 encoder.encode(idx, &mut buffer);
408 if buffer.len() > 8 * 1024 {
409 self.writer.write_all(&buffer)?;
410 buffer.clear();
411 }
412 self.format.end_row(&mut buffer)?;
413 }
414
415 if !buffer.is_empty() {
416 self.writer.write_all(&buffer)?;
417 }
418
419 Ok(())
420 }
421
422 pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
424 for b in batches {
425 self.write(b)?;
426 }
427 Ok(())
428 }
429
430 pub fn finish(&mut self) -> Result<(), ArrowError> {
434 if !self.started {
435 self.format.start_stream(&mut self.writer)?;
436 self.started = true;
437 }
438 if !self.finished {
439 self.format.end_stream(&mut self.writer)?;
440 self.finished = true;
441 }
442
443 Ok(())
444 }
445
446 pub fn get_ref(&self) -> &W {
448 &self.writer
449 }
450
451 pub fn get_mut(&mut self) -> &mut W {
456 &mut self.writer
457 }
458
459 pub fn into_inner(self) -> W {
461 self.writer
462 }
463}
464
465impl<W, F> RecordBatchWriter for Writer<W, F>
466where
467 W: Write,
468 F: JsonFormat,
469{
470 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
471 self.write(batch)
472 }
473
474 fn close(mut self) -> Result<(), ArrowError> {
475 self.finish()
476 }
477}
478
479#[cfg(test)]
480mod tests {
481 use core::str;
482 use std::collections::HashMap;
483 use std::fs::{File, read_to_string};
484 use std::io::{BufReader, Seek};
485 use std::sync::Arc;
486
487 use arrow_array::cast::AsArray;
488 use serde_json::{Value, json};
489
490 use super::LineDelimited;
491 use super::{Encoder, WriterBuilder};
492 use arrow_array::builder::*;
493 use arrow_array::types::*;
494 use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer, i256};
495
496 use crate::reader::*;
497
498 use super::*;
499
500 fn assert_json_eq(input: &[u8], expected: &str) {
502 let expected: Vec<Option<Value>> = expected
503 .split('\n')
504 .map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
505 .collect();
506
507 let actual: Vec<Option<Value>> = input
508 .split(|b| *b == b'\n')
509 .map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap()))
510 .collect();
511
512 assert_eq!(actual, expected);
513 }
514
515 #[test]
516 fn write_simple_rows() {
517 let schema = Schema::new(vec![
518 Field::new("c1", DataType::Int32, true),
519 Field::new("c2", DataType::Utf8, true),
520 ]);
521
522 let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
523 let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]);
524
525 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
526
527 let mut buf = Vec::new();
528 {
529 let mut writer = LineDelimitedWriter::new(&mut buf);
530 writer.write_batches(&[&batch]).unwrap();
531 }
532
533 assert_json_eq(
534 &buf,
535 r#"{"c1":1,"c2":"a"}
536{"c1":2,"c2":"b"}
537{"c1":3,"c2":"c"}
538{"c2":"d"}
539{"c1":5}
540"#,
541 );
542 }
543
544 #[test]
545 fn write_large_utf8_and_utf8_view() {
546 let schema = Schema::new(vec![
547 Field::new("c1", DataType::Utf8, true),
548 Field::new("c2", DataType::LargeUtf8, true),
549 Field::new("c3", DataType::Utf8View, true),
550 ]);
551
552 let a = StringArray::from(vec![Some("a"), None, Some("c"), Some("d"), None]);
553 let b = LargeStringArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
554 let c = StringViewArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
555
556 let batch = RecordBatch::try_new(
557 Arc::new(schema),
558 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
559 )
560 .unwrap();
561
562 let mut buf = Vec::new();
563 {
564 let mut writer = LineDelimitedWriter::new(&mut buf);
565 writer.write_batches(&[&batch]).unwrap();
566 }
567
568 assert_json_eq(
569 &buf,
570 r#"{"c1":"a","c2":"a","c3":"a"}
571{"c2":"b","c3":"b"}
572{"c1":"c"}
573{"c1":"d","c2":"d","c3":"d"}
574{}
575"#,
576 );
577 }
578
579 #[test]
580 fn write_dictionary() {
581 let schema = Schema::new(vec![
582 Field::new_dictionary("c1", DataType::Int32, DataType::Utf8, true),
583 Field::new_dictionary("c2", DataType::Int8, DataType::Utf8, true),
584 ]);
585
586 let a: DictionaryArray<Int32Type> = vec![
587 Some("cupcakes"),
588 Some("foo"),
589 Some("foo"),
590 None,
591 Some("cupcakes"),
592 ]
593 .into_iter()
594 .collect();
595 let b: DictionaryArray<Int8Type> =
596 vec![Some("sdsd"), Some("sdsd"), None, Some("sd"), Some("sdsd")]
597 .into_iter()
598 .collect();
599
600 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
601
602 let mut buf = Vec::new();
603 {
604 let mut writer = LineDelimitedWriter::new(&mut buf);
605 writer.write_batches(&[&batch]).unwrap();
606 }
607
608 assert_json_eq(
609 &buf,
610 r#"{"c1":"cupcakes","c2":"sdsd"}
611{"c1":"foo","c2":"sdsd"}
612{"c1":"foo"}
613{"c2":"sd"}
614{"c1":"cupcakes","c2":"sdsd"}
615"#,
616 );
617 }
618
619 #[test]
620 fn write_list_of_dictionary() {
621 let dict_field = Arc::new(Field::new_dictionary(
622 "item",
623 DataType::Int32,
624 DataType::Utf8,
625 true,
626 ));
627 let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
628
629 let dict_array: DictionaryArray<Int32Type> =
630 vec![Some("a"), Some("b"), Some("c"), Some("a"), None, Some("c")]
631 .into_iter()
632 .collect();
633 let list_array = LargeListArray::try_new(
634 dict_field,
635 OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
636 Arc::new(dict_array),
637 Some(NullBuffer::from_iter([true, true, false, true])),
638 )
639 .unwrap();
640
641 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
642
643 let mut buf = Vec::new();
644 {
645 let mut writer = LineDelimitedWriter::new(&mut buf);
646 writer.write_batches(&[&batch]).unwrap();
647 }
648
649 assert_json_eq(
650 &buf,
651 r#"{"l":["a","b","c"]}
652{"l":["a",null]}
653{}
654{"l":["c"]}
655"#,
656 );
657 }
658
659 #[test]
660 fn write_list_of_dictionary_large_values() {
661 let dict_field = Arc::new(Field::new_dictionary(
662 "item",
663 DataType::Int32,
664 DataType::LargeUtf8,
665 true,
666 ));
667 let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
668
669 let keys = PrimitiveArray::<Int32Type>::from(vec![
670 Some(0),
671 Some(1),
672 Some(2),
673 Some(0),
674 None,
675 Some(2),
676 ]);
677 let values = LargeStringArray::from(vec!["a", "b", "c"]);
678 let dict_array = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
679
680 let list_array = LargeListArray::try_new(
681 dict_field,
682 OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
683 Arc::new(dict_array),
684 Some(NullBuffer::from_iter([true, true, false, true])),
685 )
686 .unwrap();
687
688 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
689
690 let mut buf = Vec::new();
691 {
692 let mut writer = LineDelimitedWriter::new(&mut buf);
693 writer.write_batches(&[&batch]).unwrap();
694 }
695
696 assert_json_eq(
697 &buf,
698 r#"{"l":["a","b","c"]}
699{"l":["a",null]}
700{}
701{"l":["c"]}
702"#,
703 );
704 }
705
706 #[test]
707 fn write_timestamps() {
708 let ts_string = "2018-11-13T17:11:10.011375885995";
709 let ts_nanos = ts_string
710 .parse::<chrono::NaiveDateTime>()
711 .unwrap()
712 .and_utc()
713 .timestamp_nanos_opt()
714 .unwrap();
715 let ts_micros = ts_nanos / 1000;
716 let ts_millis = ts_micros / 1000;
717 let ts_secs = ts_millis / 1000;
718
719 let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
720 let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
721 let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
722 let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
723 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
724
725 let schema = Schema::new(vec![
726 Field::new("nanos", arr_nanos.data_type().clone(), true),
727 Field::new("micros", arr_micros.data_type().clone(), true),
728 Field::new("millis", arr_millis.data_type().clone(), true),
729 Field::new("secs", arr_secs.data_type().clone(), true),
730 Field::new("name", arr_names.data_type().clone(), true),
731 ]);
732 let schema = Arc::new(schema);
733
734 let batch = RecordBatch::try_new(
735 schema,
736 vec![
737 Arc::new(arr_nanos),
738 Arc::new(arr_micros),
739 Arc::new(arr_millis),
740 Arc::new(arr_secs),
741 Arc::new(arr_names),
742 ],
743 )
744 .unwrap();
745
746 let mut buf = Vec::new();
747 {
748 let mut writer = LineDelimitedWriter::new(&mut buf);
749 writer.write_batches(&[&batch]).unwrap();
750 }
751
752 assert_json_eq(
753 &buf,
754 r#"{"micros":"2018-11-13T17:11:10.011375","millis":"2018-11-13T17:11:10.011","name":"a","nanos":"2018-11-13T17:11:10.011375885","secs":"2018-11-13T17:11:10"}
755{"name":"b"}
756"#,
757 );
758
759 let mut buf = Vec::new();
760 {
761 let mut writer = WriterBuilder::new()
762 .with_timestamp_format("%m-%d-%Y".to_string())
763 .build::<_, LineDelimited>(&mut buf);
764 writer.write_batches(&[&batch]).unwrap();
765 }
766
767 assert_json_eq(
768 &buf,
769 r#"{"nanos":"11-13-2018","micros":"11-13-2018","millis":"11-13-2018","secs":"11-13-2018","name":"a"}
770{"name":"b"}
771"#,
772 );
773 }
774
775 #[test]
776 fn write_timestamps_with_tz() {
777 let ts_string = "2018-11-13T17:11:10.011375885995";
778 let ts_nanos = ts_string
779 .parse::<chrono::NaiveDateTime>()
780 .unwrap()
781 .and_utc()
782 .timestamp_nanos_opt()
783 .unwrap();
784 let ts_micros = ts_nanos / 1000;
785 let ts_millis = ts_micros / 1000;
786 let ts_secs = ts_millis / 1000;
787
788 let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
789 let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
790 let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
791 let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
792 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
793
794 let tz = "+00:00";
795
796 let arr_nanos = arr_nanos.with_timezone(tz);
797 let arr_micros = arr_micros.with_timezone(tz);
798 let arr_millis = arr_millis.with_timezone(tz);
799 let arr_secs = arr_secs.with_timezone(tz);
800
801 let schema = Schema::new(vec![
802 Field::new("nanos", arr_nanos.data_type().clone(), true),
803 Field::new("micros", arr_micros.data_type().clone(), true),
804 Field::new("millis", arr_millis.data_type().clone(), true),
805 Field::new("secs", arr_secs.data_type().clone(), true),
806 Field::new("name", arr_names.data_type().clone(), true),
807 ]);
808 let schema = Arc::new(schema);
809
810 let batch = RecordBatch::try_new(
811 schema,
812 vec![
813 Arc::new(arr_nanos),
814 Arc::new(arr_micros),
815 Arc::new(arr_millis),
816 Arc::new(arr_secs),
817 Arc::new(arr_names),
818 ],
819 )
820 .unwrap();
821
822 let mut buf = Vec::new();
823 {
824 let mut writer = LineDelimitedWriter::new(&mut buf);
825 writer.write_batches(&[&batch]).unwrap();
826 }
827
828 assert_json_eq(
829 &buf,
830 r#"{"micros":"2018-11-13T17:11:10.011375Z","millis":"2018-11-13T17:11:10.011Z","name":"a","nanos":"2018-11-13T17:11:10.011375885Z","secs":"2018-11-13T17:11:10Z"}
831{"name":"b"}
832"#,
833 );
834
835 let mut buf = Vec::new();
836 {
837 let mut writer = WriterBuilder::new()
838 .with_timestamp_tz_format("%m-%d-%Y %Z".to_string())
839 .build::<_, LineDelimited>(&mut buf);
840 writer.write_batches(&[&batch]).unwrap();
841 }
842
843 assert_json_eq(
844 &buf,
845 r#"{"nanos":"11-13-2018 +00:00","micros":"11-13-2018 +00:00","millis":"11-13-2018 +00:00","secs":"11-13-2018 +00:00","name":"a"}
846{"name":"b"}
847"#,
848 );
849 }
850
851 #[test]
852 fn write_dates() {
853 let ts_string = "2018-11-13T17:11:10.011375885995";
854 let ts_millis = ts_string
855 .parse::<chrono::NaiveDateTime>()
856 .unwrap()
857 .and_utc()
858 .timestamp_millis();
859
860 let arr_date32 = Date32Array::from(vec![
861 Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()),
862 None,
863 ]);
864 let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]);
865 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
866
867 let schema = Schema::new(vec![
868 Field::new("date32", arr_date32.data_type().clone(), true),
869 Field::new("date64", arr_date64.data_type().clone(), true),
870 Field::new("name", arr_names.data_type().clone(), false),
871 ]);
872 let schema = Arc::new(schema);
873
874 let batch = RecordBatch::try_new(
875 schema,
876 vec![
877 Arc::new(arr_date32),
878 Arc::new(arr_date64),
879 Arc::new(arr_names),
880 ],
881 )
882 .unwrap();
883
884 let mut buf = Vec::new();
885 {
886 let mut writer = LineDelimitedWriter::new(&mut buf);
887 writer.write_batches(&[&batch]).unwrap();
888 }
889
890 assert_json_eq(
891 &buf,
892 r#"{"date32":"2018-11-13","date64":"2018-11-13T17:11:10.011","name":"a"}
893{"name":"b"}
894"#,
895 );
896
897 let mut buf = Vec::new();
898 {
899 let mut writer = WriterBuilder::new()
900 .with_date_format("%m-%d-%Y".to_string())
901 .with_datetime_format("%m-%d-%Y %Mmin %Ssec %Hhour".to_string())
902 .build::<_, LineDelimited>(&mut buf);
903 writer.write_batches(&[&batch]).unwrap();
904 }
905
906 assert_json_eq(
907 &buf,
908 r#"{"date32":"11-13-2018","date64":"11-13-2018 11min 10sec 17hour","name":"a"}
909{"name":"b"}
910"#,
911 );
912 }
913
914 #[test]
915 fn write_times() {
916 let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]);
917 let arr_time32msec = Time32MillisecondArray::from(vec![Some(120), None]);
918 let arr_time64usec = Time64MicrosecondArray::from(vec![Some(120), None]);
919 let arr_time64nsec = Time64NanosecondArray::from(vec![Some(120), None]);
920 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
921
922 let schema = Schema::new(vec![
923 Field::new("time32sec", arr_time32sec.data_type().clone(), true),
924 Field::new("time32msec", arr_time32msec.data_type().clone(), true),
925 Field::new("time64usec", arr_time64usec.data_type().clone(), true),
926 Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
927 Field::new("name", arr_names.data_type().clone(), true),
928 ]);
929 let schema = Arc::new(schema);
930
931 let batch = RecordBatch::try_new(
932 schema,
933 vec![
934 Arc::new(arr_time32sec),
935 Arc::new(arr_time32msec),
936 Arc::new(arr_time64usec),
937 Arc::new(arr_time64nsec),
938 Arc::new(arr_names),
939 ],
940 )
941 .unwrap();
942
943 let mut buf = Vec::new();
944 {
945 let mut writer = LineDelimitedWriter::new(&mut buf);
946 writer.write_batches(&[&batch]).unwrap();
947 }
948
949 assert_json_eq(
950 &buf,
951 r#"{"time32sec":"00:02:00","time32msec":"00:00:00.120","time64usec":"00:00:00.000120","time64nsec":"00:00:00.000000120","name":"a"}
952{"name":"b"}
953"#,
954 );
955
956 let mut buf = Vec::new();
957 {
958 let mut writer = WriterBuilder::new()
959 .with_time_format("%H-%M-%S %f".to_string())
960 .build::<_, LineDelimited>(&mut buf);
961 writer.write_batches(&[&batch]).unwrap();
962 }
963
964 assert_json_eq(
965 &buf,
966 r#"{"time32sec":"00-02-00 000000000","time32msec":"00-00-00 120000000","time64usec":"00-00-00 000120000","time64nsec":"00-00-00 000000120","name":"a"}
967{"name":"b"}
968"#,
969 );
970 }
971
972 #[test]
973 fn write_durations() {
974 let arr_durationsec = DurationSecondArray::from(vec![Some(120), None]);
975 let arr_durationmsec = DurationMillisecondArray::from(vec![Some(120), None]);
976 let arr_durationusec = DurationMicrosecondArray::from(vec![Some(120), None]);
977 let arr_durationnsec = DurationNanosecondArray::from(vec![Some(120), None]);
978 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
979
980 let schema = Schema::new(vec![
981 Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
982 Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
983 Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
984 Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
985 Field::new("name", arr_names.data_type().clone(), true),
986 ]);
987 let schema = Arc::new(schema);
988
989 let batch = RecordBatch::try_new(
990 schema,
991 vec![
992 Arc::new(arr_durationsec),
993 Arc::new(arr_durationmsec),
994 Arc::new(arr_durationusec),
995 Arc::new(arr_durationnsec),
996 Arc::new(arr_names),
997 ],
998 )
999 .unwrap();
1000
1001 let mut buf = Vec::new();
1002 {
1003 let mut writer = LineDelimitedWriter::new(&mut buf);
1004 writer.write_batches(&[&batch]).unwrap();
1005 }
1006
1007 assert_json_eq(
1008 &buf,
1009 r#"{"duration_sec":"PT120S","duration_msec":"PT0.12S","duration_usec":"PT0.00012S","duration_nsec":"PT0.00000012S","name":"a"}
1010{"name":"b"}
1011"#,
1012 );
1013 }
1014
1015 #[test]
1016 fn write_nested_structs() {
1017 let schema = Schema::new(vec![
1018 Field::new(
1019 "c1",
1020 DataType::Struct(Fields::from(vec![
1021 Field::new("c11", DataType::Int32, true),
1022 Field::new(
1023 "c12",
1024 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1025 false,
1026 ),
1027 ])),
1028 false,
1029 ),
1030 Field::new("c2", DataType::Utf8, false),
1031 ]);
1032
1033 let c1 = StructArray::from(vec![
1034 (
1035 Arc::new(Field::new("c11", DataType::Int32, true)),
1036 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1037 ),
1038 (
1039 Arc::new(Field::new(
1040 "c12",
1041 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1042 false,
1043 )),
1044 Arc::new(StructArray::from(vec![(
1045 Arc::new(Field::new("c121", DataType::Utf8, false)),
1046 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1047 )])) as ArrayRef,
1048 ),
1049 ]);
1050 let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1051
1052 let batch =
1053 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1054
1055 let mut buf = Vec::new();
1056 {
1057 let mut writer = LineDelimitedWriter::new(&mut buf);
1058 writer.write_batches(&[&batch]).unwrap();
1059 }
1060
1061 assert_json_eq(
1062 &buf,
1063 r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"}
1064{"c1":{"c12":{"c121":"f"}},"c2":"b"}
1065{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}
1066"#,
1067 );
1068 }
1069
1070 #[test]
1071 fn write_struct_with_list_field() {
1072 let field_c_list = Arc::new(Field::new("c_list", DataType::Utf8, false));
1073 let field_c1 = Field::new("c1", DataType::List(field_c_list.clone()), false);
1074 let field_c2 = Field::new("c2", DataType::Int32, false);
1075 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1076
1077 let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]);
1078 let a = ListArray::new(
1080 field_c_list,
1081 OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 3, 4, 5, 6])),
1082 Arc::new(a_values),
1083 None,
1084 );
1085
1086 let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
1087
1088 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1089
1090 let mut buf = Vec::new();
1091 {
1092 let mut writer = LineDelimitedWriter::new(&mut buf);
1093 writer.write_batches(&[&batch]).unwrap();
1094 }
1095
1096 assert_json_eq(
1097 &buf,
1098 r#"{"c1":["a","a1"],"c2":1}
1099{"c1":["b"],"c2":2}
1100{"c1":["c"],"c2":3}
1101{"c1":["d"],"c2":4}
1102{"c1":["e"],"c2":5}
1103"#,
1104 );
1105 }
1106
1107 #[test]
1108 fn write_nested_list() {
1109 let field_b = Arc::new(Field::new("b", DataType::Int32, false));
1110 let field_a = Arc::new(Field::new("a", DataType::List(field_b.clone()), false));
1111 let field_c1 = Field::new("c1", DataType::List(field_a.clone()), false);
1112 let field_c2 = Field::new("c2", DataType::Utf8, true);
1113 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1114
1115 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1117
1118 let a_list = ListArray::new(
1119 field_b,
1120 OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 3, 6])),
1121 Arc::new(a_values),
1122 None,
1123 );
1124
1125 let c1 = ListArray::new(
1126 field_a,
1127 OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 2, 3])),
1128 Arc::new(a_list),
1129 None,
1130 );
1131 let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]);
1132
1133 let batch =
1134 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1135
1136 let mut buf = Vec::new();
1137 {
1138 let mut writer = LineDelimitedWriter::new(&mut buf);
1139 writer.write_batches(&[&batch]).unwrap();
1140 }
1141
1142 assert_json_eq(
1143 &buf,
1144 r#"{"c1":[[1,2],[3]],"c2":"foo"}
1145{"c1":[],"c2":"bar"}
1146{"c1":[[4,5,6]]}
1147"#,
1148 );
1149 }
1150
1151 #[test]
1152 fn write_list_of_struct() {
1153 let field_c1 = Field::new(
1154 "c1",
1155 DataType::List(Arc::new(Field::new(
1156 "s",
1157 DataType::Struct(Fields::from(vec![
1158 Field::new("c11", DataType::Int32, true),
1159 Field::new(
1160 "c12",
1161 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1162 false,
1163 ),
1164 ])),
1165 false,
1166 ))),
1167 true,
1168 );
1169 let field_c2 = Field::new("c2", DataType::Int32, false);
1170 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1171
1172 let struct_values = StructArray::from(vec![
1173 (
1174 Arc::new(Field::new("c11", DataType::Int32, true)),
1175 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1176 ),
1177 (
1178 Arc::new(Field::new(
1179 "c12",
1180 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1181 false,
1182 )),
1183 Arc::new(StructArray::from(vec![(
1184 Arc::new(Field::new("c121", DataType::Utf8, false)),
1185 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1186 )])) as ArrayRef,
1187 ),
1188 ]);
1189
1190 let c1_inner = match field_c1.data_type() {
1195 DataType::List(f) => f.clone(),
1196 _ => unreachable!(),
1197 };
1198 let c1 = ListArray::new(
1199 c1_inner,
1200 OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 2, 3])),
1201 Arc::new(struct_values),
1202 Some(NullBuffer::from(vec![true, false, true])),
1203 );
1204
1205 let c2 = Int32Array::from(vec![1, 2, 3]);
1206
1207 let batch =
1208 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1209
1210 let mut buf = Vec::new();
1211 {
1212 let mut writer = LineDelimitedWriter::new(&mut buf);
1213 writer.write_batches(&[&batch]).unwrap();
1214 }
1215
1216 assert_json_eq(
1217 &buf,
1218 r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1}
1219{"c2":2}
1220{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3}
1221"#,
1222 );
1223 }
1224
1225 fn assert_write_list_view<O: OffsetSizeTrait>() {
1226 let field = Arc::new(Field::new("item", DataType::Int32, true));
1227 let data_type = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(field.clone());
1228 let schema = Schema::new(vec![Field::new("lv", data_type, true)]);
1229
1230 let values = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), None, Some(6)]);
1232 let offsets = [0, 3, 0, 5]
1233 .iter()
1234 .map(|&v| O::from_usize(v).unwrap())
1235 .collect::<Vec<_>>();
1236 let sizes = [3, 2, 0, 1]
1237 .iter()
1238 .map(|&v| O::from_usize(v).unwrap())
1239 .collect::<Vec<_>>();
1240 let list_view = GenericListViewArray::<O>::try_new(
1241 field,
1242 ScalarBuffer::from(offsets),
1243 ScalarBuffer::from(sizes),
1244 Arc::new(values),
1245 Some(NullBuffer::from_iter([true, true, false, true])),
1246 )
1247 .unwrap();
1248
1249 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
1250
1251 let mut buf = Vec::new();
1252 {
1253 let mut writer = LineDelimitedWriter::new(&mut buf);
1254 writer.write_batches(&[&batch]).unwrap();
1255 }
1256
1257 assert_json_eq(
1258 &buf,
1259 r#"{"lv":[1,2,3]}
1260{"lv":[4,null]}
1261{}
1262{"lv":[6]}
1263"#,
1264 );
1265 }
1266
1267 #[test]
1268 fn write_list_view() {
1269 assert_write_list_view::<i32>();
1270 assert_write_list_view::<i64>();
1271 }
1272
1273 fn test_write_for_file(test_file: &str, remove_nulls: bool) {
1274 let file = File::open(test_file).unwrap();
1275 let mut reader = BufReader::new(file);
1276 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1277 reader.rewind().unwrap();
1278
1279 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1280 let mut reader = builder.build(reader).unwrap();
1281 let batch = reader.next().unwrap().unwrap();
1282
1283 let mut buf = Vec::new();
1284 {
1285 if remove_nulls {
1286 let mut writer = LineDelimitedWriter::new(&mut buf);
1287 writer.write_batches(&[&batch]).unwrap();
1288 } else {
1289 let mut writer = WriterBuilder::new()
1290 .with_explicit_nulls(true)
1291 .build::<_, LineDelimited>(&mut buf);
1292 writer.write_batches(&[&batch]).unwrap();
1293 }
1294 }
1295
1296 let result = str::from_utf8(&buf).unwrap();
1297 let expected = read_to_string(test_file).unwrap();
1298 for (r, e) in result.lines().zip(expected.lines()) {
1299 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1300 if remove_nulls {
1301 if let Value::Object(obj) = expected_json {
1303 expected_json =
1304 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1305 }
1306 }
1307 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1308 }
1309 }
1310
1311 #[test]
1312 fn write_basic_rows() {
1313 test_write_for_file("test/data/basic.json", true);
1314 }
1315
1316 #[test]
1317 fn write_arrays() {
1318 test_write_for_file("test/data/arrays.json", true);
1319 }
1320
1321 #[test]
1322 fn write_basic_nulls() {
1323 test_write_for_file("test/data/basic_nulls.json", true);
1324 }
1325
1326 #[test]
1327 fn write_nested_with_nulls() {
1328 test_write_for_file("test/data/nested_with_nulls.json", false);
1329 }
1330
1331 #[test]
1332 fn json_line_writer_empty() {
1333 let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1334 writer.finish().unwrap();
1335 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1336 }
1337
1338 #[test]
1339 fn json_array_writer_empty() {
1340 let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1341 writer.finish().unwrap();
1342 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1343 }
1344
1345 #[test]
1346 fn json_line_writer_empty_batch() {
1347 let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1348
1349 let array = Int32Array::from(Vec::<i32>::new());
1350 let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1351 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1352
1353 writer.write(&batch).unwrap();
1354 writer.finish().unwrap();
1355 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1356 }
1357
1358 #[test]
1359 fn json_array_writer_empty_batch() {
1360 let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1361
1362 let array = Int32Array::from(Vec::<i32>::new());
1363 let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1364 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1365
1366 writer.write(&batch).unwrap();
1367 writer.finish().unwrap();
1368 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1369 }
1370
1371 #[test]
1372 fn json_struct_array_nulls() {
1373 let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1374 Some(vec![Some(1), Some(2)]),
1375 Some(vec![None]),
1376 Some(vec![]),
1377 Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
1379 None, None,
1381 ]);
1382
1383 let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1384 let array = Arc::new(inner) as ArrayRef;
1385 let struct_array_a = StructArray::from((
1386 vec![(field.clone(), array.clone())],
1387 Buffer::from([0b01010111]),
1388 ));
1389 let struct_array_b = StructArray::from(vec![(field, array)]);
1390
1391 let schema = Schema::new(vec![
1392 Field::new_struct("a", struct_array_a.fields().clone(), true),
1393 Field::new_struct("b", struct_array_b.fields().clone(), true),
1394 ]);
1395
1396 let batch = RecordBatch::try_new(
1397 Arc::new(schema),
1398 vec![Arc::new(struct_array_a), Arc::new(struct_array_b)],
1399 )
1400 .unwrap();
1401
1402 let mut buf = Vec::new();
1403 {
1404 let mut writer = LineDelimitedWriter::new(&mut buf);
1405 writer.write_batches(&[&batch]).unwrap();
1406 }
1407
1408 assert_json_eq(
1409 &buf,
1410 r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
1411{"a":{"list":[null]},"b":{"list":[null]}}
1412{"a":{"list":[]},"b":{"list":[]}}
1413{"b":{"list":[3,null]}}
1414{"a":{"list":[4,5]},"b":{"list":[4,5]}}
1415{"b":{}}
1416{"a":{},"b":{}}
1417"#,
1418 );
1419 }
1420
1421 fn run_json_writer_map_with_keys(keys_array: ArrayRef) {
1422 let values_array = super::Int64Array::from(vec![10, 20, 30, 40, 50]);
1423
1424 let keys_field = Arc::new(Field::new("keys", keys_array.data_type().clone(), false));
1425 let values_field = Arc::new(Field::new("values", DataType::Int64, false));
1426 let entry_struct = StructArray::from(vec![
1427 (keys_field, keys_array.clone()),
1428 (values_field, Arc::new(values_array) as ArrayRef),
1429 ]);
1430
1431 let entries_field = Arc::new(Field::new(
1432 "entries",
1433 entry_struct.data_type().clone(),
1434 false,
1435 ));
1436
1437 let map = MapArray::new(
1439 entries_field.clone(),
1440 OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 1, 1, 1, 4, 5, 5])),
1441 entry_struct,
1442 Some(NullBuffer::from(vec![true, false, true, true, true, true])),
1443 false,
1444 );
1445
1446 let map_field = Field::new("map", DataType::Map(entries_field, false), true);
1447 let schema = Arc::new(Schema::new(vec![map_field]));
1448
1449 let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
1450
1451 let mut buf = Vec::new();
1452 {
1453 let mut writer = LineDelimitedWriter::new(&mut buf);
1454 writer.write_batches(&[&batch]).unwrap();
1455 }
1456
1457 assert_json_eq(
1458 &buf,
1459 r#"{"map":{"foo":10}}
1460{}
1461{"map":{}}
1462{"map":{"bar":20,"baz":30,"qux":40}}
1463{"map":{"quux":50}}
1464{"map":{}}
1465"#,
1466 );
1467 }
1468
1469 #[test]
1470 fn json_writer_map() {
1471 let keys_utf8 = super::StringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1473 run_json_writer_map_with_keys(Arc::new(keys_utf8) as ArrayRef);
1474
1475 let keys_large = super::LargeStringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1477 run_json_writer_map_with_keys(Arc::new(keys_large) as ArrayRef);
1478
1479 let keys_view = super::StringViewArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1481 run_json_writer_map_with_keys(Arc::new(keys_view) as ArrayRef);
1482 }
1483
1484 #[test]
1485 fn test_write_single_batch() {
1486 let test_file = "test/data/basic.json";
1487 let file = File::open(test_file).unwrap();
1488 let mut reader = BufReader::new(file);
1489 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1490 reader.rewind().unwrap();
1491
1492 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1493 let mut reader = builder.build(reader).unwrap();
1494 let batch = reader.next().unwrap().unwrap();
1495
1496 let mut buf = Vec::new();
1497 {
1498 let mut writer = LineDelimitedWriter::new(&mut buf);
1499 writer.write(&batch).unwrap();
1500 }
1501
1502 let result = str::from_utf8(&buf).unwrap();
1503 let expected = read_to_string(test_file).unwrap();
1504 for (r, e) in result.lines().zip(expected.lines()) {
1505 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1506 if let Value::Object(obj) = expected_json {
1508 expected_json =
1509 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1510 }
1511 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1512 }
1513 }
1514
1515 #[test]
1516 fn test_write_multi_batches() {
1517 let test_file = "test/data/basic.json";
1518
1519 let schema = SchemaRef::new(Schema::new(vec![
1520 Field::new("a", DataType::Int64, true),
1521 Field::new("b", DataType::Float64, true),
1522 Field::new("c", DataType::Boolean, true),
1523 Field::new("d", DataType::Utf8, true),
1524 Field::new("e", DataType::Utf8, true),
1525 Field::new("f", DataType::Utf8, true),
1526 Field::new("g", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1527 Field::new("h", DataType::Float16, true),
1528 ]));
1529
1530 let mut reader = ReaderBuilder::new(schema.clone())
1531 .build(BufReader::new(File::open(test_file).unwrap()))
1532 .unwrap();
1533 let batch = reader.next().unwrap().unwrap();
1534
1535 let batches = [&RecordBatch::new_empty(schema), &batch, &batch];
1537
1538 let mut buf = Vec::new();
1539 {
1540 let mut writer = LineDelimitedWriter::new(&mut buf);
1541 writer.write_batches(&batches).unwrap();
1542 }
1543
1544 let result = str::from_utf8(&buf).unwrap();
1545 let expected = read_to_string(test_file).unwrap();
1546 let expected = format!("{expected}\n{expected}");
1548 for (r, e) in result.lines().zip(expected.lines()) {
1549 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1550 if let Value::Object(obj) = expected_json {
1552 expected_json =
1553 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1554 }
1555 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1556 }
1557 }
1558
1559 #[test]
1560 fn test_writer_explicit_nulls() -> Result<(), ArrowError> {
1561 fn nested_list() -> (Arc<ListArray>, Arc<Field>) {
1562 let array = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1563 Some(vec![None, None, None]),
1564 Some(vec![Some(1), Some(2), Some(3)]),
1565 None,
1566 Some(vec![None, None, None]),
1567 ]));
1568 let field = Arc::new(Field::new("list", array.data_type().clone(), true));
1569 (array, field)
1571 }
1572
1573 fn nested_dict() -> (Arc<DictionaryArray<Int32Type>>, Arc<Field>) {
1574 let array = Arc::new(DictionaryArray::from_iter(vec![
1575 Some("cupcakes"),
1576 None,
1577 Some("bear"),
1578 Some("kuma"),
1579 ]));
1580 let field = Arc::new(Field::new("dict", array.data_type().clone(), true));
1581 (array, field)
1583 }
1584
1585 fn nested_map() -> (Arc<MapArray>, Arc<Field>) {
1586 let string_builder = StringBuilder::new();
1587 let int_builder = Int64Builder::new();
1588 let mut builder = MapBuilder::new(None, string_builder, int_builder);
1589
1590 builder.keys().append_value("foo");
1592 builder.values().append_value(10);
1593 builder.append(true).unwrap();
1594
1595 builder.append(false).unwrap();
1596
1597 builder.append(true).unwrap();
1598
1599 builder.keys().append_value("bar");
1600 builder.values().append_value(20);
1601 builder.keys().append_value("baz");
1602 builder.values().append_value(30);
1603 builder.keys().append_value("qux");
1604 builder.values().append_value(40);
1605 builder.append(true).unwrap();
1606
1607 let array = Arc::new(builder.finish());
1608 let field = Arc::new(Field::new("map", array.data_type().clone(), true));
1609 (array, field)
1610 }
1611
1612 fn root_list() -> (Arc<ListArray>, Field) {
1613 let struct_array = StructArray::from(vec![
1614 (
1615 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1616 Arc::new(StringArray::from(vec![Some("a"), Some("b"), None, None])) as ArrayRef,
1617 ),
1618 (
1619 Arc::new(Field::new("int32", DataType::Int32, true)),
1620 Arc::new(Int32Array::from(vec![Some(1), None, Some(5), None])) as ArrayRef,
1621 ),
1622 ]);
1623
1624 let values_field =
1625 Arc::new(Field::new("struct", struct_array.data_type().clone(), true));
1626 let field = Field::new_list("list", values_field.as_ref().clone(), true);
1627
1628 let array = Arc::new(ListArray::new(
1630 values_field,
1631 OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 2, 3, 3])),
1632 Arc::new(struct_array),
1633 Some(NullBuffer::from(vec![true, false, true, false])),
1634 ));
1635 (array, field)
1636 }
1637
1638 let (nested_list_array, nested_list_field) = nested_list();
1639 let (nested_dict_array, nested_dict_field) = nested_dict();
1640 let (nested_map_array, nested_map_field) = nested_map();
1641 let (root_list_array, root_list_field) = root_list();
1642
1643 let schema = Schema::new(vec![
1644 Field::new("date", DataType::Date32, true),
1645 Field::new("null", DataType::Null, true),
1646 Field::new_struct(
1647 "struct",
1648 vec![
1649 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1650 nested_list_field.clone(),
1651 nested_dict_field.clone(),
1652 nested_map_field.clone(),
1653 ],
1654 true,
1655 ),
1656 root_list_field,
1657 ]);
1658
1659 let arr_date32 = Date32Array::from(vec![Some(0), None, Some(1), None]);
1660 let arr_null = NullArray::new(4);
1661 let arr_struct = StructArray::from(vec![
1662 (
1664 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1665 Arc::new(StringArray::from(vec![Some("a"), None, None, Some("b")])) as ArrayRef,
1666 ),
1667 (nested_list_field, nested_list_array as ArrayRef),
1669 (nested_dict_field, nested_dict_array as ArrayRef),
1671 (nested_map_field, nested_map_array as ArrayRef),
1673 ]);
1674
1675 let batch = RecordBatch::try_new(
1676 Arc::new(schema),
1677 vec![
1678 Arc::new(arr_date32),
1680 Arc::new(arr_null),
1682 Arc::new(arr_struct),
1683 root_list_array,
1685 ],
1686 )?;
1687
1688 let mut buf = Vec::new();
1689 {
1690 let mut writer = WriterBuilder::new()
1691 .with_explicit_nulls(true)
1692 .build::<_, JsonArray>(&mut buf);
1693 writer.write_batches(&[&batch])?;
1694 writer.finish()?;
1695 }
1696
1697 let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
1698 let expected = serde_json::from_value::<Vec<Value>>(json!([
1699 {
1700 "date": "1970-01-01",
1701 "list": [
1702 {
1703 "int32": 1,
1704 "utf8": "a"
1705 },
1706 {
1707 "int32": null,
1708 "utf8": "b"
1709 }
1710 ],
1711 "null": null,
1712 "struct": {
1713 "dict": "cupcakes",
1714 "list": [
1715 null,
1716 null,
1717 null
1718 ],
1719 "map": {
1720 "foo": 10
1721 },
1722 "utf8": "a"
1723 }
1724 },
1725 {
1726 "date": null,
1727 "list": null,
1728 "null": null,
1729 "struct": {
1730 "dict": null,
1731 "list": [
1732 1,
1733 2,
1734 3
1735 ],
1736 "map": null,
1737 "utf8": null
1738 }
1739 },
1740 {
1741 "date": "1970-01-02",
1742 "list": [
1743 {
1744 "int32": 5,
1745 "utf8": null
1746 }
1747 ],
1748 "null": null,
1749 "struct": {
1750 "dict": "bear",
1751 "list": null,
1752 "map": {},
1753 "utf8": null
1754 }
1755 },
1756 {
1757 "date": null,
1758 "list": null,
1759 "null": null,
1760 "struct": {
1761 "dict": "kuma",
1762 "list": [
1763 null,
1764 null,
1765 null
1766 ],
1767 "map": {
1768 "bar": 20,
1769 "baz": 30,
1770 "qux": 40
1771 },
1772 "utf8": "b"
1773 }
1774 }
1775 ]))
1776 .unwrap();
1777
1778 assert_eq!(actual, expected);
1779
1780 Ok(())
1781 }
1782
1783 fn build_array_binary<O: OffsetSizeTrait>(values: &[Option<&[u8]>]) -> RecordBatch {
1784 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1785 "bytes",
1786 GenericBinaryType::<O>::DATA_TYPE,
1787 true,
1788 )]));
1789 let mut builder = GenericByteBuilder::<GenericBinaryType<O>>::new();
1790 for value in values {
1791 match value {
1792 Some(v) => builder.append_value(v),
1793 None => builder.append_null(),
1794 }
1795 }
1796 let array = Arc::new(builder.finish()) as ArrayRef;
1797 RecordBatch::try_new(schema, vec![array]).unwrap()
1798 }
1799
1800 fn build_array_binary_view(values: &[Option<&[u8]>]) -> RecordBatch {
1801 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1802 "bytes",
1803 DataType::BinaryView,
1804 true,
1805 )]));
1806 let mut builder = BinaryViewBuilder::new();
1807 for value in values {
1808 match value {
1809 Some(v) => builder.append_value(v),
1810 None => builder.append_null(),
1811 }
1812 }
1813 let array = Arc::new(builder.finish()) as ArrayRef;
1814 RecordBatch::try_new(schema, vec![array]).unwrap()
1815 }
1816
1817 fn assert_binary_json(batch: &RecordBatch) {
1818 {
1820 let mut buf = Vec::new();
1821 let json_value: Value = {
1822 let mut writer = WriterBuilder::new()
1823 .with_explicit_nulls(true)
1824 .build::<_, JsonArray>(&mut buf);
1825 writer.write(batch).unwrap();
1826 writer.close().unwrap();
1827 serde_json::from_slice(&buf).unwrap()
1828 };
1829
1830 assert_eq!(
1831 json!([
1832 {
1833 "bytes": "4e656420466c616e64657273"
1834 },
1835 {
1836 "bytes": null },
1838 {
1839 "bytes": "54726f79204d63436c757265"
1840 }
1841 ]),
1842 json_value,
1843 );
1844 }
1845
1846 {
1848 let mut buf = Vec::new();
1849 let json_value: Value = {
1850 let mut writer = ArrayWriter::new(&mut buf);
1853 writer.write(batch).unwrap();
1854 writer.close().unwrap();
1855 serde_json::from_slice(&buf).unwrap()
1856 };
1857
1858 assert_eq!(
1859 json!([
1860 { "bytes": "4e656420466c616e64657273" },
1861 {},
1862 { "bytes": "54726f79204d63436c757265" }
1863 ]),
1864 json_value
1865 );
1866 }
1867 }
1868
1869 #[test]
1870 fn test_writer_binary() {
1871 let values: [Option<&[u8]>; 3] = [
1872 Some(b"Ned Flanders" as &[u8]),
1873 None,
1874 Some(b"Troy McClure" as &[u8]),
1875 ];
1876 {
1878 let batch = build_array_binary::<i32>(&values);
1879 assert_binary_json(&batch);
1880 }
1881 {
1883 let batch = build_array_binary::<i64>(&values);
1884 assert_binary_json(&batch);
1885 }
1886 {
1887 let batch = build_array_binary_view(&values);
1888 assert_binary_json(&batch);
1889 }
1890 }
1891
1892 #[test]
1893 fn test_writer_fixed_size_binary() {
1894 let size = 11;
1896 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1897 "bytes",
1898 DataType::FixedSizeBinary(size),
1899 true,
1900 )]));
1901
1902 let mut builder = FixedSizeBinaryBuilder::new(size);
1904 let values = [Some(b"hello world"), None, Some(b"summer rain")];
1905 for value in values {
1906 match value {
1907 Some(v) => builder.append_value(v).unwrap(),
1908 None => builder.append_null(),
1909 }
1910 }
1911 let array = Arc::new(builder.finish()) as ArrayRef;
1912 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1913
1914 {
1916 let mut buf = Vec::new();
1917 let json_value: Value = {
1918 let mut writer = WriterBuilder::new()
1919 .with_explicit_nulls(true)
1920 .build::<_, JsonArray>(&mut buf);
1921 writer.write(&batch).unwrap();
1922 writer.close().unwrap();
1923 serde_json::from_slice(&buf).unwrap()
1924 };
1925
1926 assert_eq!(
1927 json!([
1928 {
1929 "bytes": "68656c6c6f20776f726c64"
1930 },
1931 {
1932 "bytes": null },
1934 {
1935 "bytes": "73756d6d6572207261696e"
1936 }
1937 ]),
1938 json_value,
1939 );
1940 }
1941 {
1943 let mut buf = Vec::new();
1944 let json_value: Value = {
1945 let mut writer = ArrayWriter::new(&mut buf);
1948 writer.write(&batch).unwrap();
1949 writer.close().unwrap();
1950 serde_json::from_slice(&buf).unwrap()
1951 };
1952
1953 assert_eq!(
1954 json!([
1955 {
1956 "bytes": "68656c6c6f20776f726c64"
1957 },
1958 {}, {
1960 "bytes": "73756d6d6572207261696e"
1961 }
1962 ]),
1963 json_value,
1964 );
1965 }
1966 }
1967
1968 #[test]
1969 fn test_writer_fixed_size_list() {
1970 let size = 3;
1971 let field = FieldRef::new(Field::new_list_field(DataType::Int32, true));
1972 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1973 "list",
1974 DataType::FixedSizeList(field, size),
1975 true,
1976 )]));
1977
1978 let values_builder = Int32Builder::new();
1979 let mut list_builder = FixedSizeListBuilder::new(values_builder, size);
1980 let lists = [
1981 Some([Some(1), Some(2), None]),
1982 Some([Some(3), None, Some(4)]),
1983 Some([None, Some(5), Some(6)]),
1984 None,
1985 ];
1986 for list in lists {
1987 match list {
1988 Some(l) => {
1989 for value in l {
1990 match value {
1991 Some(v) => list_builder.values().append_value(v),
1992 None => list_builder.values().append_null(),
1993 }
1994 }
1995 list_builder.append(true);
1996 }
1997 None => {
1998 for _ in 0..size {
1999 list_builder.values().append_null();
2000 }
2001 list_builder.append(false);
2002 }
2003 }
2004 }
2005 let array = Arc::new(list_builder.finish()) as ArrayRef;
2006 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2007
2008 {
2010 let json_value: Value = {
2011 let mut buf = Vec::new();
2012 let mut writer = WriterBuilder::new()
2013 .with_explicit_nulls(true)
2014 .build::<_, JsonArray>(&mut buf);
2015 writer.write(&batch).unwrap();
2016 writer.close().unwrap();
2017 serde_json::from_slice(&buf).unwrap()
2018 };
2019 assert_eq!(
2020 json!([
2021 {"list": [1, 2, null]},
2022 {"list": [3, null, 4]},
2023 {"list": [null, 5, 6]},
2024 {"list": null},
2025 ]),
2026 json_value
2027 );
2028 }
2029 {
2031 let json_value: Value = {
2032 let mut buf = Vec::new();
2033 let mut writer = ArrayWriter::new(&mut buf);
2034 writer.write(&batch).unwrap();
2035 writer.close().unwrap();
2036 serde_json::from_slice(&buf).unwrap()
2037 };
2038 assert_eq!(
2039 json!([
2040 {"list": [1, 2, null]},
2041 {"list": [3, null, 4]},
2042 {"list": [null, 5, 6]},
2043 {}, ]),
2045 json_value
2046 );
2047 }
2048 }
2049
2050 #[test]
2051 fn test_writer_null_dict() {
2052 let keys = Int32Array::from_iter(vec![Some(0), None, Some(1)]);
2053 let values = Arc::new(StringArray::from_iter(vec![Some("a"), None]));
2054 let dict = DictionaryArray::new(keys, values);
2055
2056 let schema = SchemaRef::new(Schema::new(vec![Field::new(
2057 "my_dict",
2058 DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into()),
2059 true,
2060 )]));
2061
2062 let array = Arc::new(dict) as ArrayRef;
2063 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2064
2065 let mut json = Vec::new();
2066 let write_builder = WriterBuilder::new().with_explicit_nulls(true);
2067 let mut writer = write_builder.build::<_, JsonArray>(&mut json);
2068 writer.write(&batch).unwrap();
2069 writer.close().unwrap();
2070
2071 let json_str = str::from_utf8(&json).unwrap();
2072 assert_eq!(
2073 json_str,
2074 r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":""}]"#
2075 )
2076 }
2077
2078 #[test]
2079 fn test_decimal32_encoder() {
2080 let array = Decimal32Array::from_iter_values([1234, 5678, 9012])
2081 .with_precision_and_scale(8, 2)
2082 .unwrap();
2083 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2084 let schema = Schema::new(vec![field]);
2085 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2086
2087 let mut buf = Vec::new();
2088 {
2089 let mut writer = LineDelimitedWriter::new(&mut buf);
2090 writer.write_batches(&[&batch]).unwrap();
2091 }
2092
2093 assert_json_eq(
2094 &buf,
2095 r#"{"decimal":12.34}
2096{"decimal":56.78}
2097{"decimal":90.12}
2098"#,
2099 );
2100 }
2101
2102 #[test]
2103 fn test_decimal64_encoder() {
2104 let array = Decimal64Array::from_iter_values([1234, 5678, 9012])
2105 .with_precision_and_scale(10, 2)
2106 .unwrap();
2107 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2108 let schema = Schema::new(vec![field]);
2109 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2110
2111 let mut buf = Vec::new();
2112 {
2113 let mut writer = LineDelimitedWriter::new(&mut buf);
2114 writer.write_batches(&[&batch]).unwrap();
2115 }
2116
2117 assert_json_eq(
2118 &buf,
2119 r#"{"decimal":12.34}
2120{"decimal":56.78}
2121{"decimal":90.12}
2122"#,
2123 );
2124 }
2125
2126 #[test]
2127 fn test_decimal128_encoder() {
2128 let array = Decimal128Array::from_iter_values([1234, 5678, 9012])
2129 .with_precision_and_scale(10, 2)
2130 .unwrap();
2131 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2132 let schema = Schema::new(vec![field]);
2133 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2134
2135 let mut buf = Vec::new();
2136 {
2137 let mut writer = LineDelimitedWriter::new(&mut buf);
2138 writer.write_batches(&[&batch]).unwrap();
2139 }
2140
2141 assert_json_eq(
2142 &buf,
2143 r#"{"decimal":12.34}
2144{"decimal":56.78}
2145{"decimal":90.12}
2146"#,
2147 );
2148 }
2149
2150 #[test]
2151 fn test_decimal256_encoder() {
2152 let array = Decimal256Array::from_iter_values([
2153 i256::from(123400),
2154 i256::from(567800),
2155 i256::from(901200),
2156 ])
2157 .with_precision_and_scale(10, 4)
2158 .unwrap();
2159 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2160 let schema = Schema::new(vec![field]);
2161 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2162
2163 let mut buf = Vec::new();
2164 {
2165 let mut writer = LineDelimitedWriter::new(&mut buf);
2166 writer.write_batches(&[&batch]).unwrap();
2167 }
2168
2169 assert_json_eq(
2170 &buf,
2171 r#"{"decimal":12.3400}
2172{"decimal":56.7800}
2173{"decimal":90.1200}
2174"#,
2175 );
2176 }
2177
2178 #[test]
2179 fn test_decimal_encoder_with_nulls() {
2180 let array = Decimal128Array::from_iter([Some(1234), None, Some(5678)])
2181 .with_precision_and_scale(10, 2)
2182 .unwrap();
2183 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
2184 let schema = Schema::new(vec![field]);
2185 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
2186
2187 let mut buf = Vec::new();
2188 {
2189 let mut writer = LineDelimitedWriter::new(&mut buf);
2190 writer.write_batches(&[&batch]).unwrap();
2191 }
2192
2193 assert_json_eq(
2194 &buf,
2195 r#"{"decimal":12.34}
2196{}
2197{"decimal":56.78}
2198"#,
2199 );
2200 }
2201
2202 #[test]
2203 fn write_structs_as_list() {
2204 let schema = Schema::new(vec![
2205 Field::new(
2206 "c1",
2207 DataType::Struct(Fields::from(vec![
2208 Field::new("c11", DataType::Int32, true),
2209 Field::new(
2210 "c12",
2211 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2212 false,
2213 ),
2214 ])),
2215 false,
2216 ),
2217 Field::new("c2", DataType::Utf8, false),
2218 ]);
2219
2220 let c1 = StructArray::from(vec![
2221 (
2222 Arc::new(Field::new("c11", DataType::Int32, true)),
2223 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
2224 ),
2225 (
2226 Arc::new(Field::new(
2227 "c12",
2228 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2229 false,
2230 )),
2231 Arc::new(StructArray::from(vec![(
2232 Arc::new(Field::new("c121", DataType::Utf8, false)),
2233 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
2234 )])) as ArrayRef,
2235 ),
2236 ]);
2237 let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
2238
2239 let batch =
2240 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
2241
2242 let expected = r#"[[1,["e"]],"a"]
2243[[null,["f"]],"b"]
2244[[5,["g"]],"c"]
2245"#;
2246
2247 let mut buf = Vec::new();
2248 {
2249 let builder = WriterBuilder::new()
2250 .with_explicit_nulls(true)
2251 .with_struct_mode(StructMode::ListOnly);
2252 let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2253 writer.write_batches(&[&batch]).unwrap();
2254 }
2255 assert_json_eq(&buf, expected);
2256
2257 let mut buf = Vec::new();
2258 {
2259 let builder = WriterBuilder::new()
2260 .with_explicit_nulls(false)
2261 .with_struct_mode(StructMode::ListOnly);
2262 let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2263 writer.write_batches(&[&batch]).unwrap();
2264 }
2265 assert_json_eq(&buf, expected);
2266 }
2267
2268 fn make_fallback_encoder_test_data() -> (RecordBatch, Arc<dyn EncoderFactory>) {
2269 #[derive(Debug)]
2272 enum UnionValue {
2273 Int32(i32),
2274 String(String),
2275 }
2276
2277 #[derive(Debug)]
2278 struct UnionEncoder {
2279 array: Vec<Option<UnionValue>>,
2280 }
2281
2282 impl Encoder for UnionEncoder {
2283 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2284 match &self.array[idx] {
2285 None => out.extend_from_slice(b"null"),
2286 Some(UnionValue::Int32(v)) => out.extend_from_slice(v.to_string().as_bytes()),
2287 Some(UnionValue::String(v)) => {
2288 out.extend_from_slice(format!("\"{v}\"").as_bytes())
2289 }
2290 }
2291 }
2292 }
2293
2294 #[derive(Debug)]
2295 struct UnionEncoderFactory;
2296
2297 impl EncoderFactory for UnionEncoderFactory {
2298 fn make_default_encoder<'a>(
2299 &self,
2300 _field: &'a FieldRef,
2301 array: &'a dyn Array,
2302 _options: &'a EncoderOptions,
2303 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2304 let data_type = array.data_type();
2305 let fields = match data_type {
2306 DataType::Union(fields, UnionMode::Sparse) => fields,
2307 _ => return Ok(None),
2308 };
2309 let fields = fields.iter().map(|(_, f)| f).collect::<Vec<_>>();
2311 for f in fields.iter() {
2312 match f.data_type() {
2313 DataType::Null => {}
2314 DataType::Int32 => {}
2315 DataType::Utf8 => {}
2316 _ => return Ok(None),
2317 }
2318 }
2319 let (_, type_ids, _, buffers) = array.as_union().clone().into_parts();
2320 let mut values = Vec::with_capacity(type_ids.len());
2321 for idx in 0..type_ids.len() {
2322 let type_id = type_ids[idx];
2323 let field = &fields[type_id as usize];
2324 let value = match field.data_type() {
2325 DataType::Null => None,
2326 DataType::Int32 => Some(UnionValue::Int32(
2327 buffers[type_id as usize]
2328 .as_primitive::<Int32Type>()
2329 .value(idx),
2330 )),
2331 DataType::Utf8 => Some(UnionValue::String(
2332 buffers[type_id as usize]
2333 .as_string::<i32>()
2334 .value(idx)
2335 .to_string(),
2336 )),
2337 _ => unreachable!(),
2338 };
2339 values.push(value);
2340 }
2341 let array_encoder =
2342 Box::new(UnionEncoder { array: values }) as Box<dyn Encoder + 'a>;
2343 let nulls = array.nulls().cloned();
2344 Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2345 }
2346 }
2347
2348 let int_array = Int32Array::from(vec![Some(1), None, None]);
2349 let string_array = StringArray::from(vec![None, Some("a"), None]);
2350 let null_array = NullArray::new(3);
2351 let type_ids = [0_i8, 1, 2].into_iter().collect::<ScalarBuffer<i8>>();
2352
2353 let union_fields = [
2354 (0, Arc::new(Field::new("A", DataType::Int32, false))),
2355 (1, Arc::new(Field::new("B", DataType::Utf8, false))),
2356 (2, Arc::new(Field::new("C", DataType::Null, false))),
2357 ]
2358 .into_iter()
2359 .collect::<UnionFields>();
2360
2361 let children = vec![
2362 Arc::new(int_array) as Arc<dyn Array>,
2363 Arc::new(string_array),
2364 Arc::new(null_array),
2365 ];
2366
2367 let array = UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap();
2368
2369 let float_array = Float64Array::from(vec![Some(1.0), None, Some(3.4)]);
2370
2371 let fields = vec![
2372 Field::new(
2373 "union",
2374 DataType::Union(union_fields, UnionMode::Sparse),
2375 true,
2376 ),
2377 Field::new("float", DataType::Float64, true),
2378 ];
2379
2380 let batch = RecordBatch::try_new(
2381 Arc::new(Schema::new(fields)),
2382 vec![
2383 Arc::new(array) as Arc<dyn Array>,
2384 Arc::new(float_array) as Arc<dyn Array>,
2385 ],
2386 )
2387 .unwrap();
2388
2389 (batch, Arc::new(UnionEncoderFactory))
2390 }
2391
2392 #[test]
2393 fn test_fallback_encoder_factory_line_delimited_implicit_nulls() {
2394 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2395
2396 let mut buf = Vec::new();
2397 {
2398 let mut writer = WriterBuilder::new()
2399 .with_encoder_factory(encoder_factory)
2400 .with_explicit_nulls(false)
2401 .build::<_, LineDelimited>(&mut buf);
2402 writer.write_batches(&[&batch]).unwrap();
2403 writer.finish().unwrap();
2404 }
2405
2406 println!("{}", str::from_utf8(&buf).unwrap());
2407
2408 assert_json_eq(
2409 &buf,
2410 r#"{"union":1,"float":1.0}
2411{"union":"a"}
2412{"union":null,"float":3.4}
2413"#,
2414 );
2415 }
2416
2417 #[test]
2418 fn test_fallback_encoder_factory_line_delimited_explicit_nulls() {
2419 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2420
2421 let mut buf = Vec::new();
2422 {
2423 let mut writer = WriterBuilder::new()
2424 .with_encoder_factory(encoder_factory)
2425 .with_explicit_nulls(true)
2426 .build::<_, LineDelimited>(&mut buf);
2427 writer.write_batches(&[&batch]).unwrap();
2428 writer.finish().unwrap();
2429 }
2430
2431 assert_json_eq(
2432 &buf,
2433 r#"{"union":1,"float":1.0}
2434{"union":"a","float":null}
2435{"union":null,"float":3.4}
2436"#,
2437 );
2438 }
2439
2440 #[test]
2441 fn test_fallback_encoder_factory_array_implicit_nulls() {
2442 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2443
2444 let json_value: Value = {
2445 let mut buf = Vec::new();
2446 let mut writer = WriterBuilder::new()
2447 .with_encoder_factory(encoder_factory)
2448 .build::<_, JsonArray>(&mut buf);
2449 writer.write_batches(&[&batch]).unwrap();
2450 writer.finish().unwrap();
2451 serde_json::from_slice(&buf).unwrap()
2452 };
2453
2454 let expected = json!([
2455 {"union":1,"float":1.0},
2456 {"union":"a"},
2457 {"float":3.4,"union":null},
2458 ]);
2459
2460 assert_eq!(json_value, expected);
2461 }
2462
2463 #[test]
2464 fn test_fallback_encoder_factory_array_explicit_nulls() {
2465 let (batch, encoder_factory) = make_fallback_encoder_test_data();
2466
2467 let json_value: Value = {
2468 let mut buf = Vec::new();
2469 let mut writer = WriterBuilder::new()
2470 .with_encoder_factory(encoder_factory)
2471 .with_explicit_nulls(true)
2472 .build::<_, JsonArray>(&mut buf);
2473 writer.write_batches(&[&batch]).unwrap();
2474 writer.finish().unwrap();
2475 serde_json::from_slice(&buf).unwrap()
2476 };
2477
2478 let expected = json!([
2479 {"union":1,"float":1.0},
2480 {"union":"a", "float": null},
2481 {"union":null,"float":3.4},
2482 ]);
2483
2484 assert_eq!(json_value, expected);
2485 }
2486
2487 #[test]
2488 fn test_default_encoder_byte_array() {
2489 struct IntArrayBinaryEncoder<B> {
2490 array: B,
2491 }
2492
2493 impl<'a, B> Encoder for IntArrayBinaryEncoder<B>
2494 where
2495 B: ArrayAccessor<Item = &'a [u8]>,
2496 {
2497 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2498 out.push(b'[');
2499 let child = self.array.value(idx);
2500 for (idx, byte) in child.iter().enumerate() {
2501 write!(out, "{byte}").unwrap();
2502 if idx < child.len() - 1 {
2503 out.push(b',');
2504 }
2505 }
2506 out.push(b']');
2507 }
2508 }
2509
2510 #[derive(Debug)]
2511 struct IntArayBinaryEncoderFactory;
2512
2513 impl EncoderFactory for IntArayBinaryEncoderFactory {
2514 fn make_default_encoder<'a>(
2515 &self,
2516 _field: &'a FieldRef,
2517 array: &'a dyn Array,
2518 _options: &'a EncoderOptions,
2519 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2520 match array.data_type() {
2521 DataType::Binary => {
2522 let array = array.as_binary::<i32>();
2523 let encoder = IntArrayBinaryEncoder { array };
2524 let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2525 let nulls = array.nulls().cloned();
2526 Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2527 }
2528 _ => Ok(None),
2529 }
2530 }
2531 }
2532
2533 let binary_array = BinaryArray::from_opt_vec(vec![Some(b"a"), None, Some(b"b")]);
2534 let float_array = Float64Array::from(vec![Some(1.0), Some(2.3), None]);
2535 let fields = vec![
2536 Field::new("bytes", DataType::Binary, true),
2537 Field::new("float", DataType::Float64, true),
2538 ];
2539 let batch = RecordBatch::try_new(
2540 Arc::new(Schema::new(fields)),
2541 vec![
2542 Arc::new(binary_array) as Arc<dyn Array>,
2543 Arc::new(float_array) as Arc<dyn Array>,
2544 ],
2545 )
2546 .unwrap();
2547
2548 let json_value: Value = {
2549 let mut buf = Vec::new();
2550 let mut writer = WriterBuilder::new()
2551 .with_encoder_factory(Arc::new(IntArayBinaryEncoderFactory))
2552 .build::<_, JsonArray>(&mut buf);
2553 writer.write_batches(&[&batch]).unwrap();
2554 writer.finish().unwrap();
2555 serde_json::from_slice(&buf).unwrap()
2556 };
2557
2558 let expected = json!([
2559 {"bytes": [97], "float": 1.0},
2560 {"float": 2.3},
2561 {"bytes": [98]},
2562 ]);
2563
2564 assert_eq!(json_value, expected);
2565 }
2566
2567 #[test]
2568 fn test_encoder_factory_customize_dictionary() {
2569 struct PaddedInt32Encoder {
2574 array: Int32Array,
2575 }
2576
2577 impl Encoder for PaddedInt32Encoder {
2578 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
2579 let value = self.array.value(idx);
2580 write!(out, "\"{value:0>8}\"").unwrap();
2581 }
2582 }
2583
2584 #[derive(Debug)]
2585 struct CustomEncoderFactory;
2586
2587 impl EncoderFactory for CustomEncoderFactory {
2588 fn make_default_encoder<'a>(
2589 &self,
2590 field: &'a FieldRef,
2591 array: &'a dyn Array,
2592 _options: &'a EncoderOptions,
2593 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
2594 let padded = field
2599 .metadata()
2600 .get("padded")
2601 .map(|v| v == "true")
2602 .unwrap_or_default();
2603 match (array.data_type(), padded) {
2604 (DataType::Int32, true) => {
2605 let array = array.as_primitive::<Int32Type>();
2606 let nulls = array.nulls().cloned();
2607 let encoder = PaddedInt32Encoder {
2608 array: array.clone(),
2609 };
2610 let array_encoder = Box::new(encoder) as Box<dyn Encoder + 'a>;
2611 Ok(Some(NullableEncoder::new(array_encoder, nulls)))
2612 }
2613 _ => Ok(None),
2614 }
2615 }
2616 }
2617
2618 let to_json = |batch| {
2619 let mut buf = Vec::new();
2620 let mut writer = WriterBuilder::new()
2621 .with_encoder_factory(Arc::new(CustomEncoderFactory))
2622 .build::<_, JsonArray>(&mut buf);
2623 writer.write_batches(&[batch]).unwrap();
2624 writer.finish().unwrap();
2625 serde_json::from_slice::<Value>(&buf).unwrap()
2626 };
2627
2628 let array = Int32Array::from(vec![Some(1), None, Some(2)]);
2630 let field = Arc::new(Field::new("int", DataType::Int32, true).with_metadata(
2631 HashMap::from_iter(vec![("padded".to_string(), "true".to_string())]),
2632 ));
2633 let batch = RecordBatch::try_new(
2634 Arc::new(Schema::new(vec![field.clone()])),
2635 vec![Arc::new(array)],
2636 )
2637 .unwrap();
2638
2639 let json_value = to_json(&batch);
2640
2641 let expected = json!([
2642 {"int": "00000001"},
2643 {},
2644 {"int": "00000002"},
2645 ]);
2646
2647 assert_eq!(json_value, expected);
2648
2649 let mut array_builder = PrimitiveDictionaryBuilder::<UInt16Type, Int32Type>::new();
2651 array_builder.append_value(1);
2652 array_builder.append_null();
2653 array_builder.append_value(1);
2654 let array = array_builder.finish();
2655 let field = Field::new(
2656 "int",
2657 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Int32)),
2658 true,
2659 )
2660 .with_metadata(HashMap::from_iter(vec![(
2661 "padded".to_string(),
2662 "true".to_string(),
2663 )]));
2664 let batch = RecordBatch::try_new(Arc::new(Schema::new(vec![field])), vec![Arc::new(array)])
2665 .unwrap();
2666
2667 let json_value = to_json(&batch);
2668
2669 let expected = json!([
2670 {"int": "00000001"},
2671 {},
2672 {"int": "00000001"},
2673 ]);
2674
2675 assert_eq!(json_value, expected);
2676 }
2677
2678 #[test]
2679 fn test_write_run_end_encoded() {
2680 let run_ends = Int32Array::from(vec![2, 5, 6]);
2681 let values = StringArray::from(vec![Some("a"), Some("b"), None]);
2682 let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
2683
2684 let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
2685 "c1",
2686 ree.data_type().clone(),
2687 true,
2688 )]));
2689
2690 let batch = RecordBatch::try_new(schema, vec![Arc::new(ree)]).unwrap();
2691
2692 let mut buf = Vec::new();
2693 {
2694 let mut writer = LineDelimitedWriter::new(&mut buf);
2695 writer.write_batches(&[&batch]).unwrap();
2696 }
2697
2698 assert_json_eq(
2699 &buf,
2700 r#"{"c1":"a"}
2701{"c1":"a"}
2702{"c1":"b"}
2703{"c1":"b"}
2704{"c1":"b"}
2705{}
2706"#,
2707 );
2708 }
2709
2710 #[test]
2711 fn test_write_run_end_encoded_int_values() {
2712 let run_ends = Int32Array::from(vec![3, 5]);
2713 let values = Int32Array::from(vec![10, 20]);
2714 let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
2715
2716 let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
2717 "n",
2718 ree.data_type().clone(),
2719 true,
2720 )]));
2721
2722 let batch = RecordBatch::try_new(schema, vec![Arc::new(ree)]).unwrap();
2723
2724 let json_value: Value = {
2725 let mut buf = Vec::new();
2726 let mut writer = WriterBuilder::new().build::<_, JsonArray>(&mut buf);
2727 writer.write_batches(&[&batch]).unwrap();
2728 writer.finish().unwrap();
2729 serde_json::from_slice(&buf).unwrap()
2730 };
2731
2732 let expected = json!([
2733 {"n": 10},
2734 {"n": 10},
2735 {"n": 10},
2736 {"n": 20},
2737 {"n": 20},
2738 ]);
2739
2740 assert_eq!(json_value, expected);
2741 }
2742
2743 #[test]
2744 fn test_run_end_encoded_roundtrip() {
2745 let run_ends = Int32Array::from(vec![3, 5, 7]);
2746 let values = StringArray::from(vec![Some("a"), None, Some("b")]);
2747 let ree = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
2748
2749 let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
2750 "c",
2751 ree.data_type().clone(),
2752 true,
2753 )]));
2754 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(ree)]).unwrap();
2755
2756 let mut buf = Vec::new();
2757 {
2758 let mut writer = super::LineDelimitedWriter::new(&mut buf);
2759 writer.write_batches(&[&batch]).unwrap();
2760 }
2761
2762 let batches: Vec<RecordBatch> = ReaderBuilder::new(schema)
2763 .with_batch_size(1024)
2764 .build(std::io::Cursor::new(&buf))
2765 .unwrap()
2766 .collect::<Result<Vec<_>, _>>()
2767 .unwrap();
2768 assert_eq!(batches.len(), 1);
2769
2770 let col = batches[0].column(0);
2771 let run_array = col.as_run::<Int32Type>();
2772
2773 assert_eq!(run_array.len(), 7);
2774 assert_eq!(run_array.run_ends().values(), &[3, 5, 7]);
2775
2776 let values = run_array.values().as_string::<i32>();
2777 assert_eq!(values.len(), 3);
2778 assert_eq!(values.value(0), "a");
2779 assert!(values.is_null(1));
2780 assert_eq!(values.value(2), "b");
2781 }
2782}