use chrono::{Duration, NaiveDate, NaiveDateTime};
use lexical_core::ToLexical;
use std::io::Write;
use streaming_iterator::StreamingIterator;
use crate::bitmap::utils::ZipValidity;
use crate::datatypes::{IntegerType, TimeUnit};
use crate::io::iterator::BufStreamingIterator;
use crate::offset::Offset;
#[cfg(feature = "chrono-tz")]
use crate::temporal_conversions::parse_offset_tz;
use crate::temporal_conversions::{
date32_to_date, date64_to_date, duration_ms_to_duration, duration_ns_to_duration,
duration_s_to_duration, duration_us_to_duration, parse_offset, timestamp_ms_to_datetime,
timestamp_ns_to_datetime, timestamp_s_to_datetime, timestamp_to_datetime,
timestamp_us_to_datetime,
};
use crate::util::lexical_to_bytes_mut;
use crate::{array::*, datatypes::DataType, types::NativeType};
use super::utf8;
fn materialize_serializer<'a, I, F, T>(
f: F,
iterator: I,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
where
T: 'a,
I: Iterator<Item = T> + Send + Sync + 'a,
F: FnMut(T, &mut Vec<u8>) + Send + Sync + 'a,
{
if offset > 0 || take < usize::MAX {
Box::new(BufStreamingIterator::new(
iterator.skip(offset).take(take),
f,
vec![],
))
} else {
Box::new(BufStreamingIterator::new(iterator, f, vec![]))
}
}
fn boolean_serializer<'a>(
array: &'a BooleanArray,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
let f = |x: Option<bool>, buf: &mut Vec<u8>| match x {
Some(true) => buf.extend_from_slice(b"true"),
Some(false) => buf.extend_from_slice(b"false"),
None => buf.extend_from_slice(b"null"),
};
materialize_serializer(f, array.iter(), offset, take)
}
fn null_serializer(
len: usize,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + Send + Sync> {
let f = |_x: (), buf: &mut Vec<u8>| buf.extend_from_slice(b"null");
materialize_serializer(f, std::iter::repeat(()).take(len), offset, take)
}
fn primitive_serializer<'a, T: NativeType + ToLexical>(
array: &'a PrimitiveArray<T>,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
let f = |x: Option<&T>, buf: &mut Vec<u8>| {
if let Some(x) = x {
lexical_to_bytes_mut(*x, buf)
} else {
buf.extend(b"null")
}
};
materialize_serializer(f, array.iter(), offset, take)
}
fn float_serializer<'a, T>(
array: &'a PrimitiveArray<T>,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
where
T: num_traits::Float + NativeType + ToLexical,
{
let f = |x: Option<&T>, buf: &mut Vec<u8>| {
if let Some(x) = x {
if T::is_nan(*x) || T::is_infinite(*x) {
buf.extend(b"null")
} else {
lexical_to_bytes_mut(*x, buf)
}
} else {
buf.extend(b"null")
}
};
materialize_serializer(f, array.iter(), offset, take)
}
fn dictionary_utf8_serializer<'a, K: DictionaryKey, O: Offset>(
array: &'a DictionaryArray<K>,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
let iter = array.iter_typed::<Utf8Array<O>>().unwrap().skip(offset);
let f = |x: Option<&str>, buf: &mut Vec<u8>| {
if let Some(x) = x {
utf8::write_str(buf, x).unwrap();
} else {
buf.extend_from_slice(b"null")
}
};
materialize_serializer(f, iter, offset, take)
}
fn utf8_serializer<'a, O: Offset>(
array: &'a Utf8Array<O>,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
let f = |x: Option<&str>, buf: &mut Vec<u8>| {
if let Some(x) = x {
utf8::write_str(buf, x).unwrap();
} else {
buf.extend_from_slice(b"null")
}
};
materialize_serializer(f, array.iter(), offset, take)
}
fn struct_serializer<'a>(
array: &'a StructArray,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
let mut serializers = array
.values()
.iter()
.map(|x| x.as_ref())
.map(|arr| new_serializer(arr, offset, take))
.collect::<Vec<_>>();
let names = array.fields().iter().map(|f| f.name.as_str());
Box::new(BufStreamingIterator::new(
ZipValidity::new_with_validity(0..array.len(), array.validity()),
move |maybe, buf| {
if maybe.is_some() {
let names = names.clone();
let mut record: Vec<(&str, &[u8])> = Default::default();
serializers
.iter_mut()
.zip(names)
.for_each(|(iter, name)| {
let item = iter.next().unwrap();
record.push((name, item));
});
serialize_item(buf, &record, true);
} else {
serializers.iter_mut().for_each(|iter| {
let _ = iter.next();
});
buf.extend(b"null");
}
},
vec![],
))
}
fn list_serializer<'a, O: Offset>(
array: &'a ListArray<O>,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
let offsets = array.offsets().as_slice();
let start = offsets[0].to_usize();
let end = offsets.last().unwrap().to_usize();
let mut serializer = new_serializer(array.values().as_ref(), start, end - start);
let f = move |offset: Option<&[O]>, buf: &mut Vec<u8>| {
if let Some(offset) = offset {
let length = (offset[1] - offset[0]).to_usize();
buf.push(b'[');
let mut is_first_row = true;
for _ in 0..length {
if !is_first_row {
buf.push(b',');
}
is_first_row = false;
buf.extend(serializer.next().unwrap());
}
buf.push(b']');
} else {
buf.extend(b"null");
}
};
let iter =
ZipValidity::new_with_validity(array.offsets().buffer().windows(2), array.validity());
materialize_serializer(f, iter, offset, take)
}
fn fixed_size_list_serializer<'a>(
array: &'a FixedSizeListArray,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
let mut serializer = new_serializer(array.values().as_ref(), offset, take);
Box::new(BufStreamingIterator::new(
ZipValidity::new(0..array.len(), array.validity().map(|x| x.iter())),
move |ix, buf| {
if ix.is_some() {
let length = array.size();
buf.push(b'[');
let mut is_first_row = true;
for _ in 0..length {
if !is_first_row {
buf.push(b',');
}
is_first_row = false;
buf.extend(serializer.next().unwrap());
}
buf.push(b']');
} else {
buf.extend(b"null");
}
},
vec![],
))
}
fn date_serializer<'a, T, F>(
array: &'a PrimitiveArray<T>,
convert: F,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
where
T: NativeType,
F: Fn(T) -> NaiveDate + 'static + Send + Sync,
{
let f = move |x: Option<&T>, buf: &mut Vec<u8>| {
if let Some(x) = x {
let nd = convert(*x);
write!(buf, "\"{nd}\"").unwrap();
} else {
buf.extend_from_slice(b"null")
}
};
materialize_serializer(f, array.iter(), offset, take)
}
fn duration_serializer<'a, T, F>(
array: &'a PrimitiveArray<T>,
convert: F,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
where
T: NativeType,
F: Fn(T) -> Duration + 'static + Send + Sync,
{
let f = move |x: Option<&T>, buf: &mut Vec<u8>| {
if let Some(x) = x {
let duration = convert(*x);
write!(buf, "\"{duration}\"").unwrap();
} else {
buf.extend_from_slice(b"null")
}
};
materialize_serializer(f, array.iter(), offset, take)
}
fn timestamp_serializer<'a, F>(
array: &'a PrimitiveArray<i64>,
convert: F,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
where
F: Fn(i64) -> NaiveDateTime + 'static + Send + Sync,
{
let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {
if let Some(x) = x {
let ndt = convert(*x);
write!(buf, "\"{ndt}\"").unwrap();
} else {
buf.extend_from_slice(b"null")
}
};
materialize_serializer(f, array.iter(), offset, take)
}
fn timestamp_tz_serializer<'a>(
array: &'a PrimitiveArray<i64>,
time_unit: TimeUnit,
tz: &str,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
match parse_offset(tz) {
Ok(parsed_tz) => {
let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {
if let Some(x) = x {
let dt_str = timestamp_to_datetime(*x, time_unit, &parsed_tz).to_rfc3339();
write!(buf, "\"{dt_str}\"").unwrap();
} else {
buf.extend_from_slice(b"null")
}
};
materialize_serializer(f, array.iter(), offset, take)
}
#[cfg(feature = "chrono-tz")]
_ => match parse_offset_tz(tz) {
Ok(parsed_tz) => {
let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {
if let Some(x) = x {
let dt_str = timestamp_to_datetime(*x, time_unit, &parsed_tz).to_rfc3339();
write!(buf, "\"{dt_str}\"").unwrap();
} else {
buf.extend_from_slice(b"null")
}
};
materialize_serializer(f, array.iter(), offset, take)
}
_ => {
panic!("Timezone {} is invalid or not supported", tz);
}
},
#[cfg(not(feature = "chrono-tz"))]
_ => {
panic!("Invalid Offset format (must be [-]00:00) or chrono-tz feature not active");
}
}
}
pub(crate) fn new_serializer<'a>(
array: &'a dyn Array,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
match array.data_type().to_logical_type() {
DataType::Boolean => {
boolean_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::Int8 => {
primitive_serializer::<i8>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::Int16 => {
primitive_serializer::<i16>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::Int32 => {
primitive_serializer::<i32>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::Int64 => {
primitive_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::UInt8 => {
primitive_serializer::<u8>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::UInt16 => {
primitive_serializer::<u16>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::UInt32 => {
primitive_serializer::<u32>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::UInt64 => {
primitive_serializer::<u64>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::Float32 => {
float_serializer::<f32>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::Float64 => {
float_serializer::<f64>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::Utf8 => {
utf8_serializer::<i32>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::LargeUtf8 => {
utf8_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::Struct(_) => {
struct_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::FixedSizeList(_, _) => {
fixed_size_list_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::List(_) => {
list_serializer::<i32>(array.as_any().downcast_ref().unwrap(), offset, take)
}
DataType::LargeList(_) => {
list_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)
}
other @ DataType::Dictionary(k, v, _) => match (k, &**v) {
(IntegerType::UInt32, DataType::LargeUtf8) => {
let array = array
.as_any()
.downcast_ref::<DictionaryArray<u32>>()
.unwrap();
dictionary_utf8_serializer::<u32, i64>(array, offset, take)
}
_ => {
todo!("Writing {:?} to JSON", other)
}
},
DataType::Date32 => date_serializer(
array.as_any().downcast_ref().unwrap(),
date32_to_date,
offset,
take,
),
DataType::Date64 => date_serializer(
array.as_any().downcast_ref().unwrap(),
date64_to_date,
offset,
take,
),
DataType::Timestamp(tu, None) => {
let convert = match tu {
TimeUnit::Nanosecond => timestamp_ns_to_datetime,
TimeUnit::Microsecond => timestamp_us_to_datetime,
TimeUnit::Millisecond => timestamp_ms_to_datetime,
TimeUnit::Second => timestamp_s_to_datetime,
};
timestamp_serializer(
array.as_any().downcast_ref().unwrap(),
convert,
offset,
take,
)
}
DataType::Timestamp(time_unit, Some(tz)) => timestamp_tz_serializer(
array.as_any().downcast_ref().unwrap(),
*time_unit,
tz,
offset,
take,
),
DataType::Duration(tu) => {
let convert = match tu {
TimeUnit::Nanosecond => duration_ns_to_duration,
TimeUnit::Microsecond => duration_us_to_duration,
TimeUnit::Millisecond => duration_ms_to_duration,
TimeUnit::Second => duration_s_to_duration,
};
duration_serializer(
array.as_any().downcast_ref().unwrap(),
convert,
offset,
take,
)
}
DataType::Null => null_serializer(array.len(), offset, take),
other => todo!("Writing {:?} to JSON", other),
}
}
fn serialize_item(buffer: &mut Vec<u8>, record: &[(&str, &[u8])], is_first_row: bool) {
if !is_first_row {
buffer.push(b',');
}
buffer.push(b'{');
let mut first_item = true;
for (key, value) in record {
if !first_item {
buffer.push(b',');
}
first_item = false;
utf8::write_str(buffer, key).unwrap();
buffer.push(b':');
buffer.extend(*value);
}
buffer.push(b'}');
}
pub(crate) fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {
let mut serializer = new_serializer(array, 0, usize::MAX);
(0..array.len()).for_each(|i| {
if i != 0 {
buffer.push(b',');
}
buffer.extend_from_slice(serializer.next().unwrap());
});
}