use chrono::{NaiveDate, NaiveDateTime};
use lexical_core::ToLexical;
use std::io::Write;
use streaming_iterator::StreamingIterator;
use crate::bitmap::utils::zip_validity;
use crate::datatypes::TimeUnit;
use crate::io::iterator::BufStreamingIterator;
use crate::temporal_conversions::{
date32_to_date, date64_to_date, timestamp_ms_to_datetime, timestamp_ns_to_datetime,
timestamp_s_to_datetime, timestamp_us_to_datetime,
};
use crate::util::lexical_to_bytes_mut;
use crate::{array::*, datatypes::DataType, types::NativeType};
fn boolean_serializer<'a>(
array: &'a BooleanArray,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| match x {
Some(true) => buf.extend_from_slice(b"true"),
Some(false) => buf.extend_from_slice(b"false"),
None => buf.extend_from_slice(b"null"),
},
vec![],
))
}
fn primitive_serializer<'a, T: NativeType + ToLexical>(
array: &'a PrimitiveArray<T>,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
lexical_to_bytes_mut(*x, buf)
} else {
buf.extend(b"null")
}
},
vec![],
))
}
fn utf8_serializer<'a, O: Offset>(
array: &'a Utf8Array<O>,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
serde_json::to_writer(buf, x).unwrap();
} else {
buf.extend_from_slice(b"null")
}
},
vec![],
))
}
fn struct_serializer<'a>(
array: &'a StructArray,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
let mut serializers = array
.values()
.iter()
.map(|x| x.as_ref())
.map(new_serializer)
.collect::<Vec<_>>();
let names = array.fields().iter().map(|f| f.name.as_str());
Box::new(BufStreamingIterator::new(
zip_validity(0..array.len(), array.validity().map(|x| x.iter())),
move |maybe, buf| {
if maybe.is_some() {
let names = names.clone();
let mut record: Vec<(&str, &[u8])> = Default::default();
serializers
.iter_mut()
.zip(names)
.for_each(|(iter, name)| {
let item = iter.next().unwrap();
record.push((name, item));
});
serialize_item(buf, &record, true);
} else {
serializers.iter_mut().for_each(|iter| {
let _ = iter.next();
});
buf.extend(b"null");
}
},
vec![],
))
}
fn list_serializer<'a, O: Offset>(
array: &'a ListArray<O>,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
let mut serializer = new_serializer(array.values().as_ref());
Box::new(BufStreamingIterator::new(
zip_validity(
array.offsets().windows(2),
array.validity().map(|x| x.iter()),
),
move |offset, buf| {
if let Some(offset) = offset {
let length = (offset[1] - offset[0]).to_usize();
buf.push(b'[');
let mut is_first_row = true;
for _ in 0..length {
if !is_first_row {
buf.push(b',');
}
is_first_row = false;
buf.extend(serializer.next().unwrap());
}
buf.push(b']');
} else {
buf.extend(b"null");
}
},
vec![],
))
}
fn date_serializer<'a, T, F>(
array: &'a PrimitiveArray<T>,
convert: F,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
where
T: NativeType,
F: Fn(T) -> NaiveDate + 'static + Send + Sync,
{
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
let nd = convert(*x);
write!(buf, "\"{}\"", nd).unwrap();
} else {
buf.extend_from_slice(b"null")
}
},
vec![],
))
}
fn timestamp_serializer<'a, F>(
array: &'a PrimitiveArray<i64>,
convert: F,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
where
F: Fn(i64) -> NaiveDateTime + 'static + Send + Sync,
{
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
let ndt = convert(*x);
write!(buf, "\"{}\"", ndt).unwrap();
} else {
buf.extend_from_slice(b"null")
}
},
vec![],
))
}
pub(crate) fn new_serializer<'a>(
array: &'a dyn Array,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
match array.data_type().to_logical_type() {
DataType::Boolean => boolean_serializer(array.as_any().downcast_ref().unwrap()),
DataType::Int8 => primitive_serializer::<i8>(array.as_any().downcast_ref().unwrap()),
DataType::Int16 => primitive_serializer::<i16>(array.as_any().downcast_ref().unwrap()),
DataType::Int32 => primitive_serializer::<i32>(array.as_any().downcast_ref().unwrap()),
DataType::Int64 => primitive_serializer::<i64>(array.as_any().downcast_ref().unwrap()),
DataType::UInt8 => primitive_serializer::<u8>(array.as_any().downcast_ref().unwrap()),
DataType::UInt16 => primitive_serializer::<u16>(array.as_any().downcast_ref().unwrap()),
DataType::UInt32 => primitive_serializer::<u32>(array.as_any().downcast_ref().unwrap()),
DataType::UInt64 => primitive_serializer::<u64>(array.as_any().downcast_ref().unwrap()),
DataType::Float32 => primitive_serializer::<f32>(array.as_any().downcast_ref().unwrap()),
DataType::Float64 => primitive_serializer::<f64>(array.as_any().downcast_ref().unwrap()),
DataType::Utf8 => utf8_serializer::<i32>(array.as_any().downcast_ref().unwrap()),
DataType::LargeUtf8 => utf8_serializer::<i64>(array.as_any().downcast_ref().unwrap()),
DataType::Struct(_) => struct_serializer(array.as_any().downcast_ref().unwrap()),
DataType::List(_) => list_serializer::<i32>(array.as_any().downcast_ref().unwrap()),
DataType::LargeList(_) => list_serializer::<i64>(array.as_any().downcast_ref().unwrap()),
DataType::Date32 => date_serializer(array.as_any().downcast_ref().unwrap(), date32_to_date),
DataType::Date64 => date_serializer(array.as_any().downcast_ref().unwrap(), date64_to_date),
DataType::Timestamp(tu, tz) => {
if tz.is_some() {
todo!("still have to implement timezone")
} else {
let convert = match tu {
TimeUnit::Nanosecond => timestamp_ns_to_datetime,
TimeUnit::Microsecond => timestamp_us_to_datetime,
TimeUnit::Millisecond => timestamp_ms_to_datetime,
TimeUnit::Second => timestamp_s_to_datetime,
};
timestamp_serializer(array.as_any().downcast_ref().unwrap(), convert)
}
}
other => todo!("Writing {:?} to JSON", other),
}
}
fn serialize_item(buffer: &mut Vec<u8>, record: &[(&str, &[u8])], is_first_row: bool) {
if !is_first_row {
buffer.push(b',');
}
buffer.push(b'{');
let mut first_item = true;
for (key, value) in record {
if !first_item {
buffer.push(b',');
}
first_item = false;
serde_json::to_writer(&mut *buffer, key).unwrap();
buffer.push(b':');
buffer.extend(*value);
}
buffer.push(b'}');
}
pub(crate) fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {
let mut serializer = new_serializer(array);
(0..array.len()).for_each(|i| {
if i != 0 {
buffer.push(b',');
}
buffer.extend_from_slice(serializer.next().unwrap());
});
}