1pub(crate) mod deserialize;
2pub mod geo;
3pub(crate) mod low_cardinality;
4pub mod map;
5pub(crate) mod serialize;
6#[cfg(test)]
7mod tests;
8
9use std::fmt::Display;
10use std::str::FromStr;
11
12use chrono_tz::Tz;
13use futures_util::FutureExt;
14use tokio::io::AsyncWriteExt;
15use uuid::Uuid;
16
17use super::protocol::MAX_STRING_SIZE;
18use super::values::{
19 Date, DateTime, DynDateTime64, Ipv4, Ipv6, MultiPolygon, Point, Polygon, Ring, Value, i256,
20 u256,
21};
22use crate::formats::{DeserializerState, SerializerState};
23use crate::io::{ClickHouseBytesRead, ClickHouseBytesWrite, ClickHouseRead, ClickHouseWrite};
24use crate::{Date32, Error, Result};
25
26#[derive(Clone, Debug, PartialEq, Eq, Hash)]
28#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
29pub enum Type {
30 Int8,
31 Int16,
32 Int32,
33 Int64,
34 Int128,
35 Int256,
36
37 UInt8,
38 UInt16,
39 UInt32,
40 UInt64,
41 UInt128,
42 UInt256,
43
44 Float32,
45 Float64,
46
47 Decimal32(usize),
49 Decimal64(usize),
50 Decimal128(usize),
51 Decimal256(usize),
52
53 String,
54 FixedSizedString(usize),
55 Binary,
56 FixedSizedBinary(usize),
57
58 Uuid,
59
60 Date,
61 Date32, DateTime(Tz),
63 DateTime64(usize, Tz),
64
65 Ipv4,
66 Ipv6,
67
68 Point,
72 Ring,
73 Polygon,
74 MultiPolygon,
75
76 Nullable(Box<Type>),
77
78 Enum8(Vec<(String, i8)>),
79 Enum16(Vec<(String, i16)>),
80 LowCardinality(Box<Type>),
81 Array(Box<Type>),
82 Tuple(Vec<Type>),
83 Map(Box<Type>, Box<Type>),
84
85 Object,
86}
87
88impl Type {
89 pub fn unwrap_array(&self) -> Result<&Type> {
93 match self {
94 Type::Array(x) => Ok(x),
95 _ => Err(Error::UnexpectedType(self.clone())),
96 }
97 }
98
99 pub fn unarray(&self) -> Option<&Type> {
100 match self {
101 Type::Array(x) => Some(&**x),
102 _ => None,
103 }
104 }
105
106 pub fn unwrap_map(&self) -> Result<(&Type, &Type)> {
110 match self {
111 Type::Map(key, value) => Ok((&**key, &**value)),
112 _ => Err(Error::UnexpectedType(self.clone())),
113 }
114 }
115
116 pub fn unmap(&self) -> Option<(&Type, &Type)> {
117 match self {
118 Type::Map(key, value) => Some((&**key, &**value)),
119 _ => None,
120 }
121 }
122
123 pub fn unwrap_tuple(&self) -> Result<&[Type]> {
127 match self {
128 Type::Tuple(x) => Ok(&x[..]),
129 _ => Err(Error::UnexpectedType(self.clone())),
130 }
131 }
132
133 pub fn untuple(&self) -> Option<&[Type]> {
134 match self {
135 Type::Tuple(x) => Some(&x[..]),
136 _ => None,
137 }
138 }
139
140 pub fn unnull(&self) -> Option<&Type> {
141 match self {
142 Type::Nullable(x) => Some(&**x),
143 _ => None,
144 }
145 }
146
147 pub fn strip_null(&self) -> &Type {
148 match self {
149 Type::Nullable(x) => x,
150 _ => self,
151 }
152 }
153
154 pub fn is_nullable(&self) -> bool { matches!(self, Type::Nullable(_)) }
155
156 pub fn strip_low_cardinality(&self) -> &Type {
157 match self {
158 Type::LowCardinality(x) => x,
159 _ => self,
160 }
161 }
162
163 #[must_use]
164 pub fn into_nullable(self) -> Type {
165 match self {
166 t @ Type::Nullable(_) => t,
167 Type::LowCardinality(inner) => Type::LowCardinality(Box::new(inner.into_nullable())),
169 t => Type::Nullable(Box::new(t)),
170 }
171 }
172
173 pub fn default_value(&self) -> Value {
174 match self {
175 Type::Int8 => Value::Int8(0),
176 Type::Int16 => Value::Int16(0),
177 Type::Int32 => Value::Int32(0),
178 Type::Int64 => Value::Int64(0),
179 Type::Int128 => Value::Int128(0),
180 Type::Int256 => Value::Int256(i256::default()),
181 Type::UInt8 => Value::UInt8(0),
182 Type::UInt16 => Value::UInt16(0),
183 Type::UInt32 => Value::UInt32(0),
184 Type::UInt64 => Value::UInt64(0),
185 Type::UInt128 => Value::UInt128(0),
186 Type::UInt256 => Value::UInt256(u256::default()),
187 Type::Float32 => Value::Float32(0.0),
188 Type::Float64 => Value::Float64(0.0),
189 Type::Decimal32(s) => Value::Decimal32(*s, 0),
190 Type::Decimal64(s) => Value::Decimal64(*s, 0),
191 Type::Decimal128(s) => Value::Decimal128(*s, 0),
192 Type::Decimal256(s) => Value::Decimal256(*s, i256::default()),
193 Type::String | Type::FixedSizedString(_) | Type::Binary | Type::FixedSizedBinary(_) => {
194 Value::String(vec![])
195 }
196 Type::Date => Value::Date(Date(0)),
197 Type::Date32 => Value::Date32(Date32(0)),
198 Type::DateTime(tz) => Value::DateTime(DateTime(*tz, 0)),
199 Type::DateTime64(precision, tz) => Value::DateTime64(DynDateTime64(*tz, 0, *precision)),
200 Type::Ipv4 => Value::Ipv4(Ipv4::default()),
201 Type::Ipv6 => Value::Ipv6(Ipv6::default()),
202 Type::Enum8(_) => Value::Enum8(String::new(), 0),
203 Type::Enum16(_) => Value::Enum16(String::new(), 0),
204 Type::LowCardinality(x) => x.default_value(),
205 Type::Array(_) => Value::Array(vec![]),
206 Type::Tuple(types) => Value::Tuple(types.iter().map(Type::default_value).collect()),
207 Type::Nullable(_) => Value::Null,
208 Type::Map(_, _) => Value::Map(vec![], vec![]),
209 Type::Point => Value::Point(Point::default()),
210 Type::Ring => Value::Ring(Ring::default()),
211 Type::Polygon => Value::Polygon(Polygon::default()),
212 Type::MultiPolygon => Value::MultiPolygon(MultiPolygon::default()),
213 Type::Uuid => Value::Uuid(Uuid::from_u128(0)),
214 Type::Object => Value::Object("{}".as_bytes().to_vec()),
215 }
216 }
217}
218
219impl Display for Type {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 match self {
222 Type::Int8 => write!(f, "Int8"),
223 Type::Int16 => write!(f, "Int16"),
224 Type::Int32 => write!(f, "Int32"),
225 Type::Int64 => write!(f, "Int64"),
226 Type::Int128 => write!(f, "Int128"),
227 Type::Int256 => write!(f, "Int256"),
228 Type::UInt8 => write!(f, "UInt8"),
229 Type::UInt16 => write!(f, "UInt16"),
230 Type::UInt32 => write!(f, "UInt32"),
231 Type::UInt64 => write!(f, "UInt64"),
232 Type::UInt128 => write!(f, "UInt128"),
233 Type::UInt256 => write!(f, "UInt256"),
234 Type::Float32 => write!(f, "Float32"),
235 Type::Float64 => write!(f, "Float64"),
236 Type::Decimal32(s) => write!(f, "Decimal32({s})"),
237 Type::Decimal64(s) => write!(f, "Decimal64({s})"),
238 Type::Decimal128(s) => write!(f, "Decimal128({s})"),
239 Type::Decimal256(s) => write!(f, "Decimal256({s})"),
240 Type::String | Type::Binary => write!(f, "String"),
241 Type::FixedSizedBinary(s) | Type::FixedSizedString(s) => write!(f, "FixedString({s})"),
242 Type::Uuid => write!(f, "UUID"),
243 Type::Date => write!(f, "Date"),
244 Type::Date32 => write!(f, "Date32"),
245 Type::DateTime(tz) => write!(f, "DateTime('{tz}')"),
246 Type::DateTime64(precision, tz) => write!(f, "DateTime64({precision},'{tz}')"),
247 Type::Ipv4 => write!(f, "IPv4"),
248 Type::Ipv6 => write!(f, "IPv6"),
249 Type::Point => write!(f, "Point"),
250 Type::Ring => write!(f, "Ring"),
251 Type::Polygon => write!(f, "Polygon"),
252 Type::MultiPolygon => write!(f, "MultiPolygon"),
253 Type::Enum8(items) => {
254 write!(f, "Enum8(")?;
255 if !items.is_empty() {
256 let last_index = items.len() - 1;
257 for (i, (name, value)) in items.iter().enumerate() {
258 write!(f, "'{}' = {value}", name.replace('\'', "''"))?;
259 if i < last_index {
260 write!(f, ",")?;
261 }
262 }
263 }
264 write!(f, ")")
265 }
266 Type::Enum16(items) => {
267 write!(f, "Enum16(")?;
268 if !items.is_empty() {
269 let last_index = items.len() - 1;
270 for (i, (name, value)) in items.iter().enumerate() {
271 write!(f, "'{}' = {value}", name.replace('\'', "''"))?;
272 if i < last_index {
273 write!(f, ",")?;
274 }
275 }
276 }
277 write!(f, ")")
278 }
279 Type::LowCardinality(inner) => write!(f, "LowCardinality({inner})"),
280 Type::Array(inner) => write!(f, "Array({inner})"),
281 Type::Tuple(items) => write!(
282 f,
283 "Tuple({})",
284 items.iter().map(ToString::to_string).collect::<Vec<_>>().join(",")
285 ),
286 Type::Nullable(inner) => write!(f, "Nullable({inner})"),
287 Type::Map(key, value) => write!(f, "Map({key},{value})"),
288 Type::Object => write!(f, "JSON"),
289 }
290 }
291}
292
293impl Type {
294 pub(crate) fn deserialize_column<'a, R: ClickHouseRead>(
295 &'a self,
296 reader: &'a mut R,
297 rows: usize,
298 state: &'a mut DeserializerState,
299 ) -> impl Future<Output = Result<Vec<Value>>> + Send + 'a {
300 use deserialize::*;
301 async move {
302 if rows > MAX_STRING_SIZE {
303 return Err(Error::Protocol(format!(
304 "deserialize response size too large. {rows} > {MAX_STRING_SIZE}"
305 )));
306 }
307
308 Ok(match self {
309 Type::Int8
310 | Type::Int16
311 | Type::Int32
312 | Type::Int64
313 | Type::Int128
314 | Type::Int256
315 | Type::UInt8
316 | Type::UInt16
317 | Type::UInt32
318 | Type::UInt64
319 | Type::UInt128
320 | Type::UInt256
321 | Type::Float32
322 | Type::Float64
323 | Type::Decimal32(_)
324 | Type::Decimal64(_)
325 | Type::Decimal128(_)
326 | Type::Decimal256(_)
327 | Type::Uuid
328 | Type::Date
329 | Type::Date32
330 | Type::DateTime(_)
331 | Type::DateTime64(_, _)
332 | Type::Ipv4
333 | Type::Ipv6
334 | Type::Enum8(_)
335 | Type::Enum16(_) => {
336 sized::SizedDeserializer::read(self, reader, rows, state).await?
337 }
338 Type::String
339 | Type::FixedSizedString(_)
340 | Type::Binary
341 | Type::FixedSizedBinary(_) => {
342 string::StringDeserializer::read(self, reader, rows, state).await?
343 }
344 Type::Array(_) => array::ArrayDeserializer::read(self, reader, rows, state).await?,
345 Type::Ring => geo::RingDeserializer::read(self, reader, rows, state).await?,
346 Type::Polygon => geo::PolygonDeserializer::read(self, reader, rows, state).await?,
347 Type::MultiPolygon => {
348 geo::MultiPolygonDeserializer::read(self, reader, rows, state).await?
349 }
350 Type::Tuple(_) => tuple::TupleDeserializer::read(self, reader, rows, state).await?,
351 Type::Point => geo::PointDeserializer::read(self, reader, rows, state).await?,
352 Type::Nullable(_) => {
353 nullable::NullableDeserializer::read(self, reader, rows, state).await?
354 }
355 Type::Map(_, _) => map::MapDeserializer::read(self, reader, rows, state).await?,
356 Type::LowCardinality(_) => {
357 low_cardinality::LowCardinalityDeserializer::read(self, reader, rows, state)
358 .await?
359 }
360 Type::Object => object::ObjectDeserializer::read(self, reader, rows, state).await?,
361 })
362 }
363 .boxed()
364 }
365
366 pub(crate) fn deserialize_column_sync(
367 &self,
368 reader: &mut impl ClickHouseBytesRead,
369 rows: usize,
370 state: &mut DeserializerState,
371 ) -> Result<Vec<Value>> {
372 use deserialize::*;
373
374 if rows > MAX_STRING_SIZE {
375 return Err(Error::Protocol(format!(
376 "deserialize response size too large. {rows} > {MAX_STRING_SIZE}"
377 )));
378 }
379
380 Ok(match self {
381 Type::Int8
382 | Type::Int16
383 | Type::Int32
384 | Type::Int64
385 | Type::Int128
386 | Type::Int256
387 | Type::UInt8
388 | Type::UInt16
389 | Type::UInt32
390 | Type::UInt64
391 | Type::UInt128
392 | Type::UInt256
393 | Type::Float32
394 | Type::Float64
395 | Type::Decimal32(_)
396 | Type::Decimal64(_)
397 | Type::Decimal128(_)
398 | Type::Decimal256(_)
399 | Type::Uuid
400 | Type::Date
401 | Type::Date32
402 | Type::DateTime(_)
403 | Type::DateTime64(_, _)
404 | Type::Ipv4
405 | Type::Ipv6
406 | Type::Enum8(_)
407 | Type::Enum16(_) => sized::SizedDeserializer::read_sync(self, reader, rows, state)?,
408 Type::String | Type::FixedSizedString(_) | Type::Binary | Type::FixedSizedBinary(_) => {
409 string::StringDeserializer::read_sync(self, reader, rows, state)?
410 }
411 Type::Array(_) => array::ArrayDeserializer::read_sync(self, reader, rows, state)?,
412 Type::Ring => geo::RingDeserializer::read_sync(self, reader, rows, state)?,
413 Type::Polygon => geo::PolygonDeserializer::read_sync(self, reader, rows, state)?,
414 Type::MultiPolygon => {
415 geo::MultiPolygonDeserializer::read_sync(self, reader, rows, state)?
416 }
417 Type::Tuple(_) => tuple::TupleDeserializer::read_sync(self, reader, rows, state)?,
418 Type::Point => geo::PointDeserializer::read_sync(self, reader, rows, state)?,
419 Type::Nullable(_) => {
420 nullable::NullableDeserializer::read_sync(self, reader, rows, state)?
421 }
422 Type::Map(_, _) => map::MapDeserializer::read_sync(self, reader, rows, state)?,
423 Type::LowCardinality(_) => {
424 low_cardinality::LowCardinalityDeserializer::read_sync(self, reader, rows, state)?
425 }
426 Type::Object => object::ObjectDeserializer::read_sync(self, reader, rows, state)?,
427 })
428 }
429
430 pub(crate) fn serialize_column<'a, W: ClickHouseWrite>(
431 &'a self,
432 values: Vec<Value>,
433 writer: &'a mut W,
434 state: &'a mut SerializerState,
435 ) -> impl Future<Output = Result<()>> + Send + 'a {
436 use serialize::*;
437 async move {
438 match self {
439 Type::Int8
440 | Type::Int16
441 | Type::Int32
442 | Type::Int64
443 | Type::Int128
444 | Type::Int256
445 | Type::UInt8
446 | Type::UInt16
447 | Type::UInt32
448 | Type::UInt64
449 | Type::UInt128
450 | Type::UInt256
451 | Type::Float32
452 | Type::Float64
453 | Type::Decimal32(_)
454 | Type::Decimal64(_)
455 | Type::Decimal128(_)
456 | Type::Decimal256(_)
457 | Type::Uuid
458 | Type::Date
459 | Type::Date32
460 | Type::DateTime(_)
461 | Type::DateTime64(_, _)
462 | Type::Ipv4
463 | Type::Ipv6
464 | Type::Enum8(_)
465 | Type::Enum16(_) => {
466 sized::SizedSerializer::write(self, values, writer, state).await?;
467 }
468
469 Type::String
470 | Type::FixedSizedString(_)
471 | Type::Binary
472 | Type::FixedSizedBinary(_) => {
473 string::StringSerializer::write(self, values, writer, state).await?;
474 }
475
476 Type::Array(_) => {
477 array::ArraySerializer::write(self, values, writer, state).await?;
478 }
479 Type::Tuple(_) => {
480 tuple::TupleSerializer::write(self, values, writer, state).await?;
481 }
482 Type::Point => geo::PointSerializer::write(self, values, writer, state).await?,
483 Type::Ring => geo::RingSerializer::write(self, values, writer, state).await?,
484 Type::Polygon => geo::PolygonSerializer::write(self, values, writer, state).await?,
485 Type::MultiPolygon => {
486 geo::MultiPolygonSerializer::write(self, values, writer, state).await?;
487 }
488 Type::Nullable(_) => {
489 nullable::NullableSerializer::write(self, values, writer, state).await?;
490 }
491 Type::Map(_, _) => map::MapSerializer::write(self, values, writer, state).await?,
492 Type::LowCardinality(_) => {
493 low_cardinality::LowCardinalitySerializer::write(self, values, writer, state)
494 .await?;
495 }
496 Type::Object => {
497 object::ObjectSerializer::write(self, values, writer, state).await?;
498 }
499 }
500 Ok(())
501 }
502 .boxed()
503 }
504
505 pub(crate) fn serialize_column_sync(
506 &self,
507 values: Vec<Value>,
508 writer: &mut impl ClickHouseBytesWrite,
509 state: &mut SerializerState,
510 ) -> Result<()> {
511 use serialize::*;
512 match self {
513 Type::Int8
514 | Type::Int16
515 | Type::Int32
516 | Type::Int64
517 | Type::Int128
518 | Type::Int256
519 | Type::UInt8
520 | Type::UInt16
521 | Type::UInt32
522 | Type::UInt64
523 | Type::UInt128
524 | Type::UInt256
525 | Type::Float32
526 | Type::Float64
527 | Type::Decimal32(_)
528 | Type::Decimal64(_)
529 | Type::Decimal128(_)
530 | Type::Decimal256(_)
531 | Type::Uuid
532 | Type::Date
533 | Type::Date32
534 | Type::DateTime(_)
535 | Type::DateTime64(_, _)
536 | Type::Ipv4
537 | Type::Ipv6
538 | Type::Enum8(_)
539 | Type::Enum16(_) => {
540 sized::SizedSerializer::write_sync(self, values, writer, state)?;
541 }
542
543 Type::String | Type::FixedSizedString(_) | Type::Binary | Type::FixedSizedBinary(_) => {
544 string::StringSerializer::write_sync(self, values, writer, state)?;
545 }
546
547 Type::Array(_) => {
548 array::ArraySerializer::write_sync(self, values, writer, state)?;
549 }
550 Type::Tuple(_) => {
551 tuple::TupleSerializer::write_sync(self, values, writer, state)?;
552 }
553 Type::Point => geo::PointSerializer::write_sync(self, values, writer, state)?,
554 Type::Ring => geo::RingSerializer::write_sync(self, values, writer, state)?,
555 Type::Polygon => geo::PolygonSerializer::write_sync(self, values, writer, state)?,
556 Type::MultiPolygon => {
557 geo::MultiPolygonSerializer::write_sync(self, values, writer, state)?;
558 }
559 Type::Nullable(_) => {
560 nullable::NullableSerializer::write_sync(self, values, writer, state)?;
561 }
562 Type::Map(_, _) => map::MapSerializer::write_sync(self, values, writer, state)?,
563 Type::LowCardinality(_) => {
564 low_cardinality::LowCardinalitySerializer::write_sync(self, values, writer, state)?;
565 }
566 Type::Object => {
567 object::ObjectSerializer::write_sync(self, values, writer, state)?;
568 }
569 }
570 Ok(())
571 }
572
573 #[expect(clippy::too_many_lines)]
574 pub(crate) fn validate(&self) -> Result<()> {
575 match self {
576 Type::Decimal32(scale) => {
577 if *scale == 0 || *scale > 9 {
578 return Err(Error::TypeParseError(format!(
579 "scale out of bounds for Decimal32({}) must be in range (1..=9)",
580 *scale
581 )));
582 }
583 }
584
585 Type::Decimal128(scale) => {
586 if *scale == 0 || *scale > 38 {
587 return Err(Error::TypeParseError(format!(
588 "scale out of bounds for Decimal128({}) must be in range (1..=38)",
589 *scale
590 )));
591 }
592 }
593 Type::Decimal256(scale) => {
594 if *scale == 0 || *scale > 76 {
595 return Err(Error::TypeParseError(format!(
596 "scale out of bounds for Decimal256({}) must be in range (1..=76)",
597 *scale
598 )));
599 }
600 }
601 Type::DateTime64(precision, _) | Type::Decimal64(precision) => {
602 if *precision == 0 || *precision > 18 {
603 return Err(Error::TypeParseError(format!(
604 "precision out of bounds for Decimal64/DateTime64({}) must be in range \
605 (1..=18)",
606 *precision
607 )));
608 }
609 }
610 Type::LowCardinality(inner) => match inner.strip_null() {
611 Type::String
612 | Type::FixedSizedString(_)
613 | Type::Binary
614 | Type::FixedSizedBinary(_)
615 | Type::Date
616 | Type::Date32
617 | Type::DateTime(_)
618 | Type::Ipv4
619 | Type::Ipv6
620 | Type::Int8
621 | Type::Int16
622 | Type::Int32
623 | Type::Int64
624 | Type::Int128
625 | Type::Int256
626 | Type::UInt8
627 | Type::UInt16
628 | Type::UInt32
629 | Type::UInt64
630 | Type::UInt128
631 | Type::UInt256 => inner.validate()?,
632 _ => {
633 return Err(Error::TypeParseError(format!(
634 "illegal type '{inner:?}' in LowCardinality, not allowed"
635 )));
636 }
637 },
638 Type::Array(inner) => {
639 inner.validate()?;
640 }
641 Type::Tuple(inner) => {
642 for inner in inner {
643 inner.validate()?;
644 }
645 }
646 Type::Nullable(inner) => match &**inner {
647 Type::Array(_)
648 | Type::Map(_, _)
649 | Type::LowCardinality(_)
650 | Type::Tuple(_)
651 | Type::Nullable(_) => {
652 return Err(Error::TypeParseError(format!(
653 "nullable cannot contain composite type '{inner:?}'"
654 )));
655 }
656 _ => inner.validate()?,
657 },
658 Type::Map(key, value) => {
659 if !matches!(
660 &**key,
661 Type::String
662 | Type::FixedSizedString(_)
663 | Type::Int8
664 | Type::Int16
665 | Type::Int32
666 | Type::Int64
667 | Type::Int128
668 | Type::Int256
669 | Type::UInt8
670 | Type::UInt16
671 | Type::UInt32
672 | Type::UInt64
673 | Type::UInt128
674 | Type::UInt256
675 | Type::LowCardinality(_)
676 | Type::Uuid
677 | Type::Date
678 | Type::DateTime(_)
679 | Type::DateTime64(_, _)
680 | Type::Enum8(_)
681 | Type::Enum16(_)
682 ) {
683 return Err(Error::TypeParseError(
684 "key in map must be String, Integer, LowCardinality, FixedString, UUID, \
685 Date, DateTime, Date32, Enum"
686 .to_string(),
687 ));
688 }
689 key.validate()?;
690 value.validate()?;
691 }
692 _ => {}
694 }
695 Ok(())
696 }
697
698 pub(crate) fn validate_value(&self, value: &Value) -> Result<()> {
699 self.validate()?;
700 if !self.inner_validate_value(value) {
701 return Err(Error::TypeParseError(format!(
702 "could not assign value '{value:?}' to type '{self:?}'"
703 )));
704 }
705 Ok(())
706 }
707
708 fn inner_validate_value(&self, value: &Value) -> bool {
709 match (self, value) {
710 (Type::Int8, Value::Int8(_))
711 | (Type::Int16, Value::Int16(_))
712 | (Type::Int32, Value::Int32(_))
713 | (Type::Int64, Value::Int64(_))
714 | (Type::Int128, Value::Int128(_))
715 | (Type::Int256, Value::Int256(_))
716 | (Type::UInt8, Value::UInt8(_))
717 | (Type::UInt16, Value::UInt16(_))
718 | (Type::UInt32, Value::UInt32(_))
719 | (Type::UInt64, Value::UInt64(_))
720 | (Type::UInt128, Value::UInt128(_))
721 | (Type::UInt256, Value::UInt256(_))
722 | (Type::Float32, Value::Float32(_))
723 | (Type::Float64, Value::Float64(_))
724 | (Type::String | Type::FixedSizedString(_), Value::String(_))
725 | (Type::Uuid, Value::Uuid(_))
726 | (Type::Date, Value::Date(_))
727 | (Type::Date32, Value::Date32(_))
728 | (Type::Ipv4, Value::Ipv4(_))
729 | (Type::Ipv6, Value::Ipv6(_))
730 | (Type::Point, Value::Point(_))
731 | (Type::Ring, Value::Ring(_))
732 | (Type::Polygon, Value::Polygon(_))
733 | (Type::MultiPolygon, Value::MultiPolygon(_)) => true,
734 (Type::DateTime(tz1), Value::DateTime(date)) => tz1 == &date.0,
735 (Type::DateTime64(precision1, tz1), Value::DateTime64(tz2)) => {
736 tz1 == &tz2.0 && precision1 == &tz2.2
737 }
738 (Type::Decimal32(scale1), Value::Decimal32(scale2, _))
739 | (Type::Decimal64(scale1), Value::Decimal64(scale2, _))
740 | (Type::Decimal128(scale1), Value::Decimal128(scale2, _))
741 | (Type::Decimal256(scale1), Value::Decimal256(scale2, _)) => scale1 >= scale2,
742 (Type::FixedSizedString(_) | Type::String, Value::Array(items))
743 if items.iter().all(|item| matches!(item, Value::UInt8(_) | Value::Int8(_))) =>
744 {
745 true
746 }
747 (Type::Enum8(entries), Value::Enum8(_, index)) => entries.iter().any(|x| x.1 == *index),
748 (Type::Enum16(entries), Value::Enum16(_, index)) => {
749 entries.iter().any(|x| x.1 == *index)
750 }
751 (Type::LowCardinality(x), value) => x.inner_validate_value(value),
752 (Type::Array(inner_type), Value::Array(values)) => {
753 values.iter().all(|x| inner_type.inner_validate_value(x))
754 }
755 (Type::Tuple(inner_types), Value::Tuple(values)) => {
756 inner_types.len() == values.len()
757 && inner_types
758 .iter()
759 .zip(values.iter())
760 .all(|(type_, value)| type_.inner_validate_value(value))
761 }
762 (Type::Nullable(inner), value) => {
763 value == &Value::Null || inner.inner_validate_value(value)
764 }
765 (Type::Map(key, value), Value::Map(keys, values)) => {
766 keys.len() == values.len()
767 && keys.iter().all(|x| key.inner_validate_value(x))
768 && values.iter().all(|x| value.inner_validate_value(x))
769 }
770 _ => false,
771 }
772 }
773
774 pub(crate) fn estimate_capacity(&self) -> usize {
776 match self {
777 Type::Int8 | Type::UInt8 => 1,
778 Type::Int16 | Type::UInt16 | Type::Date => 2,
779 Type::Int32
780 | Type::UInt32
781 | Type::Float32
782 | Type::Date32
783 | Type::DateTime(_)
784 | Type::Ipv4 => 4,
785 Type::Int64 | Type::UInt64 | Type::Float64 | Type::DateTime64(_, _) => 8,
786 Type::Int128 | Type::UInt128 | Type::Uuid | Type::Ipv6 | Type::Point => 16,
787 Type::Int256 | Type::UInt256 | Type::String | Type::Binary => 32,
788 Type::FixedSizedString(n) | Type::FixedSizedBinary(n) => *n,
789
790 Type::Array(inner) => {
792 let inner_data = inner.estimate_capacity();
793 (4 + inner_data) * 8 }
795 Type::Nullable(inner) => inner.estimate_capacity(),
796 Type::Tuple(types) => types.iter().map(Type::estimate_capacity).sum(),
797 Type::Map(key, value) => {
798 let key_data = key.estimate_capacity();
799 let value_data = value.estimate_capacity();
800 4 + key_data + value_data }
802
803 _ => 64, }
806 }
807}
808
809impl Type {
810 pub(crate) async fn write_default<W: ClickHouseWrite>(&self, writer: &mut W) -> Result<()> {
817 match self.strip_null() {
818 Type::String | Type::Binary => {
819 writer.write_string("").await?;
820 }
821 Type::FixedSizedString(n) | Type::FixedSizedBinary(n) => {
822 writer.write_all(&vec![0u8; *n]).await?;
823 }
824 Type::Int8 => writer.write_i8(0).await?,
825 Type::Int16 => writer.write_i16_le(0).await?,
826 Type::Int32 => writer.write_i32_le(0).await?,
827 Type::Int64 => writer.write_i64_le(0).await?,
828 Type::Int128 => writer.write_all(&[0; 16]).await?,
829 Type::Int256 => writer.write_all(&[0; 32]).await?,
830 Type::UInt8 => writer.write_u8(0).await?,
831 Type::UInt16 => writer.write_u16_le(0).await?,
832 Type::UInt32 => writer.write_u32_le(0).await?,
833 Type::UInt64 => writer.write_u64_le(0).await?,
834 Type::UInt128 => writer.write_all(&[0; 16]).await?,
835 Type::UInt256 => writer.write_all(&[0; 32]).await?,
836 Type::Float32 => writer.write_f32_le(0.0).await?,
837 Type::Float64 => writer.write_f64_le(0.0).await?,
838 Type::Uuid => writer.write_all(&[0; 16]).await?,
839 Type::Ipv4 => writer.write_u32_le(0).await?,
840 Type::Ipv6 => writer.write_all(&[0; 16]).await?,
841 Type::Date => writer.write_u16_le(0).await?,
842 Type::Date32 => writer.write_i32_le(0).await?,
843 Type::DateTime(_) => writer.write_u32_le(0).await?,
844 Type::DateTime64(precision, _) => {
845 let bytes = (0_i64).to_le_bytes();
846 writer.write_all(&bytes[..*precision]).await?;
847 }
848 Type::Decimal32(_) => writer.write_i32_le(0).await?,
849 Type::Decimal64(_) => writer.write_i64_le(0).await?,
850 Type::Decimal128(_) => writer.write_all(&[0; 16]).await?,
851 Type::Decimal256(_) => writer.write_all(&[0; 32]).await?,
852 Type::Array(_) => writer.write_var_uint(0).await?, Type::Map(_, _) => writer.write_var_uint(0).await?, Type::Enum8(_) => writer.write_i8(0).await?,
855 Type::Enum16(_) => writer.write_i16(0).await?,
856 Type::LowCardinality(inner) => Box::pin(inner.write_default(writer)).await?,
858 Type::Tuple(inner) => {
859 for t in inner {
860 Box::pin(t.write_default(writer)).await?;
861 }
862 }
863 _ => {
864 return Err(Error::SerializeError(format!("No default value for type: {self:?}")));
865 }
866 }
867 Ok(())
868 }
869
870 pub(crate) fn put_default<W: ClickHouseBytesWrite>(&self, writer: &mut W) -> Result<()> {
871 match self.strip_null() {
872 Type::String | Type::Binary => {
873 writer.put_string("")?;
874 }
875 Type::FixedSizedString(n) | Type::FixedSizedBinary(n) => {
876 writer.put_slice(&vec![0u8; *n]);
877 }
878 Type::Int8 | Type::Enum8(_) => writer.put_i8(0),
879 Type::Int16 => writer.put_i16_le(0),
880 Type::Int32 | Type::Date32 | Type::Decimal32(_) => writer.put_i32_le(0),
881 Type::Int64 | Type::Decimal64(_) => writer.put_i64_le(0),
882 Type::Int128 | Type::UInt128 | Type::Uuid | Type::Ipv6 | Type::Decimal128(_) => {
883 writer.put_slice(&[0; 16]);
884 }
885 Type::Int256 | Type::UInt256 | Type::Decimal256(_) => writer.put_slice(&[0; 32]),
886 Type::UInt8 => writer.put_u8(0),
887 Type::UInt16 | Type::Date => writer.put_u16_le(0),
888 Type::UInt32 | Type::Ipv4 | Type::DateTime(_) => writer.put_u32_le(0),
889 Type::UInt64 => writer.put_u64_le(0),
890 Type::Float32 => writer.put_f32_le(0.0),
891 Type::Float64 => writer.put_f64_le(0.0),
892 Type::DateTime64(precision, _) => {
893 let bytes = (0_i64).to_le_bytes();
894 writer.put_slice(&bytes[..*precision]);
895 }
896 Type::Array(_) => writer.put_var_uint(0)?, Type::Map(_, _) => writer.put_var_uint(0)?, Type::Enum16(_) => writer.put_i16(0),
899 Type::LowCardinality(inner) => inner.put_default(writer)?,
901 Type::Tuple(inner) => {
902 for t in inner {
903 t.put_default(writer)?;
904 }
905 }
906 _ => {
907 return Err(Error::SerializeError(format!("No default value for type: {self:?}")));
908 }
909 }
910 Ok(())
911 }
912}
913
914pub(crate) trait Deserializer {
915 fn read_prefix<R: ClickHouseRead>(
926 _type_: &Type,
927 _reader: &mut R,
928 _state: &mut DeserializerState,
929 ) -> impl Future<Output = Result<()>> {
930 async { Ok(()) }
931 }
932
933 fn read<R: ClickHouseRead>(
934 type_: &Type,
935 reader: &mut R,
936 rows: usize,
937 state: &mut DeserializerState,
938 ) -> impl Future<Output = Result<Vec<Value>>>;
939
940 fn read_sync(
941 type_: &Type,
942 reader: &mut impl ClickHouseBytesRead,
943 rows: usize,
944 state: &mut DeserializerState,
945 ) -> Result<Vec<Value>>;
946}
947
948pub(crate) trait Serializer {
949 fn write_prefix<W: ClickHouseWrite>(
950 _type_: &Type,
951 _writer: &mut W,
952 _state: &mut SerializerState,
953 ) -> impl Future<Output = Result<()>> {
954 async { Ok(()) }
955 }
956
957 fn write<W: ClickHouseWrite>(
958 type_: &Type,
959 values: Vec<Value>,
960 writer: &mut W,
961 state: &mut SerializerState,
962 ) -> impl Future<Output = Result<()>>;
963
964 fn write_sync(
965 type_: &Type,
966 values: Vec<Value>,
967 writer: &mut impl ClickHouseBytesWrite,
968 state: &mut SerializerState,
969 ) -> Result<()>;
970}