Skip to main content

clickhouse_driver/protocol/
value.rs

1use super::column::as_bytes_bufer;
2use super::column::AsOutColumn;
3use super::encoder::Encoder;
4use crate::errors::{ConversionError, Result};
5#[cfg(feature = "int128")]
6use crate::types::Decimal128;
7use crate::types::{Decimal, Decimal32, Decimal64, DecimalBits, Field, FieldMeta, SqlType, SCALE};
8use byteorder::{LittleEndian, WriteBytesExt};
9use chrono::{Date, DateTime, NaiveDate, NaiveDateTime, Utc};
10use std::io;
11use std::io::Write;
12use std::net::{Ipv4Addr, Ipv6Addr};
13use uuid::Uuid;
14
15pub trait IntoColumn<'b>: Sized {
16    fn to_column(this: Vec<Self>) -> Box<dyn AsOutColumn + 'b>;
17}
18
19lazy_static! {
20    /// EPOCH is January 1, 1970 0:00:00 UTC (zero point for "UNIX timestamp").
21    static ref EPOCH: chrono::NaiveDate  = {
22         chrono::NaiveDate::from_ymd(1970,1,1)
23    };
24}
25
26pub(crate) trait WriteColumn {
27    fn write_column(&self, field: &Field, writer: &mut dyn Write) -> Result<()>;
28}
29
30struct SimpleOutputColumn<T, F: Fn(&Field) -> bool> {
31    data: Vec<T>,
32    f: F,
33}
34
35impl<T, F> AsOutColumn for SimpleOutputColumn<T, F>
36where
37    T: WriteColumn,
38    F: Fn(&Field) -> bool,
39{
40    fn len(&self) -> usize {
41        self.data.len()
42    }
43
44    fn encode(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
45        for item in self.data.iter() {
46            <T as WriteColumn>::write_column(item, field, writer)?;
47        }
48        Ok(())
49    }
50
51    fn is_compatible(&self, field: &Field) -> bool {
52        (&self.f)(field)
53    }
54}
55/// Null default value
56pub trait NullValue {
57    fn null() -> Self;
58}
59
60macro_rules! impl_null {
61    ($t: ty, $v: expr) => {
62        impl NullValue for $t {
63            #[inline]
64            fn null() -> Self {
65                $v
66            }
67        }
68    };
69}
70
71// Null value placeholder
72impl_null!(u8, 0u8);
73impl_null!(i8, 0i8);
74impl_null!(u16, 0u16);
75impl_null!(i16, 0i16);
76impl_null!(u32, 0u32);
77impl_null!(i32, 0i32);
78impl_null!(u64, 0u64);
79impl_null!(i64, 0i64);
80impl_null!(f32, 0f32);
81impl_null!(f64, 0f64);
82
83impl_null!(Ipv4Addr, Ipv4Addr::UNSPECIFIED);
84impl_null!(Ipv6Addr, Ipv6Addr::UNSPECIFIED);
85impl_null!(Uuid, Default::default());
86impl_null!(Decimal32, Default::default());
87impl_null!(Decimal64, Default::default());
88#[cfg(feature = "int128")]
89impl_null!(Decimal128, Default::default());
90impl_null!(Date<Utc>, chrono::MIN_DATE);
91impl_null!(DateTime<Utc>, chrono::MIN_DATE.and_hms(0, 0, 0));
92impl_null!(&str, Default::default());
93impl_null!(String, Default::default());
94
95impl<T, F> AsOutColumn for SimpleOutputColumn<Option<T>, F>
96where
97    T: WriteColumn + NullValue,
98    F: Fn(&Field) -> bool,
99{
100    fn len(&self) -> usize {
101        self.data.len()
102    }
103    /// Encodes null flags then encode data
104    /// Null values are encoded as ordinary ones received by calling NullValue::null()
105    fn encode(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
106        // TODO: Compare its performance with single pass encoder.
107        // Here we iterate over data twice. We have to do it because nulls are serialized first.
108        // However we can serialize values in second buffer and then append it to the output stream
109        for item in self
110            .data
111            .iter()
112            .map(|item| if item.is_some() { 0u8 } else { 1u8 })
113        {
114            writer.write_u8(item)?;
115        }
116        let def: T = NullValue::null();
117        for item in self.data.iter() {
118            <T as WriteColumn>::write_column(item.as_ref().unwrap_or(&def), field, writer)?;
119        }
120        Ok(())
121    }
122
123    fn is_compatible(&self, field: &Field) -> bool {
124        (&self.f)(field)
125    }
126}
127
128/// Default string encoder implementation
129/// It's used by nullable string
130impl WriteColumn for &str {
131    #[inline]
132    fn write_column(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
133        let slice = std::slice::from_ref(self);
134        match field.sql_type {
135            SqlType::String => encode_string(slice, writer),
136            SqlType::FixedString(val) => encode_fixedstring(slice, val, writer),
137            _ => unreachable!(),
138        }
139    }
140}
141impl WriteColumn for String {
142    #[inline]
143    fn write_column(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
144        let slice = std::slice::from_ref(self);
145        match field.sql_type {
146            SqlType::String => encode_string(slice, writer),
147            SqlType::FixedString(val) => encode_fixedstring(slice, val, writer),
148            _ => unreachable!(),
149        }
150    }
151}
152
153/// Encode string array. Column `String` in `Native Format`
154/// |StringLength as VarInt (0..9 bytes)|String byte array  | ... next item
155fn encode_string<T: AsRef<[u8]>>(data: &[T], writer: &mut dyn Write) -> Result<()> {
156    for s in data {
157        let s = s.as_ref();
158        (s.len() as u64).encode(writer)?;
159        s.encode(writer)?;
160    }
161    Ok(())
162}
163/// Encode fixed length string array. Column `FixedString` in `Native Format`
164/// |String byte array|...next item
165fn encode_fixedstring<T: AsRef<[u8]>>(data: &[T], size: u32, writer: &mut dyn Write) -> Result<()> {
166    for s in data {
167        let s = s.as_ref();
168        //empty or default string workaround
169        if s.is_empty() {
170            for _ in 0..size {
171                writer.write_u8(0)?;
172            }
173        } else if s.len() != size as usize {
174            return Err(ConversionError::FixedStringLengthNotMatch(size).into());
175        } else {
176            writer.write_all(s)?;
177        }
178    }
179
180    Ok(())
181}
182
183fn encode_enum8<T: AsRef<[u8]>>(
184    data: &[T],
185    meta: &FieldMeta,
186    writer: &mut dyn Write,
187) -> Result<()> {
188    for s in data {
189        let val = meta.str2val(s.as_ref())?;
190        writer.write_i8(val as i8)?;
191    }
192    Ok(())
193}
194
195fn encode_enum16<T: AsRef<[u8]>>(
196    data: &[T],
197    meta: &FieldMeta,
198    writer: &mut dyn Write,
199) -> Result<()> {
200    for s in data {
201        let val: i16 = meta.str2val(s.as_ref())?;
202        writer.write_i16::<LittleEndian>(val)?;
203    }
204    Ok(())
205}
206/// Bespoke String as well as FixedString, Enum output column implementation
207struct StringOutputColumn<T> {
208    data: Vec<T>,
209}
210
211impl<'a, T> AsOutColumn for StringOutputColumn<T>
212where
213    T: AsRef<[u8]>,
214{
215    fn len(&self) -> usize {
216        self.data.len()
217    }
218
219    fn encode(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
220        match field.sql_type {
221            SqlType::String => encode_string(self.data.as_ref(), writer),
222            SqlType::FixedString(v) => encode_fixedstring(self.data.as_ref(), v, writer),
223            SqlType::Enum8 => encode_enum8(
224                self.data.as_ref(),
225                field.get_meta().expect("enum index corrupted"),
226                writer,
227            ),
228            SqlType::Enum16 => encode_enum16(
229                self.data.as_ref(),
230                field.get_meta().expect("enum index corrupted"),
231                writer,
232            ),
233            _ => unreachable!(),
234        }
235    }
236
237    fn is_compatible(&self, field: &Field) -> bool {
238        matches!(field.sql_type, SqlType::String | SqlType::FixedString(_) | SqlType::Enum8 | SqlType::Enum16)
239    }
240}
241/// IPv4 output column
242impl WriteColumn for Ipv4Addr {
243    fn write_column(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
244        let mut b = self.octets();
245        b.reverse();
246
247        writer.write_all(&b[..]).map_err(Into::into)
248    }
249}
250/// IPv6 output column
251impl WriteColumn for Ipv6Addr {
252    fn write_column(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
253        let b = self.octets();
254        writer.write_all(&b[..]).map_err(Into::into)
255    }
256}
257/// UUID output column
258impl WriteColumn for Uuid {
259    fn write_column(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
260        let i = self.as_u128();
261        writer.write_u64::<LittleEndian>((i >> 64) as u64)?;
262        writer
263            .write_u64::<LittleEndian>(i as u64)
264            .map_err(Into::into)
265    }
266}
267/// Data output column
268impl WriteColumn for Date<Utc> {
269    fn write_column(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
270        let days = (self.naive_utc() - *EPOCH).num_days();
271
272        if days < 0 || days > u16::MAX as i64 {
273            return Err(ConversionError::UnsupportedConversion.into());
274        }
275
276        let days = (days as u16).to_le_bytes();
277        writer.write_all(&days[..]).map_err(Into::into)
278    }
279}
280/// DataTime and DateTime64 output column
281impl WriteColumn for DateTime<Utc> {
282    fn write_column(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
283        let mut timestamp = self.timestamp();
284
285        match field.sql_type {
286            SqlType::DateTime => {
287                if timestamp < 0 || timestamp > u32::MAX as i64 {
288                    return Err(ConversionError::UnsupportedConversion.into());
289                }
290                let timestamp = (timestamp as u32).to_le_bytes();
291                writer.write_all(&timestamp[..]).map_err(Into::into)
292            }
293            SqlType::DateTime64(p, _) => {
294                debug_assert!(p < 9);
295                // TODO: refine getting value. DateTime(self) has higher precision than timestamp
296                timestamp *= SCALE[p as usize];
297                writer
298                    .write_i64::<LittleEndian>(timestamp)
299                    .map_err(Into::into)
300            }
301            _ => unreachable!(),
302        }
303    }
304}
305
306macro_rules! to_column_numeric {
307    ($t:ty, $f: ident, $endian: ty) => {
308        impl WriteColumn for $t {
309            #[inline]
310            fn write_column(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
311                writer.$f::<$endian>(*self).map_err(Into::into)
312            }
313        }
314    };
315    ($t:ty, $f: ident) => {
316        impl WriteColumn for $t {
317            #[inline]
318            fn write_column(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
319                writer.$f(*self).map_err(Into::into)
320            }
321        }
322    };
323}
324
325to_column_numeric!(i8, write_i8);
326to_column_numeric!(u8, write_u8);
327to_column_numeric!(i16, write_i16, LittleEndian);
328to_column_numeric!(u16, write_u16, LittleEndian);
329to_column_numeric!(i32, write_i32, LittleEndian);
330to_column_numeric!(u32, write_u32, LittleEndian);
331to_column_numeric!(i64, write_i64, LittleEndian);
332to_column_numeric!(u64, write_u64, LittleEndian);
333
334#[cfg(feature = "int128")]
335to_column_numeric!(i128, write_i128, LittleEndian);
336#[cfg(feature = "int128")]
337to_column_numeric!(u128, write_u128, LittleEndian);
338
339to_column_numeric!(f32, write_f32, LittleEndian);
340to_column_numeric!(f64, write_f64, LittleEndian);
341
342/// Decimal output column
343impl<T: WriteColumn + DecimalBits> WriteColumn for Decimal<T> {
344    fn write_column(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
345        if let SqlType::Decimal(p, s) = field.sql_type {
346            debug_assert!(T::fit(p));
347            if s != self.scale {
348                return Err(ConversionError::UnsupportedConversion.into());
349            }
350        } else {
351            unreachable!()
352        }
353        self.underlying.write_column(field, writer)
354    }
355}
356
357/// Some data types u(i)8,16,32,64 f32, f64 have the same
358/// representation  in memory and in Clickhouse columnar data format
359/// so they can be easily encoded all at once
360struct BinaryCompatibleOutColumn<T: Sized> {
361    sql_type: SqlType,
362    data: Vec<T>,
363}
364
365fn encode_data_bc(data: &[u8], writer: &mut dyn Write) -> io::Result<()> {
366    writer.write_all(data)
367}
368
369impl<'a, T: Sized + Send + Sync> AsOutColumn for BinaryCompatibleOutColumn<T> {
370    fn len(&self) -> usize {
371        self.data.len()
372    }
373    fn encode(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
374        encode_data_bc(unsafe { as_bytes_bufer(self.data.as_ref()) }, writer).map_err(Into::into)
375    }
376    fn is_compatible(&self, field: &Field) -> bool {
377        self.sql_type == field.sql_type
378    }
379}
380
381macro_rules! impl_intocolumn_bc {
382    ($fs: ty, $sql: path) => {
383        impl<'b> IntoColumn<'b> for $fs {
384            fn to_column(this: Vec<$fs>) -> Box<dyn AsOutColumn + 'b> {
385                Box::new(BinaryCompatibleOutColumn {
386                    data: this,
387                    sql_type: $sql,
388                })
389            }
390        }
391    };
392}
393
394macro_rules! impl_intocolumn_simple {
395    ($fs: ty, $sql: expr) => {
396        impl<'b> IntoColumn<'b> for $fs
397        where
398            $fs: 'b,
399        {
400            fn to_column(this: Vec<$fs>) -> Box<dyn AsOutColumn + 'b> {
401                Box::new(SimpleOutputColumn {
402                    data: this,
403                    f: $sql,
404                })
405            }
406        }
407    };
408}
409
410macro_rules! impl_intocolumn_string {
411    ($fs: ty) => {
412        impl<'b> IntoColumn<'b> for $fs
413        where
414            $fs: 'b,
415        {
416            fn to_column(this: Vec<$fs>) -> Box<dyn AsOutColumn + 'b> {
417                Box::new(StringOutputColumn { data: this })
418            }
419        }
420    };
421}
422
423impl_intocolumn_bc!(u8, SqlType::UInt8);
424impl_intocolumn_bc!(i8, SqlType::Int8);
425impl_intocolumn_bc!(u16, SqlType::UInt16);
426impl_intocolumn_bc!(i16, SqlType::Int16);
427impl_intocolumn_bc!(u32, SqlType::UInt32);
428impl_intocolumn_bc!(i32, SqlType::Int32);
429impl_intocolumn_bc!(u64, SqlType::UInt64);
430impl_intocolumn_bc!(i64, SqlType::Int64);
431impl_intocolumn_bc!(f64, SqlType::Float64);
432impl_intocolumn_bc!(f32, SqlType::Float32);
433impl_intocolumn_bc!(ValueDate, SqlType::Date);
434impl_intocolumn_bc!(ValueDateTime, SqlType::DateTime);
435
436impl_intocolumn_simple!(Ipv4Addr, |f| f.sql_type == SqlType::Ipv4);
437impl_intocolumn_simple!(Ipv6Addr, |f| f.sql_type == SqlType::Ipv6);
438impl_intocolumn_simple!(Date<Utc>, |f| f.sql_type == SqlType::Date);
439
440impl_intocolumn_simple!(Decimal32, |f| {
441    match f.sql_type {
442        SqlType::Decimal(p, _) => i32::fit(p),
443        _ => false,
444    }
445});
446
447impl_intocolumn_simple!(Decimal64, |f| {
448    match f.sql_type {
449        SqlType::Decimal(p, _) => i64::fit(p),
450        _ => false,
451    }
452});
453
454#[cfg(feature = "int128")]
455impl_intocolumn_simple!(Decimal128, |f| {
456    match f.sql_type {
457        SqlType::Decimal(p, _) => i128::fit(p),
458        _ => false,
459    }
460});
461
462impl_intocolumn_simple!(DateTime<Utc>, |f| matches!(
463    f.sql_type,
464    SqlType::DateTime | SqlType::DateTime64(..)
465));
466
467impl_intocolumn_simple!(Uuid, |f| f.sql_type == SqlType::Uuid);
468impl_intocolumn_string!(&'b str);
469impl_intocolumn_string!(String);
470
471//Nullable types
472impl_intocolumn_simple!(Option<u8>, |f| f.sql_type == SqlType::UInt8);
473impl_intocolumn_simple!(Option<i8>, |f| f.sql_type == SqlType::Int8);
474impl_intocolumn_simple!(Option<u16>, |f| f.sql_type == SqlType::UInt16);
475impl_intocolumn_simple!(Option<i16>, |f| f.sql_type == SqlType::Int16);
476impl_intocolumn_simple!(Option<u32>, |f| f.sql_type == SqlType::UInt32);
477impl_intocolumn_simple!(Option<i32>, |f| f.sql_type == SqlType::Int32);
478impl_intocolumn_simple!(Option<u64>, |f| f.sql_type == SqlType::UInt64);
479impl_intocolumn_simple!(Option<i64>, |f| f.sql_type == SqlType::Int64);
480impl_intocolumn_simple!(Option<f32>, |f| f.sql_type == SqlType::Float32);
481impl_intocolumn_simple!(Option<f64>, |f| f.sql_type == SqlType::Float64);
482impl_intocolumn_simple!(Option<Uuid>, |f| f.sql_type == SqlType::Uuid);
483impl_intocolumn_simple!(Option<Ipv4Addr>, |f| f.sql_type == SqlType::Ipv4);
484impl_intocolumn_simple!(Option<Ipv6Addr>, |f| f.sql_type == SqlType::Ipv6);
485impl_intocolumn_simple!(Option<Date<Utc>>, |f| f.sql_type == SqlType::Date);
486impl_intocolumn_simple!(Option<DateTime<Utc>>, |f| f.sql_type == SqlType::DateTime);
487
488impl_intocolumn_simple!(Option<&'b str>, |f| f.sql_type == SqlType::String);
489impl_intocolumn_simple!(Option<String>, |f| f.sql_type == SqlType::String);
490
491impl_intocolumn_simple!(Option<Decimal32>, |f| {
492    match f.sql_type {
493        SqlType::Decimal(p, _) => i32::fit(p),
494        _ => false,
495    }
496});
497
498impl_intocolumn_simple!(Option<Decimal64>, |f| {
499    match f.sql_type {
500        SqlType::Decimal(p, _) => i64::fit(p),
501        _ => false,
502    }
503});
504
505#[cfg(feature = "int128")]
506impl_intocolumn_simple!(Option<Decimal128>, |f| {
507    match f.sql_type {
508        SqlType::Decimal(p, _) => i128::fit(p),
509        _ => false,
510    }
511});
512
513#[derive(Copy, Clone, Debug)]
514pub struct ValueIp4([u8; 4]);
515
516impl Into<Ipv4Addr> for ValueIp4 {
517    fn into(mut self) -> Ipv4Addr {
518        self.0.reverse();
519        self.0.into()
520    }
521}
522
523#[derive(Copy, Clone, Debug)]
524pub struct ValueIp6([u8; 16]);
525
526impl Into<Ipv6Addr> for ValueIp6 {
527    fn into(mut self) -> Ipv6Addr {
528        self.0.reverse();
529        self.0.into()
530    }
531}
532
533#[derive(Copy, Clone, Debug)]
534pub struct ValueUuid([u8; 16]);
535
536impl Into<Uuid> for ValueUuid {
537    fn into(mut self) -> Uuid {
538        self.0[0..8].reverse();
539        self.0[8..16].reverse();
540        Uuid::from_bytes(self.0)
541    }
542}
543
544#[derive(Copy, Clone, Debug)]
545pub struct ValueDate(pub [u8; 2]);
546
547#[derive(Copy, Clone, Debug)]
548pub struct ValueDateTime(pub [u8; 4]);
549
550#[derive(Copy, Clone, Debug)]
551pub struct ValueDateTime64(pub i64);
552
553#[derive(Copy, Clone, Debug)]
554pub struct ValueDecimal32(pub i32);
555
556#[derive(Copy, Clone, Debug)]
557pub struct ValueDecimal64(pub i64);
558
559#[cfg(feature = "int128")]
560#[derive(Copy, Clone, Debug)]
561pub struct ValueDecimal128(pub i128);
562
563impl ValueDate {
564    pub(super) fn to_date(&self) -> chrono::Date<chrono::offset::Utc> {
565        ValueDate::date_inner(i16::from_le_bytes(self.0))
566    }
567
568    pub(super) fn date_inner(dates: i16) -> chrono::Date<chrono::offset::Utc> {
569        let ce: i32 = 719163_i32 + dates as i32;
570        let nt = NaiveDate::from_num_days_from_ce(ce);
571        chrono::Date::from_utc(nt, Utc)
572    }
573}
574
575impl ValueDateTime {
576    pub(super) fn to_datetime(&self) -> DateTime<chrono::offset::Utc> {
577        ValueDateTime::datetime_inner(i32::from_le_bytes(self.0))
578    }
579
580    pub(super) fn datetime_inner(sec: i32) -> DateTime<chrono::offset::Utc> {
581        let nt = NaiveDateTime::from_timestamp(sec as i64, 0);
582        DateTime::from_utc(nt, Utc)
583    }
584}
585
586impl ValueDateTime64 {
587    pub(super) fn to_datetime(self, precision: u8) -> DateTime<chrono::offset::Utc> {
588        let magnitude = SCALE[precision as usize];
589        let sec = self.0.wrapping_div(magnitude);
590        // TODO: check whether it is correct for dates below 1970-01-01, that is negative self.0 value
591        let nsec = self.0.wrapping_rem(magnitude) * SCALE[(9 - precision) as usize];
592
593        let nt = NaiveDateTime::from_timestamp(sec, nsec as u32);
594        DateTime::from_utc(nt, Utc)
595    }
596
597    pub fn from_raw(base: i64) -> ValueDateTime64 {
598        ValueDateTime64(base)
599    }
600}