1use std::io::Write;
18use std::sync::Arc;
19
20use crate::StructMode;
21use arrow_array::cast::AsArray;
22use arrow_array::types::*;
23use arrow_array::*;
24use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer};
25use arrow_cast::display::{ArrayFormatter, FormatOptions};
26use arrow_schema::{ArrowError, DataType, FieldRef};
27use half::f16;
28use lexical_core::FormattedSize;
29use serde_core::Serializer;
30
31#[derive(Debug, Clone, Default)]
33pub struct EncoderOptions {
34 explicit_nulls: bool,
36 struct_mode: StructMode,
38 encoder_factory: Option<Arc<dyn EncoderFactory>>,
40 date_format: Option<String>,
42 datetime_format: Option<String>,
44 timestamp_format: Option<String>,
46 timestamp_tz_format: Option<String>,
48 time_format: Option<String>,
50}
51
52impl EncoderOptions {
53 pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
55 self.explicit_nulls = explicit_nulls;
56 self
57 }
58
59 pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
61 self.struct_mode = struct_mode;
62 self
63 }
64
65 pub fn with_encoder_factory(mut self, encoder_factory: Arc<dyn EncoderFactory>) -> Self {
67 self.encoder_factory = Some(encoder_factory);
68 self
69 }
70
71 pub fn explicit_nulls(&self) -> bool {
73 self.explicit_nulls
74 }
75
76 pub fn struct_mode(&self) -> StructMode {
78 self.struct_mode
79 }
80
81 pub fn encoder_factory(&self) -> Option<&Arc<dyn EncoderFactory>> {
83 self.encoder_factory.as_ref()
84 }
85
86 pub fn with_date_format(mut self, format: String) -> Self {
88 self.date_format = Some(format);
89 self
90 }
91
92 pub fn date_format(&self) -> Option<&str> {
94 self.date_format.as_deref()
95 }
96
97 pub fn with_datetime_format(mut self, format: String) -> Self {
99 self.datetime_format = Some(format);
100 self
101 }
102
103 pub fn datetime_format(&self) -> Option<&str> {
105 self.datetime_format.as_deref()
106 }
107
108 pub fn with_time_format(mut self, format: String) -> Self {
110 self.time_format = Some(format);
111 self
112 }
113
114 pub fn time_format(&self) -> Option<&str> {
116 self.time_format.as_deref()
117 }
118
119 pub fn with_timestamp_format(mut self, format: String) -> Self {
121 self.timestamp_format = Some(format);
122 self
123 }
124
125 pub fn timestamp_format(&self) -> Option<&str> {
127 self.timestamp_format.as_deref()
128 }
129
130 pub fn with_timestamp_tz_format(mut self, tz_format: String) -> Self {
132 self.timestamp_tz_format = Some(tz_format);
133 self
134 }
135
136 pub fn timestamp_tz_format(&self) -> Option<&str> {
138 self.timestamp_tz_format.as_deref()
139 }
140}
141
142pub trait EncoderFactory: std::fmt::Debug + Send + Sync {
239 fn make_default_encoder<'a>(
247 &self,
248 _field: &'a FieldRef,
249 _array: &'a dyn Array,
250 _options: &'a EncoderOptions,
251 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
252 Ok(None)
253 }
254}
255
256pub struct NullableEncoder<'a> {
259 encoder: Box<dyn Encoder + 'a>,
260 nulls: Option<NullBuffer>,
261}
262
263impl<'a> NullableEncoder<'a> {
264 pub fn new(encoder: Box<dyn Encoder + 'a>, nulls: Option<NullBuffer>) -> Self {
266 Self { encoder, nulls }
267 }
268
269 pub fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
271 self.encoder.encode(idx, out)
272 }
273
274 pub fn is_null(&self, idx: usize) -> bool {
276 self.nulls.as_ref().is_some_and(|nulls| nulls.is_null(idx))
277 }
278
279 pub fn has_nulls(&self) -> bool {
281 match self.nulls {
282 Some(ref nulls) => nulls.null_count() > 0,
283 None => false,
284 }
285 }
286}
287
288impl Encoder for NullableEncoder<'_> {
289 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
290 self.encoder.encode(idx, out)
291 }
292}
293
294pub trait Encoder {
298 fn encode(&mut self, idx: usize, out: &mut Vec<u8>);
302}
303
304pub fn make_encoder<'a>(
308 field: &'a FieldRef,
309 array: &'a dyn Array,
310 options: &'a EncoderOptions,
311) -> Result<NullableEncoder<'a>, ArrowError> {
312 macro_rules! primitive_helper {
313 ($t:ty) => {{
314 let array = array.as_primitive::<$t>();
315 let nulls = array.nulls().cloned();
316 NullableEncoder::new(Box::new(PrimitiveEncoder::new(array)), nulls)
317 }};
318 }
319
320 if let Some(factory) = options.encoder_factory() {
321 if let Some(encoder) = factory.make_default_encoder(field, array, options)? {
322 return Ok(encoder);
323 }
324 }
325
326 let nulls = array.nulls().cloned();
327 let encoder = downcast_integer! {
328 array.data_type() => (primitive_helper),
329 DataType::Float16 => primitive_helper!(Float16Type),
330 DataType::Float32 => primitive_helper!(Float32Type),
331 DataType::Float64 => primitive_helper!(Float64Type),
332 DataType::Boolean => {
333 let array = array.as_boolean();
334 NullableEncoder::new(Box::new(BooleanEncoder(array)), array.nulls().cloned())
335 }
336 DataType::Null => NullableEncoder::new(Box::new(NullEncoder), array.logical_nulls()),
337 DataType::Utf8 => {
338 let array = array.as_string::<i32>();
339 NullableEncoder::new(Box::new(StringEncoder(array)), array.nulls().cloned())
340 }
341 DataType::LargeUtf8 => {
342 let array = array.as_string::<i64>();
343 NullableEncoder::new(Box::new(StringEncoder(array)), array.nulls().cloned())
344 }
345 DataType::Utf8View => {
346 let array = array.as_string_view();
347 NullableEncoder::new(Box::new(StringViewEncoder(array)), array.nulls().cloned())
348 }
349 DataType::List(_) => {
350 let array = array.as_list::<i32>();
351 NullableEncoder::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned())
352 }
353 DataType::LargeList(_) => {
354 let array = array.as_list::<i64>();
355 NullableEncoder::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned())
356 }
357 DataType::FixedSizeList(_, _) => {
358 let array = array.as_fixed_size_list();
359 NullableEncoder::new(Box::new(FixedSizeListEncoder::try_new(field, array, options)?), array.nulls().cloned())
360 }
361
362 DataType::Dictionary(_, _) => downcast_dictionary_array! {
363 array => {
364 NullableEncoder::new(Box::new(DictionaryEncoder::try_new(field, array, options)?), array.nulls().cloned())
365 },
366 _ => unreachable!()
367 }
368
369 DataType::Map(_, _) => {
370 let array = array.as_map();
371 NullableEncoder::new(Box::new(MapEncoder::try_new(field, array, options)?), array.nulls().cloned())
372 }
373
374 DataType::FixedSizeBinary(_) => {
375 let array = array.as_fixed_size_binary();
376 NullableEncoder::new(Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned())
377 }
378
379 DataType::Binary => {
380 let array: &BinaryArray = array.as_binary();
381 NullableEncoder::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned())
382 }
383
384 DataType::LargeBinary => {
385 let array: &LargeBinaryArray = array.as_binary();
386 NullableEncoder::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned())
387 }
388
389 DataType::Struct(fields) => {
390 let array = array.as_struct();
391 let encoders = fields.iter().zip(array.columns()).map(|(field, array)| {
392 let encoder = make_encoder(field, array, options)?;
393 Ok(FieldEncoder{
394 field: field.clone(),
395 encoder,
396 })
397 }).collect::<Result<Vec<_>, ArrowError>>()?;
398
399 let encoder = StructArrayEncoder{
400 encoders,
401 explicit_nulls: options.explicit_nulls(),
402 struct_mode: options.struct_mode(),
403 };
404 let nulls = array.nulls().cloned();
405 NullableEncoder::new(Box::new(encoder) as Box<dyn Encoder + 'a>, nulls)
406 }
407 DataType::Decimal32(_, _) | DataType::Decimal64(_, _) | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
408 let options = FormatOptions::new().with_display_error(true);
409 let formatter = JsonArrayFormatter::new(ArrayFormatter::try_new(array, &options)?);
410 NullableEncoder::new(Box::new(RawArrayFormatter(formatter)) as Box<dyn Encoder + 'a>, nulls)
411 }
412 d => match d.is_temporal() {
413 true => {
414 let fops = FormatOptions::new().with_display_error(true)
419 .with_date_format(options.date_format.as_deref())
420 .with_datetime_format(options.datetime_format.as_deref())
421 .with_timestamp_format(options.timestamp_format.as_deref())
422 .with_timestamp_tz_format(options.timestamp_tz_format.as_deref())
423 .with_time_format(options.time_format.as_deref());
424
425 let formatter = ArrayFormatter::try_new(array, &fops)?;
426 let formatter = JsonArrayFormatter::new(formatter);
427 NullableEncoder::new(Box::new(formatter) as Box<dyn Encoder + 'a>, nulls)
428 }
429 false => return Err(ArrowError::JsonError(format!(
430 "Unsupported data type for JSON encoding: {d:?}",
431 )))
432 }
433 };
434
435 Ok(encoder)
436}
437
438fn encode_string(s: &str, out: &mut Vec<u8>) {
439 let mut serializer = serde_json::Serializer::new(out);
440 serializer.serialize_str(s).unwrap();
441}
442
443struct FieldEncoder<'a> {
444 field: FieldRef,
445 encoder: NullableEncoder<'a>,
446}
447
448impl FieldEncoder<'_> {
449 fn is_null(&self, idx: usize) -> bool {
450 self.encoder.is_null(idx)
451 }
452}
453
454struct StructArrayEncoder<'a> {
455 encoders: Vec<FieldEncoder<'a>>,
456 explicit_nulls: bool,
457 struct_mode: StructMode,
458}
459
460impl Encoder for StructArrayEncoder<'_> {
461 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
462 match self.struct_mode {
463 StructMode::ObjectOnly => out.push(b'{'),
464 StructMode::ListOnly => out.push(b'['),
465 }
466 let mut is_first = true;
467 let drop_nulls = (self.struct_mode == StructMode::ObjectOnly) && !self.explicit_nulls;
469
470 for field_encoder in self.encoders.iter_mut() {
471 let is_null = field_encoder.is_null(idx);
472 if is_null && drop_nulls {
473 continue;
474 }
475
476 if !is_first {
477 out.push(b',');
478 }
479 is_first = false;
480
481 if self.struct_mode == StructMode::ObjectOnly {
482 encode_string(field_encoder.field.name(), out);
483 out.push(b':');
484 }
485
486 if is_null {
487 out.extend_from_slice(b"null");
488 } else {
489 field_encoder.encoder.encode(idx, out);
490 }
491 }
492 match self.struct_mode {
493 StructMode::ObjectOnly => out.push(b'}'),
494 StructMode::ListOnly => out.push(b']'),
495 }
496 }
497}
498
499trait PrimitiveEncode: ArrowNativeType {
500 type Buffer;
501
502 fn init_buffer() -> Self::Buffer;
504
505 fn encode(self, buf: &mut Self::Buffer) -> &[u8];
509}
510
511macro_rules! integer_encode {
512 ($($t:ty),*) => {
513 $(
514 impl PrimitiveEncode for $t {
515 type Buffer = [u8; Self::FORMATTED_SIZE];
516
517 fn init_buffer() -> Self::Buffer {
518 [0; Self::FORMATTED_SIZE]
519 }
520
521 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
522 lexical_core::write(self, buf)
523 }
524 }
525 )*
526 };
527}
528integer_encode!(i8, i16, i32, i64, u8, u16, u32, u64);
529
530macro_rules! float_encode {
531 ($($t:ty),*) => {
532 $(
533 impl PrimitiveEncode for $t {
534 type Buffer = [u8; Self::FORMATTED_SIZE];
535
536 fn init_buffer() -> Self::Buffer {
537 [0; Self::FORMATTED_SIZE]
538 }
539
540 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
541 if self.is_infinite() || self.is_nan() {
542 b"null"
543 } else {
544 lexical_core::write(self, buf)
545 }
546 }
547 }
548 )*
549 };
550}
551float_encode!(f32, f64);
552
553impl PrimitiveEncode for f16 {
554 type Buffer = <f32 as PrimitiveEncode>::Buffer;
555
556 fn init_buffer() -> Self::Buffer {
557 f32::init_buffer()
558 }
559
560 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
561 self.to_f32().encode(buf)
562 }
563}
564
565struct PrimitiveEncoder<N: PrimitiveEncode> {
566 values: ScalarBuffer<N>,
567 buffer: N::Buffer,
568}
569
570impl<N: PrimitiveEncode> PrimitiveEncoder<N> {
571 fn new<P: ArrowPrimitiveType<Native = N>>(array: &PrimitiveArray<P>) -> Self {
572 Self {
573 values: array.values().clone(),
574 buffer: N::init_buffer(),
575 }
576 }
577}
578
579impl<N: PrimitiveEncode> Encoder for PrimitiveEncoder<N> {
580 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
581 out.extend_from_slice(self.values[idx].encode(&mut self.buffer));
582 }
583}
584
585struct BooleanEncoder<'a>(&'a BooleanArray);
586
587impl Encoder for BooleanEncoder<'_> {
588 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
589 match self.0.value(idx) {
590 true => out.extend_from_slice(b"true"),
591 false => out.extend_from_slice(b"false"),
592 }
593 }
594}
595
596struct StringEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
597
598impl<O: OffsetSizeTrait> Encoder for StringEncoder<'_, O> {
599 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
600 encode_string(self.0.value(idx), out);
601 }
602}
603
604struct StringViewEncoder<'a>(&'a StringViewArray);
605
606impl Encoder for StringViewEncoder<'_> {
607 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
608 encode_string(self.0.value(idx), out);
609 }
610}
611
612struct ListEncoder<'a, O: OffsetSizeTrait> {
613 offsets: OffsetBuffer<O>,
614 encoder: NullableEncoder<'a>,
615}
616
617impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
618 fn try_new(
619 field: &'a FieldRef,
620 array: &'a GenericListArray<O>,
621 options: &'a EncoderOptions,
622 ) -> Result<Self, ArrowError> {
623 let encoder = make_encoder(field, array.values().as_ref(), options)?;
624 Ok(Self {
625 offsets: array.offsets().clone(),
626 encoder,
627 })
628 }
629}
630
631impl<O: OffsetSizeTrait> Encoder for ListEncoder<'_, O> {
632 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
633 let end = self.offsets[idx + 1].as_usize();
634 let start = self.offsets[idx].as_usize();
635 out.push(b'[');
636
637 if self.encoder.has_nulls() {
638 for idx in start..end {
639 if idx != start {
640 out.push(b',')
641 }
642 if self.encoder.is_null(idx) {
643 out.extend_from_slice(b"null");
644 } else {
645 self.encoder.encode(idx, out);
646 }
647 }
648 } else {
649 for idx in start..end {
650 if idx != start {
651 out.push(b',')
652 }
653 self.encoder.encode(idx, out);
654 }
655 }
656 out.push(b']');
657 }
658}
659
660struct FixedSizeListEncoder<'a> {
661 value_length: usize,
662 encoder: NullableEncoder<'a>,
663}
664
665impl<'a> FixedSizeListEncoder<'a> {
666 fn try_new(
667 field: &'a FieldRef,
668 array: &'a FixedSizeListArray,
669 options: &'a EncoderOptions,
670 ) -> Result<Self, ArrowError> {
671 let encoder = make_encoder(field, array.values().as_ref(), options)?;
672 Ok(Self {
673 encoder,
674 value_length: array.value_length().as_usize(),
675 })
676 }
677}
678
679impl Encoder for FixedSizeListEncoder<'_> {
680 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
681 let start = idx * self.value_length;
682 let end = start + self.value_length;
683 out.push(b'[');
684 if self.encoder.has_nulls() {
685 for idx in start..end {
686 if idx != start {
687 out.push(b',')
688 }
689 if self.encoder.is_null(idx) {
690 out.extend_from_slice(b"null");
691 } else {
692 self.encoder.encode(idx, out);
693 }
694 }
695 } else {
696 for idx in start..end {
697 if idx != start {
698 out.push(b',')
699 }
700 self.encoder.encode(idx, out);
701 }
702 }
703 out.push(b']');
704 }
705}
706
707struct DictionaryEncoder<'a, K: ArrowDictionaryKeyType> {
708 keys: ScalarBuffer<K::Native>,
709 encoder: NullableEncoder<'a>,
710}
711
712impl<'a, K: ArrowDictionaryKeyType> DictionaryEncoder<'a, K> {
713 fn try_new(
714 field: &'a FieldRef,
715 array: &'a DictionaryArray<K>,
716 options: &'a EncoderOptions,
717 ) -> Result<Self, ArrowError> {
718 let encoder = make_encoder(field, array.values().as_ref(), options)?;
719
720 Ok(Self {
721 keys: array.keys().values().clone(),
722 encoder,
723 })
724 }
725}
726
727impl<K: ArrowDictionaryKeyType> Encoder for DictionaryEncoder<'_, K> {
728 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
729 self.encoder.encode(self.keys[idx].as_usize(), out)
730 }
731}
732
733struct JsonArrayFormatter<'a> {
735 formatter: ArrayFormatter<'a>,
736}
737
738impl<'a> JsonArrayFormatter<'a> {
739 fn new(formatter: ArrayFormatter<'a>) -> Self {
740 Self { formatter }
741 }
742}
743
744impl Encoder for JsonArrayFormatter<'_> {
745 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
746 out.push(b'"');
747 let _ = write!(out, "{}", self.formatter.value(idx));
750 out.push(b'"')
751 }
752}
753
754struct RawArrayFormatter<'a>(JsonArrayFormatter<'a>);
756
757impl Encoder for RawArrayFormatter<'_> {
758 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
759 let _ = write!(out, "{}", self.0.formatter.value(idx));
760 }
761}
762
763struct NullEncoder;
764
765impl Encoder for NullEncoder {
766 fn encode(&mut self, _idx: usize, _out: &mut Vec<u8>) {
767 unreachable!()
768 }
769}
770
771struct MapEncoder<'a> {
772 offsets: OffsetBuffer<i32>,
773 keys: NullableEncoder<'a>,
774 values: NullableEncoder<'a>,
775 explicit_nulls: bool,
776}
777
778impl<'a> MapEncoder<'a> {
779 fn try_new(
780 field: &'a FieldRef,
781 array: &'a MapArray,
782 options: &'a EncoderOptions,
783 ) -> Result<Self, ArrowError> {
784 let values = array.values();
785 let keys = array.keys();
786
787 if !matches!(keys.data_type(), DataType::Utf8 | DataType::LargeUtf8) {
788 return Err(ArrowError::JsonError(format!(
789 "Only UTF8 keys supported by JSON MapArray Writer: got {:?}",
790 keys.data_type()
791 )));
792 }
793
794 let keys = make_encoder(field, keys, options)?;
795 let values = make_encoder(field, values, options)?;
796
797 if keys.has_nulls() {
799 return Err(ArrowError::InvalidArgumentError(
800 "Encountered nulls in MapArray keys".to_string(),
801 ));
802 }
803
804 if array.entries().nulls().is_some_and(|x| x.null_count() != 0) {
805 return Err(ArrowError::InvalidArgumentError(
806 "Encountered nulls in MapArray entries".to_string(),
807 ));
808 }
809
810 Ok(Self {
811 offsets: array.offsets().clone(),
812 keys,
813 values,
814 explicit_nulls: options.explicit_nulls(),
815 })
816 }
817}
818
819impl Encoder for MapEncoder<'_> {
820 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
821 let end = self.offsets[idx + 1].as_usize();
822 let start = self.offsets[idx].as_usize();
823
824 let mut is_first = true;
825
826 out.push(b'{');
827
828 for idx in start..end {
829 let is_null = self.values.is_null(idx);
830 if is_null && !self.explicit_nulls {
831 continue;
832 }
833
834 if !is_first {
835 out.push(b',');
836 }
837 is_first = false;
838
839 self.keys.encode(idx, out);
840 out.push(b':');
841
842 if is_null {
843 out.extend_from_slice(b"null");
844 } else {
845 self.values.encode(idx, out);
846 }
847 }
848 out.push(b'}');
849 }
850}
851
852struct BinaryEncoder<B>(B);
855
856impl<'a, B> BinaryEncoder<B>
857where
858 B: ArrayAccessor<Item = &'a [u8]>,
859{
860 fn new(array: B) -> Self {
861 Self(array)
862 }
863}
864
865impl<'a, B> Encoder for BinaryEncoder<B>
866where
867 B: ArrayAccessor<Item = &'a [u8]>,
868{
869 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
870 out.push(b'"');
871 for byte in self.0.value(idx) {
872 write!(out, "{byte:02x}").unwrap();
874 }
875 out.push(b'"');
876 }
877}