pub(crate) mod dictionary;
pub(crate) mod list;
pub(crate) mod map;
use std::sync::Arc;
use arrow::array::*;
use arrow::datatypes::*;
use strum::AsRefStr;
use crate::constants::CLICKHOUSE_DEFAULT_CHUNK_ROWS;
use crate::prelude::*;
pub(crate) type TypedBuilderMap<'a> = Vec<(&'a str, (&'a Type, TypedBuilder))>;
#[expect(unused)]
pub(crate) fn create_typed_builder_map(
definitions: &[(String, Type)],
get_field: impl Fn(&str) -> Result<&Field>,
) -> Result<TypedBuilderMap<'_>> {
let mut builders = Vec::with_capacity(definitions.len());
for (name, type_) in definitions {
let field = get_field(name)?;
let builder = TypedBuilder::try_new(type_, field.data_type())?;
builders.push((name.as_str(), (type_, builder)));
}
Ok(builders)
}
macro_rules! typed_arrow_build {
($typed:ident, $data_type:expr, { $(
$typ:pat => ($var:ident, $builder:expr $(,)?)
),+ $(,)? }) => {
match $data_type {
$(
$typ => $typed::$var($builder),
)*
_ => return Err(Error::ArrowDeserialize(format!("Unexpected type: {}", $data_type))),
}
}
}
pub(super) use typed_arrow_build;
use self::dictionary::LowCardinalityBuilder;
use self::list::TypedListBuilder;
macro_rules! typed_build {
($type_hint:expr, { $(
$typ:pat => ($var:ident, $builder:expr $(,)?)
),+ $(,)? }) => {
match $type_hint {
$(
$typ => TypedBuilder::$var($builder),
)*
Type::DateTime64(10.., _) => return Err(Error::ArrowDeserialize(
"Invalid DateTime64".into()
)),
Type::Point
| Type::Polygon
| Type::MultiPolygon
| Type::Ring => {
unimplemented!()
}
_ => return Err(Error::UnexpectedType($type_hint.clone())),
}
}
}
#[derive(AsRefStr)]
pub(crate) enum TypedBuilder {
Int8(PrimitiveBuilder<Int8Type>),
Int16(PrimitiveBuilder<Int16Type>),
Int32(PrimitiveBuilder<Int32Type>),
Int64(PrimitiveBuilder<Int64Type>),
UInt8(PrimitiveBuilder<UInt8Type>),
UInt16(PrimitiveBuilder<UInt16Type>),
UInt32(PrimitiveBuilder<UInt32Type>),
UInt64(PrimitiveBuilder<UInt64Type>),
Float32(PrimitiveBuilder<Float32Type>),
Float64(PrimitiveBuilder<Float64Type>),
Decimal32(Decimal128Builder),
Decimal64(Decimal128Builder),
Decimal128(Decimal128Builder),
Decimal256(Decimal256Builder),
Date(Date32Builder),
Date32(Date32Builder),
DateTime(TimestampSecondBuilder),
DateTimeS(TimestampSecondBuilder),
DateTimeMs(TimestampMillisecondBuilder),
DateTimeMu(TimestampMicrosecondBuilder),
DateTimeNano(TimestampNanosecondBuilder),
String(StringBuilder),
Object(StringBuilder),
Binary(BinaryBuilder),
FixedSizeBinary(FixedSizeBinaryBuilder),
Enum8(StringDictionaryBuilder<Int8Type>),
Enum16(StringDictionaryBuilder<Int16Type>),
List(TypedListBuilder),
LowCardinality(LowCardinalityBuilder),
Map((Box<TypedBuilder>, Box<TypedBuilder>)),
Tuple(Vec<TypedBuilder>),
}
impl TypedBuilder {
#[expect(clippy::too_many_lines)]
#[expect(clippy::cast_possible_wrap)]
#[expect(clippy::cast_possible_truncation)]
pub(crate) fn try_new(type_: &Type, data_type: &DataType) -> Result<Self> {
const ROWS: usize = CLICKHOUSE_DEFAULT_CHUNK_ROWS;
let tz_some = matches!(data_type, DataType::Timestamp(_, tz) if tz.is_some());
let type_ = type_.strip_null();
if let Type::Array(inner) = type_ {
return Ok(Self::List(TypedListBuilder::try_new(inner, data_type)?));
}
if let Type::LowCardinality(inner) = type_ {
return Ok(Self::LowCardinality(LowCardinalityBuilder::try_new(inner, data_type)?));
}
if let Type::Tuple(inner) = type_ {
let DataType::Struct(fields) = data_type else {
return Err(Error::ArrowDeserialize(format!(
"Unexpected datatype for tuple: {data_type:?}",
)));
};
if inner.len() != fields.len() {
return Err(Error::ArrowDeserialize(format!(
"Tuple length mismatch: {inner:?} != {fields:?}",
)));
}
return Ok(Self::Tuple(
inner
.iter()
.zip(fields.iter())
.map(|(t, f)| TypedBuilder::try_new(t, f.data_type()))
.collect::<Result<Vec<_>, _>>()?,
));
}
if let Type::Map(key, value) = type_ {
let (kfield, vfield) = map::get_map_fields(data_type)?;
let kbuilder = Box::new(TypedBuilder::try_new(key, kfield.data_type())?);
let vbuilder = Box::new(TypedBuilder::try_new(value, vfield.data_type())?);
return Ok(Self::Map((kbuilder, vbuilder)));
}
Ok(typed_build!(type_, {
Type::Int8 => (Int8, PrimitiveBuilder::<Int8Type>::with_capacity(ROWS)),
Type::Int16 => (Int16, PrimitiveBuilder::<Int16Type>::with_capacity(ROWS)),
Type::Int32 => ( Int32, PrimitiveBuilder::<Int32Type>::with_capacity(ROWS) ),
Type::Int64 => ( Int64, PrimitiveBuilder::<Int64Type>::with_capacity(ROWS) ),
Type::UInt8 => ( UInt8, PrimitiveBuilder::<UInt8Type>::with_capacity(ROWS)),
Type::UInt16 => ( UInt16, PrimitiveBuilder::<UInt16Type>::with_capacity(ROWS)),
Type::UInt32 => ( UInt32, PrimitiveBuilder::<UInt32Type>::with_capacity(ROWS)),
Type::UInt64 => ( UInt64, PrimitiveBuilder::<UInt64Type>::with_capacity(ROWS)),
Type::Float32 => ( Float32, PrimitiveBuilder::<Float32Type>::with_capacity(ROWS)),
Type::Float64 => ( Float64, PrimitiveBuilder::<Float64Type>::with_capacity(ROWS)),
Type::Decimal32(s) => (
Decimal32,
Decimal128Builder::with_capacity(ROWS).with_precision_and_scale(9, *s as i8)?
),
Type::Decimal64(s) => (
Decimal64,
Decimal128Builder::with_capacity(ROWS).with_precision_and_scale(18, *s as i8)?
),
Type::Decimal128(s) => (
Decimal128,
Decimal128Builder::with_capacity(ROWS).with_precision_and_scale(38, *s as i8)?
),
Type::Decimal256(s) => (
Decimal256,
Decimal256Builder::with_capacity(ROWS).with_precision_and_scale(76, *s as i8)?
),
Type::Date => (Date, Date32Builder::with_capacity(ROWS)),
Type::Date32 => (Date32, Date32Builder::with_capacity(ROWS)),
Type::DateTime(tz) => (
DateTime,
TimestampSecondBuilder::with_capacity(ROWS)
.with_timezone_opt(tz_some.then_some(Arc::from(tz.name())))
),
Type::DateTime64(0, tz) => (
DateTimeS,
TimestampSecondBuilder::with_capacity(ROWS)
.with_timezone_opt(tz_some.then_some(Arc::from(tz.name())))
),
Type::DateTime64(1..=3, tz) => (
DateTimeMs,
TimestampMillisecondBuilder::with_capacity(ROWS)
.with_timezone_opt(tz_some.then_some(Arc::from(tz.name())))
),
Type::DateTime64(4..=6, tz) => (
DateTimeMu,
TimestampMicrosecondBuilder::with_capacity(ROWS)
.with_timezone_opt(tz_some.then_some(Arc::from(tz.name())))
),
Type::DateTime64(7..=9, tz) => (
DateTimeNano,
TimestampNanosecondBuilder::with_capacity(ROWS)
.with_timezone_opt(tz_some.then_some(Arc::from(tz.name())))
),
Type::String => (
String, StringBuilder::with_capacity(ROWS, ROWS * 64)
),
Type::Object => (
Object, StringBuilder::with_capacity(ROWS, ROWS * 1024)
),
Type::FixedSizedString(n) => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, *n as i32)
),
Type::Binary => (
Binary, BinaryBuilder::with_capacity(ROWS, ROWS * 64)
),
Type::FixedSizedBinary(n) => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, *n as i32)
),
Type::Ipv4 => (
FixedSizeBinary, FixedSizeBinaryBuilder::with_capacity(ROWS, 4)
),
Type::Ipv6 => (
FixedSizeBinary, FixedSizeBinaryBuilder::with_capacity(ROWS, 16)
),
Type::Uuid => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, 16)
),
Type::Int128 => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, 16)
),
Type::Int256 => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, 32)
),
Type::UInt128 => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, 16)
),
Type::UInt256 => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, 32)
),
Type::Enum8(p) => (
Enum8,
StringDictionaryBuilder::<Int8Type>::with_capacity(ROWS, p.len(), ROWS * p.len() * 4)
),
Type::Enum16(p) => (
Enum16,
StringDictionaryBuilder::<Int16Type>::with_capacity(ROWS, p.len(), ROWS * p.len() * 4)
),
}))
}
}
impl std::fmt::Debug for TypedBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::List(l) => write!(f, "TypedBuilder::List({l:?})"),
Self::LowCardinality(l) => write!(f, "TypedBuilder::LowCardinality({l:?})"),
Self::Map((k, v)) => write!(f, "TypedBuilder::Map({k:?}, {v:?})"),
Self::Tuple(t) => write!(f, "TypedBuilder::Tuple({t:?})"),
Self::String(_) => write!(f, "TypedBuilder::String"),
b => write!(f, "TypedBuilder::{}", b.as_ref()),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::datatypes::{DataType, Field, TimeUnit};
use chrono_tz::UTC;
use super::*;
#[test]
fn test_create_typed_builder_map() {
let _definitions = [("col1".to_string(), Type::Int32), ("col2".to_string(), Type::String)];
let builder1 = TypedBuilder::try_new(&Type::Int32, &DataType::Int32).unwrap();
let builder2 = TypedBuilder::try_new(&Type::String, &DataType::Utf8).unwrap();
assert!(matches!(builder1, TypedBuilder::Int32(_)));
assert!(matches!(builder2, TypedBuilder::String(_)));
}
#[test]
fn test_typed_builder_primitive_types() {
let test_cases = vec![
(Type::Int8, DataType::Int8),
(Type::Int16, DataType::Int16),
(Type::Int32, DataType::Int32),
(Type::Int64, DataType::Int64),
(Type::UInt8, DataType::UInt8),
(Type::UInt16, DataType::UInt16),
(Type::UInt32, DataType::UInt32),
(Type::UInt64, DataType::UInt64),
(Type::Float32, DataType::Float32),
(Type::Float64, DataType::Float64),
];
for (type_, data_type) in test_cases {
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
match (&type_, &builder) {
(Type::UInt8, TypedBuilder::UInt8(_))
| (Type::UInt16, TypedBuilder::UInt16(_))
| (Type::UInt32, TypedBuilder::UInt32(_))
| (Type::UInt64, TypedBuilder::UInt64(_))
| (Type::Int8, TypedBuilder::Int8(_))
| (Type::Int16, TypedBuilder::Int16(_))
| (Type::Int32, TypedBuilder::Int32(_))
| (Type::Int64, TypedBuilder::Int64(_))
| (Type::Float32, TypedBuilder::Float32(_))
| (Type::Float64, TypedBuilder::Float64(_)) => {}
_ => panic!("Unexpected builder type for {type_:?}"),
}
}
}
#[test]
fn test_typed_builder_decimal_types() {
let test_cases = vec![
(Type::Decimal32(2), DataType::Decimal128(9, 2)),
(Type::Decimal64(4), DataType::Decimal128(18, 4)),
(Type::Decimal128(6), DataType::Decimal128(38, 6)),
(Type::Decimal256(8), DataType::Decimal256(76, 8)),
];
for (type_, data_type) in test_cases {
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
match (&type_, &builder) {
(Type::Decimal32(_), TypedBuilder::Decimal32(_))
| (Type::Decimal64(_), TypedBuilder::Decimal64(_))
| (Type::Decimal128(_), TypedBuilder::Decimal128(_))
| (Type::Decimal256(_), TypedBuilder::Decimal256(_)) => {}
_ => panic!("Unexpected builder type for {type_:?}"),
}
}
}
#[test]
fn test_typed_builder_date_time_types() {
let test_cases = vec![
(Type::Date, DataType::Date32),
(Type::Date32, DataType::Date32),
(Type::DateTime(UTC), DataType::Timestamp(TimeUnit::Second, None)),
(Type::DateTime64(0, UTC), DataType::Timestamp(TimeUnit::Second, None)),
(Type::DateTime64(3, UTC), DataType::Timestamp(TimeUnit::Millisecond, None)),
(Type::DateTime64(6, UTC), DataType::Timestamp(TimeUnit::Microsecond, None)),
(Type::DateTime64(9, UTC), DataType::Timestamp(TimeUnit::Nanosecond, None)),
];
for (type_, data_type) in test_cases {
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
match (&type_, &builder) {
(Type::Date, TypedBuilder::Date(_))
| (Type::Date32, TypedBuilder::Date32(_))
| (Type::DateTime(_), TypedBuilder::DateTime(_))
| (Type::DateTime64(0, _), TypedBuilder::DateTimeS(_))
| (Type::DateTime64(1..=3, _), TypedBuilder::DateTimeMs(_))
| (Type::DateTime64(4..=6, _), TypedBuilder::DateTimeMu(_))
| (Type::DateTime64(7..=9, _), TypedBuilder::DateTimeNano(_)) => {}
_ => panic!("Unexpected builder type for {type_:?}"),
}
}
}
#[test]
fn test_typed_builder_string_binary_types() {
let test_cases = vec![
(Type::String, DataType::Utf8),
(Type::Object, DataType::Utf8),
(Type::Binary, DataType::Binary),
(Type::FixedSizedString(10), DataType::FixedSizeBinary(10)),
(Type::FixedSizedBinary(16), DataType::FixedSizeBinary(16)),
(Type::Ipv4, DataType::FixedSizeBinary(4)),
(Type::Ipv6, DataType::FixedSizeBinary(16)),
(Type::Uuid, DataType::FixedSizeBinary(16)),
];
for (type_, data_type) in test_cases {
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
match (&type_, &builder) {
(Type::String, TypedBuilder::String(_))
| (Type::Object, TypedBuilder::Object(_))
| (Type::Binary, TypedBuilder::Binary(_))
| (
Type::FixedSizedString(_)
| Type::FixedSizedBinary(_)
| Type::Ipv4
| Type::Ipv6
| Type::Uuid,
TypedBuilder::FixedSizeBinary(_),
) => {}
_ => panic!("Unexpected builder type for {type_:?}"),
}
}
}
#[test]
fn test_typed_builder_large_int_types() {
let test_cases = vec![
(Type::Int128, DataType::FixedSizeBinary(16)),
(Type::Int256, DataType::FixedSizeBinary(32)),
(Type::UInt128, DataType::FixedSizeBinary(16)),
(Type::UInt256, DataType::FixedSizeBinary(32)),
];
for (type_, data_type) in test_cases {
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
assert!(matches!(builder, TypedBuilder::FixedSizeBinary(_)));
}
}
#[test]
fn test_typed_builder_enum_types() {
let enum8_values = vec![("a".to_string(), 1i8), ("b".to_string(), 2i8)];
let enum16_values = vec![("x".to_string(), 10i16), ("y".to_string(), 20i16)];
let test_cases = vec![
(
Type::Enum8(enum8_values),
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
),
(
Type::Enum16(enum16_values),
DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
),
];
for (type_, data_type) in test_cases {
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
match (&type_, &builder) {
(Type::Enum8(_), TypedBuilder::Enum8(_))
| (Type::Enum16(_), TypedBuilder::Enum16(_)) => {}
_ => panic!("Unexpected builder type for {type_:?}"),
}
}
}
#[test]
fn test_typed_builder_array_type() {
let inner_field = Arc::new(Field::new("item", DataType::Int32, false));
let data_type = DataType::List(inner_field);
let type_ = Type::Array(Box::new(Type::Int32));
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
assert!(matches!(builder, TypedBuilder::List(_)));
}
#[test]
fn test_typed_builder_low_cardinality_type() {
let value_type = Box::new(DataType::Utf8);
let key_type = Box::new(DataType::UInt8);
let data_type = DataType::Dictionary(key_type, value_type);
let type_ = Type::LowCardinality(Box::new(Type::String));
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
assert!(matches!(builder, TypedBuilder::LowCardinality(_)));
}
#[test]
fn test_typed_builder_tuple_type() {
let fields = vec![
Arc::new(Field::new("0", DataType::Int32, false)),
Arc::new(Field::new("1", DataType::Utf8, false)),
];
let data_type = DataType::Struct(fields.into());
let type_ = Type::Tuple(vec![Type::Int32, Type::String]);
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
match builder {
TypedBuilder::Tuple(builders) => {
assert_eq!(builders.len(), 2);
assert!(matches!(builders[0], TypedBuilder::Int32(_)));
assert!(matches!(builders[1], TypedBuilder::String(_)));
}
_ => panic!("Expected Tuple builder"),
}
}
#[test]
fn test_typed_builder_map_type() {
let key_field = Arc::new(Field::new("key", DataType::Utf8, false));
let value_field = Arc::new(Field::new("value", DataType::Int32, false));
let inner_fields = vec![key_field, value_field];
let struct_type = DataType::Struct(inner_fields.into());
let entries_field = Arc::new(Field::new("entries", struct_type, false));
let data_type = DataType::Map(entries_field, false);
let type_ = Type::Map(Box::new(Type::String), Box::new(Type::Int32));
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
assert!(matches!(builder, TypedBuilder::Map(_)));
}
#[test]
fn test_typed_builder_nullable_type() {
let type_ = Type::Nullable(Box::new(Type::Int32));
let data_type = DataType::Int32;
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
assert!(matches!(builder, TypedBuilder::Int32(_)));
}
#[test]
fn test_typed_builder_invalid_datetime64() {
let type_ = Type::DateTime64(15, UTC); let data_type = DataType::Timestamp(TimeUnit::Nanosecond, None);
let result = TypedBuilder::try_new(&type_, &data_type);
assert!(result.is_err());
if let Err(Error::ArrowDeserialize(msg)) = result {
assert_eq!(msg, "Invalid DateTime64");
} else {
panic!("Expected ArrowDeserialize error");
}
}
#[test]
fn test_typed_builder_tuple_length_mismatch() {
let fields = vec![Arc::new(Field::new("0", DataType::Int32, false))];
let data_type = DataType::Struct(fields.into());
let type_ = Type::Tuple(vec![Type::Int32, Type::String]);
let result = TypedBuilder::try_new(&type_, &data_type);
assert!(result.is_err());
if let Err(Error::ArrowDeserialize(msg)) = result {
assert!(msg.contains("Tuple length mismatch"));
} else {
panic!("Expected ArrowDeserialize error");
}
}
#[test]
fn test_typed_builder_tuple_invalid_data_type() {
let data_type = DataType::Int32; let type_ = Type::Tuple(vec![Type::Int32]);
let result = TypedBuilder::try_new(&type_, &data_type);
assert!(result.is_err());
if let Err(Error::ArrowDeserialize(msg)) = result {
assert!(msg.contains("Unexpected datatype for tuple"));
} else {
panic!("Expected ArrowDeserialize error");
}
}
#[test]
fn test_typed_builder_debug_formatting() {
let type_ = Type::String;
let data_type = DataType::Utf8;
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
let debug_str = format!("{builder:?}");
assert_eq!(debug_str, "TypedBuilder::String");
}
#[test]
fn test_typed_builder_debug_complex_types() {
let key_field = Arc::new(Field::new("key", DataType::Utf8, false));
let value_field = Arc::new(Field::new("value", DataType::Int32, false));
let inner_fields = vec![key_field, value_field];
let struct_type = DataType::Struct(inner_fields.into());
let entries_field = Arc::new(Field::new("entries", struct_type, false));
let data_type = DataType::Map(entries_field, false);
let type_ = Type::Map(Box::new(Type::String), Box::new(Type::Int32));
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
let debug_str = format!("{builder:?}");
assert!(debug_str.contains("TypedBuilder::Map"));
}
#[test]
fn test_typed_builder_datetime_with_timezone() {
let tz_name = Some(Arc::from("UTC"));
let data_type = DataType::Timestamp(TimeUnit::Second, tz_name);
let type_ = Type::DateTime(UTC);
let builder = TypedBuilder::try_new(&type_, &data_type).unwrap();
assert!(matches!(builder, TypedBuilder::DateTime(_)));
}
}