mod binary;
mod enums;
mod list;
mod low_cardinality;
mod map;
mod null;
mod primitive;
mod tuple;
use arrow::array::*;
use arrow::datatypes::*;
use crate::formats::SerializerState;
use crate::geo::normalize_geo_type;
use crate::io::{ClickHouseBytesWrite, ClickHouseWrite};
use crate::{Result, Type};
pub(crate) trait ClickHouseArrowSerializer {
async fn serialize_async<W: ClickHouseWrite>(
&self,
writer: &mut W,
column: &ArrayRef,
data_type: &DataType,
state: &mut SerializerState,
) -> Result<()>;
fn serialize<W: ClickHouseBytesWrite>(
&self,
writer: &mut W,
column: &ArrayRef,
data_type: &DataType,
state: &mut SerializerState,
) -> Result<()>;
}
impl ClickHouseArrowSerializer for Type {
async fn serialize_async<W: ClickHouseWrite>(
&self,
writer: &mut W,
column: &ArrayRef,
data_type: &DataType,
state: &mut SerializerState,
) -> Result<()> {
let base_type = self.strip_null();
if self.is_nullable() {
null::serialize_nulls_async(self, writer, column, state).await?;
}
match base_type {
Type::Int8
| Type::Int16
| Type::Int32
| Type::Int64
| Type::Int128
| Type::Int256
| Type::UInt8
| Type::UInt16
| Type::UInt32
| Type::UInt64
| Type::UInt128
| Type::UInt256
| Type::Float32
| Type::Float64
| Type::Decimal32(_)
| Type::Decimal64(_)
| Type::Decimal128(_)
| Type::Decimal256(_)
| Type::Date
| Type::Date32
| Type::DateTime(_)
| Type::DateTime64(_, _)
| Type::Ipv4
| Type::Ipv6
| Type::Uuid => {
primitive::serialize_async(self, writer, column, data_type).await?;
}
Type::String
| Type::Binary
| Type::FixedSizedString(_)
| Type::FixedSizedBinary(_)
| Type::Object => {
binary::serialize_async(self, writer, column).await?;
}
Type::Enum8(_) | Type::Enum16(_) => {
enums::serialize_async(self, writer, column).await?;
}
Type::LowCardinality(_) => {
Box::pin(low_cardinality::serialize_async(self, writer, column, data_type, state))
.await?;
}
Type::Array(_) => {
Box::pin(list::serialize_async(self, writer, column, data_type, state)).await?;
}
Type::Map(_, _) => {
Box::pin(map::serialize_async(self, writer, column, data_type, state)).await?;
}
Type::Tuple(_) => {
Box::pin(tuple::serialize_async(self, writer, column, state)).await?;
}
Type::Ring | Type::Polygon | Type::Point | Type::MultiPolygon => {
let normalized = normalize_geo_type(base_type).unwrap();
Box::pin(normalized.serialize_async(writer, column, data_type, state)).await?;
}
Type::Nullable(_) => unreachable!(),
}
Ok(())
}
fn serialize<W: ClickHouseBytesWrite>(
&self,
writer: &mut W,
column: &ArrayRef,
data_type: &DataType,
state: &mut SerializerState,
) -> Result<()> {
let base_type = self.strip_null();
if self.is_nullable() {
null::serialize_nulls(self, writer, column, state);
}
match base_type {
Type::Int8
| Type::Int16
| Type::Int32
| Type::Int64
| Type::Int128
| Type::Int256
| Type::UInt8
| Type::UInt16
| Type::UInt32
| Type::UInt64
| Type::UInt128
| Type::UInt256
| Type::Float32
| Type::Float64
| Type::Decimal32(_)
| Type::Decimal64(_)
| Type::Decimal128(_)
| Type::Decimal256(_)
| Type::Date
| Type::Date32
| Type::DateTime(_)
| Type::DateTime64(_, _)
| Type::Ipv4
| Type::Ipv6
| Type::Uuid => {
primitive::serialize(self, writer, column, data_type)?;
}
Type::String
| Type::Binary
| Type::FixedSizedString(_)
| Type::FixedSizedBinary(_)
| Type::Object => {
binary::serialize(self, writer, column)?;
}
Type::Enum8(_) | Type::Enum16(_) => enums::serialize(self, writer, column)?,
Type::LowCardinality(_) => {
low_cardinality::serialize(self, writer, column, data_type, state)?;
}
Type::Array(_) => {
list::serialize(self, writer, column, data_type, state)?;
}
Type::Map(_, _) => {
map::serialize(self, writer, column, data_type, state)?;
}
Type::Tuple(_) => {
tuple::serialize(self, writer, column, state)?;
}
Type::Ring | Type::Polygon | Type::Point | Type::MultiPolygon => {
let normalized = normalize_geo_type(base_type).unwrap();
normalized.serialize(writer, column, data_type, state)?;
}
Type::Nullable(_) => unreachable!(),
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use std::sync::Arc;
use arrow::array::*;
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::datatypes::{DataType, Field, Fields};
use super::*;
use crate::ArrowOptions;
use crate::arrow::deserialize::ClickHouseArrowDeserializer;
use crate::arrow::types::{
LIST_ITEM_FIELD_NAME, MAP_FIELD_NAME, STRUCT_KEY_FIELD_NAME, STRUCT_VALUE_FIELD_NAME,
};
use crate::native::types::Type;
#[tokio::test]
async fn test_serialize_int32() {
let column = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
Type::Int32
.serialize_async(&mut buffer, &column, &DataType::Int32, &mut state)
.await
.unwrap();
let output = buffer.into_inner();
assert_eq!(output, vec![
1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, ]);
}
#[tokio::test]
async fn test_serialize_nullable_int32() {
let column = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
Type::Nullable(Box::new(Type::Int32))
.serialize_async(&mut buffer, &column, &DataType::Int32, &mut state)
.await
.unwrap();
let output = buffer.into_inner();
assert_eq!(output, vec![
0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, ]);
}
#[tokio::test]
async fn test_serialize_string() {
let column = Arc::new(StringArray::from(vec!["hello", "", "world"])) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
Type::String
.serialize_async(&mut buffer, &column, &DataType::Utf8, &mut state)
.await
.unwrap();
let output = buffer.into_inner();
assert_eq!(output, vec![
5, b'h', b'e', b'l', b'l', b'o', 0, 5, b'w', b'o', b'r', b'l', b'd', ]);
}
#[tokio::test]
async fn test_serialize_nullable_string() {
let column = Arc::new(StringArray::from(vec![Some("a"), None, Some("c")])) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
Type::Nullable(Box::new(Type::String))
.serialize_async(&mut buffer, &column, &DataType::Utf8, &mut state)
.await
.unwrap();
let output = buffer.into_inner();
assert_eq!(output, vec![
0, 1, 0, 1, b'a', 0, 1, b'c', ]);
}
#[tokio::test]
async fn test_serialize_array_int32() {
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
let offsets = OffsetBuffer::new(vec![0, 2, 3, 5].into());
let column =
Arc::new(ListArray::new(Arc::clone(&inner_field), offsets, values, None)) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
Type::Array(Box::new(Type::Int32))
.serialize_async(&mut buffer, &column, &DataType::List(inner_field), &mut state)
.await
.unwrap();
let output = buffer.into_inner();
assert_eq!(output, vec![
2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, ]);
}
#[tokio::test]
async fn test_serialize_nullable_array_int32() {
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let field = Field::new("col", DataType::List(Arc::clone(&inner_field)), true);
let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
let offsets = OffsetBuffer::new(vec![0, 2, 2, 5].into());
let null_buffer = Some(NullBuffer::from(vec![true, false, true]));
let column =
Arc::new(ListArray::new(inner_field, offsets, values, null_buffer)) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
Type::Nullable(Box::new(Type::Array(Box::new(Type::Int32))))
.serialize_async(&mut buffer, &column, field.data_type(), &mut state)
.await
.unwrap();
let output = buffer.into_inner();
assert_eq!(output, vec![
2, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, ]);
}
#[tokio::test]
async fn test_serialize_map_string_int32() {
let key_field = Field::new(STRUCT_KEY_FIELD_NAME, DataType::Utf8, false);
let value_field = Field::new(STRUCT_VALUE_FIELD_NAME, DataType::Int32, false);
let struct_field = Arc::new(Field::new(
MAP_FIELD_NAME,
DataType::Struct(Fields::from(vec![key_field.clone(), value_field.clone()])),
false,
));
let field = Field::new("col", DataType::Map(Arc::clone(&struct_field), false), false);
let keys = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
let struct_array = StructArray::from(vec![
(Arc::new(key_field), keys as ArrayRef),
(Arc::new(value_field), values as ArrayRef),
]);
let offsets = OffsetBuffer::new(vec![0, 2, 3, 5].into());
let column =
Arc::new(MapArray::new(struct_field, offsets, struct_array, None, false)) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
Type::Map(Box::new(Type::String), Box::new(Type::Int32))
.serialize_async(&mut buffer, &column, field.data_type(), &mut state)
.await
.unwrap();
let output = buffer.into_inner();
assert_eq!(output, vec![
2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 1, b'a', 1, b'b', 1, b'c', 1, b'd', 1, b'e', 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, ]);
}
#[tokio::test]
async fn test_serialize_int32_zero_rows() {
let field = Field::new("col", DataType::Int32, false);
let column = Arc::new(Int32Array::from(Vec::<i32>::new())) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
Type::Int32
.serialize_async(&mut buffer, &column, field.data_type(), &mut state)
.await
.unwrap();
let output = buffer.into_inner();
assert!(output.is_empty()); }
#[tokio::test]
async fn test_serialize_list_zero_rows() {
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let data_type = DataType::List(Arc::clone(&inner_field));
let values = Arc::new(Int32Array::from(Vec::<i32>::new())) as ArrayRef;
let offsets = OffsetBuffer::new(vec![0].into());
let array = ListArray::new(inner_field, offsets, values, None);
let column = Arc::new(array) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
let type_ = Type::Array(Type::Int32.into());
type_.serialize_async(&mut buffer, &column, &data_type, &mut state).await.unwrap();
let output = buffer.into_inner();
assert!(output.is_empty()); }
#[tokio::test]
async fn test_serialize_lowcard_zero_rows() {
let type_ = Type::LowCardinality(Box::new(Type::String));
let data_type = DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into());
let array = Arc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(Vec::<i32>::new()),
Arc::new(StringArray::from(Vec::<&str>::new())),
)
.unwrap(),
) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default()
.with_arrow_options(ArrowOptions::default().with_strings_as_strings(true));
type_.serialize_async(&mut buffer, &array, &data_type, &mut state).await.unwrap();
let output = buffer.into_inner();
assert!(output.is_empty()); }
#[tokio::test]
async fn test_serialize_point() {
use std::io::Read;
let type_ = Type::Point;
let (data_type, _) = type_.arrow_type(None).unwrap();
let x_values = Float64Array::from(vec![1.0, 2.5, 3.7]);
let y_values = Float64Array::from(vec![10.0, 20.5, 30.7]);
let fields = Fields::from(vec![
Field::new("x", DataType::Float64, false),
Field::new("y", DataType::Float64, false),
]);
let struct_array = StructArray::new(
fields,
vec![Arc::new(x_values) as ArrayRef, Arc::new(y_values) as ArrayRef],
None,
);
let column = Arc::new(struct_array) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
type_.serialize_async(&mut buffer, &column, &data_type, &mut state).await.unwrap();
let output = buffer.into_inner();
assert_eq!(output.len(), 6 * 8);
let mut cursor = Cursor::new(output);
let mut bytes = [0u8; 8];
cursor.read_exact(&mut bytes).unwrap();
assert!((f64::from_le_bytes(bytes) - 1.0).abs() < f64::EPSILON);
cursor.read_exact(&mut bytes).unwrap();
assert!((f64::from_le_bytes(bytes) - 2.5).abs() < f64::EPSILON);
cursor.read_exact(&mut bytes).unwrap();
assert!((f64::from_le_bytes(bytes) - 3.7).abs() < f64::EPSILON);
cursor.read_exact(&mut bytes).unwrap();
assert!((f64::from_le_bytes(bytes) - 10.0).abs() < f64::EPSILON);
cursor.read_exact(&mut bytes).unwrap();
assert!((f64::from_le_bytes(bytes) - 20.5).abs() < f64::EPSILON);
cursor.read_exact(&mut bytes).unwrap();
assert!((f64::from_le_bytes(bytes) - 30.7).abs() < f64::EPSILON);
}
#[tokio::test]
async fn test_serialize_ring() {
use std::io::Read;
let type_ = Type::Ring;
let (data_type, _) = type_.arrow_type(None).unwrap();
let x_values = Float64Array::from(vec![0.0, 1.0, 0.5, 0.0]);
let y_values = Float64Array::from(vec![0.0, 0.0, 1.0, 0.0]);
let fields = Fields::from(vec![
Field::new("x", DataType::Float64, false),
Field::new("y", DataType::Float64, false),
]);
let struct_array = StructArray::new(
fields,
vec![Arc::new(x_values) as ArrayRef, Arc::new(y_values) as ArrayRef],
None,
);
let offsets = OffsetBuffer::new(vec![0, 4].into());
let list_array = ListArray::new(
Arc::new(Field::new("item", struct_array.data_type().clone(), false)),
offsets,
Arc::new(struct_array) as ArrayRef,
None,
);
let column = Arc::new(list_array) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
type_.serialize_async(&mut buffer, &column, &data_type, &mut state).await.unwrap();
let output = buffer.into_inner();
assert_eq!(output.len(), 8 + 4 * 2 * 8);
let mut cursor = Cursor::new(output);
let mut bytes = [0u8; 8];
cursor.read_exact(&mut bytes).unwrap();
assert_eq!(u64::from_le_bytes(bytes), 4);
}
#[tokio::test]
async fn test_serialize_polygon() {
use std::io::Read;
let type_ = Type::Polygon;
let (data_type, _) = type_.arrow_type(None).unwrap();
let x_values = Float64Array::from(vec![0.0, 1.0, 0.5, 0.0]);
let y_values = Float64Array::from(vec![0.0, 0.0, 1.0, 0.0]);
let fields = Fields::from(vec![
Field::new("x", DataType::Float64, false),
Field::new("y", DataType::Float64, false),
]);
let struct_array = StructArray::new(
fields,
vec![Arc::new(x_values) as ArrayRef, Arc::new(y_values) as ArrayRef],
None,
);
let ring_offsets = OffsetBuffer::new(vec![0, 4].into());
let ring_array = ListArray::new(
Arc::new(Field::new("item", struct_array.data_type().clone(), false)),
ring_offsets,
Arc::new(struct_array) as ArrayRef,
None,
);
let polygon_offsets = OffsetBuffer::new(vec![0, 1].into());
let polygon_array = ListArray::new(
Arc::new(Field::new("item", ring_array.data_type().clone(), false)),
polygon_offsets,
Arc::new(ring_array) as ArrayRef,
None,
);
let column = Arc::new(polygon_array) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
type_.serialize_async(&mut buffer, &column, &data_type, &mut state).await.unwrap();
let output = buffer.into_inner();
assert_eq!(output.len(), 8 + 8 + 4 * 2 * 8);
let mut cursor = Cursor::new(output);
let mut bytes = [0u8; 8];
cursor.read_exact(&mut bytes).unwrap();
assert_eq!(u64::from_le_bytes(bytes), 1);
cursor.read_exact(&mut bytes).unwrap();
assert_eq!(u64::from_le_bytes(bytes), 4);
}
#[tokio::test]
async fn test_serialize_multipolygon() {
use std::io::Read;
let type_ = Type::MultiPolygon;
let (data_type, _) = type_.arrow_type(None).unwrap();
let x_values = Float64Array::from(vec![0.0, 1.0, 0.5, 0.0]);
let y_values = Float64Array::from(vec![0.0, 0.0, 1.0, 0.0]);
let fields = Fields::from(vec![
Field::new("x", DataType::Float64, false),
Field::new("y", DataType::Float64, false),
]);
let struct_array = StructArray::new(
fields,
vec![Arc::new(x_values) as ArrayRef, Arc::new(y_values) as ArrayRef],
None,
);
let ring_offsets = OffsetBuffer::new(vec![0, 4].into());
let ring_array = ListArray::new(
Arc::new(Field::new("item", struct_array.data_type().clone(), false)),
ring_offsets,
Arc::new(struct_array) as ArrayRef,
None,
);
let polygon_offsets = OffsetBuffer::new(vec![0, 1].into());
let polygon_array = ListArray::new(
Arc::new(Field::new("item", ring_array.data_type().clone(), false)),
polygon_offsets,
Arc::new(ring_array) as ArrayRef,
None,
);
let multipolygon_offsets = OffsetBuffer::new(vec![0, 1].into());
let multipolygon_array = ListArray::new(
Arc::new(Field::new("item", polygon_array.data_type().clone(), false)),
multipolygon_offsets,
Arc::new(polygon_array) as ArrayRef,
None,
);
let column = Arc::new(multipolygon_array) as ArrayRef;
let mut buffer = Cursor::new(Vec::new());
let mut state = SerializerState::default();
type_.serialize_async(&mut buffer, &column, &data_type, &mut state).await.unwrap();
let output = buffer.into_inner();
assert_eq!(output.len(), 8 + 8 + 8 + 4 * 2 * 8);
let mut cursor = Cursor::new(output);
let mut bytes = [0u8; 8];
cursor.read_exact(&mut bytes).unwrap();
assert_eq!(u64::from_le_bytes(bytes), 1);
cursor.read_exact(&mut bytes).unwrap();
assert_eq!(u64::from_le_bytes(bytes), 1);
cursor.read_exact(&mut bytes).unwrap();
assert_eq!(u64::from_le_bytes(bytes), 4);
}
}
#[cfg(test)]
mod tests_sync {
use std::sync::Arc;
use arrow::array::*;
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::datatypes::{DataType, Field, Fields};
use super::*;
use crate::ArrowOptions;
use crate::arrow::deserialize::ClickHouseArrowDeserializer;
use crate::arrow::types::{
LIST_ITEM_FIELD_NAME, MAP_FIELD_NAME, STRUCT_KEY_FIELD_NAME, STRUCT_VALUE_FIELD_NAME,
};
use crate::native::types::Type;
#[test]
fn test_serialize_int32() {
let column = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
let mut buffer = Vec::new();
let mut state = SerializerState::default();
Type::Int32.serialize(&mut buffer, &column, &DataType::Int32, &mut state).unwrap();
assert_eq!(buffer, vec![
1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, ]);
}
#[test]
fn test_serialize_nullable_int32() {
let column = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as ArrayRef;
let mut buffer = Vec::new();
let mut state = SerializerState::default();
Type::Nullable(Box::new(Type::Int32))
.serialize(&mut buffer, &column, &DataType::Int32, &mut state)
.unwrap();
assert_eq!(buffer, vec![
0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, ]);
}
#[test]
fn test_serialize_string() {
let column = Arc::new(StringArray::from(vec!["hello", "", "world"])) as ArrayRef;
let mut buffer = Vec::new();
let mut state = SerializerState::default();
Type::String.serialize(&mut buffer, &column, &DataType::Utf8, &mut state).unwrap();
assert_eq!(buffer, vec![
5, b'h', b'e', b'l', b'l', b'o', 0, 5, b'w', b'o', b'r', b'l', b'd', ]);
}
#[test]
fn test_serialize_nullable_string() {
let column = Arc::new(StringArray::from(vec![Some("a"), None, Some("c")])) as ArrayRef;
let mut buffer = Vec::new();
let mut state = SerializerState::default();
Type::Nullable(Box::new(Type::String))
.serialize(&mut buffer, &column, &DataType::Utf8, &mut state)
.unwrap();
assert_eq!(buffer, vec![
0, 1, 0, 1, b'a', 0, 1, b'c', ]);
}
#[test]
fn test_serialize_array_int32() {
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
let offsets = OffsetBuffer::new(vec![0, 2, 3, 5].into());
let column =
Arc::new(ListArray::new(Arc::clone(&inner_field), offsets, values, None)) as ArrayRef;
let mut buffer = Vec::new();
let mut state = SerializerState::default();
Type::Array(Box::new(Type::Int32))
.serialize(&mut buffer, &column, &DataType::List(inner_field), &mut state)
.unwrap();
assert_eq!(buffer, vec![
2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, ]);
}
#[test]
fn test_serialize_nullable_array_int32() {
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let field = Field::new("col", DataType::List(Arc::clone(&inner_field)), true);
let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
let offsets = OffsetBuffer::new(vec![0, 2, 2, 5].into());
let null_buffer = Some(NullBuffer::from(vec![true, false, true]));
let column =
Arc::new(ListArray::new(inner_field, offsets, values, null_buffer)) as ArrayRef;
let mut buffer = Vec::new();
let mut state = SerializerState::default();
Type::Nullable(Box::new(Type::Array(Box::new(Type::Int32))))
.serialize(&mut buffer, &column, field.data_type(), &mut state)
.unwrap();
assert_eq!(buffer, vec![
2, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, ]);
}
#[test]
fn test_serialize_map_string_int32() {
let key_field = Field::new(STRUCT_KEY_FIELD_NAME, DataType::Utf8, false);
let value_field = Field::new(STRUCT_VALUE_FIELD_NAME, DataType::Int32, false);
let struct_field = Arc::new(Field::new(
MAP_FIELD_NAME,
DataType::Struct(Fields::from(vec![key_field.clone(), value_field.clone()])),
false,
));
let field = Field::new("col", DataType::Map(Arc::clone(&struct_field), false), false);
let keys = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
let struct_array = StructArray::from(vec![
(Arc::new(key_field), keys as ArrayRef),
(Arc::new(value_field), values as ArrayRef),
]);
let offsets = OffsetBuffer::new(vec![0, 2, 3, 5].into());
let column =
Arc::new(MapArray::new(struct_field, offsets, struct_array, None, false)) as ArrayRef;
let mut buffer = Vec::new();
let mut state = SerializerState::default();
Type::Map(Box::new(Type::String), Box::new(Type::Int32))
.serialize(&mut buffer, &column, field.data_type(), &mut state)
.unwrap();
assert_eq!(buffer, vec![
2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 1, b'a', 1, b'b', 1, b'c', 1, b'd', 1, b'e', 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, ]);
}
#[test]
fn test_serialize_int32_zero_rows() {
let field = Field::new("col", DataType::Int32, false);
let column = Arc::new(Int32Array::from(Vec::<i32>::new())) as ArrayRef;
let mut buffer = Vec::new();
let mut state = SerializerState::default();
Type::Int32.serialize(&mut buffer, &column, field.data_type(), &mut state).unwrap();
assert!(buffer.is_empty()); }
#[test]
fn test_serialize_list_zero_rows() {
let inner_field = Arc::new(Field::new(LIST_ITEM_FIELD_NAME, DataType::Int32, false));
let data_type = DataType::List(Arc::clone(&inner_field));
let values = Arc::new(Int32Array::from(Vec::<i32>::new())) as ArrayRef;
let offsets = OffsetBuffer::new(vec![0].into());
let array = ListArray::new(inner_field, offsets, values, None);
let column = Arc::new(array) as ArrayRef;
let mut buffer = Vec::new();
let mut state = SerializerState::default();
let type_ = Type::Array(Type::Int32.into());
type_.serialize(&mut buffer, &column, &data_type, &mut state).unwrap();
assert!(buffer.is_empty()); }
#[test]
fn test_serialize_lowcard_zero_rows() {
let type_ = Type::LowCardinality(Box::new(Type::String));
let data_type = DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into());
let array = Arc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(Vec::<i32>::new()),
Arc::new(StringArray::from(Vec::<&str>::new())),
)
.unwrap(),
) as ArrayRef;
let mut buffer = Vec::new();
let mut state = SerializerState::default()
.with_arrow_options(ArrowOptions::default().with_strings_as_strings(true));
type_.serialize(&mut buffer, &array, &data_type, &mut state).unwrap();
assert!(buffer.is_empty()); }
#[test]
fn test_serialize_point() {
use std::io::Read;
let type_ = Type::Point;
let (data_type, _) = type_.arrow_type(None).unwrap();
let x_values = Float64Array::from(vec![1.0, 2.5, 3.7]);
let y_values = Float64Array::from(vec![10.0, 20.5, 30.7]);
let fields = Fields::from(vec![
Field::new("x", DataType::Float64, false),
Field::new("y", DataType::Float64, false),
]);
let struct_array = StructArray::new(
fields,
vec![Arc::new(x_values) as ArrayRef, Arc::new(y_values) as ArrayRef],
None,
);
let column = Arc::new(struct_array) as ArrayRef;
let mut buffer = Vec::new();
let mut state = SerializerState::default();
type_.serialize(&mut buffer, &column, &data_type, &mut state).unwrap();
assert_eq!(buffer.len(), 6 * 8);
let mut cursor = std::io::Cursor::new(buffer);
let mut bytes = [0u8; 8];
cursor.read_exact(&mut bytes).unwrap();
assert!((f64::from_le_bytes(bytes) - 1.0).abs() < f64::EPSILON);
cursor.read_exact(&mut bytes).unwrap();
assert!((f64::from_le_bytes(bytes) - 2.5).abs() < f64::EPSILON);
cursor.read_exact(&mut bytes).unwrap();
assert!((f64::from_le_bytes(bytes) - 3.7).abs() < f64::EPSILON);
cursor.read_exact(&mut bytes).unwrap();
assert!((f64::from_le_bytes(bytes) - 10.0).abs() < f64::EPSILON);
cursor.read_exact(&mut bytes).unwrap();
assert!((f64::from_le_bytes(bytes) - 20.5).abs() < f64::EPSILON);
cursor.read_exact(&mut bytes).unwrap();
assert!((f64::from_le_bytes(bytes) - 30.7).abs() < f64::EPSILON);
}
#[test]
fn test_serialize_ring() {
use std::io::Read;
let type_ = Type::Ring;
let x_values = Float64Array::from(vec![0.0, 1.0, 0.5, 0.0]);
let y_values = Float64Array::from(vec![0.0, 0.0, 1.0, 0.0]);
let fields = Fields::from(vec![
Field::new("x", DataType::Float64, false),
Field::new("y", DataType::Float64, false),
]);
let struct_array = StructArray::new(
fields,
vec![Arc::new(x_values) as ArrayRef, Arc::new(y_values) as ArrayRef],
None,
);
let offsets = OffsetBuffer::new(vec![0, 4].into());
let list_array = ListArray::new(
Arc::new(Field::new("item", struct_array.data_type().clone(), false)),
offsets,
Arc::new(struct_array) as ArrayRef,
None,
);
let mut buffer = Vec::new();
let column = Arc::new(list_array) as ArrayRef;
let (data_type, _) = type_.arrow_type(None).unwrap();
let mut state = SerializerState::default();
type_.serialize(&mut buffer, &column, &data_type, &mut state).unwrap();
assert_eq!(buffer.len(), 8 + 4 * 2 * 8);
let mut cursor = std::io::Cursor::new(buffer);
let mut bytes = [0u8; 8];
cursor.read_exact(&mut bytes).unwrap();
assert_eq!(u64::from_le_bytes(bytes), 4);
}
#[test]
fn test_serialize_polygon() {
use std::io::Read;
let type_ = Type::Polygon;
let x_values = Float64Array::from(vec![0.0, 1.0, 0.5, 0.0]);
let y_values = Float64Array::from(vec![0.0, 0.0, 1.0, 0.0]);
let fields = Fields::from(vec![
Field::new("x", DataType::Float64, false),
Field::new("y", DataType::Float64, false),
]);
let struct_array = StructArray::new(
fields,
vec![Arc::new(x_values) as ArrayRef, Arc::new(y_values) as ArrayRef],
None,
);
let ring_offsets = OffsetBuffer::new(vec![0, 4].into());
let ring_array = ListArray::new(
Arc::new(Field::new("item", struct_array.data_type().clone(), false)),
ring_offsets,
Arc::new(struct_array) as ArrayRef,
None,
);
let polygon_offsets = OffsetBuffer::new(vec![0, 1].into());
let polygon_array = ListArray::new(
Arc::new(Field::new("item", ring_array.data_type().clone(), false)),
polygon_offsets,
Arc::new(ring_array) as ArrayRef,
None,
);
let mut buffer = Vec::new();
let column = Arc::new(polygon_array) as ArrayRef;
let (data_type, _) = type_.arrow_type(None).unwrap();
let mut state = SerializerState::default();
type_.serialize(&mut buffer, &column, &data_type, &mut state).unwrap();
assert_eq!(buffer.len(), 8 + 8 + 4 * 2 * 8);
let mut cursor = std::io::Cursor::new(buffer);
let mut bytes = [0u8; 8];
cursor.read_exact(&mut bytes).unwrap();
assert_eq!(u64::from_le_bytes(bytes), 1);
cursor.read_exact(&mut bytes).unwrap();
assert_eq!(u64::from_le_bytes(bytes), 4);
}
#[test]
fn test_serialize_multipolygon() {
use std::io::Read;
let type_ = Type::MultiPolygon;
let x_values = Float64Array::from(vec![0.0, 1.0, 0.5, 0.0]);
let y_values = Float64Array::from(vec![0.0, 0.0, 1.0, 0.0]);
let fields = Fields::from(vec![
Field::new("x", DataType::Float64, false),
Field::new("y", DataType::Float64, false),
]);
let struct_array = StructArray::new(
fields,
vec![Arc::new(x_values) as ArrayRef, Arc::new(y_values) as ArrayRef],
None,
);
let ring_offsets = OffsetBuffer::new(vec![0, 4].into());
let ring_array = ListArray::new(
Arc::new(Field::new("item", struct_array.data_type().clone(), false)),
ring_offsets,
Arc::new(struct_array) as ArrayRef,
None,
);
let polygon_offsets = OffsetBuffer::new(vec![0, 1].into());
let polygon_array = ListArray::new(
Arc::new(Field::new("item", ring_array.data_type().clone(), false)),
polygon_offsets,
Arc::new(ring_array) as ArrayRef,
None,
);
let multipolygon_offsets = OffsetBuffer::new(vec![0, 1].into());
let multipolygon_array = ListArray::new(
Arc::new(Field::new("item", polygon_array.data_type().clone(), false)),
multipolygon_offsets,
Arc::new(polygon_array) as ArrayRef,
None,
);
let mut buffer = Vec::new();
let column = Arc::new(multipolygon_array) as ArrayRef;
let (data_type, _) = type_.arrow_type(None).unwrap();
let mut state = SerializerState::default();
type_.serialize(&mut buffer, &column, &data_type, &mut state).unwrap();
assert_eq!(buffer.len(), 8 + 8 + 8 + 4 * 2 * 8);
let mut cursor = std::io::Cursor::new(buffer);
let mut bytes = [0u8; 8];
cursor.read_exact(&mut bytes).unwrap();
assert_eq!(u64::from_le_bytes(bytes), 1);
cursor.read_exact(&mut bytes).unwrap();
assert_eq!(u64::from_le_bytes(bytes), 1);
cursor.read_exact(&mut bytes).unwrap();
assert_eq!(u64::from_le_bytes(bytes), 4);
}
}