use std::sync::Arc;
use arrow::array::*;
use tokio::io::AsyncReadExt;
use crate::arrow::builder::TypedBuilder;
use crate::io::ClickHouseRead;
use crate::{Error, Result, Type};
pub(super) async fn deserialize_async<R: ClickHouseRead>(
type_hint: &Type,
builder: &mut TypedBuilder,
reader: &mut R,
rows: usize,
nulls: &[u8],
) -> Result<ArrayRef> {
super::deser!(() => builder => {
TypedBuilder::Enum8(b) => {{
let Type::Enum8(pairs) = type_hint else {
return Err(Error::UnexpectedType(type_hint.clone()));
};
for i in 0..rows {
let idx = super::primitive::primitive_async!(Int8 => reader);
if nulls.is_empty() || nulls[i] == 0 {
b.append_value(&pairs.iter().find(|(_, key)| *key == idx).ok_or(
Error::ArrowDeserialize(format!(
"Invalid Enum8 index: {idx} not found in pairs"
))
)?.0);
} else {
b.append_null();
}
}
Ok(Arc::new(b.finish()) as ArrayRef)
}},
TypedBuilder::Enum16(b) => {{
let Type::Enum16(pairs) = type_hint else {
return Err(Error::UnexpectedType(type_hint.clone()));
};
for i in 0..rows {
let idx = super::primitive::primitive_async!(Int16 => reader);
if nulls.is_empty() || nulls[i] == 0 {
b.append_value(&pairs.iter().find(|(_, key)| *key == idx).ok_or(
Error::ArrowDeserialize(format!(
"Invalid Enum16 index: {idx} not found in pairs"
))
)?.0);
} else {
b.append_null();
}
}
Ok(Arc::new(b.finish()) as ArrayRef)
}}
}
_ => { Err(Error::ArrowDeserialize(format!(
"Unexpected builder type for enum: {type_hint:?}"
)))})
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use std::sync::Arc;
use arrow::array::{DictionaryArray, Int8Array, Int16Array, StringArray};
use arrow::datatypes::{Int8Type, Int16Type};
use super::*;
type MockReader = Cursor<Vec<u8>>;
#[tokio::test]
async fn test_deserialize_enum8() {
let pairs = vec![("a".to_string(), 1_i8), ("b".to_string(), 2_i8)];
let data = vec![1, 2, 1]; let mut reader = MockReader::new(data);
let type_ = Type::Enum8(pairs);
let data_type = arrow::datatypes::DataType::Dictionary(
Box::new(arrow::datatypes::DataType::Int8),
Box::new(arrow::datatypes::DataType::Utf8),
);
let mut builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
let array = deserialize_async(&type_, &mut builder, &mut reader, 3, &[]).await.unwrap();
let values = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
let expected = Arc::new(
DictionaryArray::<Int8Type>::try_new(Int8Array::from(vec![0, 1, 0]), values).unwrap(),
) as ArrayRef;
assert_eq!(array.as_ref(), expected.as_ref());
}
#[tokio::test]
async fn test_deserialize_enum16() {
let pairs = vec![("x".to_string(), 10_i16), ("y".to_string(), 20_i16)];
let data = vec![10, 0, 20, 0, 10, 0]; let mut reader = MockReader::new(data);
let type_ = Type::Enum16(pairs);
let data_type = arrow::datatypes::DataType::Dictionary(
Box::new(arrow::datatypes::DataType::Int16),
Box::new(arrow::datatypes::DataType::Utf8),
);
let mut builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
let array = deserialize_async(&type_, &mut builder, &mut reader, 3, &[]).await.unwrap();
let values = Arc::new(StringArray::from(vec!["x", "y"])) as ArrayRef;
let expected = Arc::new(
DictionaryArray::<Int16Type>::try_new(Int16Array::from(vec![0, 1, 0]), values).unwrap(),
) as ArrayRef;
assert_eq!(array.as_ref(), expected.as_ref());
}
#[tokio::test]
async fn test_deserialize_enum8_nullable() {
let pairs = vec![("a".to_string(), 1_i8), ("b".to_string(), 2_i8)];
let data = vec![1, 2, 1]; let nulls = vec![0, 1, 0]; let mut reader = MockReader::new(data);
let type_ = Type::Enum8(pairs);
let data_type = arrow::datatypes::DataType::Dictionary(
Box::new(arrow::datatypes::DataType::Int8),
Box::new(arrow::datatypes::DataType::Utf8),
);
let mut builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
let array = deserialize_async(&type_, &mut builder, &mut reader, 3, &nulls).await.unwrap();
let values = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
let expected = Arc::new(
DictionaryArray::<Int8Type>::try_new(
Int8Array::from(vec![Some(0), None, Some(0)]),
values,
)
.unwrap(),
) as ArrayRef;
assert_eq!(array.as_ref(), expected.as_ref());
}
#[tokio::test]
async fn test_deserialize_enum8_empty() {
let pairs = vec![("a".to_string(), 1_i8), ("b".to_string(), 2_i8)];
let data = vec![]; let mut reader = MockReader::new(data);
let type_ = Type::Enum8(pairs);
let data_type = arrow::datatypes::DataType::Dictionary(
Box::new(arrow::datatypes::DataType::Int8),
Box::new(arrow::datatypes::DataType::Utf8),
);
let mut builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
let array = deserialize_async(&type_, &mut builder, &mut reader, 0, &[]).await.unwrap();
let values = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
let expected = Arc::new(
DictionaryArray::<Int8Type>::try_new(Int8Array::from(Vec::<i8>::new()), values)
.unwrap(),
) as ArrayRef;
assert_eq!(array.as_ref(), expected.as_ref());
}
#[tokio::test]
async fn test_deserialize_enum8_invalid_index() {
let pairs = vec![("a".to_string(), 1_i8), ("b".to_string(), 2_i8)];
let data = vec![3]; let mut reader = MockReader::new(data);
let type_ = Type::Enum8(pairs);
let data_type = arrow::datatypes::DataType::Dictionary(
Box::new(arrow::datatypes::DataType::Int8),
Box::new(arrow::datatypes::DataType::Utf8),
);
let mut builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
let result = deserialize_async(&type_, &mut builder, &mut reader, 1, &[]).await;
assert!(matches!(
result,
Err(Error::ArrowDeserialize(msg))
if msg.contains("Invalid")
));
}
#[tokio::test]
async fn test_deserialize_invalid_type() {
let data = vec![];
let mut reader = MockReader::new(data);
let type_ = Type::Int32;
let data_type = arrow::datatypes::DataType::Int32;
let mut builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
let result = deserialize_async(&type_, &mut builder, &mut reader, 0, &[]).await;
assert!(matches!(
result,
Err(Error::ArrowDeserialize(msg))
if msg.contains("Unexpected builder")
));
}
}