use arrow::array::*;
use tokio::io::AsyncWriteExt;
use crate::io::{ClickHouseBytesWrite, ClickHouseWrite};
use crate::{Error, Result, Type};
pub(super) async fn serialize_async<W: ClickHouseWrite>(
type_hint: &Type,
writer: &mut W,
values: &ArrayRef,
) -> Result<()> {
match type_hint.strip_null() {
Type::String | Type::Object => write_string_values(values, writer).await?,
Type::Binary => write_binary_values(values, writer).await?,
Type::FixedSizedString(len) => write_fixed_string_values(values, writer, *len).await?,
Type::FixedSizedBinary(len) => write_fixed_binary_values(values, writer, *len).await?,
_ => {
return Err(Error::ArrowSerialize(format!("Unsupported data type: {type_hint:?}")));
}
}
Ok(())
}
pub(super) fn serialize<W: ClickHouseBytesWrite>(
type_hint: &Type,
writer: &mut W,
values: &ArrayRef,
) -> Result<()> {
match type_hint.strip_null() {
Type::String | Type::Object => put_string_values(values, writer)?,
Type::Binary => put_binary_values(values, writer)?,
Type::FixedSizedString(len) => put_fixed_string_values(values, writer, *len)?,
Type::FixedSizedBinary(len) => put_fixed_binary_values(values, writer, *len)?,
_ => {
return Err(Error::ArrowSerialize(format!("Unsupported data type: {type_hint:?}")));
}
}
Ok(())
}
macro_rules! write_variable_values {
($name:ident, varlen $write_fn:ident, $def:expr, [$(($at:ty => $coerce:expr)),* $(,)?]) => {
async fn $name<W: ClickHouseWrite>(
column: &::arrow::array::ArrayRef,
writer: &mut W,
) -> Result<()> {
$(
if let Some(array) = column.as_any().downcast_ref::<$at>() {
for i in 0..array.len() {
let value = if array.is_null(i) {
$def
} else {
$coerce(array.value(i))
};
writer.$write_fn(value).await?;
}
return Ok(());
}
)*
Err($crate::Error::ArrowSerialize(
concat!("Expected one of: ", $(stringify!($at), " "),*).into()
))
}
};
}
macro_rules! put_variable_values {
($name:ident, varlen $write_fn:ident, $def:expr, [$(($at:ty => $coerce:expr)),* $(,)?]) => {
fn $name<W: $crate::io::ClickHouseBytesWrite>(
column: &::arrow::array::ArrayRef,
writer: &mut W,
) -> Result<()> {
$(
if let Some(array) = column.as_any().downcast_ref::<$at>() {
for i in 0..array.len() {
let value = if array.is_null(i) {
$def
} else {
$coerce(array.value(i))
};
writer.$write_fn(value)?;
}
return Ok(());
}
)*
Err($crate::Error::ArrowSerialize(
concat!("Expected one of: ", $(stringify!($at), " "),*).into()
))
}
};
}
macro_rules! write_fixed_values {
($name:ident, [$(($at:ty => $coerce:expr)),* $(,)?]) => {
async fn $name<W: ClickHouseWrite>(
column: &::arrow::array::ArrayRef,
writer: &mut W,
len: usize
) -> Result<()> {
let expected_len = len;
$(
if let Some(array) = column.as_any().downcast_ref::<$at>() {
for i in 0..array.len() {
if array.is_null(i) {
writer.write_all(&vec![0u8; expected_len]).await?;
continue;
}
let value = $coerce(array.value(i));
if value.len() != expected_len {
let mut padded = vec![0u8; expected_len];
let copy_len = value.len().min(expected_len);
padded[..copy_len].copy_from_slice(&value[..copy_len]);
writer.write_all(&padded).await?;
} else {
writer.write_all(&value).await?;
};
}
return Ok(());
}
)*
Err($crate::Error::ArrowSerialize(
concat!("Expected one of: ", $(stringify!($at), " "),*).into()
))
}
};
}
macro_rules! put_fixed_values {
($name:ident, [$(($at:ty => $coerce:expr)),* $(,)?]) => {
fn $name<W: $crate::io::ClickHouseBytesWrite>(
column: &::arrow::array::ArrayRef,
writer: &mut W,
len: usize
) -> Result<()> {
let expected_len = len;
$(
if let Some(array) = column.as_any().downcast_ref::<$at>() {
for i in 0..array.len() {
if array.is_null(i) {
writer.put_slice(&vec![0u8; expected_len]);
continue;
}
let value = $coerce(array.value(i));
if value.len() != expected_len {
let mut padded = vec![0u8; expected_len];
let copy_len = value.len().min(expected_len);
padded[..copy_len].copy_from_slice(&value[..copy_len]);
writer.put_slice(&padded);
} else {
writer.put_slice(&value);
};
}
return Ok(());
}
)*
Err($crate::Error::ArrowSerialize(
concat!("Expected one of: ", $(stringify!($at), " "),*).into()
))
}
};
}
write_variable_values!(write_string_values, varlen write_string, &[], [
(StringArray => as_bytes),
(BinaryArray => pass_through),
(StringViewArray => as_bytes),
(BinaryViewArray => pass_through),
(LargeStringArray => as_bytes),
(LargeBinaryArray => pass_through)
]);
write_variable_values!(write_binary_values, varlen write_string, &[], [
(BinaryArray => pass_through),
(StringArray => as_bytes),
(StringViewArray => as_bytes),
(BinaryViewArray => pass_through),
(LargeBinaryArray => pass_through),
(LargeStringArray => as_bytes)
]);
put_variable_values!(put_string_values, varlen put_string, &[], [
(StringArray => as_bytes),
(BinaryArray => pass_through),
(StringViewArray => as_bytes),
(BinaryViewArray => pass_through),
(LargeStringArray => as_bytes),
(LargeBinaryArray => pass_through)
]);
put_variable_values!(put_binary_values, varlen put_string, &[], [
(BinaryArray => pass_through),
(StringArray => as_bytes),
(StringViewArray => as_bytes),
(BinaryViewArray => pass_through),
(LargeBinaryArray => pass_through),
(LargeStringArray => as_bytes)
]);
write_fixed_values!(write_fixed_string_values, [
(StringArray => as_bytes),
(FixedSizeBinaryArray => pass_through),
(BinaryArray => pass_through),
(StringViewArray => as_bytes),
(BinaryViewArray => pass_through),
(LargeStringArray => as_bytes),
(LargeBinaryArray => pass_through)
]);
write_fixed_values!(write_fixed_binary_values, [
(FixedSizeBinaryArray => pass_through),
(BinaryArray => pass_through),
(LargeBinaryArray => pass_through),
(BinaryViewArray => pass_through),
(StringArray => as_bytes),
(StringViewArray => as_bytes),
(LargeStringArray => as_bytes)
]);
put_fixed_values!(put_fixed_string_values, [
(StringArray => as_bytes),
(FixedSizeBinaryArray => pass_through),
(BinaryArray => pass_through),
(StringViewArray => as_bytes),
(BinaryViewArray => pass_through),
(LargeStringArray => as_bytes),
(LargeBinaryArray => pass_through)
]);
put_fixed_values!(put_fixed_binary_values, [
(FixedSizeBinaryArray => pass_through),
(BinaryArray => pass_through),
(LargeBinaryArray => pass_through),
(BinaryViewArray => pass_through),
(StringArray => as_bytes),
(StringViewArray => as_bytes),
(LargeStringArray => as_bytes)
]);
fn pass_through(v: &[u8]) -> &[u8] { v }
fn as_bytes(v: &str) -> &[u8] { v.as_bytes() }
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::{BinaryArray, FixedSizeBinaryArray, Int32Array, StringArray};
use super::*;
type MockWriter = Vec<u8>;
#[tokio::test]
async fn test_serialize_string() {
let column =
Arc::new(StringArray::from(vec![Some("hello"), None, Some("world")])) as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::String, &mut writer, &column).await.unwrap();
let expected = vec![
5, 104, 101, 108, 108, 111, 0, 5, 119, 111, 114, 108, 100, ];
assert_eq!(writer, expected);
}
#[tokio::test]
async fn test_serialize_string_empty_and_large() {
let large_string = "x".repeat(128); let column = Arc::new(StringArray::from(vec![Some(""), Some(&large_string), Some("abc")]))
as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::String, &mut writer, &column).await.unwrap();
let mut expected = vec![0]; expected.extend(vec![128, 1]); expected.extend(vec![120; 128]); expected.extend(vec![3, 97, 98, 99]); assert_eq!(writer, expected);
}
#[tokio::test]
async fn test_serialize_string_unicode() {
let column = Arc::new(StringArray::from(vec![Some("こんにちは"), Some("")])) as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::String, &mut writer, &column).await.unwrap();
let expected = vec![
15, 227, 129, 147, 227, 130, 147, 227, 129, 171, 227, 129, 161, 227, 129,
175, 0, ];
assert_eq!(writer, expected);
}
#[tokio::test]
async fn test_serialize_binary() {
let column =
Arc::new(BinaryArray::from(vec![Some(b"abc".as_ref()), None, Some(b"def".as_ref())]))
as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::Binary, &mut writer, &column).await.unwrap();
let expected = vec![
3, 97, 98, 99, 0, 3, 100, 101, 102, ];
assert_eq!(writer, expected);
}
#[tokio::test]
async fn test_serialize_binary_empty_and_large() {
let large_binary = vec![255; 128]; let column = Arc::new(BinaryArray::from(vec![
Some(b"".as_ref()),
Some(large_binary.as_slice()),
Some(b"abc".as_ref()),
])) as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::Binary, &mut writer, &column).await.unwrap();
let mut expected = vec![0]; expected.extend(vec![128, 1]); expected.extend(vec![255; 128]); expected.extend(vec![3, 97, 98, 99]); assert_eq!(writer, expected);
}
#[tokio::test]
async fn test_serialize_fixed_string() {
let column = Arc::new(StringArray::from(vec!["abc", "de", "fghij"])) as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::FixedSizedString(5), &mut writer, &column).await.unwrap();
let expected = vec![
97, 98, 99, 0, 0, 100, 101, 0, 0, 0, 102, 103, 104, 105, 106, ];
assert_eq!(writer, expected);
}
#[tokio::test]
async fn test_serialize_fixed_string_short_and_null() {
let column = Arc::new(StringArray::from(vec![Some("a"), None, Some("bc")])) as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::FixedSizedString(3), &mut writer, &column).await.unwrap();
let expected = vec![
97, 0, 0, 0, 0, 0, 98, 99, 0, ];
assert_eq!(writer, expected);
}
#[tokio::test]
async fn test_serialize_fixed_string_oversized() {
let column = Arc::new(StringArray::from(vec!["abcdef"])) as ArrayRef;
let mut writer = MockWriter::new();
let result = serialize_async(&Type::FixedSizedString(3), &mut writer, &column).await;
assert!(result.is_ok(), "Expected truncated string");
}
#[tokio::test]
async fn test_serialize_fixed_binary() {
let column = Arc::new(
FixedSizeBinaryArray::try_from_iter(
vec![b"abc".as_ref(), b"def".as_ref(), b"ghi".as_ref()].into_iter(),
)
.unwrap(),
) as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::FixedSizedBinary(5), &mut writer, &column).await.unwrap();
let expected = vec![
97, 98, 99, 0, 0, 100, 101, 102, 0, 0, 103, 104, 105, 0, 0, ];
assert_eq!(writer, expected);
}
#[tokio::test]
async fn test_serialize_fixed_binary_null() {
let column = Arc::new(
FixedSizeBinaryArray::try_from_sparse_iter_with_size(
vec![Some(b"ab".as_ref()), None, Some(b"cd".as_ref())].into_iter(),
2,
)
.unwrap(),
) as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::FixedSizedBinary(3), &mut writer, &column).await.unwrap();
let expected = vec![
97, 98, 0, 0, 0, 0, 99, 100, 0, ];
assert_eq!(writer, expected);
}
#[tokio::test]
async fn test_serialize_fixed_binary_oversized() {
let column = Arc::new(
FixedSizeBinaryArray::try_from_iter(vec![b"abcd".as_ref()].into_iter()).unwrap(),
) as ArrayRef;
let mut writer = MockWriter::new();
let result = serialize_async(&Type::FixedSizedBinary(3), &mut writer, &column).await;
assert!(result.is_ok(), "Expected truncated string");
}
#[tokio::test]
async fn test_serialize_empty_string() {
let column = Arc::new(StringArray::from(Vec::<String>::new())) as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::String, &mut writer, &column).await.unwrap();
assert!(writer.is_empty());
}
#[tokio::test]
async fn test_serialize_empty_binary() {
let column = Arc::new(BinaryArray::from(Vec::<Option<&[u8]>>::new())) as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::Binary, &mut writer, &column).await.unwrap();
assert!(writer.is_empty());
}
#[tokio::test]
async fn test_serialize_empty_fixed_string() {
let column = Arc::new(StringArray::from(Vec::<String>::new())) as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::FixedSizedString(3), &mut writer, &column).await.unwrap();
assert!(writer.is_empty());
}
#[tokio::test]
async fn test_serialize_null_only_string() {
let column =
Arc::new(StringArray::from(Vec::<Option<String>>::from([None, None]))) as ArrayRef;
let mut writer = MockWriter::new();
serialize_async(&Type::String, &mut writer, &column).await.unwrap();
let expected = vec![0, 0]; assert_eq!(writer, expected);
}
#[tokio::test]
async fn test_serialize_unsupported_type() {
let column = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
let mut writer = MockWriter::new();
let result = serialize_async(&Type::String, &mut writer, &column).await;
assert!(matches!(
result,
Err(Error::ArrowSerialize(msg))
if msg.contains("Expected one of")
));
}
#[tokio::test]
async fn test_serialize_invalid_array_type() {
let column = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
let mut writer = MockWriter::new();
let result = serialize_async(&Type::String, &mut writer, &column).await;
assert!(matches!(
result,
Err(Error::ArrowSerialize(msg))
if msg.contains("Expected one of")
));
}
}
#[cfg(test)]
mod tests_sync {
use std::sync::Arc;
use arrow::array::{BinaryArray, FixedSizeBinaryArray, Int32Array, StringArray};
use super::*;
type MockWriter = Vec<u8>;
#[test]
fn test_serialize_string() {
let column =
Arc::new(StringArray::from(vec![Some("hello"), None, Some("world")])) as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::String, &mut writer, &column).unwrap();
let expected = vec![
5, 104, 101, 108, 108, 111, 0, 5, 119, 111, 114, 108, 100, ];
assert_eq!(writer, expected);
}
#[test]
fn test_serialize_string_empty_and_large() {
let large_string = "x".repeat(128); let column = Arc::new(StringArray::from(vec![Some(""), Some(&large_string), Some("abc")]))
as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::String, &mut writer, &column).unwrap();
let mut expected = vec![0]; expected.extend(vec![128, 1]); expected.extend(vec![120; 128]); expected.extend(vec![3, 97, 98, 99]); assert_eq!(writer, expected);
}
#[test]
fn test_serialize_string_unicode() {
let column = Arc::new(StringArray::from(vec![Some("こんにちは"), Some("")])) as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::String, &mut writer, &column).unwrap();
let expected = vec![
15, 227, 129, 147, 227, 130, 147, 227, 129, 171, 227, 129, 161, 227, 129,
175, 0, ];
assert_eq!(writer, expected);
}
#[test]
fn test_serialize_binary() {
let column =
Arc::new(BinaryArray::from(vec![Some(b"abc".as_ref()), None, Some(b"def".as_ref())]))
as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::Binary, &mut writer, &column).unwrap();
let expected = vec![
3, 97, 98, 99, 0, 3, 100, 101, 102, ];
assert_eq!(writer, expected);
}
#[test]
fn test_serialize_binary_empty_and_large() {
let large_binary = vec![255; 128]; let column = Arc::new(BinaryArray::from(vec![
Some(b"".as_ref()),
Some(large_binary.as_slice()),
Some(b"abc".as_ref()),
])) as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::Binary, &mut writer, &column).unwrap();
let mut expected = vec![0]; expected.extend(vec![128, 1]); expected.extend(vec![255; 128]); expected.extend(vec![3, 97, 98, 99]); assert_eq!(writer, expected);
}
#[test]
fn test_serialize_fixed_string() {
let column = Arc::new(StringArray::from(vec!["abc", "de", "fghij"])) as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::FixedSizedString(5), &mut writer, &column).unwrap();
let expected = vec![
97, 98, 99, 0, 0, 100, 101, 0, 0, 0, 102, 103, 104, 105, 106, ];
assert_eq!(writer, expected);
}
#[test]
fn test_serialize_fixed_string_short_and_null() {
let column = Arc::new(StringArray::from(vec![Some("a"), None, Some("bc")])) as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::FixedSizedString(3), &mut writer, &column).unwrap();
let expected = vec![
97, 0, 0, 0, 0, 0, 98, 99, 0, ];
assert_eq!(writer, expected);
}
#[test]
fn test_serialize_fixed_string_oversized() {
let column = Arc::new(StringArray::from(vec!["abcdef"])) as ArrayRef;
let mut writer = MockWriter::new();
let result = serialize(&Type::FixedSizedString(3), &mut writer, &column);
assert!(result.is_ok(), "Expected truncated string");
}
#[test]
fn test_serialize_fixed_binary() {
let column = Arc::new(
FixedSizeBinaryArray::try_from_iter(
vec![b"abc".as_ref(), b"def".as_ref(), b"ghi".as_ref()].into_iter(),
)
.unwrap(),
) as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::FixedSizedBinary(5), &mut writer, &column).unwrap();
let expected = vec![
97, 98, 99, 0, 0, 100, 101, 102, 0, 0, 103, 104, 105, 0, 0, ];
assert_eq!(writer, expected);
}
#[test]
fn test_serialize_fixed_binary_null() {
let column = Arc::new(
FixedSizeBinaryArray::try_from_sparse_iter_with_size(
vec![Some(b"ab".as_ref()), None, Some(b"cd".as_ref())].into_iter(),
2,
)
.unwrap(),
) as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::FixedSizedBinary(3), &mut writer, &column).unwrap();
let expected = vec![
97, 98, 0, 0, 0, 0, 99, 100, 0, ];
assert_eq!(writer, expected);
}
#[test]
fn test_serialize_fixed_binary_oversized() {
let column = Arc::new(
FixedSizeBinaryArray::try_from_iter(vec![b"abcd".as_ref()].into_iter()).unwrap(),
) as ArrayRef;
let mut writer = MockWriter::new();
let result = serialize(&Type::FixedSizedBinary(3), &mut writer, &column);
assert!(result.is_ok(), "Expected truncated string");
}
#[test]
fn test_serialize_empty_string() {
let column = Arc::new(StringArray::from(Vec::<String>::new())) as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::String, &mut writer, &column).unwrap();
assert!(writer.is_empty());
}
#[test]
fn test_serialize_empty_binary() {
let column = Arc::new(BinaryArray::from(Vec::<Option<&[u8]>>::new())) as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::Binary, &mut writer, &column).unwrap();
assert!(writer.is_empty());
}
#[test]
fn test_serialize_empty_fixed_string() {
let column = Arc::new(StringArray::from(Vec::<String>::new())) as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::FixedSizedString(3), &mut writer, &column).unwrap();
assert!(writer.is_empty());
}
#[test]
fn test_serialize_null_only_string() {
let column =
Arc::new(StringArray::from(Vec::<Option<String>>::from([None, None]))) as ArrayRef;
let mut writer = MockWriter::new();
serialize(&Type::String, &mut writer, &column).unwrap();
let expected = vec![0, 0]; assert_eq!(writer, expected);
}
#[test]
fn test_serialize_unsupported_type() {
let column = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
let mut writer = MockWriter::new();
let result = serialize(&Type::String, &mut writer, &column);
assert!(matches!(
result,
Err(Error::ArrowSerialize(msg))
if msg.contains("Expected one of")
));
}
#[test]
fn test_serialize_invalid_array_type() {
let column = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
let mut writer = MockWriter::new();
let result = serialize(&Type::String, &mut writer, &column);
assert!(matches!(
result,
Err(Error::ArrowSerialize(msg))
if msg.contains("Expected one of")
));
}
}