clickhouse_arrow/native/
types.rs

1pub(crate) mod deserialize;
2pub mod geo;
3pub(crate) mod low_cardinality;
4pub mod map;
5pub(crate) mod serialize;
6#[cfg(test)]
7mod tests;
8
9use std::fmt::Display;
10use std::str::FromStr;
11
12use chrono_tz::Tz;
13use futures_util::FutureExt;
14use tokio::io::AsyncWriteExt;
15use uuid::Uuid;
16
17use super::protocol::MAX_STRING_SIZE;
18use super::values::{
19    Date, DateTime, DynDateTime64, Ipv4, Ipv6, MultiPolygon, Point, Polygon, Ring, Value, i256,
20    u256,
21};
22use crate::formats::{DeserializerState, SerializerState};
23use crate::io::{ClickHouseBytesRead, ClickHouseBytesWrite, ClickHouseRead, ClickHouseWrite};
24use crate::{Date32, Error, Result};
25
26/// A raw `ClickHouse` type.
27#[derive(Clone, Debug, PartialEq, Eq, Hash)]
28#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
29pub enum Type {
30    Int8,
31    Int16,
32    Int32,
33    Int64,
34    Int128,
35    Int256,
36
37    UInt8,
38    UInt16,
39    UInt32,
40    UInt64,
41    UInt128,
42    UInt256,
43
44    Float32,
45    Float64,
46
47    // Inner value is SCALE
48    Decimal32(usize),
49    Decimal64(usize),
50    Decimal128(usize),
51    Decimal256(usize),
52
53    String,
54    FixedSizedString(usize),
55    Binary,
56    FixedSizedBinary(usize),
57
58    Uuid,
59
60    Date,
61    Date32, // NOTE: This is i32 days since 1900-01-01
62    DateTime(Tz),
63    DateTime64(usize, Tz),
64
65    Ipv4,
66    Ipv6,
67
68    // Geo types, see
69    // https://clickhouse.com/docs/en/sql-reference/data-types/geo
70    // These are just aliases of primitive types.
71    Point,
72    Ring,
73    Polygon,
74    MultiPolygon,
75
76    Nullable(Box<Type>),
77
78    Enum8(Vec<(String, i8)>),
79    Enum16(Vec<(String, i16)>),
80    LowCardinality(Box<Type>),
81    Array(Box<Type>),
82    Tuple(Vec<Type>),
83    Map(Box<Type>, Box<Type>),
84
85    Object,
86}
87
88impl Type {
89    /// # Errors
90    ///
91    /// Errors if the type is not an array
92    pub fn unwrap_array(&self) -> Result<&Type> {
93        match self {
94            Type::Array(x) => Ok(x),
95            _ => Err(Error::UnexpectedType(self.clone())),
96        }
97    }
98
99    pub fn unarray(&self) -> Option<&Type> {
100        match self {
101            Type::Array(x) => Some(&**x),
102            _ => None,
103        }
104    }
105
106    /// # Errors
107    ///
108    /// Errors if the type is not a map
109    pub fn unwrap_map(&self) -> Result<(&Type, &Type)> {
110        match self {
111            Type::Map(key, value) => Ok((&**key, &**value)),
112            _ => Err(Error::UnexpectedType(self.clone())),
113        }
114    }
115
116    pub fn unmap(&self) -> Option<(&Type, &Type)> {
117        match self {
118            Type::Map(key, value) => Some((&**key, &**value)),
119            _ => None,
120        }
121    }
122
123    /// # Errors
124    ///
125    /// Errors if the type is not a tuple
126    pub fn unwrap_tuple(&self) -> Result<&[Type]> {
127        match self {
128            Type::Tuple(x) => Ok(&x[..]),
129            _ => Err(Error::UnexpectedType(self.clone())),
130        }
131    }
132
133    pub fn untuple(&self) -> Option<&[Type]> {
134        match self {
135            Type::Tuple(x) => Some(&x[..]),
136            _ => None,
137        }
138    }
139
140    pub fn unnull(&self) -> Option<&Type> {
141        match self {
142            Type::Nullable(x) => Some(&**x),
143            _ => None,
144        }
145    }
146
147    pub fn strip_null(&self) -> &Type {
148        match self {
149            Type::Nullable(x) => x,
150            _ => self,
151        }
152    }
153
154    pub fn is_nullable(&self) -> bool { matches!(self, Type::Nullable(_)) }
155
156    pub fn strip_low_cardinality(&self) -> &Type {
157        match self {
158            Type::LowCardinality(x) => x,
159            _ => self,
160        }
161    }
162
163    #[must_use]
164    pub fn into_nullable(self) -> Type {
165        match self {
166            t @ Type::Nullable(_) => t,
167            // LowCardinality pushes nullability down
168            Type::LowCardinality(inner) => Type::LowCardinality(Box::new(inner.into_nullable())),
169            t => Type::Nullable(Box::new(t)),
170        }
171    }
172
173    pub fn default_value(&self) -> Value {
174        match self {
175            Type::Int8 => Value::Int8(0),
176            Type::Int16 => Value::Int16(0),
177            Type::Int32 => Value::Int32(0),
178            Type::Int64 => Value::Int64(0),
179            Type::Int128 => Value::Int128(0),
180            Type::Int256 => Value::Int256(i256::default()),
181            Type::UInt8 => Value::UInt8(0),
182            Type::UInt16 => Value::UInt16(0),
183            Type::UInt32 => Value::UInt32(0),
184            Type::UInt64 => Value::UInt64(0),
185            Type::UInt128 => Value::UInt128(0),
186            Type::UInt256 => Value::UInt256(u256::default()),
187            Type::Float32 => Value::Float32(0.0),
188            Type::Float64 => Value::Float64(0.0),
189            Type::Decimal32(s) => Value::Decimal32(*s, 0),
190            Type::Decimal64(s) => Value::Decimal64(*s, 0),
191            Type::Decimal128(s) => Value::Decimal128(*s, 0),
192            Type::Decimal256(s) => Value::Decimal256(*s, i256::default()),
193            Type::String | Type::FixedSizedString(_) | Type::Binary | Type::FixedSizedBinary(_) => {
194                Value::String(vec![])
195            }
196            Type::Date => Value::Date(Date(0)),
197            Type::Date32 => Value::Date32(Date32(0)),
198            Type::DateTime(tz) => Value::DateTime(DateTime(*tz, 0)),
199            Type::DateTime64(precision, tz) => Value::DateTime64(DynDateTime64(*tz, 0, *precision)),
200            Type::Ipv4 => Value::Ipv4(Ipv4::default()),
201            Type::Ipv6 => Value::Ipv6(Ipv6::default()),
202            Type::Enum8(_) => Value::Enum8(String::new(), 0),
203            Type::Enum16(_) => Value::Enum16(String::new(), 0),
204            Type::LowCardinality(x) => x.default_value(),
205            Type::Array(_) => Value::Array(vec![]),
206            Type::Tuple(types) => Value::Tuple(types.iter().map(Type::default_value).collect()),
207            Type::Nullable(_) => Value::Null,
208            Type::Map(_, _) => Value::Map(vec![], vec![]),
209            Type::Point => Value::Point(Point::default()),
210            Type::Ring => Value::Ring(Ring::default()),
211            Type::Polygon => Value::Polygon(Polygon::default()),
212            Type::MultiPolygon => Value::MultiPolygon(MultiPolygon::default()),
213            Type::Uuid => Value::Uuid(Uuid::from_u128(0)),
214            Type::Object => Value::Object("{}".as_bytes().to_vec()),
215        }
216    }
217}
218
219impl Display for Type {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        match self {
222            Type::Int8 => write!(f, "Int8"),
223            Type::Int16 => write!(f, "Int16"),
224            Type::Int32 => write!(f, "Int32"),
225            Type::Int64 => write!(f, "Int64"),
226            Type::Int128 => write!(f, "Int128"),
227            Type::Int256 => write!(f, "Int256"),
228            Type::UInt8 => write!(f, "UInt8"),
229            Type::UInt16 => write!(f, "UInt16"),
230            Type::UInt32 => write!(f, "UInt32"),
231            Type::UInt64 => write!(f, "UInt64"),
232            Type::UInt128 => write!(f, "UInt128"),
233            Type::UInt256 => write!(f, "UInt256"),
234            Type::Float32 => write!(f, "Float32"),
235            Type::Float64 => write!(f, "Float64"),
236            Type::Decimal32(s) => write!(f, "Decimal32({s})"),
237            Type::Decimal64(s) => write!(f, "Decimal64({s})"),
238            Type::Decimal128(s) => write!(f, "Decimal128({s})"),
239            Type::Decimal256(s) => write!(f, "Decimal256({s})"),
240            Type::String | Type::Binary => write!(f, "String"),
241            Type::FixedSizedBinary(s) | Type::FixedSizedString(s) => write!(f, "FixedString({s})"),
242            Type::Uuid => write!(f, "UUID"),
243            Type::Date => write!(f, "Date"),
244            Type::Date32 => write!(f, "Date32"),
245            Type::DateTime(tz) => write!(f, "DateTime('{tz}')"),
246            Type::DateTime64(precision, tz) => write!(f, "DateTime64({precision},'{tz}')"),
247            Type::Ipv4 => write!(f, "IPv4"),
248            Type::Ipv6 => write!(f, "IPv6"),
249            Type::Point => write!(f, "Point"),
250            Type::Ring => write!(f, "Ring"),
251            Type::Polygon => write!(f, "Polygon"),
252            Type::MultiPolygon => write!(f, "MultiPolygon"),
253            Type::Enum8(items) => {
254                write!(f, "Enum8(")?;
255                if !items.is_empty() {
256                    let last_index = items.len() - 1;
257                    for (i, (name, value)) in items.iter().enumerate() {
258                        write!(f, "'{}' = {value}", name.replace('\'', "''"))?;
259                        if i < last_index {
260                            write!(f, ",")?;
261                        }
262                    }
263                }
264                write!(f, ")")
265            }
266            Type::Enum16(items) => {
267                write!(f, "Enum16(")?;
268                if !items.is_empty() {
269                    let last_index = items.len() - 1;
270                    for (i, (name, value)) in items.iter().enumerate() {
271                        write!(f, "'{}' = {value}", name.replace('\'', "''"))?;
272                        if i < last_index {
273                            write!(f, ",")?;
274                        }
275                    }
276                }
277                write!(f, ")")
278            }
279            Type::LowCardinality(inner) => write!(f, "LowCardinality({inner})"),
280            Type::Array(inner) => write!(f, "Array({inner})"),
281            Type::Tuple(items) => write!(
282                f,
283                "Tuple({})",
284                items.iter().map(ToString::to_string).collect::<Vec<_>>().join(",")
285            ),
286            Type::Nullable(inner) => write!(f, "Nullable({inner})"),
287            Type::Map(key, value) => write!(f, "Map({key},{value})"),
288            Type::Object => write!(f, "JSON"),
289        }
290    }
291}
292
293impl Type {
294    pub(crate) fn deserialize_column<'a, R: ClickHouseRead>(
295        &'a self,
296        reader: &'a mut R,
297        rows: usize,
298        state: &'a mut DeserializerState,
299    ) -> impl Future<Output = Result<Vec<Value>>> + Send + 'a {
300        use deserialize::*;
301        async move {
302            if rows > MAX_STRING_SIZE {
303                return Err(Error::Protocol(format!(
304                    "deserialize response size too large. {rows} > {MAX_STRING_SIZE}"
305                )));
306            }
307
308            Ok(match self {
309                Type::Int8
310                | Type::Int16
311                | Type::Int32
312                | Type::Int64
313                | Type::Int128
314                | Type::Int256
315                | Type::UInt8
316                | Type::UInt16
317                | Type::UInt32
318                | Type::UInt64
319                | Type::UInt128
320                | Type::UInt256
321                | Type::Float32
322                | Type::Float64
323                | Type::Decimal32(_)
324                | Type::Decimal64(_)
325                | Type::Decimal128(_)
326                | Type::Decimal256(_)
327                | Type::Uuid
328                | Type::Date
329                | Type::Date32
330                | Type::DateTime(_)
331                | Type::DateTime64(_, _)
332                | Type::Ipv4
333                | Type::Ipv6
334                | Type::Enum8(_)
335                | Type::Enum16(_) => {
336                    sized::SizedDeserializer::read(self, reader, rows, state).await?
337                }
338                Type::String
339                | Type::FixedSizedString(_)
340                | Type::Binary
341                | Type::FixedSizedBinary(_) => {
342                    string::StringDeserializer::read(self, reader, rows, state).await?
343                }
344                Type::Array(_) => array::ArrayDeserializer::read(self, reader, rows, state).await?,
345                Type::Ring => geo::RingDeserializer::read(self, reader, rows, state).await?,
346                Type::Polygon => geo::PolygonDeserializer::read(self, reader, rows, state).await?,
347                Type::MultiPolygon => {
348                    geo::MultiPolygonDeserializer::read(self, reader, rows, state).await?
349                }
350                Type::Tuple(_) => tuple::TupleDeserializer::read(self, reader, rows, state).await?,
351                Type::Point => geo::PointDeserializer::read(self, reader, rows, state).await?,
352                Type::Nullable(_) => {
353                    nullable::NullableDeserializer::read(self, reader, rows, state).await?
354                }
355                Type::Map(_, _) => map::MapDeserializer::read(self, reader, rows, state).await?,
356                Type::LowCardinality(_) => {
357                    low_cardinality::LowCardinalityDeserializer::read(self, reader, rows, state)
358                        .await?
359                }
360                Type::Object => object::ObjectDeserializer::read(self, reader, rows, state).await?,
361            })
362        }
363        .boxed()
364    }
365
366    #[allow(dead_code)] // TODO: remove once synchronous native path is fully retired
367    pub(crate) fn deserialize_column_sync(
368        &self,
369        reader: &mut impl ClickHouseBytesRead,
370        rows: usize,
371        state: &mut DeserializerState,
372    ) -> Result<Vec<Value>> {
373        use deserialize::*;
374
375        if rows > MAX_STRING_SIZE {
376            return Err(Error::Protocol(format!(
377                "deserialize response size too large. {rows} > {MAX_STRING_SIZE}"
378            )));
379        }
380
381        Ok(match self {
382            Type::Int8
383            | Type::Int16
384            | Type::Int32
385            | Type::Int64
386            | Type::Int128
387            | Type::Int256
388            | Type::UInt8
389            | Type::UInt16
390            | Type::UInt32
391            | Type::UInt64
392            | Type::UInt128
393            | Type::UInt256
394            | Type::Float32
395            | Type::Float64
396            | Type::Decimal32(_)
397            | Type::Decimal64(_)
398            | Type::Decimal128(_)
399            | Type::Decimal256(_)
400            | Type::Uuid
401            | Type::Date
402            | Type::Date32
403            | Type::DateTime(_)
404            | Type::DateTime64(_, _)
405            | Type::Ipv4
406            | Type::Ipv6
407            | Type::Enum8(_)
408            | Type::Enum16(_) => sized::SizedDeserializer::read_sync(self, reader, rows, state)?,
409            Type::String | Type::FixedSizedString(_) | Type::Binary | Type::FixedSizedBinary(_) => {
410                string::StringDeserializer::read_sync(self, reader, rows, state)?
411            }
412            Type::Array(_) => array::ArrayDeserializer::read_sync(self, reader, rows, state)?,
413            Type::Ring => geo::RingDeserializer::read_sync(self, reader, rows, state)?,
414            Type::Polygon => geo::PolygonDeserializer::read_sync(self, reader, rows, state)?,
415            Type::MultiPolygon => {
416                geo::MultiPolygonDeserializer::read_sync(self, reader, rows, state)?
417            }
418            Type::Tuple(_) => tuple::TupleDeserializer::read_sync(self, reader, rows, state)?,
419            Type::Point => geo::PointDeserializer::read_sync(self, reader, rows, state)?,
420            Type::Nullable(_) => {
421                nullable::NullableDeserializer::read_sync(self, reader, rows, state)?
422            }
423            Type::Map(_, _) => map::MapDeserializer::read_sync(self, reader, rows, state)?,
424            Type::LowCardinality(_) => {
425                low_cardinality::LowCardinalityDeserializer::read_sync(self, reader, rows, state)?
426            }
427            Type::Object => object::ObjectDeserializer::read_sync(self, reader, rows, state)?,
428        })
429    }
430
431    pub(crate) fn serialize_column<'a, W: ClickHouseWrite>(
432        &'a self,
433        values: Vec<Value>,
434        writer: &'a mut W,
435        state: &'a mut SerializerState,
436    ) -> impl Future<Output = Result<()>> + Send + 'a {
437        use serialize::*;
438        async move {
439            match self {
440                Type::Int8
441                | Type::Int16
442                | Type::Int32
443                | Type::Int64
444                | Type::Int128
445                | Type::Int256
446                | Type::UInt8
447                | Type::UInt16
448                | Type::UInt32
449                | Type::UInt64
450                | Type::UInt128
451                | Type::UInt256
452                | Type::Float32
453                | Type::Float64
454                | Type::Decimal32(_)
455                | Type::Decimal64(_)
456                | Type::Decimal128(_)
457                | Type::Decimal256(_)
458                | Type::Uuid
459                | Type::Date
460                | Type::Date32
461                | Type::DateTime(_)
462                | Type::DateTime64(_, _)
463                | Type::Ipv4
464                | Type::Ipv6
465                | Type::Enum8(_)
466                | Type::Enum16(_) => {
467                    sized::SizedSerializer::write(self, values, writer, state).await?;
468                }
469
470                Type::String
471                | Type::FixedSizedString(_)
472                | Type::Binary
473                | Type::FixedSizedBinary(_) => {
474                    string::StringSerializer::write(self, values, writer, state).await?;
475                }
476
477                Type::Array(_) => {
478                    array::ArraySerializer::write(self, values, writer, state).await?;
479                }
480                Type::Tuple(_) => {
481                    tuple::TupleSerializer::write(self, values, writer, state).await?;
482                }
483                Type::Point => geo::PointSerializer::write(self, values, writer, state).await?,
484                Type::Ring => geo::RingSerializer::write(self, values, writer, state).await?,
485                Type::Polygon => geo::PolygonSerializer::write(self, values, writer, state).await?,
486                Type::MultiPolygon => {
487                    geo::MultiPolygonSerializer::write(self, values, writer, state).await?;
488                }
489                Type::Nullable(_) => {
490                    nullable::NullableSerializer::write(self, values, writer, state).await?;
491                }
492                Type::Map(_, _) => map::MapSerializer::write(self, values, writer, state).await?,
493                Type::LowCardinality(_) => {
494                    low_cardinality::LowCardinalitySerializer::write(self, values, writer, state)
495                        .await?;
496                }
497                Type::Object => {
498                    object::ObjectSerializer::write(self, values, writer, state).await?;
499                }
500            }
501            Ok(())
502        }
503        .boxed()
504    }
505
506    pub(crate) fn serialize_column_sync(
507        &self,
508        values: Vec<Value>,
509        writer: &mut impl ClickHouseBytesWrite,
510        state: &mut SerializerState,
511    ) -> Result<()> {
512        use serialize::*;
513        match self {
514            Type::Int8
515            | Type::Int16
516            | Type::Int32
517            | Type::Int64
518            | Type::Int128
519            | Type::Int256
520            | Type::UInt8
521            | Type::UInt16
522            | Type::UInt32
523            | Type::UInt64
524            | Type::UInt128
525            | Type::UInt256
526            | Type::Float32
527            | Type::Float64
528            | Type::Decimal32(_)
529            | Type::Decimal64(_)
530            | Type::Decimal128(_)
531            | Type::Decimal256(_)
532            | Type::Uuid
533            | Type::Date
534            | Type::Date32
535            | Type::DateTime(_)
536            | Type::DateTime64(_, _)
537            | Type::Ipv4
538            | Type::Ipv6
539            | Type::Enum8(_)
540            | Type::Enum16(_) => {
541                sized::SizedSerializer::write_sync(self, values, writer, state)?;
542            }
543
544            Type::String | Type::FixedSizedString(_) | Type::Binary | Type::FixedSizedBinary(_) => {
545                string::StringSerializer::write_sync(self, values, writer, state)?;
546            }
547
548            Type::Array(_) => {
549                array::ArraySerializer::write_sync(self, values, writer, state)?;
550            }
551            Type::Tuple(_) => {
552                tuple::TupleSerializer::write_sync(self, values, writer, state)?;
553            }
554            Type::Point => geo::PointSerializer::write_sync(self, values, writer, state)?,
555            Type::Ring => geo::RingSerializer::write_sync(self, values, writer, state)?,
556            Type::Polygon => geo::PolygonSerializer::write_sync(self, values, writer, state)?,
557            Type::MultiPolygon => {
558                geo::MultiPolygonSerializer::write_sync(self, values, writer, state)?;
559            }
560            Type::Nullable(_) => {
561                nullable::NullableSerializer::write_sync(self, values, writer, state)?;
562            }
563            Type::Map(_, _) => map::MapSerializer::write_sync(self, values, writer, state)?,
564            Type::LowCardinality(_) => {
565                low_cardinality::LowCardinalitySerializer::write_sync(self, values, writer, state)?;
566            }
567            Type::Object => {
568                object::ObjectSerializer::write_sync(self, values, writer, state)?;
569            }
570        }
571        Ok(())
572    }
573
574    #[expect(clippy::too_many_lines)]
575    pub(crate) fn validate(&self) -> Result<()> {
576        match self {
577            Type::Decimal32(scale) => {
578                if *scale == 0 || *scale > 9 {
579                    return Err(Error::TypeParseError(format!(
580                        "scale out of bounds for Decimal32({}) must be in range (1..=9)",
581                        *scale
582                    )));
583                }
584            }
585
586            Type::Decimal128(scale) => {
587                if *scale == 0 || *scale > 38 {
588                    return Err(Error::TypeParseError(format!(
589                        "scale out of bounds for Decimal128({}) must be in range (1..=38)",
590                        *scale
591                    )));
592                }
593            }
594            Type::Decimal256(scale) => {
595                if *scale == 0 || *scale > 76 {
596                    return Err(Error::TypeParseError(format!(
597                        "scale out of bounds for Decimal256({}) must be in range (1..=76)",
598                        *scale
599                    )));
600                }
601            }
602            Type::DateTime64(precision, _) | Type::Decimal64(precision) => {
603                if *precision == 0 || *precision > 18 {
604                    return Err(Error::TypeParseError(format!(
605                        "precision out of bounds for Decimal64/DateTime64({}) must be in range \
606                         (1..=18)",
607                        *precision
608                    )));
609                }
610            }
611            Type::LowCardinality(inner) => match inner.strip_null() {
612                Type::String
613                | Type::FixedSizedString(_)
614                | Type::Binary
615                | Type::FixedSizedBinary(_)
616                | Type::Date
617                | Type::Date32
618                | Type::DateTime(_)
619                | Type::Ipv4
620                | Type::Ipv6
621                | Type::Int8
622                | Type::Int16
623                | Type::Int32
624                | Type::Int64
625                | Type::Int128
626                | Type::Int256
627                | Type::UInt8
628                | Type::UInt16
629                | Type::UInt32
630                | Type::UInt64
631                | Type::UInt128
632                | Type::UInt256 => inner.validate()?,
633                _ => {
634                    return Err(Error::TypeParseError(format!(
635                        "illegal type '{inner:?}' in LowCardinality, not allowed"
636                    )));
637                }
638            },
639            Type::Array(inner) => {
640                inner.validate()?;
641            }
642            Type::Tuple(inner) => {
643                for inner in inner {
644                    inner.validate()?;
645                }
646            }
647            Type::Nullable(inner) => match &**inner {
648                Type::Array(_)
649                | Type::Map(_, _)
650                | Type::LowCardinality(_)
651                | Type::Tuple(_)
652                | Type::Nullable(_) => {
653                    return Err(Error::TypeParseError(format!(
654                        "nullable cannot contain composite type '{inner:?}'"
655                    )));
656                }
657                _ => inner.validate()?,
658            },
659            Type::Map(key, value) => {
660                if !matches!(
661                    &**key,
662                    Type::String
663                        | Type::FixedSizedString(_)
664                        | Type::Int8
665                        | Type::Int16
666                        | Type::Int32
667                        | Type::Int64
668                        | Type::Int128
669                        | Type::Int256
670                        | Type::UInt8
671                        | Type::UInt16
672                        | Type::UInt32
673                        | Type::UInt64
674                        | Type::UInt128
675                        | Type::UInt256
676                        | Type::LowCardinality(_)
677                        | Type::Uuid
678                        | Type::Date
679                        | Type::DateTime(_)
680                        | Type::DateTime64(_, _)
681                        | Type::Enum8(_)
682                        | Type::Enum16(_)
683                ) {
684                    return Err(Error::TypeParseError(
685                        "key in map must be String, Integer, LowCardinality, FixedString, UUID, \
686                         Date, DateTime, Date32, Enum"
687                            .to_string(),
688                    ));
689                }
690                key.validate()?;
691                value.validate()?;
692            }
693            // TODO: Add Object
694            _ => {}
695        }
696        Ok(())
697    }
698
699    pub(crate) fn validate_value(&self, value: &Value) -> Result<()> {
700        self.validate()?;
701        if !self.inner_validate_value(value) {
702            return Err(Error::TypeParseError(format!(
703                "could not assign value '{value:?}' to type '{self:?}'"
704            )));
705        }
706        Ok(())
707    }
708
709    fn inner_validate_value(&self, value: &Value) -> bool {
710        match (self, value) {
711            (Type::Int8, Value::Int8(_))
712            | (Type::Int16, Value::Int16(_))
713            | (Type::Int32, Value::Int32(_))
714            | (Type::Int64, Value::Int64(_))
715            | (Type::Int128, Value::Int128(_))
716            | (Type::Int256, Value::Int256(_))
717            | (Type::UInt8, Value::UInt8(_))
718            | (Type::UInt16, Value::UInt16(_))
719            | (Type::UInt32, Value::UInt32(_))
720            | (Type::UInt64, Value::UInt64(_))
721            | (Type::UInt128, Value::UInt128(_))
722            | (Type::UInt256, Value::UInt256(_))
723            | (Type::Float32, Value::Float32(_))
724            | (Type::Float64, Value::Float64(_))
725            | (Type::String | Type::FixedSizedString(_), Value::String(_))
726            | (Type::Uuid, Value::Uuid(_))
727            | (Type::Date, Value::Date(_))
728            | (Type::Date32, Value::Date32(_))
729            | (Type::Ipv4, Value::Ipv4(_))
730            | (Type::Ipv6, Value::Ipv6(_))
731            | (Type::Point, Value::Point(_))
732            | (Type::Ring, Value::Ring(_))
733            | (Type::Polygon, Value::Polygon(_))
734            | (Type::MultiPolygon, Value::MultiPolygon(_)) => true,
735            (Type::DateTime(tz1), Value::DateTime(date)) => tz1 == &date.0,
736            (Type::DateTime64(precision1, tz1), Value::DateTime64(tz2)) => {
737                tz1 == &tz2.0 && precision1 == &tz2.2
738            }
739            (Type::Decimal32(scale1), Value::Decimal32(scale2, _))
740            | (Type::Decimal64(scale1), Value::Decimal64(scale2, _))
741            | (Type::Decimal128(scale1), Value::Decimal128(scale2, _))
742            | (Type::Decimal256(scale1), Value::Decimal256(scale2, _)) => scale1 >= scale2,
743            (Type::FixedSizedString(_) | Type::String, Value::Array(items))
744                if items.iter().all(|item| matches!(item, Value::UInt8(_) | Value::Int8(_))) =>
745            {
746                true
747            }
748            (Type::Enum8(entries), Value::Enum8(_, index)) => entries.iter().any(|x| x.1 == *index),
749            (Type::Enum16(entries), Value::Enum16(_, index)) => {
750                entries.iter().any(|x| x.1 == *index)
751            }
752            (Type::LowCardinality(x), value) => x.inner_validate_value(value),
753            (Type::Array(inner_type), Value::Array(values)) => {
754                values.iter().all(|x| inner_type.inner_validate_value(x))
755            }
756            (Type::Tuple(inner_types), Value::Tuple(values)) => {
757                inner_types.len() == values.len()
758                    && inner_types
759                        .iter()
760                        .zip(values.iter())
761                        .all(|(type_, value)| type_.inner_validate_value(value))
762            }
763            (Type::Nullable(inner), value) => {
764                value == &Value::Null || inner.inner_validate_value(value)
765            }
766            (Type::Map(key, value), Value::Map(keys, values)) => {
767                keys.len() == values.len()
768                    && keys.iter().all(|x| key.inner_validate_value(x))
769                    && values.iter().all(|x| value.inner_validate_value(x))
770            }
771            _ => false,
772        }
773    }
774
775    /// Helper type to estimate capacity of a type
776    pub(crate) fn estimate_capacity(&self) -> usize {
777        match self {
778            Type::Int8 | Type::UInt8 => 1,
779            Type::Int16 | Type::UInt16 | Type::Date => 2,
780            Type::Int32
781            | Type::UInt32
782            | Type::Float32
783            | Type::Date32
784            | Type::DateTime(_)
785            | Type::Ipv4 => 4,
786            Type::Int64 | Type::UInt64 | Type::Float64 | Type::DateTime64(_, _) => 8,
787            Type::Int128 | Type::UInt128 | Type::Uuid | Type::Ipv6 | Type::Point => 16,
788            Type::Int256 | Type::UInt256 | Type::String | Type::Binary => 32,
789            Type::FixedSizedString(n) | Type::FixedSizedBinary(n) => *n,
790
791            // Complex types
792            Type::Array(inner) => {
793                let inner_data = inner.estimate_capacity();
794                (4 + inner_data) * 8 // 4 bytes for offsets estimate 8 items per array
795            }
796            Type::Nullable(inner) => inner.estimate_capacity(),
797            Type::Tuple(types) => types.iter().map(Type::estimate_capacity).sum(),
798            Type::Map(key, value) => {
799                let key_data = key.estimate_capacity();
800                let value_data = value.estimate_capacity();
801                4 + key_data + value_data // 4 bytes for offsets
802            }
803
804            // Placeholder for unsupported types
805            _ => 64, // Default to 8 bytes as a safe estimate
806        }
807    }
808}
809
810impl Type {
811    /// Write a single default value based on 'non-null' type. This is useful in scenarios where
812    /// `ClickHouse` expects a default value written for a null value of a type.
813    ///
814    /// # Errors
815    /// Returns `SerializeError` if the `Type` is not handled.
816    /// Returns `Io` error if the write fails.
817    pub(crate) async fn write_default<W: ClickHouseWrite>(&self, writer: &mut W) -> Result<()> {
818        match self.strip_null() {
819            Type::String | Type::Binary => {
820                writer.write_string("").await?;
821            }
822            Type::FixedSizedString(n) | Type::FixedSizedBinary(n) => {
823                writer.write_all(&vec![0u8; *n]).await?;
824            }
825            Type::Int8 | Type::Enum8(_) => writer.write_i8(0).await?,
826            Type::Int16 | Type::Enum16(_) => writer.write_i16_le(0).await?,
827            Type::Int32 | Type::Date32 | Type::Decimal32(_) => writer.write_i32_le(0).await?,
828            Type::Int64 | Type::Decimal64(_) => writer.write_i64_le(0).await?,
829            Type::Int128 | Type::UInt128 | Type::Uuid | Type::Ipv6 | Type::Decimal128(_) => {
830                writer.write_all(&[0; 16]).await?;
831            }
832            Type::Int256 | Type::UInt256 | Type::Decimal256(_) => {
833                writer.write_all(&[0; 32]).await?;
834            }
835            Type::UInt8 => writer.write_u8(0).await?,
836            Type::UInt16 | Type::Date => writer.write_u16_le(0).await?,
837            Type::UInt32 | Type::Ipv4 | Type::DateTime(_) => writer.write_u32_le(0).await?,
838            Type::UInt64 => writer.write_u64_le(0).await?,
839            Type::Float32 => writer.write_f32_le(0.0).await?,
840            Type::Float64 => writer.write_f64_le(0.0).await?,
841            Type::DateTime64(precision, _) => {
842                let bytes = (0_i64).to_le_bytes();
843                writer.write_all(&bytes[..*precision]).await?;
844            }
845            Type::Array(_) | Type::Map(_, _) => writer.write_var_uint(0).await?, // Empty array/map
846            // Recursive
847            Type::LowCardinality(inner) => Box::pin(inner.write_default(writer)).await?,
848            Type::Tuple(inner) => {
849                for t in inner {
850                    Box::pin(t.write_default(writer)).await?;
851                }
852            }
853            _ => {
854                return Err(Error::SerializeError(format!("No default value for type: {self:?}")));
855            }
856        }
857        Ok(())
858    }
859
860    pub(crate) fn put_default<W: ClickHouseBytesWrite>(&self, writer: &mut W) -> Result<()> {
861        match self.strip_null() {
862            Type::String | Type::Binary => {
863                writer.put_string("")?;
864            }
865            Type::FixedSizedString(n) | Type::FixedSizedBinary(n) => {
866                writer.put_slice(&vec![0u8; *n]);
867            }
868            Type::Int8 | Type::Enum8(_) => writer.put_i8(0),
869            Type::Int16 => writer.put_i16_le(0),
870            Type::Int32 | Type::Date32 | Type::Decimal32(_) => writer.put_i32_le(0),
871            Type::Int64 | Type::Decimal64(_) => writer.put_i64_le(0),
872            Type::Int128 | Type::UInt128 | Type::Uuid | Type::Ipv6 | Type::Decimal128(_) => {
873                writer.put_slice(&[0; 16]);
874            }
875            Type::Int256 | Type::UInt256 | Type::Decimal256(_) => writer.put_slice(&[0; 32]),
876            Type::UInt8 => writer.put_u8(0),
877            Type::UInt16 | Type::Date => writer.put_u16_le(0),
878            Type::UInt32 | Type::Ipv4 | Type::DateTime(_) => writer.put_u32_le(0),
879            Type::UInt64 => writer.put_u64_le(0),
880            Type::Float32 => writer.put_f32_le(0.0),
881            Type::Float64 => writer.put_f64_le(0.0),
882            Type::DateTime64(precision, _) => {
883                let bytes = (0_i64).to_le_bytes();
884                writer.put_slice(&bytes[..*precision]);
885            }
886            Type::Array(_) | Type::Map(_, _) => writer.put_var_uint(0)?, // Empty array/map
887            Type::Enum16(_) => writer.put_i16(0),
888            // Recursive
889            Type::LowCardinality(inner) => inner.put_default(writer)?,
890            Type::Tuple(inner) => {
891                for t in inner {
892                    t.put_default(writer)?;
893                }
894            }
895            _ => {
896                return Err(Error::SerializeError(format!("No default value for type: {self:?}")));
897            }
898        }
899        Ok(())
900    }
901}
902
903pub(crate) trait Deserializer {
904    // TODO:
905    // Add custom serialization here. Will need to pass in state or via arg.
906    // Example from python:
907    // ```
908    //  def read_state_prefix(self, buf):
909    //     if self.has_custom_serialization:
910    //         use_custom_serialization = read_varint(buf)
911    //         if use_custom_serialization:
912    //             self.serialization = SparseSerialization(self)
913    // ```
914    fn read_prefix<R: ClickHouseRead>(
915        _type_: &Type,
916        _reader: &mut R,
917        _state: &mut DeserializerState,
918    ) -> impl Future<Output = Result<()>> {
919        async { Ok(()) }
920    }
921
922    fn read<R: ClickHouseRead>(
923        type_: &Type,
924        reader: &mut R,
925        rows: usize,
926        state: &mut DeserializerState,
927    ) -> impl Future<Output = Result<Vec<Value>>>;
928
929    #[allow(dead_code)] // TODO: remove once synchronous native path is fully retired
930    fn read_sync(
931        type_: &Type,
932        reader: &mut impl ClickHouseBytesRead,
933        rows: usize,
934        state: &mut DeserializerState,
935    ) -> Result<Vec<Value>>;
936}
937
938pub(crate) trait Serializer {
939    fn write_prefix<W: ClickHouseWrite>(
940        _type_: &Type,
941        _writer: &mut W,
942        _state: &mut SerializerState,
943    ) -> impl Future<Output = Result<()>> {
944        async { Ok(()) }
945    }
946
947    fn write<W: ClickHouseWrite>(
948        type_: &Type,
949        values: Vec<Value>,
950        writer: &mut W,
951        state: &mut SerializerState,
952    ) -> impl Future<Output = Result<()>>;
953
954    fn write_sync(
955        type_: &Type,
956        values: Vec<Value>,
957        writer: &mut impl ClickHouseBytesWrite,
958        state: &mut SerializerState,
959    ) -> Result<()>;
960}