pub(crate) mod dictionary;
pub(crate) mod list;
pub(crate) mod map;
use std::sync::Arc;
use arrow::array::*;
use arrow::datatypes::*;
use strum::AsRefStr;
use crate::constants::CLICKHOUSE_DEFAULT_CHUNK_ROWS;
use crate::prelude::*;
pub(crate) type TypedBuilderMap<'a> = Vec<(&'a str, (&'a Type, TypedBuilder))>;
#[expect(unused)]
pub(crate) fn create_typed_builder_map(
definitions: &[(String, Type)],
get_field: impl Fn(&str) -> Result<&Field>,
) -> Result<TypedBuilderMap<'_>> {
let mut builders = Vec::with_capacity(definitions.len());
for (name, type_) in definitions {
let field = get_field(name)?;
let builder = TypedBuilder::try_new(type_, field.data_type())?;
builders.push((name.as_str(), (type_, builder)));
}
Ok(builders)
}
macro_rules! typed_arrow_build {
($typed:ident, $data_type:expr, { $(
$typ:pat => ($var:ident, $builder:expr $(,)?)
),+ $(,)? }) => {
match $data_type {
$(
$typ => $typed::$var($builder),
)*
_ => return Err(Error::ArrowDeserialize(format!("Unexpected type: {}", $data_type))),
}
}
}
pub(super) use typed_arrow_build;
use self::dictionary::LowCardinalityBuilder;
use self::list::TypedListBuilder;
macro_rules! typed_build {
($type_hint:expr, { $(
$typ:pat => ($var:ident, $builder:expr $(,)?)
),+ $(,)? }) => {
match $type_hint {
$(
$typ => TypedBuilder::$var($builder),
)*
Type::DateTime64(10.., _) => return Err(Error::ArrowDeserialize(
"Invalid DateTime64".into()
)),
Type::Point
| Type::Polygon
| Type::MultiPolygon
| Type::Ring => {
unimplemented!()
}
_ => return Err(Error::UnexpectedType($type_hint.clone())),
}
}
}
#[derive(AsRefStr)]
pub(crate) enum TypedBuilder {
Int8(PrimitiveBuilder<Int8Type>),
Int16(PrimitiveBuilder<Int16Type>),
Int32(PrimitiveBuilder<Int32Type>),
Int64(PrimitiveBuilder<Int64Type>),
UInt8(PrimitiveBuilder<UInt8Type>),
UInt16(PrimitiveBuilder<UInt16Type>),
UInt32(PrimitiveBuilder<UInt32Type>),
UInt64(PrimitiveBuilder<UInt64Type>),
Float32(PrimitiveBuilder<Float32Type>),
Float64(PrimitiveBuilder<Float64Type>),
Decimal32(Decimal128Builder),
Decimal64(Decimal128Builder),
Decimal128(Decimal128Builder),
Decimal256(Decimal256Builder),
Date(Date32Builder),
Date32(Date32Builder),
DateTime(TimestampSecondBuilder),
DateTimeS(TimestampSecondBuilder),
DateTimeMs(TimestampMillisecondBuilder),
DateTimeMu(TimestampMicrosecondBuilder),
DateTimeNano(TimestampNanosecondBuilder),
String(StringBuilder),
Object(StringBuilder),
Binary(BinaryBuilder),
FixedSizeBinary(FixedSizeBinaryBuilder),
Enum8(StringDictionaryBuilder<Int8Type>),
Enum16(StringDictionaryBuilder<Int16Type>),
List(TypedListBuilder),
LowCardinality(LowCardinalityBuilder),
Map((Box<TypedBuilder>, Box<TypedBuilder>)),
Tuple(Vec<TypedBuilder>),
}
impl TypedBuilder {
#[expect(clippy::too_many_lines)]
#[expect(clippy::cast_possible_wrap)]
#[expect(clippy::cast_possible_truncation)]
pub(crate) fn try_new(type_: &Type, data_type: &DataType) -> Result<Self> {
const ROWS: usize = CLICKHOUSE_DEFAULT_CHUNK_ROWS;
let tz_some = matches!(data_type, DataType::Timestamp(_, tz) if tz.is_some());
let type_ = type_.strip_null();
if let Type::Array(inner) = type_ {
return Ok(Self::List(TypedListBuilder::try_new(inner, data_type)?));
}
if let Type::LowCardinality(inner) = type_ {
return Ok(Self::LowCardinality(LowCardinalityBuilder::try_new(inner, data_type)?));
}
if let Type::Tuple(inner) = type_ {
let DataType::Struct(fields) = data_type else {
return Err(Error::ArrowDeserialize(format!(
"Unexpected datatype for tuple: {data_type:?}",
)));
};
if inner.len() != fields.len() {
return Err(Error::ArrowDeserialize(format!(
"Tuple length mismatch: {inner:?} != {fields:?}",
)));
}
return Ok(Self::Tuple(
inner
.iter()
.zip(fields.iter())
.map(|(t, f)| TypedBuilder::try_new(t, f.data_type()))
.collect::<Result<Vec<_>, _>>()?,
));
}
if let Type::Map(key, value) = type_ {
let (kfield, vfield) = map::get_map_fields(data_type)?;
let kbuilder = Box::new(TypedBuilder::try_new(key, kfield.data_type())?);
let vbuilder = Box::new(TypedBuilder::try_new(value, vfield.data_type())?);
return Ok(Self::Map((kbuilder, vbuilder)));
}
Ok(typed_build!(type_, {
Type::Int8 => (Int8, PrimitiveBuilder::<Int8Type>::with_capacity(ROWS)),
Type::Int16 => (Int16, PrimitiveBuilder::<Int16Type>::with_capacity(ROWS)),
Type::Int32 => ( Int32, PrimitiveBuilder::<Int32Type>::with_capacity(ROWS) ),
Type::Int64 => ( Int64, PrimitiveBuilder::<Int64Type>::with_capacity(ROWS) ),
Type::UInt8 => ( UInt8, PrimitiveBuilder::<UInt8Type>::with_capacity(ROWS)),
Type::UInt16 => ( UInt16, PrimitiveBuilder::<UInt16Type>::with_capacity(ROWS)),
Type::UInt32 => ( UInt32, PrimitiveBuilder::<UInt32Type>::with_capacity(ROWS)),
Type::UInt64 => ( UInt64, PrimitiveBuilder::<UInt64Type>::with_capacity(ROWS)),
Type::Float32 => ( Float32, PrimitiveBuilder::<Float32Type>::with_capacity(ROWS)),
Type::Float64 => ( Float64, PrimitiveBuilder::<Float64Type>::with_capacity(ROWS)),
Type::Decimal32(s) => (
Decimal32,
Decimal128Builder::with_capacity(ROWS).with_precision_and_scale(9, *s as i8)?
),
Type::Decimal64(s) => (
Decimal64,
Decimal128Builder::with_capacity(ROWS).with_precision_and_scale(18, *s as i8)?
),
Type::Decimal128(s) => (
Decimal128,
Decimal128Builder::with_capacity(ROWS).with_precision_and_scale(38, *s as i8)?
),
Type::Decimal256(s) => (
Decimal256,
Decimal256Builder::with_capacity(ROWS).with_precision_and_scale(76, *s as i8)?
),
Type::Date => (Date, Date32Builder::with_capacity(ROWS)),
Type::Date32 => (Date32, Date32Builder::with_capacity(ROWS)),
Type::DateTime(tz) => (
DateTime,
TimestampSecondBuilder::with_capacity(ROWS)
.with_timezone_opt(tz_some.then_some(Arc::from(tz.name())))
),
Type::DateTime64(0, tz) => (
DateTimeS,
TimestampSecondBuilder::with_capacity(ROWS)
.with_timezone_opt(tz_some.then_some(Arc::from(tz.name())))
),
Type::DateTime64(1..=3, tz) => (
DateTimeMs,
TimestampMillisecondBuilder::with_capacity(ROWS)
.with_timezone_opt(tz_some.then_some(Arc::from(tz.name())))
),
Type::DateTime64(4..=6, tz) => (
DateTimeMu,
TimestampMicrosecondBuilder::with_capacity(ROWS)
.with_timezone_opt(tz_some.then_some(Arc::from(tz.name())))
),
Type::DateTime64(7..=9, tz) => (
DateTimeNano,
TimestampNanosecondBuilder::with_capacity(ROWS)
.with_timezone_opt(tz_some.then_some(Arc::from(tz.name())))
),
Type::String => (
String, StringBuilder::with_capacity(ROWS, ROWS * 64)
),
Type::Object => (
Object, StringBuilder::with_capacity(ROWS, ROWS * 1024)
),
Type::FixedSizedString(n) => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, *n as i32)
),
Type::Binary => (
Binary, BinaryBuilder::with_capacity(ROWS, ROWS * 64)
),
Type::FixedSizedBinary(n) => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, *n as i32)
),
Type::Ipv4 => (
FixedSizeBinary, FixedSizeBinaryBuilder::with_capacity(ROWS, 4)
),
Type::Ipv6 => (
FixedSizeBinary, FixedSizeBinaryBuilder::with_capacity(ROWS, 16)
),
Type::Uuid => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, 16)
),
Type::Int128 => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, 16)
),
Type::Int256 => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, 32)
),
Type::UInt128 => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, 16)
),
Type::UInt256 => (
FixedSizeBinary,
FixedSizeBinaryBuilder::with_capacity(ROWS, 32)
),
Type::Enum8(p) => (
Enum8,
StringDictionaryBuilder::<Int8Type>::with_capacity(ROWS, p.len(), ROWS * p.len() * 4)
),
Type::Enum16(p) => (
Enum16,
StringDictionaryBuilder::<Int16Type>::with_capacity(ROWS, p.len(), ROWS * p.len() * 4)
),
}))
}
}
impl std::fmt::Debug for TypedBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::List(l) => write!(f, "TypedBuilder::List({l:?})"),
Self::LowCardinality(l) => write!(f, "TypedBuilder::LowCardinality({l:?})"),
Self::Map((k, v)) => write!(f, "TypedBuilder::Map({k:?}, {v:?})"),
Self::Tuple(t) => write!(f, "TypedBuilder::Tuple({t:?})"),
Self::String(_) => write!(f, "TypedBuilder::String"),
b => write!(f, "TypedBuilder::{}", b.as_ref()),
}
}
}