1pub(crate) mod deserialize;
2pub mod geo;
3pub(crate) mod low_cardinality;
4pub mod map;
5pub(crate) mod serialize;
6#[cfg(test)]
7mod tests;
8
9use std::fmt::Display;
10use std::str::FromStr;
11
12use chrono_tz::Tz;
13use futures_util::FutureExt;
14use tokio::io::AsyncWriteExt;
15use uuid::Uuid;
16
17use super::protocol::MAX_STRING_SIZE;
18use super::values::{
19 Date, DateTime, DynDateTime64, Ipv4, Ipv6, MultiPolygon, Point, Polygon, Ring, Value, i256,
20 u256,
21};
22use crate::formats::{DeserializerState, SerializerState};
23use crate::io::{ClickHouseBytesRead, ClickHouseBytesWrite, ClickHouseRead, ClickHouseWrite};
24use crate::{Date32, Error, Result};
25
26#[derive(Clone, Debug, PartialEq, Eq, Hash)]
28#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
29pub enum Type {
30 Int8,
31 Int16,
32 Int32,
33 Int64,
34 Int128,
35 Int256,
36
37 UInt8,
38 UInt16,
39 UInt32,
40 UInt64,
41 UInt128,
42 UInt256,
43
44 Float32,
45 Float64,
46
47 Decimal32(usize),
49 Decimal64(usize),
50 Decimal128(usize),
51 Decimal256(usize),
52
53 String,
54 FixedSizedString(usize),
55 Binary,
56 FixedSizedBinary(usize),
57
58 Uuid,
59
60 Date,
61 Date32, DateTime(Tz),
63 DateTime64(usize, Tz),
64
65 Ipv4,
66 Ipv6,
67
68 Point,
72 Ring,
73 Polygon,
74 MultiPolygon,
75
76 Nullable(Box<Type>),
77
78 Enum8(Vec<(String, i8)>),
79 Enum16(Vec<(String, i16)>),
80 LowCardinality(Box<Type>),
81 Array(Box<Type>),
82 Tuple(Vec<Type>),
83 Map(Box<Type>, Box<Type>),
84
85 Object,
86}
87
88impl Type {
89 pub fn unwrap_array(&self) -> Result<&Type> {
93 match self {
94 Type::Array(x) => Ok(x),
95 _ => Err(Error::UnexpectedType(self.clone())),
96 }
97 }
98
99 pub fn unarray(&self) -> Option<&Type> {
100 match self {
101 Type::Array(x) => Some(&**x),
102 _ => None,
103 }
104 }
105
106 pub fn unwrap_map(&self) -> Result<(&Type, &Type)> {
110 match self {
111 Type::Map(key, value) => Ok((&**key, &**value)),
112 _ => Err(Error::UnexpectedType(self.clone())),
113 }
114 }
115
116 pub fn unmap(&self) -> Option<(&Type, &Type)> {
117 match self {
118 Type::Map(key, value) => Some((&**key, &**value)),
119 _ => None,
120 }
121 }
122
123 pub fn unwrap_tuple(&self) -> Result<&[Type]> {
127 match self {
128 Type::Tuple(x) => Ok(&x[..]),
129 _ => Err(Error::UnexpectedType(self.clone())),
130 }
131 }
132
133 pub fn untuple(&self) -> Option<&[Type]> {
134 match self {
135 Type::Tuple(x) => Some(&x[..]),
136 _ => None,
137 }
138 }
139
140 pub fn unnull(&self) -> Option<&Type> {
141 match self {
142 Type::Nullable(x) => Some(&**x),
143 _ => None,
144 }
145 }
146
147 pub fn strip_null(&self) -> &Type {
148 match self {
149 Type::Nullable(x) => x,
150 _ => self,
151 }
152 }
153
154 pub fn is_nullable(&self) -> bool { matches!(self, Type::Nullable(_)) }
155
156 pub fn strip_low_cardinality(&self) -> &Type {
157 match self {
158 Type::LowCardinality(x) => x,
159 _ => self,
160 }
161 }
162
163 #[must_use]
164 pub fn into_nullable(self) -> Type {
165 match self {
166 t @ Type::Nullable(_) => t,
167 Type::LowCardinality(inner) => Type::LowCardinality(Box::new(inner.into_nullable())),
169 t => Type::Nullable(Box::new(t)),
170 }
171 }
172
173 pub fn default_value(&self) -> Value {
174 match self {
175 Type::Int8 => Value::Int8(0),
176 Type::Int16 => Value::Int16(0),
177 Type::Int32 => Value::Int32(0),
178 Type::Int64 => Value::Int64(0),
179 Type::Int128 => Value::Int128(0),
180 Type::Int256 => Value::Int256(i256::default()),
181 Type::UInt8 => Value::UInt8(0),
182 Type::UInt16 => Value::UInt16(0),
183 Type::UInt32 => Value::UInt32(0),
184 Type::UInt64 => Value::UInt64(0),
185 Type::UInt128 => Value::UInt128(0),
186 Type::UInt256 => Value::UInt256(u256::default()),
187 Type::Float32 => Value::Float32(0.0),
188 Type::Float64 => Value::Float64(0.0),
189 Type::Decimal32(s) => Value::Decimal32(*s, 0),
190 Type::Decimal64(s) => Value::Decimal64(*s, 0),
191 Type::Decimal128(s) => Value::Decimal128(*s, 0),
192 Type::Decimal256(s) => Value::Decimal256(*s, i256::default()),
193 Type::String | Type::FixedSizedString(_) | Type::Binary | Type::FixedSizedBinary(_) => {
194 Value::String(vec![])
195 }
196 Type::Date => Value::Date(Date(0)),
197 Type::Date32 => Value::Date32(Date32(0)),
198 Type::DateTime(tz) => Value::DateTime(DateTime(*tz, 0)),
199 Type::DateTime64(precision, tz) => Value::DateTime64(DynDateTime64(*tz, 0, *precision)),
200 Type::Ipv4 => Value::Ipv4(Ipv4::default()),
201 Type::Ipv6 => Value::Ipv6(Ipv6::default()),
202 Type::Enum8(_) => Value::Enum8(String::new(), 0),
203 Type::Enum16(_) => Value::Enum16(String::new(), 0),
204 Type::LowCardinality(x) => x.default_value(),
205 Type::Array(_) => Value::Array(vec![]),
206 Type::Tuple(types) => Value::Tuple(types.iter().map(Type::default_value).collect()),
207 Type::Nullable(_) => Value::Null,
208 Type::Map(_, _) => Value::Map(vec![], vec![]),
209 Type::Point => Value::Point(Point::default()),
210 Type::Ring => Value::Ring(Ring::default()),
211 Type::Polygon => Value::Polygon(Polygon::default()),
212 Type::MultiPolygon => Value::MultiPolygon(MultiPolygon::default()),
213 Type::Uuid => Value::Uuid(Uuid::from_u128(0)),
214 Type::Object => Value::Object("{}".as_bytes().to_vec()),
215 }
216 }
217}
218
219impl Display for Type {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 match self {
222 Type::Int8 => write!(f, "Int8"),
223 Type::Int16 => write!(f, "Int16"),
224 Type::Int32 => write!(f, "Int32"),
225 Type::Int64 => write!(f, "Int64"),
226 Type::Int128 => write!(f, "Int128"),
227 Type::Int256 => write!(f, "Int256"),
228 Type::UInt8 => write!(f, "UInt8"),
229 Type::UInt16 => write!(f, "UInt16"),
230 Type::UInt32 => write!(f, "UInt32"),
231 Type::UInt64 => write!(f, "UInt64"),
232 Type::UInt128 => write!(f, "UInt128"),
233 Type::UInt256 => write!(f, "UInt256"),
234 Type::Float32 => write!(f, "Float32"),
235 Type::Float64 => write!(f, "Float64"),
236 Type::Decimal32(s) => write!(f, "Decimal32({s})"),
237 Type::Decimal64(s) => write!(f, "Decimal64({s})"),
238 Type::Decimal128(s) => write!(f, "Decimal128({s})"),
239 Type::Decimal256(s) => write!(f, "Decimal256({s})"),
240 Type::String | Type::Binary => write!(f, "String"),
241 Type::FixedSizedBinary(s) | Type::FixedSizedString(s) => write!(f, "FixedString({s})"),
242 Type::Uuid => write!(f, "UUID"),
243 Type::Date => write!(f, "Date"),
244 Type::Date32 => write!(f, "Date32"),
245 Type::DateTime(tz) => write!(f, "DateTime('{tz}')"),
246 Type::DateTime64(precision, tz) => write!(f, "DateTime64({precision},'{tz}')"),
247 Type::Ipv4 => write!(f, "IPv4"),
248 Type::Ipv6 => write!(f, "IPv6"),
249 Type::Point => write!(f, "Point"),
250 Type::Ring => write!(f, "Ring"),
251 Type::Polygon => write!(f, "Polygon"),
252 Type::MultiPolygon => write!(f, "MultiPolygon"),
253 Type::Enum8(items) => {
254 write!(f, "Enum8(")?;
255 if !items.is_empty() {
256 let last_index = items.len() - 1;
257 for (i, (name, value)) in items.iter().enumerate() {
258 write!(f, "'{}' = {value}", name.replace('\'', "''"))?;
259 if i < last_index {
260 write!(f, ",")?;
261 }
262 }
263 }
264 write!(f, ")")
265 }
266 Type::Enum16(items) => {
267 write!(f, "Enum16(")?;
268 if !items.is_empty() {
269 let last_index = items.len() - 1;
270 for (i, (name, value)) in items.iter().enumerate() {
271 write!(f, "'{}' = {value}", name.replace('\'', "''"))?;
272 if i < last_index {
273 write!(f, ",")?;
274 }
275 }
276 }
277 write!(f, ")")
278 }
279 Type::LowCardinality(inner) => write!(f, "LowCardinality({inner})"),
280 Type::Array(inner) => write!(f, "Array({inner})"),
281 Type::Tuple(items) => write!(
282 f,
283 "Tuple({})",
284 items.iter().map(ToString::to_string).collect::<Vec<_>>().join(",")
285 ),
286 Type::Nullable(inner) => write!(f, "Nullable({inner})"),
287 Type::Map(key, value) => write!(f, "Map({key},{value})"),
288 Type::Object => write!(f, "JSON"),
289 }
290 }
291}
292
293impl Type {
294 pub(crate) fn deserialize_column<'a, R: ClickHouseRead>(
295 &'a self,
296 reader: &'a mut R,
297 rows: usize,
298 state: &'a mut DeserializerState,
299 ) -> impl Future<Output = Result<Vec<Value>>> + Send + 'a {
300 use deserialize::*;
301 async move {
302 if rows > MAX_STRING_SIZE {
303 return Err(Error::Protocol(format!(
304 "deserialize response size too large. {rows} > {MAX_STRING_SIZE}"
305 )));
306 }
307
308 Ok(match self {
309 Type::Int8
310 | Type::Int16
311 | Type::Int32
312 | Type::Int64
313 | Type::Int128
314 | Type::Int256
315 | Type::UInt8
316 | Type::UInt16
317 | Type::UInt32
318 | Type::UInt64
319 | Type::UInt128
320 | Type::UInt256
321 | Type::Float32
322 | Type::Float64
323 | Type::Decimal32(_)
324 | Type::Decimal64(_)
325 | Type::Decimal128(_)
326 | Type::Decimal256(_)
327 | Type::Uuid
328 | Type::Date
329 | Type::Date32
330 | Type::DateTime(_)
331 | Type::DateTime64(_, _)
332 | Type::Ipv4
333 | Type::Ipv6
334 | Type::Enum8(_)
335 | Type::Enum16(_) => {
336 sized::SizedDeserializer::read(self, reader, rows, state).await?
337 }
338 Type::String
339 | Type::FixedSizedString(_)
340 | Type::Binary
341 | Type::FixedSizedBinary(_) => {
342 string::StringDeserializer::read(self, reader, rows, state).await?
343 }
344 Type::Array(_) => array::ArrayDeserializer::read(self, reader, rows, state).await?,
345 Type::Ring => geo::RingDeserializer::read(self, reader, rows, state).await?,
346 Type::Polygon => geo::PolygonDeserializer::read(self, reader, rows, state).await?,
347 Type::MultiPolygon => {
348 geo::MultiPolygonDeserializer::read(self, reader, rows, state).await?
349 }
350 Type::Tuple(_) => tuple::TupleDeserializer::read(self, reader, rows, state).await?,
351 Type::Point => geo::PointDeserializer::read(self, reader, rows, state).await?,
352 Type::Nullable(_) => {
353 nullable::NullableDeserializer::read(self, reader, rows, state).await?
354 }
355 Type::Map(_, _) => map::MapDeserializer::read(self, reader, rows, state).await?,
356 Type::LowCardinality(_) => {
357 low_cardinality::LowCardinalityDeserializer::read(self, reader, rows, state)
358 .await?
359 }
360 Type::Object => object::ObjectDeserializer::read(self, reader, rows, state).await?,
361 })
362 }
363 .boxed()
364 }
365
366 #[allow(dead_code)] pub(crate) fn deserialize_column_sync(
368 &self,
369 reader: &mut impl ClickHouseBytesRead,
370 rows: usize,
371 state: &mut DeserializerState,
372 ) -> Result<Vec<Value>> {
373 use deserialize::*;
374
375 if rows > MAX_STRING_SIZE {
376 return Err(Error::Protocol(format!(
377 "deserialize response size too large. {rows} > {MAX_STRING_SIZE}"
378 )));
379 }
380
381 Ok(match self {
382 Type::Int8
383 | Type::Int16
384 | Type::Int32
385 | Type::Int64
386 | Type::Int128
387 | Type::Int256
388 | Type::UInt8
389 | Type::UInt16
390 | Type::UInt32
391 | Type::UInt64
392 | Type::UInt128
393 | Type::UInt256
394 | Type::Float32
395 | Type::Float64
396 | Type::Decimal32(_)
397 | Type::Decimal64(_)
398 | Type::Decimal128(_)
399 | Type::Decimal256(_)
400 | Type::Uuid
401 | Type::Date
402 | Type::Date32
403 | Type::DateTime(_)
404 | Type::DateTime64(_, _)
405 | Type::Ipv4
406 | Type::Ipv6
407 | Type::Enum8(_)
408 | Type::Enum16(_) => sized::SizedDeserializer::read_sync(self, reader, rows, state)?,
409 Type::String | Type::FixedSizedString(_) | Type::Binary | Type::FixedSizedBinary(_) => {
410 string::StringDeserializer::read_sync(self, reader, rows, state)?
411 }
412 Type::Array(_) => array::ArrayDeserializer::read_sync(self, reader, rows, state)?,
413 Type::Ring => geo::RingDeserializer::read_sync(self, reader, rows, state)?,
414 Type::Polygon => geo::PolygonDeserializer::read_sync(self, reader, rows, state)?,
415 Type::MultiPolygon => {
416 geo::MultiPolygonDeserializer::read_sync(self, reader, rows, state)?
417 }
418 Type::Tuple(_) => tuple::TupleDeserializer::read_sync(self, reader, rows, state)?,
419 Type::Point => geo::PointDeserializer::read_sync(self, reader, rows, state)?,
420 Type::Nullable(_) => {
421 nullable::NullableDeserializer::read_sync(self, reader, rows, state)?
422 }
423 Type::Map(_, _) => map::MapDeserializer::read_sync(self, reader, rows, state)?,
424 Type::LowCardinality(_) => {
425 low_cardinality::LowCardinalityDeserializer::read_sync(self, reader, rows, state)?
426 }
427 Type::Object => object::ObjectDeserializer::read_sync(self, reader, rows, state)?,
428 })
429 }
430
431 pub(crate) fn serialize_column<'a, W: ClickHouseWrite>(
432 &'a self,
433 values: Vec<Value>,
434 writer: &'a mut W,
435 state: &'a mut SerializerState,
436 ) -> impl Future<Output = Result<()>> + Send + 'a {
437 use serialize::*;
438 async move {
439 match self {
440 Type::Int8
441 | Type::Int16
442 | Type::Int32
443 | Type::Int64
444 | Type::Int128
445 | Type::Int256
446 | Type::UInt8
447 | Type::UInt16
448 | Type::UInt32
449 | Type::UInt64
450 | Type::UInt128
451 | Type::UInt256
452 | Type::Float32
453 | Type::Float64
454 | Type::Decimal32(_)
455 | Type::Decimal64(_)
456 | Type::Decimal128(_)
457 | Type::Decimal256(_)
458 | Type::Uuid
459 | Type::Date
460 | Type::Date32
461 | Type::DateTime(_)
462 | Type::DateTime64(_, _)
463 | Type::Ipv4
464 | Type::Ipv6
465 | Type::Enum8(_)
466 | Type::Enum16(_) => {
467 sized::SizedSerializer::write(self, values, writer, state).await?;
468 }
469
470 Type::String
471 | Type::FixedSizedString(_)
472 | Type::Binary
473 | Type::FixedSizedBinary(_) => {
474 string::StringSerializer::write(self, values, writer, state).await?;
475 }
476
477 Type::Array(_) => {
478 array::ArraySerializer::write(self, values, writer, state).await?;
479 }
480 Type::Tuple(_) => {
481 tuple::TupleSerializer::write(self, values, writer, state).await?;
482 }
483 Type::Point => geo::PointSerializer::write(self, values, writer, state).await?,
484 Type::Ring => geo::RingSerializer::write(self, values, writer, state).await?,
485 Type::Polygon => geo::PolygonSerializer::write(self, values, writer, state).await?,
486 Type::MultiPolygon => {
487 geo::MultiPolygonSerializer::write(self, values, writer, state).await?;
488 }
489 Type::Nullable(_) => {
490 nullable::NullableSerializer::write(self, values, writer, state).await?;
491 }
492 Type::Map(_, _) => map::MapSerializer::write(self, values, writer, state).await?,
493 Type::LowCardinality(_) => {
494 low_cardinality::LowCardinalitySerializer::write(self, values, writer, state)
495 .await?;
496 }
497 Type::Object => {
498 object::ObjectSerializer::write(self, values, writer, state).await?;
499 }
500 }
501 Ok(())
502 }
503 .boxed()
504 }
505
506 pub(crate) fn serialize_column_sync(
507 &self,
508 values: Vec<Value>,
509 writer: &mut impl ClickHouseBytesWrite,
510 state: &mut SerializerState,
511 ) -> Result<()> {
512 use serialize::*;
513 match self {
514 Type::Int8
515 | Type::Int16
516 | Type::Int32
517 | Type::Int64
518 | Type::Int128
519 | Type::Int256
520 | Type::UInt8
521 | Type::UInt16
522 | Type::UInt32
523 | Type::UInt64
524 | Type::UInt128
525 | Type::UInt256
526 | Type::Float32
527 | Type::Float64
528 | Type::Decimal32(_)
529 | Type::Decimal64(_)
530 | Type::Decimal128(_)
531 | Type::Decimal256(_)
532 | Type::Uuid
533 | Type::Date
534 | Type::Date32
535 | Type::DateTime(_)
536 | Type::DateTime64(_, _)
537 | Type::Ipv4
538 | Type::Ipv6
539 | Type::Enum8(_)
540 | Type::Enum16(_) => {
541 sized::SizedSerializer::write_sync(self, values, writer, state)?;
542 }
543
544 Type::String | Type::FixedSizedString(_) | Type::Binary | Type::FixedSizedBinary(_) => {
545 string::StringSerializer::write_sync(self, values, writer, state)?;
546 }
547
548 Type::Array(_) => {
549 array::ArraySerializer::write_sync(self, values, writer, state)?;
550 }
551 Type::Tuple(_) => {
552 tuple::TupleSerializer::write_sync(self, values, writer, state)?;
553 }
554 Type::Point => geo::PointSerializer::write_sync(self, values, writer, state)?,
555 Type::Ring => geo::RingSerializer::write_sync(self, values, writer, state)?,
556 Type::Polygon => geo::PolygonSerializer::write_sync(self, values, writer, state)?,
557 Type::MultiPolygon => {
558 geo::MultiPolygonSerializer::write_sync(self, values, writer, state)?;
559 }
560 Type::Nullable(_) => {
561 nullable::NullableSerializer::write_sync(self, values, writer, state)?;
562 }
563 Type::Map(_, _) => map::MapSerializer::write_sync(self, values, writer, state)?,
564 Type::LowCardinality(_) => {
565 low_cardinality::LowCardinalitySerializer::write_sync(self, values, writer, state)?;
566 }
567 Type::Object => {
568 object::ObjectSerializer::write_sync(self, values, writer, state)?;
569 }
570 }
571 Ok(())
572 }
573
574 #[expect(clippy::too_many_lines)]
575 pub(crate) fn validate(&self) -> Result<()> {
576 match self {
577 Type::Decimal32(scale) => {
578 if *scale == 0 || *scale > 9 {
579 return Err(Error::TypeParseError(format!(
580 "scale out of bounds for Decimal32({}) must be in range (1..=9)",
581 *scale
582 )));
583 }
584 }
585
586 Type::Decimal128(scale) => {
587 if *scale == 0 || *scale > 38 {
588 return Err(Error::TypeParseError(format!(
589 "scale out of bounds for Decimal128({}) must be in range (1..=38)",
590 *scale
591 )));
592 }
593 }
594 Type::Decimal256(scale) => {
595 if *scale == 0 || *scale > 76 {
596 return Err(Error::TypeParseError(format!(
597 "scale out of bounds for Decimal256({}) must be in range (1..=76)",
598 *scale
599 )));
600 }
601 }
602 Type::DateTime64(precision, _) | Type::Decimal64(precision) => {
603 if *precision == 0 || *precision > 18 {
604 return Err(Error::TypeParseError(format!(
605 "precision out of bounds for Decimal64/DateTime64({}) must be in range \
606 (1..=18)",
607 *precision
608 )));
609 }
610 }
611 Type::LowCardinality(inner) => match inner.strip_null() {
612 Type::String
613 | Type::FixedSizedString(_)
614 | Type::Binary
615 | Type::FixedSizedBinary(_)
616 | Type::Date
617 | Type::Date32
618 | Type::DateTime(_)
619 | Type::Ipv4
620 | Type::Ipv6
621 | Type::Int8
622 | Type::Int16
623 | Type::Int32
624 | Type::Int64
625 | Type::Int128
626 | Type::Int256
627 | Type::UInt8
628 | Type::UInt16
629 | Type::UInt32
630 | Type::UInt64
631 | Type::UInt128
632 | Type::UInt256 => inner.validate()?,
633 _ => {
634 return Err(Error::TypeParseError(format!(
635 "illegal type '{inner:?}' in LowCardinality, not allowed"
636 )));
637 }
638 },
639 Type::Array(inner) => {
640 inner.validate()?;
641 }
642 Type::Tuple(inner) => {
643 for inner in inner {
644 inner.validate()?;
645 }
646 }
647 Type::Nullable(inner) => match &**inner {
648 Type::Array(_)
649 | Type::Map(_, _)
650 | Type::LowCardinality(_)
651 | Type::Tuple(_)
652 | Type::Nullable(_) => {
653 return Err(Error::TypeParseError(format!(
654 "nullable cannot contain composite type '{inner:?}'"
655 )));
656 }
657 _ => inner.validate()?,
658 },
659 Type::Map(key, value) => {
660 if !matches!(
661 &**key,
662 Type::String
663 | Type::FixedSizedString(_)
664 | Type::Int8
665 | Type::Int16
666 | Type::Int32
667 | Type::Int64
668 | Type::Int128
669 | Type::Int256
670 | Type::UInt8
671 | Type::UInt16
672 | Type::UInt32
673 | Type::UInt64
674 | Type::UInt128
675 | Type::UInt256
676 | Type::LowCardinality(_)
677 | Type::Uuid
678 | Type::Date
679 | Type::DateTime(_)
680 | Type::DateTime64(_, _)
681 | Type::Enum8(_)
682 | Type::Enum16(_)
683 ) {
684 return Err(Error::TypeParseError(
685 "key in map must be String, Integer, LowCardinality, FixedString, UUID, \
686 Date, DateTime, Date32, Enum"
687 .to_string(),
688 ));
689 }
690 key.validate()?;
691 value.validate()?;
692 }
693 _ => {}
695 }
696 Ok(())
697 }
698
699 pub(crate) fn validate_value(&self, value: &Value) -> Result<()> {
700 self.validate()?;
701 if !self.inner_validate_value(value) {
702 return Err(Error::TypeParseError(format!(
703 "could not assign value '{value:?}' to type '{self:?}'"
704 )));
705 }
706 Ok(())
707 }
708
709 fn inner_validate_value(&self, value: &Value) -> bool {
710 match (self, value) {
711 (Type::Int8, Value::Int8(_))
712 | (Type::Int16, Value::Int16(_))
713 | (Type::Int32, Value::Int32(_))
714 | (Type::Int64, Value::Int64(_))
715 | (Type::Int128, Value::Int128(_))
716 | (Type::Int256, Value::Int256(_))
717 | (Type::UInt8, Value::UInt8(_))
718 | (Type::UInt16, Value::UInt16(_))
719 | (Type::UInt32, Value::UInt32(_))
720 | (Type::UInt64, Value::UInt64(_))
721 | (Type::UInt128, Value::UInt128(_))
722 | (Type::UInt256, Value::UInt256(_))
723 | (Type::Float32, Value::Float32(_))
724 | (Type::Float64, Value::Float64(_))
725 | (Type::String | Type::FixedSizedString(_), Value::String(_))
726 | (Type::Uuid, Value::Uuid(_))
727 | (Type::Date, Value::Date(_))
728 | (Type::Date32, Value::Date32(_))
729 | (Type::Ipv4, Value::Ipv4(_))
730 | (Type::Ipv6, Value::Ipv6(_))
731 | (Type::Point, Value::Point(_))
732 | (Type::Ring, Value::Ring(_))
733 | (Type::Polygon, Value::Polygon(_))
734 | (Type::MultiPolygon, Value::MultiPolygon(_)) => true,
735 (Type::DateTime(tz1), Value::DateTime(date)) => tz1 == &date.0,
736 (Type::DateTime64(precision1, tz1), Value::DateTime64(tz2)) => {
737 tz1 == &tz2.0 && precision1 == &tz2.2
738 }
739 (Type::Decimal32(scale1), Value::Decimal32(scale2, _))
740 | (Type::Decimal64(scale1), Value::Decimal64(scale2, _))
741 | (Type::Decimal128(scale1), Value::Decimal128(scale2, _))
742 | (Type::Decimal256(scale1), Value::Decimal256(scale2, _)) => scale1 >= scale2,
743 (Type::FixedSizedString(_) | Type::String, Value::Array(items))
744 if items.iter().all(|item| matches!(item, Value::UInt8(_) | Value::Int8(_))) =>
745 {
746 true
747 }
748 (Type::Enum8(entries), Value::Enum8(_, index)) => entries.iter().any(|x| x.1 == *index),
749 (Type::Enum16(entries), Value::Enum16(_, index)) => {
750 entries.iter().any(|x| x.1 == *index)
751 }
752 (Type::LowCardinality(x), value) => x.inner_validate_value(value),
753 (Type::Array(inner_type), Value::Array(values)) => {
754 values.iter().all(|x| inner_type.inner_validate_value(x))
755 }
756 (Type::Tuple(inner_types), Value::Tuple(values)) => {
757 inner_types.len() == values.len()
758 && inner_types
759 .iter()
760 .zip(values.iter())
761 .all(|(type_, value)| type_.inner_validate_value(value))
762 }
763 (Type::Nullable(inner), value) => {
764 value == &Value::Null || inner.inner_validate_value(value)
765 }
766 (Type::Map(key, value), Value::Map(keys, values)) => {
767 keys.len() == values.len()
768 && keys.iter().all(|x| key.inner_validate_value(x))
769 && values.iter().all(|x| value.inner_validate_value(x))
770 }
771 _ => false,
772 }
773 }
774
775 pub(crate) fn estimate_capacity(&self) -> usize {
777 match self {
778 Type::Int8 | Type::UInt8 => 1,
779 Type::Int16 | Type::UInt16 | Type::Date => 2,
780 Type::Int32
781 | Type::UInt32
782 | Type::Float32
783 | Type::Date32
784 | Type::DateTime(_)
785 | Type::Ipv4 => 4,
786 Type::Int64 | Type::UInt64 | Type::Float64 | Type::DateTime64(_, _) => 8,
787 Type::Int128 | Type::UInt128 | Type::Uuid | Type::Ipv6 | Type::Point => 16,
788 Type::Int256 | Type::UInt256 | Type::String | Type::Binary => 32,
789 Type::FixedSizedString(n) | Type::FixedSizedBinary(n) => *n,
790
791 Type::Array(inner) => {
793 let inner_data = inner.estimate_capacity();
794 (4 + inner_data) * 8 }
796 Type::Nullable(inner) => inner.estimate_capacity(),
797 Type::Tuple(types) => types.iter().map(Type::estimate_capacity).sum(),
798 Type::Map(key, value) => {
799 let key_data = key.estimate_capacity();
800 let value_data = value.estimate_capacity();
801 4 + key_data + value_data }
803
804 _ => 64, }
807 }
808}
809
810impl Type {
811 pub(crate) async fn write_default<W: ClickHouseWrite>(&self, writer: &mut W) -> Result<()> {
818 match self.strip_null() {
819 Type::String | Type::Binary => {
820 writer.write_string("").await?;
821 }
822 Type::FixedSizedString(n) | Type::FixedSizedBinary(n) => {
823 writer.write_all(&vec![0u8; *n]).await?;
824 }
825 Type::Int8 | Type::Enum8(_) => writer.write_i8(0).await?,
826 Type::Int16 | Type::Enum16(_) => writer.write_i16_le(0).await?,
827 Type::Int32 | Type::Date32 | Type::Decimal32(_) => writer.write_i32_le(0).await?,
828 Type::Int64 | Type::Decimal64(_) => writer.write_i64_le(0).await?,
829 Type::Int128 | Type::UInt128 | Type::Uuid | Type::Ipv6 | Type::Decimal128(_) => {
830 writer.write_all(&[0; 16]).await?;
831 }
832 Type::Int256 | Type::UInt256 | Type::Decimal256(_) => {
833 writer.write_all(&[0; 32]).await?;
834 }
835 Type::UInt8 => writer.write_u8(0).await?,
836 Type::UInt16 | Type::Date => writer.write_u16_le(0).await?,
837 Type::UInt32 | Type::Ipv4 | Type::DateTime(_) => writer.write_u32_le(0).await?,
838 Type::UInt64 => writer.write_u64_le(0).await?,
839 Type::Float32 => writer.write_f32_le(0.0).await?,
840 Type::Float64 => writer.write_f64_le(0.0).await?,
841 Type::DateTime64(precision, _) => {
842 let bytes = (0_i64).to_le_bytes();
843 writer.write_all(&bytes[..*precision]).await?;
844 }
845 Type::Array(_) | Type::Map(_, _) => writer.write_var_uint(0).await?, Type::LowCardinality(inner) => Box::pin(inner.write_default(writer)).await?,
848 Type::Tuple(inner) => {
849 for t in inner {
850 Box::pin(t.write_default(writer)).await?;
851 }
852 }
853 _ => {
854 return Err(Error::SerializeError(format!("No default value for type: {self:?}")));
855 }
856 }
857 Ok(())
858 }
859
860 pub(crate) fn put_default<W: ClickHouseBytesWrite>(&self, writer: &mut W) -> Result<()> {
861 match self.strip_null() {
862 Type::String | Type::Binary => {
863 writer.put_string("")?;
864 }
865 Type::FixedSizedString(n) | Type::FixedSizedBinary(n) => {
866 writer.put_slice(&vec![0u8; *n]);
867 }
868 Type::Int8 | Type::Enum8(_) => writer.put_i8(0),
869 Type::Int16 => writer.put_i16_le(0),
870 Type::Int32 | Type::Date32 | Type::Decimal32(_) => writer.put_i32_le(0),
871 Type::Int64 | Type::Decimal64(_) => writer.put_i64_le(0),
872 Type::Int128 | Type::UInt128 | Type::Uuid | Type::Ipv6 | Type::Decimal128(_) => {
873 writer.put_slice(&[0; 16]);
874 }
875 Type::Int256 | Type::UInt256 | Type::Decimal256(_) => writer.put_slice(&[0; 32]),
876 Type::UInt8 => writer.put_u8(0),
877 Type::UInt16 | Type::Date => writer.put_u16_le(0),
878 Type::UInt32 | Type::Ipv4 | Type::DateTime(_) => writer.put_u32_le(0),
879 Type::UInt64 => writer.put_u64_le(0),
880 Type::Float32 => writer.put_f32_le(0.0),
881 Type::Float64 => writer.put_f64_le(0.0),
882 Type::DateTime64(precision, _) => {
883 let bytes = (0_i64).to_le_bytes();
884 writer.put_slice(&bytes[..*precision]);
885 }
886 Type::Array(_) | Type::Map(_, _) => writer.put_var_uint(0)?, Type::Enum16(_) => writer.put_i16(0),
888 Type::LowCardinality(inner) => inner.put_default(writer)?,
890 Type::Tuple(inner) => {
891 for t in inner {
892 t.put_default(writer)?;
893 }
894 }
895 _ => {
896 return Err(Error::SerializeError(format!("No default value for type: {self:?}")));
897 }
898 }
899 Ok(())
900 }
901}
902
903pub(crate) trait Deserializer {
904 fn read_prefix<R: ClickHouseRead>(
915 _type_: &Type,
916 _reader: &mut R,
917 _state: &mut DeserializerState,
918 ) -> impl Future<Output = Result<()>> {
919 async { Ok(()) }
920 }
921
922 fn read<R: ClickHouseRead>(
923 type_: &Type,
924 reader: &mut R,
925 rows: usize,
926 state: &mut DeserializerState,
927 ) -> impl Future<Output = Result<Vec<Value>>>;
928
929 #[allow(dead_code)] fn read_sync(
931 type_: &Type,
932 reader: &mut impl ClickHouseBytesRead,
933 rows: usize,
934 state: &mut DeserializerState,
935 ) -> Result<Vec<Value>>;
936}
937
938pub(crate) trait Serializer {
939 fn write_prefix<W: ClickHouseWrite>(
940 _type_: &Type,
941 _writer: &mut W,
942 _state: &mut SerializerState,
943 ) -> impl Future<Output = Result<()>> {
944 async { Ok(()) }
945 }
946
947 fn write<W: ClickHouseWrite>(
948 type_: &Type,
949 values: Vec<Value>,
950 writer: &mut W,
951 state: &mut SerializerState,
952 ) -> impl Future<Output = Result<()>>;
953
954 fn write_sync(
955 type_: &Type,
956 values: Vec<Value>,
957 writer: &mut impl ClickHouseBytesWrite,
958 state: &mut SerializerState,
959 ) -> Result<()>;
960}