clickhouse_arrow/native/
types.rs

1pub(crate) mod deserialize;
2pub mod geo;
3pub(crate) mod low_cardinality;
4pub mod map;
5pub(crate) mod serialize;
6#[cfg(test)]
7mod tests;
8
9use std::fmt::Display;
10use std::str::FromStr;
11
12use chrono_tz::Tz;
13use futures_util::FutureExt;
14use tokio::io::AsyncWriteExt;
15use uuid::Uuid;
16
17use super::protocol::MAX_STRING_SIZE;
18use super::values::{
19    Date, DateTime, DynDateTime64, Ipv4, Ipv6, MultiPolygon, Point, Polygon, Ring, Value, i256,
20    u256,
21};
22use crate::formats::{DeserializerState, SerializerState};
23use crate::io::{ClickHouseBytesRead, ClickHouseBytesWrite, ClickHouseRead, ClickHouseWrite};
24use crate::{Date32, Error, Result};
25
26/// A raw `ClickHouse` type.
27#[derive(Clone, Debug, PartialEq, Eq, Hash)]
28#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
29pub enum Type {
30    Int8,
31    Int16,
32    Int32,
33    Int64,
34    Int128,
35    Int256,
36
37    UInt8,
38    UInt16,
39    UInt32,
40    UInt64,
41    UInt128,
42    UInt256,
43
44    Float32,
45    Float64,
46
47    // Inner value is SCALE
48    Decimal32(usize),
49    Decimal64(usize),
50    Decimal128(usize),
51    Decimal256(usize),
52
53    String,
54    FixedSizedString(usize),
55    Binary,
56    FixedSizedBinary(usize),
57
58    Uuid,
59
60    Date,
61    Date32, // NOTE: This is i32 days since 1900-01-01
62    DateTime(Tz),
63    DateTime64(usize, Tz),
64
65    Ipv4,
66    Ipv6,
67
68    // Geo types, see
69    // https://clickhouse.com/docs/en/sql-reference/data-types/geo
70    // These are just aliases of primitive types.
71    Point,
72    Ring,
73    Polygon,
74    MultiPolygon,
75
76    Nullable(Box<Type>),
77
78    Enum8(Vec<(String, i8)>),
79    Enum16(Vec<(String, i16)>),
80    LowCardinality(Box<Type>),
81    Array(Box<Type>),
82    Tuple(Vec<Type>),
83    Map(Box<Type>, Box<Type>),
84
85    Object,
86}
87
88impl Type {
89    /// # Errors
90    ///
91    /// Errors if the type is not an array
92    pub fn unwrap_array(&self) -> Result<&Type> {
93        match self {
94            Type::Array(x) => Ok(x),
95            _ => Err(Error::UnexpectedType(self.clone())),
96        }
97    }
98
99    pub fn unarray(&self) -> Option<&Type> {
100        match self {
101            Type::Array(x) => Some(&**x),
102            _ => None,
103        }
104    }
105
106    /// # Errors
107    ///
108    /// Errors if the type is not a map
109    pub fn unwrap_map(&self) -> Result<(&Type, &Type)> {
110        match self {
111            Type::Map(key, value) => Ok((&**key, &**value)),
112            _ => Err(Error::UnexpectedType(self.clone())),
113        }
114    }
115
116    pub fn unmap(&self) -> Option<(&Type, &Type)> {
117        match self {
118            Type::Map(key, value) => Some((&**key, &**value)),
119            _ => None,
120        }
121    }
122
123    /// # Errors
124    ///
125    /// Errors if the type is not a tuple
126    pub fn unwrap_tuple(&self) -> Result<&[Type]> {
127        match self {
128            Type::Tuple(x) => Ok(&x[..]),
129            _ => Err(Error::UnexpectedType(self.clone())),
130        }
131    }
132
133    pub fn untuple(&self) -> Option<&[Type]> {
134        match self {
135            Type::Tuple(x) => Some(&x[..]),
136            _ => None,
137        }
138    }
139
140    pub fn unnull(&self) -> Option<&Type> {
141        match self {
142            Type::Nullable(x) => Some(&**x),
143            _ => None,
144        }
145    }
146
147    pub fn strip_null(&self) -> &Type {
148        match self {
149            Type::Nullable(x) => x,
150            _ => self,
151        }
152    }
153
154    pub fn is_nullable(&self) -> bool { matches!(self, Type::Nullable(_)) }
155
156    pub fn strip_low_cardinality(&self) -> &Type {
157        match self {
158            Type::LowCardinality(x) => x,
159            _ => self,
160        }
161    }
162
163    #[must_use]
164    pub fn into_nullable(self) -> Type {
165        match self {
166            t @ Type::Nullable(_) => t,
167            // LowCardinality pushes nullability down
168            Type::LowCardinality(inner) => Type::LowCardinality(Box::new(inner.into_nullable())),
169            t => Type::Nullable(Box::new(t)),
170        }
171    }
172
173    pub fn default_value(&self) -> Value {
174        match self {
175            Type::Int8 => Value::Int8(0),
176            Type::Int16 => Value::Int16(0),
177            Type::Int32 => Value::Int32(0),
178            Type::Int64 => Value::Int64(0),
179            Type::Int128 => Value::Int128(0),
180            Type::Int256 => Value::Int256(i256::default()),
181            Type::UInt8 => Value::UInt8(0),
182            Type::UInt16 => Value::UInt16(0),
183            Type::UInt32 => Value::UInt32(0),
184            Type::UInt64 => Value::UInt64(0),
185            Type::UInt128 => Value::UInt128(0),
186            Type::UInt256 => Value::UInt256(u256::default()),
187            Type::Float32 => Value::Float32(0.0),
188            Type::Float64 => Value::Float64(0.0),
189            Type::Decimal32(s) => Value::Decimal32(*s, 0),
190            Type::Decimal64(s) => Value::Decimal64(*s, 0),
191            Type::Decimal128(s) => Value::Decimal128(*s, 0),
192            Type::Decimal256(s) => Value::Decimal256(*s, i256::default()),
193            Type::String | Type::FixedSizedString(_) | Type::Binary | Type::FixedSizedBinary(_) => {
194                Value::String(vec![])
195            }
196            Type::Date => Value::Date(Date(0)),
197            Type::Date32 => Value::Date32(Date32(0)),
198            Type::DateTime(tz) => Value::DateTime(DateTime(*tz, 0)),
199            Type::DateTime64(precision, tz) => Value::DateTime64(DynDateTime64(*tz, 0, *precision)),
200            Type::Ipv4 => Value::Ipv4(Ipv4::default()),
201            Type::Ipv6 => Value::Ipv6(Ipv6::default()),
202            Type::Enum8(_) => Value::Enum8(String::new(), 0),
203            Type::Enum16(_) => Value::Enum16(String::new(), 0),
204            Type::LowCardinality(x) => x.default_value(),
205            Type::Array(_) => Value::Array(vec![]),
206            Type::Tuple(types) => Value::Tuple(types.iter().map(Type::default_value).collect()),
207            Type::Nullable(_) => Value::Null,
208            Type::Map(_, _) => Value::Map(vec![], vec![]),
209            Type::Point => Value::Point(Point::default()),
210            Type::Ring => Value::Ring(Ring::default()),
211            Type::Polygon => Value::Polygon(Polygon::default()),
212            Type::MultiPolygon => Value::MultiPolygon(MultiPolygon::default()),
213            Type::Uuid => Value::Uuid(Uuid::from_u128(0)),
214            Type::Object => Value::Object("{}".as_bytes().to_vec()),
215        }
216    }
217}
218
219impl Display for Type {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        match self {
222            Type::Int8 => write!(f, "Int8"),
223            Type::Int16 => write!(f, "Int16"),
224            Type::Int32 => write!(f, "Int32"),
225            Type::Int64 => write!(f, "Int64"),
226            Type::Int128 => write!(f, "Int128"),
227            Type::Int256 => write!(f, "Int256"),
228            Type::UInt8 => write!(f, "UInt8"),
229            Type::UInt16 => write!(f, "UInt16"),
230            Type::UInt32 => write!(f, "UInt32"),
231            Type::UInt64 => write!(f, "UInt64"),
232            Type::UInt128 => write!(f, "UInt128"),
233            Type::UInt256 => write!(f, "UInt256"),
234            Type::Float32 => write!(f, "Float32"),
235            Type::Float64 => write!(f, "Float64"),
236            Type::Decimal32(s) => write!(f, "Decimal32({s})"),
237            Type::Decimal64(s) => write!(f, "Decimal64({s})"),
238            Type::Decimal128(s) => write!(f, "Decimal128({s})"),
239            Type::Decimal256(s) => write!(f, "Decimal256({s})"),
240            Type::String | Type::Binary => write!(f, "String"),
241            Type::FixedSizedBinary(s) | Type::FixedSizedString(s) => write!(f, "FixedString({s})"),
242            Type::Uuid => write!(f, "UUID"),
243            Type::Date => write!(f, "Date"),
244            Type::Date32 => write!(f, "Date32"),
245            Type::DateTime(tz) => write!(f, "DateTime('{tz}')"),
246            Type::DateTime64(precision, tz) => write!(f, "DateTime64({precision},'{tz}')"),
247            Type::Ipv4 => write!(f, "IPv4"),
248            Type::Ipv6 => write!(f, "IPv6"),
249            Type::Point => write!(f, "Point"),
250            Type::Ring => write!(f, "Ring"),
251            Type::Polygon => write!(f, "Polygon"),
252            Type::MultiPolygon => write!(f, "MultiPolygon"),
253            Type::Enum8(items) => {
254                write!(f, "Enum8(")?;
255                if !items.is_empty() {
256                    let last_index = items.len() - 1;
257                    for (i, (name, value)) in items.iter().enumerate() {
258                        write!(f, "'{}' = {value}", name.replace('\'', "''"))?;
259                        if i < last_index {
260                            write!(f, ",")?;
261                        }
262                    }
263                }
264                write!(f, ")")
265            }
266            Type::Enum16(items) => {
267                write!(f, "Enum16(")?;
268                if !items.is_empty() {
269                    let last_index = items.len() - 1;
270                    for (i, (name, value)) in items.iter().enumerate() {
271                        write!(f, "'{}' = {value}", name.replace('\'', "''"))?;
272                        if i < last_index {
273                            write!(f, ",")?;
274                        }
275                    }
276                }
277                write!(f, ")")
278            }
279            Type::LowCardinality(inner) => write!(f, "LowCardinality({inner})"),
280            Type::Array(inner) => write!(f, "Array({inner})"),
281            Type::Tuple(items) => write!(
282                f,
283                "Tuple({})",
284                items.iter().map(ToString::to_string).collect::<Vec<_>>().join(",")
285            ),
286            Type::Nullable(inner) => write!(f, "Nullable({inner})"),
287            Type::Map(key, value) => write!(f, "Map({key},{value})"),
288            Type::Object => write!(f, "JSON"),
289        }
290    }
291}
292
293impl Type {
294    pub(crate) fn deserialize_column<'a, R: ClickHouseRead>(
295        &'a self,
296        reader: &'a mut R,
297        rows: usize,
298        state: &'a mut DeserializerState,
299    ) -> impl Future<Output = Result<Vec<Value>>> + Send + 'a {
300        use deserialize::*;
301        async move {
302            if rows > MAX_STRING_SIZE {
303                return Err(Error::Protocol(format!(
304                    "deserialize response size too large. {rows} > {MAX_STRING_SIZE}"
305                )));
306            }
307
308            Ok(match self {
309                Type::Int8
310                | Type::Int16
311                | Type::Int32
312                | Type::Int64
313                | Type::Int128
314                | Type::Int256
315                | Type::UInt8
316                | Type::UInt16
317                | Type::UInt32
318                | Type::UInt64
319                | Type::UInt128
320                | Type::UInt256
321                | Type::Float32
322                | Type::Float64
323                | Type::Decimal32(_)
324                | Type::Decimal64(_)
325                | Type::Decimal128(_)
326                | Type::Decimal256(_)
327                | Type::Uuid
328                | Type::Date
329                | Type::Date32
330                | Type::DateTime(_)
331                | Type::DateTime64(_, _)
332                | Type::Ipv4
333                | Type::Ipv6
334                | Type::Enum8(_)
335                | Type::Enum16(_) => {
336                    sized::SizedDeserializer::read(self, reader, rows, state).await?
337                }
338                Type::String
339                | Type::FixedSizedString(_)
340                | Type::Binary
341                | Type::FixedSizedBinary(_) => {
342                    string::StringDeserializer::read(self, reader, rows, state).await?
343                }
344                Type::Array(_) => array::ArrayDeserializer::read(self, reader, rows, state).await?,
345                Type::Ring => geo::RingDeserializer::read(self, reader, rows, state).await?,
346                Type::Polygon => geo::PolygonDeserializer::read(self, reader, rows, state).await?,
347                Type::MultiPolygon => {
348                    geo::MultiPolygonDeserializer::read(self, reader, rows, state).await?
349                }
350                Type::Tuple(_) => tuple::TupleDeserializer::read(self, reader, rows, state).await?,
351                Type::Point => geo::PointDeserializer::read(self, reader, rows, state).await?,
352                Type::Nullable(_) => {
353                    nullable::NullableDeserializer::read(self, reader, rows, state).await?
354                }
355                Type::Map(_, _) => map::MapDeserializer::read(self, reader, rows, state).await?,
356                Type::LowCardinality(_) => {
357                    low_cardinality::LowCardinalityDeserializer::read(self, reader, rows, state)
358                        .await?
359                }
360                Type::Object => object::ObjectDeserializer::read(self, reader, rows, state).await?,
361            })
362        }
363        .boxed()
364    }
365
366    pub(crate) fn deserialize_column_sync(
367        &self,
368        reader: &mut impl ClickHouseBytesRead,
369        rows: usize,
370        state: &mut DeserializerState,
371    ) -> Result<Vec<Value>> {
372        use deserialize::*;
373
374        if rows > MAX_STRING_SIZE {
375            return Err(Error::Protocol(format!(
376                "deserialize response size too large. {rows} > {MAX_STRING_SIZE}"
377            )));
378        }
379
380        Ok(match self {
381            Type::Int8
382            | Type::Int16
383            | Type::Int32
384            | Type::Int64
385            | Type::Int128
386            | Type::Int256
387            | Type::UInt8
388            | Type::UInt16
389            | Type::UInt32
390            | Type::UInt64
391            | Type::UInt128
392            | Type::UInt256
393            | Type::Float32
394            | Type::Float64
395            | Type::Decimal32(_)
396            | Type::Decimal64(_)
397            | Type::Decimal128(_)
398            | Type::Decimal256(_)
399            | Type::Uuid
400            | Type::Date
401            | Type::Date32
402            | Type::DateTime(_)
403            | Type::DateTime64(_, _)
404            | Type::Ipv4
405            | Type::Ipv6
406            | Type::Enum8(_)
407            | Type::Enum16(_) => sized::SizedDeserializer::read_sync(self, reader, rows, state)?,
408            Type::String | Type::FixedSizedString(_) | Type::Binary | Type::FixedSizedBinary(_) => {
409                string::StringDeserializer::read_sync(self, reader, rows, state)?
410            }
411            Type::Array(_) => array::ArrayDeserializer::read_sync(self, reader, rows, state)?,
412            Type::Ring => geo::RingDeserializer::read_sync(self, reader, rows, state)?,
413            Type::Polygon => geo::PolygonDeserializer::read_sync(self, reader, rows, state)?,
414            Type::MultiPolygon => {
415                geo::MultiPolygonDeserializer::read_sync(self, reader, rows, state)?
416            }
417            Type::Tuple(_) => tuple::TupleDeserializer::read_sync(self, reader, rows, state)?,
418            Type::Point => geo::PointDeserializer::read_sync(self, reader, rows, state)?,
419            Type::Nullable(_) => {
420                nullable::NullableDeserializer::read_sync(self, reader, rows, state)?
421            }
422            Type::Map(_, _) => map::MapDeserializer::read_sync(self, reader, rows, state)?,
423            Type::LowCardinality(_) => {
424                low_cardinality::LowCardinalityDeserializer::read_sync(self, reader, rows, state)?
425            }
426            Type::Object => object::ObjectDeserializer::read_sync(self, reader, rows, state)?,
427        })
428    }
429
430    pub(crate) fn serialize_column<'a, W: ClickHouseWrite>(
431        &'a self,
432        values: Vec<Value>,
433        writer: &'a mut W,
434        state: &'a mut SerializerState,
435    ) -> impl Future<Output = Result<()>> + Send + 'a {
436        use serialize::*;
437        async move {
438            match self {
439                Type::Int8
440                | Type::Int16
441                | Type::Int32
442                | Type::Int64
443                | Type::Int128
444                | Type::Int256
445                | Type::UInt8
446                | Type::UInt16
447                | Type::UInt32
448                | Type::UInt64
449                | Type::UInt128
450                | Type::UInt256
451                | Type::Float32
452                | Type::Float64
453                | Type::Decimal32(_)
454                | Type::Decimal64(_)
455                | Type::Decimal128(_)
456                | Type::Decimal256(_)
457                | Type::Uuid
458                | Type::Date
459                | Type::Date32
460                | Type::DateTime(_)
461                | Type::DateTime64(_, _)
462                | Type::Ipv4
463                | Type::Ipv6
464                | Type::Enum8(_)
465                | Type::Enum16(_) => {
466                    sized::SizedSerializer::write(self, values, writer, state).await?;
467                }
468
469                Type::String
470                | Type::FixedSizedString(_)
471                | Type::Binary
472                | Type::FixedSizedBinary(_) => {
473                    string::StringSerializer::write(self, values, writer, state).await?;
474                }
475
476                Type::Array(_) => {
477                    array::ArraySerializer::write(self, values, writer, state).await?;
478                }
479                Type::Tuple(_) => {
480                    tuple::TupleSerializer::write(self, values, writer, state).await?;
481                }
482                Type::Point => geo::PointSerializer::write(self, values, writer, state).await?,
483                Type::Ring => geo::RingSerializer::write(self, values, writer, state).await?,
484                Type::Polygon => geo::PolygonSerializer::write(self, values, writer, state).await?,
485                Type::MultiPolygon => {
486                    geo::MultiPolygonSerializer::write(self, values, writer, state).await?;
487                }
488                Type::Nullable(_) => {
489                    nullable::NullableSerializer::write(self, values, writer, state).await?;
490                }
491                Type::Map(_, _) => map::MapSerializer::write(self, values, writer, state).await?,
492                Type::LowCardinality(_) => {
493                    low_cardinality::LowCardinalitySerializer::write(self, values, writer, state)
494                        .await?;
495                }
496                Type::Object => {
497                    object::ObjectSerializer::write(self, values, writer, state).await?;
498                }
499            }
500            Ok(())
501        }
502        .boxed()
503    }
504
505    pub(crate) fn serialize_column_sync(
506        &self,
507        values: Vec<Value>,
508        writer: &mut impl ClickHouseBytesWrite,
509        state: &mut SerializerState,
510    ) -> Result<()> {
511        use serialize::*;
512        match self {
513            Type::Int8
514            | Type::Int16
515            | Type::Int32
516            | Type::Int64
517            | Type::Int128
518            | Type::Int256
519            | Type::UInt8
520            | Type::UInt16
521            | Type::UInt32
522            | Type::UInt64
523            | Type::UInt128
524            | Type::UInt256
525            | Type::Float32
526            | Type::Float64
527            | Type::Decimal32(_)
528            | Type::Decimal64(_)
529            | Type::Decimal128(_)
530            | Type::Decimal256(_)
531            | Type::Uuid
532            | Type::Date
533            | Type::Date32
534            | Type::DateTime(_)
535            | Type::DateTime64(_, _)
536            | Type::Ipv4
537            | Type::Ipv6
538            | Type::Enum8(_)
539            | Type::Enum16(_) => {
540                sized::SizedSerializer::write_sync(self, values, writer, state)?;
541            }
542
543            Type::String | Type::FixedSizedString(_) | Type::Binary | Type::FixedSizedBinary(_) => {
544                string::StringSerializer::write_sync(self, values, writer, state)?;
545            }
546
547            Type::Array(_) => {
548                array::ArraySerializer::write_sync(self, values, writer, state)?;
549            }
550            Type::Tuple(_) => {
551                tuple::TupleSerializer::write_sync(self, values, writer, state)?;
552            }
553            Type::Point => geo::PointSerializer::write_sync(self, values, writer, state)?,
554            Type::Ring => geo::RingSerializer::write_sync(self, values, writer, state)?,
555            Type::Polygon => geo::PolygonSerializer::write_sync(self, values, writer, state)?,
556            Type::MultiPolygon => {
557                geo::MultiPolygonSerializer::write_sync(self, values, writer, state)?;
558            }
559            Type::Nullable(_) => {
560                nullable::NullableSerializer::write_sync(self, values, writer, state)?;
561            }
562            Type::Map(_, _) => map::MapSerializer::write_sync(self, values, writer, state)?,
563            Type::LowCardinality(_) => {
564                low_cardinality::LowCardinalitySerializer::write_sync(self, values, writer, state)?;
565            }
566            Type::Object => {
567                object::ObjectSerializer::write_sync(self, values, writer, state)?;
568            }
569        }
570        Ok(())
571    }
572
573    #[expect(clippy::too_many_lines)]
574    pub(crate) fn validate(&self) -> Result<()> {
575        match self {
576            Type::Decimal32(scale) => {
577                if *scale == 0 || *scale > 9 {
578                    return Err(Error::TypeParseError(format!(
579                        "scale out of bounds for Decimal32({}) must be in range (1..=9)",
580                        *scale
581                    )));
582                }
583            }
584
585            Type::Decimal128(scale) => {
586                if *scale == 0 || *scale > 38 {
587                    return Err(Error::TypeParseError(format!(
588                        "scale out of bounds for Decimal128({}) must be in range (1..=38)",
589                        *scale
590                    )));
591                }
592            }
593            Type::Decimal256(scale) => {
594                if *scale == 0 || *scale > 76 {
595                    return Err(Error::TypeParseError(format!(
596                        "scale out of bounds for Decimal256({}) must be in range (1..=76)",
597                        *scale
598                    )));
599                }
600            }
601            Type::DateTime64(precision, _) | Type::Decimal64(precision) => {
602                if *precision == 0 || *precision > 18 {
603                    return Err(Error::TypeParseError(format!(
604                        "precision out of bounds for Decimal64/DateTime64({}) must be in range \
605                         (1..=18)",
606                        *precision
607                    )));
608                }
609            }
610            Type::LowCardinality(inner) => match inner.strip_null() {
611                Type::String
612                | Type::FixedSizedString(_)
613                | Type::Binary
614                | Type::FixedSizedBinary(_)
615                | Type::Date
616                | Type::Date32
617                | Type::DateTime(_)
618                | Type::Ipv4
619                | Type::Ipv6
620                | Type::Int8
621                | Type::Int16
622                | Type::Int32
623                | Type::Int64
624                | Type::Int128
625                | Type::Int256
626                | Type::UInt8
627                | Type::UInt16
628                | Type::UInt32
629                | Type::UInt64
630                | Type::UInt128
631                | Type::UInt256 => inner.validate()?,
632                _ => {
633                    return Err(Error::TypeParseError(format!(
634                        "illegal type '{inner:?}' in LowCardinality, not allowed"
635                    )));
636                }
637            },
638            Type::Array(inner) => {
639                inner.validate()?;
640            }
641            Type::Tuple(inner) => {
642                for inner in inner {
643                    inner.validate()?;
644                }
645            }
646            Type::Nullable(inner) => match &**inner {
647                Type::Array(_)
648                | Type::Map(_, _)
649                | Type::LowCardinality(_)
650                | Type::Tuple(_)
651                | Type::Nullable(_) => {
652                    return Err(Error::TypeParseError(format!(
653                        "nullable cannot contain composite type '{inner:?}'"
654                    )));
655                }
656                _ => inner.validate()?,
657            },
658            Type::Map(key, value) => {
659                if !matches!(
660                    &**key,
661                    Type::String
662                        | Type::FixedSizedString(_)
663                        | Type::Int8
664                        | Type::Int16
665                        | Type::Int32
666                        | Type::Int64
667                        | Type::Int128
668                        | Type::Int256
669                        | Type::UInt8
670                        | Type::UInt16
671                        | Type::UInt32
672                        | Type::UInt64
673                        | Type::UInt128
674                        | Type::UInt256
675                        | Type::LowCardinality(_)
676                        | Type::Uuid
677                        | Type::Date
678                        | Type::DateTime(_)
679                        | Type::DateTime64(_, _)
680                        | Type::Enum8(_)
681                        | Type::Enum16(_)
682                ) {
683                    return Err(Error::TypeParseError(
684                        "key in map must be String, Integer, LowCardinality, FixedString, UUID, \
685                         Date, DateTime, Date32, Enum"
686                            .to_string(),
687                    ));
688                }
689                key.validate()?;
690                value.validate()?;
691            }
692            // TODO: Add Object
693            _ => {}
694        }
695        Ok(())
696    }
697
698    pub(crate) fn validate_value(&self, value: &Value) -> Result<()> {
699        self.validate()?;
700        if !self.inner_validate_value(value) {
701            return Err(Error::TypeParseError(format!(
702                "could not assign value '{value:?}' to type '{self:?}'"
703            )));
704        }
705        Ok(())
706    }
707
708    fn inner_validate_value(&self, value: &Value) -> bool {
709        match (self, value) {
710            (Type::Int8, Value::Int8(_))
711            | (Type::Int16, Value::Int16(_))
712            | (Type::Int32, Value::Int32(_))
713            | (Type::Int64, Value::Int64(_))
714            | (Type::Int128, Value::Int128(_))
715            | (Type::Int256, Value::Int256(_))
716            | (Type::UInt8, Value::UInt8(_))
717            | (Type::UInt16, Value::UInt16(_))
718            | (Type::UInt32, Value::UInt32(_))
719            | (Type::UInt64, Value::UInt64(_))
720            | (Type::UInt128, Value::UInt128(_))
721            | (Type::UInt256, Value::UInt256(_))
722            | (Type::Float32, Value::Float32(_))
723            | (Type::Float64, Value::Float64(_))
724            | (Type::String | Type::FixedSizedString(_), Value::String(_))
725            | (Type::Uuid, Value::Uuid(_))
726            | (Type::Date, Value::Date(_))
727            | (Type::Date32, Value::Date32(_))
728            | (Type::Ipv4, Value::Ipv4(_))
729            | (Type::Ipv6, Value::Ipv6(_))
730            | (Type::Point, Value::Point(_))
731            | (Type::Ring, Value::Ring(_))
732            | (Type::Polygon, Value::Polygon(_))
733            | (Type::MultiPolygon, Value::MultiPolygon(_)) => true,
734            (Type::DateTime(tz1), Value::DateTime(date)) => tz1 == &date.0,
735            (Type::DateTime64(precision1, tz1), Value::DateTime64(tz2)) => {
736                tz1 == &tz2.0 && precision1 == &tz2.2
737            }
738            (Type::Decimal32(scale1), Value::Decimal32(scale2, _))
739            | (Type::Decimal64(scale1), Value::Decimal64(scale2, _))
740            | (Type::Decimal128(scale1), Value::Decimal128(scale2, _))
741            | (Type::Decimal256(scale1), Value::Decimal256(scale2, _)) => scale1 >= scale2,
742            (Type::FixedSizedString(_) | Type::String, Value::Array(items))
743                if items.iter().all(|item| matches!(item, Value::UInt8(_) | Value::Int8(_))) =>
744            {
745                true
746            }
747            (Type::Enum8(entries), Value::Enum8(_, index)) => entries.iter().any(|x| x.1 == *index),
748            (Type::Enum16(entries), Value::Enum16(_, index)) => {
749                entries.iter().any(|x| x.1 == *index)
750            }
751            (Type::LowCardinality(x), value) => x.inner_validate_value(value),
752            (Type::Array(inner_type), Value::Array(values)) => {
753                values.iter().all(|x| inner_type.inner_validate_value(x))
754            }
755            (Type::Tuple(inner_types), Value::Tuple(values)) => {
756                inner_types.len() == values.len()
757                    && inner_types
758                        .iter()
759                        .zip(values.iter())
760                        .all(|(type_, value)| type_.inner_validate_value(value))
761            }
762            (Type::Nullable(inner), value) => {
763                value == &Value::Null || inner.inner_validate_value(value)
764            }
765            (Type::Map(key, value), Value::Map(keys, values)) => {
766                keys.len() == values.len()
767                    && keys.iter().all(|x| key.inner_validate_value(x))
768                    && values.iter().all(|x| value.inner_validate_value(x))
769            }
770            _ => false,
771        }
772    }
773
774    /// Helper type to estimate capacity of a type
775    pub(crate) fn estimate_capacity(&self) -> usize {
776        match self {
777            Type::Int8 | Type::UInt8 => 1,
778            Type::Int16 | Type::UInt16 | Type::Date => 2,
779            Type::Int32
780            | Type::UInt32
781            | Type::Float32
782            | Type::Date32
783            | Type::DateTime(_)
784            | Type::Ipv4 => 4,
785            Type::Int64 | Type::UInt64 | Type::Float64 | Type::DateTime64(_, _) => 8,
786            Type::Int128 | Type::UInt128 | Type::Uuid | Type::Ipv6 | Type::Point => 16,
787            Type::Int256 | Type::UInt256 | Type::String | Type::Binary => 32,
788            Type::FixedSizedString(n) | Type::FixedSizedBinary(n) => *n,
789
790            // Complex types
791            Type::Array(inner) => {
792                let inner_data = inner.estimate_capacity();
793                (4 + inner_data) * 8 // 4 bytes for offsets estimate 8 items per array
794            }
795            Type::Nullable(inner) => inner.estimate_capacity(),
796            Type::Tuple(types) => types.iter().map(Type::estimate_capacity).sum(),
797            Type::Map(key, value) => {
798                let key_data = key.estimate_capacity();
799                let value_data = value.estimate_capacity();
800                4 + key_data + value_data // 4 bytes for offsets
801            }
802
803            // Placeholder for unsupported types
804            _ => 64, // Default to 8 bytes as a safe estimate
805        }
806    }
807}
808
809impl Type {
810    /// Write a single default value based on 'non-null' type. This is useful in scenarios where
811    /// `ClickHouse` expects a default value written for a null value of a type.
812    ///
813    /// # Errors
814    /// Returns `SerializeError` if the `Type` is not handled.
815    /// Returns `Io` error if the write fails.
816    pub(crate) async fn write_default<W: ClickHouseWrite>(&self, writer: &mut W) -> Result<()> {
817        match self.strip_null() {
818            Type::String | Type::Binary => {
819                writer.write_string("").await?;
820            }
821            Type::FixedSizedString(n) | Type::FixedSizedBinary(n) => {
822                writer.write_all(&vec![0u8; *n]).await?;
823            }
824            Type::Int8 => writer.write_i8(0).await?,
825            Type::Int16 => writer.write_i16_le(0).await?,
826            Type::Int32 => writer.write_i32_le(0).await?,
827            Type::Int64 => writer.write_i64_le(0).await?,
828            Type::Int128 => writer.write_all(&[0; 16]).await?,
829            Type::Int256 => writer.write_all(&[0; 32]).await?,
830            Type::UInt8 => writer.write_u8(0).await?,
831            Type::UInt16 => writer.write_u16_le(0).await?,
832            Type::UInt32 => writer.write_u32_le(0).await?,
833            Type::UInt64 => writer.write_u64_le(0).await?,
834            Type::UInt128 => writer.write_all(&[0; 16]).await?,
835            Type::UInt256 => writer.write_all(&[0; 32]).await?,
836            Type::Float32 => writer.write_f32_le(0.0).await?,
837            Type::Float64 => writer.write_f64_le(0.0).await?,
838            Type::Uuid => writer.write_all(&[0; 16]).await?,
839            Type::Ipv4 => writer.write_u32_le(0).await?,
840            Type::Ipv6 => writer.write_all(&[0; 16]).await?,
841            Type::Date => writer.write_u16_le(0).await?,
842            Type::Date32 => writer.write_i32_le(0).await?,
843            Type::DateTime(_) => writer.write_u32_le(0).await?,
844            Type::DateTime64(precision, _) => {
845                let bytes = (0_i64).to_le_bytes();
846                writer.write_all(&bytes[..*precision]).await?;
847            }
848            Type::Decimal32(_) => writer.write_i32_le(0).await?,
849            Type::Decimal64(_) => writer.write_i64_le(0).await?,
850            Type::Decimal128(_) => writer.write_all(&[0; 16]).await?,
851            Type::Decimal256(_) => writer.write_all(&[0; 32]).await?,
852            Type::Array(_) => writer.write_var_uint(0).await?, // Empty array
853            Type::Map(_, _) => writer.write_var_uint(0).await?, // Empty map
854            Type::Enum8(_) => writer.write_i8(0).await?,
855            Type::Enum16(_) => writer.write_i16(0).await?,
856            // Recursive
857            Type::LowCardinality(inner) => Box::pin(inner.write_default(writer)).await?,
858            Type::Tuple(inner) => {
859                for t in inner {
860                    Box::pin(t.write_default(writer)).await?;
861                }
862            }
863            _ => {
864                return Err(Error::SerializeError(format!("No default value for type: {self:?}")));
865            }
866        }
867        Ok(())
868    }
869
870    pub(crate) fn put_default<W: ClickHouseBytesWrite>(&self, writer: &mut W) -> Result<()> {
871        match self.strip_null() {
872            Type::String | Type::Binary => {
873                writer.put_string("")?;
874            }
875            Type::FixedSizedString(n) | Type::FixedSizedBinary(n) => {
876                writer.put_slice(&vec![0u8; *n]);
877            }
878            Type::Int8 | Type::Enum8(_) => writer.put_i8(0),
879            Type::Int16 => writer.put_i16_le(0),
880            Type::Int32 | Type::Date32 | Type::Decimal32(_) => writer.put_i32_le(0),
881            Type::Int64 | Type::Decimal64(_) => writer.put_i64_le(0),
882            Type::Int128 | Type::UInt128 | Type::Uuid | Type::Ipv6 | Type::Decimal128(_) => {
883                writer.put_slice(&[0; 16]);
884            }
885            Type::Int256 | Type::UInt256 | Type::Decimal256(_) => writer.put_slice(&[0; 32]),
886            Type::UInt8 => writer.put_u8(0),
887            Type::UInt16 | Type::Date => writer.put_u16_le(0),
888            Type::UInt32 | Type::Ipv4 | Type::DateTime(_) => writer.put_u32_le(0),
889            Type::UInt64 => writer.put_u64_le(0),
890            Type::Float32 => writer.put_f32_le(0.0),
891            Type::Float64 => writer.put_f64_le(0.0),
892            Type::DateTime64(precision, _) => {
893                let bytes = (0_i64).to_le_bytes();
894                writer.put_slice(&bytes[..*precision]);
895            }
896            Type::Array(_) => writer.put_var_uint(0)?, // Empty array
897            Type::Map(_, _) => writer.put_var_uint(0)?, // Empty map
898            Type::Enum16(_) => writer.put_i16(0),
899            // Recursive
900            Type::LowCardinality(inner) => inner.put_default(writer)?,
901            Type::Tuple(inner) => {
902                for t in inner {
903                    t.put_default(writer)?;
904                }
905            }
906            _ => {
907                return Err(Error::SerializeError(format!("No default value for type: {self:?}")));
908            }
909        }
910        Ok(())
911    }
912}
913
914pub(crate) trait Deserializer {
915    // TODO:
916    // Add custom serialization here. Will need to pass in state or via arg.
917    // Example from python:
918    // ```
919    //  def read_state_prefix(self, buf):
920    //     if self.has_custom_serialization:
921    //         use_custom_serialization = read_varint(buf)
922    //         if use_custom_serialization:
923    //             self.serialization = SparseSerialization(self)
924    // ```
925    fn read_prefix<R: ClickHouseRead>(
926        _type_: &Type,
927        _reader: &mut R,
928        _state: &mut DeserializerState,
929    ) -> impl Future<Output = Result<()>> {
930        async { Ok(()) }
931    }
932
933    fn read<R: ClickHouseRead>(
934        type_: &Type,
935        reader: &mut R,
936        rows: usize,
937        state: &mut DeserializerState,
938    ) -> impl Future<Output = Result<Vec<Value>>>;
939
940    fn read_sync(
941        type_: &Type,
942        reader: &mut impl ClickHouseBytesRead,
943        rows: usize,
944        state: &mut DeserializerState,
945    ) -> Result<Vec<Value>>;
946}
947
948pub(crate) trait Serializer {
949    fn write_prefix<W: ClickHouseWrite>(
950        _type_: &Type,
951        _writer: &mut W,
952        _state: &mut SerializerState,
953    ) -> impl Future<Output = Result<()>> {
954        async { Ok(()) }
955    }
956
957    fn write<W: ClickHouseWrite>(
958        type_: &Type,
959        values: Vec<Value>,
960        writer: &mut W,
961        state: &mut SerializerState,
962    ) -> impl Future<Output = Result<()>>;
963
964    fn write_sync(
965        type_: &Type,
966        values: Vec<Value>,
967        writer: &mut impl ClickHouseBytesWrite,
968        state: &mut SerializerState,
969    ) -> Result<()>;
970}