use arrow::array::ArrayRef;
use arrow::datatypes::DataType;
use tokio::io::AsyncReadExt;
use super::ClickHouseArrowDeserializer;
use crate::arrow::builder::TypedBuilder;
use crate::io::{ClickHouseBytesRead, ClickHouseRead};
use crate::{Error, Result, Type};
pub(crate) async fn deserialize_async<R: ClickHouseRead>(
inner: &Type,
builder: &mut TypedBuilder,
data_type: &DataType,
reader: &mut R,
rows: usize,
rbuffer: &mut Vec<u8>,
) -> Result<ArrayRef> {
let nulls = if rows > 0 {
let mut mask = vec![0u8; rows];
let _ = reader.read_exact(&mut mask).await?;
if mask.len() != rows {
return Err(Error::DeserializeError(format!("Nulls={}, rows={rows}", mask.len())));
}
mask
} else {
vec![]
};
inner.deserialize_arrow_async(builder, reader, data_type, rows, &nulls, rbuffer).await
}
#[allow(dead_code)] pub(crate) fn deserialize<R: ClickHouseBytesRead>(
inner: &Type,
builder: &mut TypedBuilder,
reader: &mut R,
data_type: &DataType,
rows: usize,
rbuffer: &mut Vec<u8>,
) -> Result<ArrayRef> {
let nulls = if rows > 0 {
let mut mask = vec![0u8; rows];
reader.try_copy_to_slice(&mut mask)?;
if mask.len() != rows {
return Err(Error::DeserializeError(format!("Nulls={}, rows={rows}", mask.len())));
}
mask
} else {
vec![]
};
inner.deserialize_arrow(builder, reader, data_type, rows, &nulls, rbuffer)
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use arrow::array::{Array, Int32Array, ListArray, StringArray};
use super::*;
use crate::ArrowOptions;
use crate::arrow::ch_to_arrow_type;
use crate::native::types::Type;
#[tokio::test]
async fn test_deserialize_nullable_int32() {
let type_ = Type::Nullable(Box::new(Type::Int32));
let inner_type = type_.strip_null();
let rows = 3;
let input = vec![
0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, ];
let mut reader = Cursor::new(input);
let data_type = ch_to_arrow_type(inner_type, None).unwrap().0;
let mut builder = TypedBuilder::try_new(inner_type, &data_type).unwrap();
let result =
deserialize_async(inner_type, &mut builder, &data_type, &mut reader, rows, &mut vec![])
.await
.expect("Failed to deserialize Nullable(Int32)");
let array = result.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(array, &Int32Array::from(vec![Some(1), None, Some(3)]));
assert_eq!(array.nulls().unwrap().iter().collect::<Vec<bool>>(), vec![true, false, true]);
}
#[tokio::test]
async fn test_deserialize_nullable_string() {
let type_ = Type::Nullable(Box::new(Type::String));
let inner_type = type_.strip_null();
let rows = 3;
let input = vec![
0, 1, 0, 1, b'a', 0, 1, b'c', ];
let mut reader = Cursor::new(input);
let opts = Some(ArrowOptions::default().with_strings_as_strings(true));
let data_type = ch_to_arrow_type(inner_type, opts).unwrap().0;
let mut builder = TypedBuilder::try_new(inner_type, &data_type).unwrap();
let result =
deserialize_async(inner_type, &mut builder, &data_type, &mut reader, rows, &mut vec![])
.await
.expect("Failed to deserialize Nullable(String)");
let array = result.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(array, &StringArray::from(vec![Some("a"), None, Some("c")]));
assert_eq!(array.nulls().unwrap().iter().collect::<Vec<bool>>(), vec![true, false, true]);
}
#[tokio::test]
async fn test_deserialize_nullable_array_int32() {
let type_ = Type::Nullable(Box::new(Type::Array(Box::new(Type::Int32))));
let inner_type = type_.strip_null();
let rows = 3;
let input = vec![
0, 1, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, ];
let mut reader = Cursor::new(input);
let data_type = ch_to_arrow_type(inner_type, None).unwrap().0;
let mut builder = TypedBuilder::try_new(inner_type, &data_type).unwrap();
let result =
deserialize_async(inner_type, &mut builder, &data_type, &mut reader, rows, &mut vec![])
.await
.expect("Failed to deserialize Nullable(Array(Int32))");
let list_array = result.as_any().downcast_ref::<ListArray>().unwrap();
let values = list_array.values().as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(list_array.len(), 3);
assert_eq!(values, &Int32Array::from(vec![1, 2, 3, 4, 5]));
assert_eq!(list_array.offsets().iter().copied().collect::<Vec<i32>>(), vec![0, 2, 3, 5]);
assert_eq!(
list_array.nulls().unwrap().iter().collect::<Vec<bool>>(),
vec![true, false, true] );
}
#[tokio::test]
async fn test_deserialize_nullable_int32_zero_rows() {
let type_ = Type::Nullable(Box::new(Type::Int32));
let inner_type = type_.strip_null();
let rows = 0;
let input = vec![]; let mut reader = Cursor::new(input);
let data_type = ch_to_arrow_type(inner_type, None).unwrap().0;
let mut builder = TypedBuilder::try_new(inner_type, &data_type).unwrap();
let result =
deserialize_async(inner_type, &mut builder, &data_type, &mut reader, rows, &mut vec![])
.await
.expect("Failed to deserialize Nullable(Int32) with zero rows");
let array = result.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(array.len(), 0);
assert_eq!(array, &Int32Array::from(Vec::<i32>::new()));
assert_eq!(array.nulls(), None);
}
#[tokio::test]
async fn test_null_mask_length() {
let type_ = Type::Nullable(Box::new(Type::String));
let inner_type = type_.strip_null();
let data_type = ch_to_arrow_type(inner_type, None).unwrap().0;
let mut builder = TypedBuilder::try_new(inner_type, &data_type).unwrap();
assert!(
deserialize_async(
inner_type,
&mut builder,
&data_type,
&mut Cursor::new(vec![0_u8; 50]),
100,
&mut vec![]
)
.await
.is_err()
);
}
}