use arrow::array::*;
use arrow::datatypes::DataType;
use tokio::io::AsyncWriteExt;
use super::ClickHouseArrowSerializer;
use crate::formats::SerializerState;
use crate::io::{ClickHouseBytesWrite, ClickHouseWrite};
use crate::{Error, Result, Type};
fn unwrap_array_data_type(dt: &DataType) -> Result<&DataType> {
match dt {
DataType::List(f)
| DataType::ListView(f)
| DataType::LargeList(f)
| DataType::LargeListView(f)
| DataType::FixedSizeList(f, _) => Ok(f.data_type()),
_ => Err(Error::ArrowSerialize(format!("Expected List or FixedSizeList, got {dt:?}"))),
}
}
pub(super) async fn serialize_async<W: ClickHouseWrite>(
type_hint: &Type,
writer: &mut W,
values: &ArrayRef,
data_type: &DataType,
state: &mut SerializerState,
) -> Result<()> {
let inner_type = type_hint.strip_null().unwrap_array()?;
macro_rules! write_list_array {
($( $array_ty:ty ),* $(,)?) => {{
$(
if let Some(array) = values.as_any().downcast_ref::<$array_ty>() {
let inner_dt = unwrap_array_data_type(data_type)?;
let offsets = array.value_offsets();
let values = array.values();
for offset in &offsets[1..] {
#[expect(clippy::cast_sign_loss)]
writer.write_u64_le(*offset as u64).await?;
}
inner_type.serialize_async(writer, values, inner_dt, state).await?;
return Ok(());
}
)*
}}
}
write_list_array!(ListArray, ListViewArray, LargeListArray, LargeListViewArray);
if let Some(array) = values.as_any().downcast_ref::<FixedSizeListArray>() {
let inner_dt = unwrap_array_data_type(data_type)?;
#[expect(clippy::cast_sign_loss)]
let value_len = array.value_length() as usize;
let num_rows = array.len();
for i in 1..=num_rows {
writer.write_u64_le((value_len * i) as u64).await?;
}
let values = array.values();
inner_type.serialize_async(writer, values, inner_dt, state).await?;
return Ok(());
}
Err(Error::ArrowSerialize(format!(
"Expected ListArray or FixedSizeListArray: type={inner_type:?}, data_type={data_type:?}"
)))
}
pub(super) fn serialize<W: ClickHouseBytesWrite>(
type_hint: &Type,
writer: &mut W,
values: &ArrayRef,
data_type: &DataType,
state: &mut SerializerState,
) -> Result<()> {
let inner_type = type_hint.strip_null().unwrap_array()?;
macro_rules! put_list_array {
($( $array_ty:ty ),* $(,)?) => {{
$(
if let Some(array) = values.as_any().downcast_ref::<$array_ty>() {
let inner_dt = unwrap_array_data_type(data_type)?;
let offsets = array.value_offsets();
let values = array.values();
for offset in &offsets[1..] {
#[expect(clippy::cast_sign_loss)]
writer.put_u64_le(*offset as u64);
}
inner_type.serialize(writer, values, inner_dt, state)?;
return Ok(());
}
)*
}}
}
put_list_array!(ListArray, ListViewArray, LargeListArray, LargeListViewArray);
if let Some(array) = values.as_any().downcast_ref::<FixedSizeListArray>() {
let inner_dt = unwrap_array_data_type(data_type)?;
#[expect(clippy::cast_sign_loss)]
let value_len = array.value_length() as usize;
let num_rows = array.len();
for i in 1..=num_rows {
writer.put_u64_le((value_len * i) as u64);
}
let values = array.values();
inner_type.serialize(writer, values, inner_dt, state)?;
return Ok(());
}
Err(Error::ArrowSerialize(format!(
"Expected ListArray or FixedSizeListArray: type={inner_type:?}, data_type={data_type:?}"
)))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::*;
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::*;
use super::*;
use crate::ArrowOptions;
use crate::arrow::types::LIST_ITEM_FIELD_NAME;
use crate::formats::SerializerState;
use crate::native::types::Type;
type MockWriter = Vec<u8>;
fn wrap_array(typ: Type) -> Type { Type::Array(Box::new(typ)) }
pub(crate) async fn test_type_serializer(
expected: Vec<u8>,
type_: &Type,
field: &Field,
array: &ArrayRef,
) {
let mut writer = MockWriter::new();
let mut state = SerializerState::default()
.with_arrow_options(ArrowOptions::default().with_strings_as_strings(true));
serialize_async(type_, &mut writer, array, field.data_type(), &mut state).await.unwrap();
assert_eq!(*writer, expected);
}
#[tokio::test]
async fn test_serialize_list_int32() {
let type_ = wrap_array(Type::Int32);
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let field = Arc::new(Field::new("list", DataType::List(Arc::clone(&inner_field)), false));
let array = Arc::new(ListArray::new(
inner_field,
OffsetBuffer::new(vec![0, 2, 3, 5].into()),
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])) as ArrayRef,
None,
)) as ArrayRef;
let expected = vec![
2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, ];
test_type_serializer(expected, &type_, &field, &array).await;
}
#[tokio::test]
async fn test_serialize_list_nullable_int32() {
let type_ = wrap_array(Type::Nullable(Box::new(Type::Int32)));
let offsets = OffsetBuffer::new(vec![0, 2, 3, 5].into());
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, true));
let field = Arc::new(Field::new("list", DataType::List(Arc::clone(&inner_field)), false));
let array = Arc::new(ListArray::new(
inner_field,
offsets,
Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)])) as ArrayRef,
None,
)) as ArrayRef;
let expected = vec![
2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0,
1, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, ];
test_type_serializer(expected, &type_, &field, &array).await;
}
#[tokio::test]
async fn test_serialize_list_nullable_string() {
let type_ = wrap_array(Type::Nullable(Box::new(Type::String)));
let values = Arc::new(StringArray::from(vec![Some("even"), Some("odd"), None, Some("odd")]))
as ArrayRef;
let offsets = OffsetBuffer::new(vec![0, 2, 3, 4].into());
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Utf8, true));
let field = Arc::new(Field::new("list", DataType::List(Arc::clone(&inner_field)), false));
let list_array = ListArray::new(inner_field, offsets, values, None);
let array = Arc::new(list_array) as ArrayRef;
let expected = vec![
2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0,
4, b'e', b'v', b'e', b'n', 3, b'o', b'd', b'd', 0, 3, b'o', b'd', b'd', ];
test_type_serializer(expected, &type_, &field, &array).await;
}
#[tokio::test]
async fn test_serialize_fixed_size_list_int32() {
let type_ = wrap_array(Type::Int32);
let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])) as ArrayRef;
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let field = Arc::new(Field::new(
"list",
DataType::FixedSizeList(Arc::clone(&inner_field), 2),
false,
));
let list_array = FixedSizeListArray::new(inner_field, 2, values, None);
let array = Arc::new(list_array) as ArrayRef;
let expected = vec![
2, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, 6, 0, 0, 0, ];
test_type_serializer(expected, &type_, &field, &array).await;
}
#[tokio::test]
async fn test_serialize_list_zero_rows() {
let type_ = wrap_array(Type::Int32);
let values = Arc::new(Int32Array::from(Vec::<i32>::new())) as ArrayRef;
let offsets = OffsetBuffer::new(vec![0].into());
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let field = Arc::new(Field::new("list", DataType::List(Arc::clone(&inner_field)), false));
let list_array = ListArray::new(inner_field, offsets, values, None);
let array = Arc::new(list_array) as ArrayRef;
let expected: Vec<u8> = vec![
];
test_type_serializer(expected, &type_, &field, &array).await;
}
#[tokio::test]
async fn test_serialize_list_empty_inner() {
let type_ = wrap_array(Type::Int32);
let values = Arc::new(Int32Array::from(Vec::<i32>::new())) as ArrayRef;
let offsets = OffsetBuffer::new(vec![0, 0].into());
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let field = Arc::new(Field::new("list", DataType::List(Arc::clone(&inner_field)), false));
let list_array = ListArray::new(inner_field, offsets, values, None);
let array = Arc::new(list_array) as ArrayRef;
let expected = vec![
0, 0, 0, 0, 0, 0, 0, 0,
];
test_type_serializer(expected, &type_, &field, &array).await;
}
#[tokio::test]
async fn test_serialize_nested_list_int32() {
let type_ = wrap_array(Type::Array(Box::new(Type::Int32)));
let inner_values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])) as ArrayRef;
let inner_offsets = OffsetBuffer::new(vec![0, 2, 3, 5].into());
let inner_inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let inner_field = Arc::new(Field::new(
"inner_list",
DataType::List(Arc::clone(&inner_inner_field)),
false,
));
let inner_list_array = ListArray::new(inner_inner_field, inner_offsets, inner_values, None);
let outer_field =
Arc::new(Field::new("list", DataType::List(Arc::clone(&inner_field)), false));
let array = Arc::new(ListArray::new(
inner_field,
OffsetBuffer::new(vec![0, 2, 3].into()),
Arc::new(inner_list_array) as ArrayRef,
None,
)) as ArrayRef;
let expected = vec![
2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, ];
test_type_serializer(expected, &type_, &outer_field, &array).await;
}
#[tokio::test]
async fn test_serialize_array_nullable_low_cardinality_string() {
let type_ =
wrap_array(Type::LowCardinality(Box::new(Type::Nullable(Box::new(Type::String)))));
let field = Arc::new(Field::new(
"array_low_cardinality_string_col",
DataType::List(Arc::new(Field::new(
"item",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
))),
false,
));
let array = Arc::new(
ListArray::try_new(
Arc::new(Field::new(
"item",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
)),
OffsetBuffer::new(vec![0, 2, 2, 3, 4, 6].into()),
Arc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(vec![
Some(0),
Some(1), Some(2), None, Some(0),
None, ]),
Arc::new(StringArray::from(vec!["low", "card", "test"])),
)
.unwrap(),
),
None,
)
.unwrap(),
) as ArrayRef;
let expected = vec![
2, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 3, 108, 111, 119, 4, 99, 97, 114, 100, 4, 116, 101, 115, 116, 6, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 0, 1, 0, ];
test_type_serializer(expected, &type_, &field, &array).await;
}
}
#[cfg(test)]
mod tests_sync {
use std::sync::Arc;
use arrow::array::*;
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::*;
use super::*;
use crate::ArrowOptions;
use crate::arrow::types::LIST_ITEM_FIELD_NAME;
use crate::formats::SerializerState;
use crate::native::types::Type;
type MockWriter = Vec<u8>;
fn wrap_array(typ: Type) -> Type { Type::Array(Box::new(typ)) }
#[expect(clippy::needless_pass_by_value)]
pub(crate) fn test_type_serializer(
expected: Vec<u8>,
type_: &Type,
field: &Field,
array: &ArrayRef,
) {
let mut writer = MockWriter::new();
let mut state = SerializerState::default()
.with_arrow_options(ArrowOptions::default().with_strings_as_strings(true));
serialize(type_, &mut writer, array, field.data_type(), &mut state).unwrap();
assert_eq!(*writer, expected);
}
#[test]
fn test_serialize_list_int32() {
let type_ = wrap_array(Type::Int32);
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let field = Arc::new(Field::new("list", DataType::List(Arc::clone(&inner_field)), false));
let array = Arc::new(ListArray::new(
inner_field,
OffsetBuffer::new(vec![0, 2, 3, 5].into()),
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])) as ArrayRef,
None,
)) as ArrayRef;
let expected = vec![
2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, ];
test_type_serializer(expected, &type_, &field, &array);
}
#[test]
fn test_serialize_list_nullable_int32() {
let type_ = wrap_array(Type::Nullable(Box::new(Type::Int32)));
let offsets = OffsetBuffer::new(vec![0, 2, 3, 5].into());
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, true));
let field = Arc::new(Field::new("list", DataType::List(Arc::clone(&inner_field)), false));
let array = Arc::new(ListArray::new(
inner_field,
offsets,
Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)])) as ArrayRef,
None,
)) as ArrayRef;
let expected = vec![
2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0,
1, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, ];
test_type_serializer(expected, &type_, &field, &array);
}
#[test]
fn test_serialize_list_nullable_string() {
let type_ = wrap_array(Type::Nullable(Box::new(Type::String)));
let values = Arc::new(StringArray::from(vec![Some("even"), Some("odd"), None, Some("odd")]))
as ArrayRef;
let offsets = OffsetBuffer::new(vec![0, 2, 3, 4].into());
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Utf8, true));
let field = Arc::new(Field::new("list", DataType::List(Arc::clone(&inner_field)), false));
let list_array = ListArray::new(inner_field, offsets, values, None);
let array = Arc::new(list_array) as ArrayRef;
let expected = vec![
2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0,
4, b'e', b'v', b'e', b'n', 3, b'o', b'd', b'd', 0, 3, b'o', b'd', b'd', ];
test_type_serializer(expected, &type_, &field, &array);
}
#[test]
fn test_serialize_fixed_size_list_int32() {
let type_ = wrap_array(Type::Int32);
let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])) as ArrayRef;
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let field = Arc::new(Field::new(
"list",
DataType::FixedSizeList(Arc::clone(&inner_field), 2),
false,
));
let list_array = FixedSizeListArray::new(inner_field, 2, values, None);
let array = Arc::new(list_array) as ArrayRef;
let expected = vec![
2, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, 6, 0, 0, 0, ];
test_type_serializer(expected, &type_, &field, &array);
}
#[test]
fn test_serialize_list_zero_rows() {
let type_ = wrap_array(Type::Int32);
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let field = Arc::new(Field::new("list", DataType::List(Arc::clone(&inner_field)), false));
let values = Arc::new(Int32Array::from(Vec::<i32>::new())) as ArrayRef;
let offsets = OffsetBuffer::new(vec![0].into());
let list_array = ListArray::new(inner_field, offsets, values, None);
let array = Arc::new(list_array) as ArrayRef;
let expected: Vec<u8> = vec![
];
test_type_serializer(expected, &type_, &field, &array);
}
#[test]
fn test_serialize_list_empty_inner() {
let type_ = wrap_array(Type::Int32);
let values = Arc::new(Int32Array::from(Vec::<i32>::new())) as ArrayRef;
let offsets = OffsetBuffer::new(vec![0, 0].into());
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let field = Arc::new(Field::new("list", DataType::List(Arc::clone(&inner_field)), false));
let list_array = ListArray::new(inner_field, offsets, values, None);
let array = Arc::new(list_array) as ArrayRef;
let expected = vec![
0, 0, 0, 0, 0, 0, 0, 0,
];
test_type_serializer(expected, &type_, &field, &array);
}
#[test]
fn test_serialize_nested_list_int32() {
let type_ = wrap_array(Type::Array(Box::new(Type::Int32)));
let inner_values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])) as ArrayRef;
let inner_offsets = OffsetBuffer::new(vec![0, 2, 3, 5].into());
let inner_inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let inner_field = Arc::new(Field::new(
"inner_list",
DataType::List(Arc::clone(&inner_inner_field)),
false,
));
let inner_list_array = ListArray::new(inner_inner_field, inner_offsets, inner_values, None);
let outer_field =
Arc::new(Field::new("list", DataType::List(Arc::clone(&inner_field)), false));
let array = Arc::new(ListArray::new(
inner_field,
OffsetBuffer::new(vec![0, 2, 3].into()),
Arc::new(inner_list_array) as ArrayRef,
None,
)) as ArrayRef;
let expected = vec![
2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, ];
test_type_serializer(expected, &type_, &outer_field, &array);
}
#[test]
fn test_serialize_array_nullable_low_cardinality_string() {
let type_ =
wrap_array(Type::LowCardinality(Box::new(Type::Nullable(Box::new(Type::String)))));
let field = Arc::new(Field::new(
"array_low_cardinality_string_col",
DataType::List(Arc::new(Field::new(
"item",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
))),
false,
));
let array = Arc::new(
ListArray::try_new(
Arc::new(Field::new(
"item",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
)),
OffsetBuffer::new(vec![0, 2, 2, 3, 4, 6].into()),
Arc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(vec![
Some(0),
Some(1), Some(2), None, Some(0),
None, ]),
Arc::new(StringArray::from(vec!["low", "card", "test"])),
)
.unwrap(),
),
None,
)
.unwrap(),
) as ArrayRef;
let expected = vec![
2, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 3, 108, 111, 119, 4, 99, 97, 114, 100, 4, 116, 101, 115, 116, 6, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 0, 1, 0, ];
test_type_serializer(expected, &type_, &field, &array);
}
}