Skip to main content

clickhouse_driver/protocol/
column.rs

1use std::borrow::{Borrow, BorrowMut};
2use std::fmt;
3use std::io::Write;
4use std::mem::{align_of, size_of};
5use std::net::{Ipv4Addr, Ipv6Addr};
6use std::slice::Iter;
7use std::str::from_utf8;
8
9use chrono::{Date, DateTime, Utc};
10use tokio::io::{AsyncBufRead, AsyncReadExt};
11use uuid::Uuid;
12
13use super::block::{BlockColumn, ServerBlock};
14use super::decoder::ValueReader;
15#[cfg(feature = "int128")]
16use super::value::ValueDecimal128;
17use super::value::{
18    ValueDate, ValueDateTime, ValueDateTime64, ValueDecimal32, ValueDecimal64, ValueIp4, ValueIp6,
19    ValueUuid,
20};
21
22use super::{Value, ValueRefEnum};
23use crate::errors::{ConversionError, DriverError, Result};
24#[cfg(feature = "int128")]
25use crate::types::Decimal128;
26use crate::types::{Decimal, Decimal32, Decimal64, Field, FieldMeta, SqlType};
27
28use std::cmp::Ordering;
29use std::fmt::Formatter;
30
31macro_rules! err {
32    ($err: expr) => {
33        Err($err.into())
34    };
35}
36
37pub trait AsOutColumn {
38    /// Returns the number of rows
39    fn len(&self) -> usize;
40    /// Encode column into Clickhouse native format
41    fn encode(&self, field: &Field, writer: &mut dyn Write) -> Result<()>;
42    /// Check before transmission if the Clickhouse column data
43    /// type compatible with provided and can be conceived by the server
44    fn is_compatible(&self, field: &Field) -> bool;
45}
46
47/// Convert received from Clickhouse server data to rust type
48pub trait AsInColumn: Send {
49    unsafe fn get_at(&self, index: u64) -> ValueRef<'_>;
50}
51/// Default implementation returns `Null` data
52impl AsInColumn for () {
53    unsafe fn get_at(&self, _: u64) -> ValueRef<'_> {
54        ValueRef { inner: None }
55    }
56}
57
58/// Output block column
59pub struct ColumnDataAdapter<'b> {
60    /// Clickhouse column name
61    pub(crate) name: &'b str,
62    /// Nullable, Array, LowCardinality
63    pub(crate) flag: u8,
64    pub(crate) data: Box<dyn AsOutColumn + 'b>,
65}
66
67/// Hold reference (or value for discreet data types ) to Clickhouse row values
68/// Option::None value indicate null
69#[derive(Debug)]
70pub struct ValueRef<'a> {
71    // TODO: remove one level of indirection. Combine ValueRef and ValueRefEnum into single struct
72    inner: Option<ValueRefEnum<'a>>,
73}
74/// Implement SqlType::DateTime,SqlType::DateTime64-> chrono::DateTime<Utc> data conversion
75impl<'a> Value<'a, DateTime<Utc>> for ValueRef<'a> {
76    fn get(&'a self, field: &'a Field) -> Result<Option<DateTime<Utc>>> {
77        match self.inner {
78            Some(ValueRefEnum::DateTime(v)) => Ok(Some(v.to_datetime())),
79            Some(ValueRefEnum::DateTime64(v)) => {
80                if let SqlType::DateTime64(p, _) = field.sql_type {
81                    Ok(Some(v.to_datetime(p)))
82                } else {
83                    // TODO: Apparently not reachable. Replace it with notreachable!
84                    err!(ConversionError::UnsupportedConversion)
85                }
86            }
87            Some(ValueRefEnum::Date(v)) => {
88                let d = v.to_date();
89                Ok(Some(d.and_hms(0, 0, 0)))
90            }
91            _ => err!(ConversionError::UnsupportedConversion),
92        }
93    }
94}
95/// Implement SqlType::Enum(x)|String|FixedSring(size)->&str data conversion
96impl<'a> Value<'a, &'a str> for ValueRef<'a> {
97    fn get(&'a self, field: &'a Field) -> Result<Option<&'_ str>> {
98        match self.inner {
99            Some(ValueRefEnum::String(v)) => Ok(Some(from_utf8(v)?)),
100            Some(ValueRefEnum::Enum(v)) => {
101                // Convert Enum value to Enum title using metadata
102                let meta = field.get_meta().expect("corrupted enum index");
103                let title = v.transcode(meta);
104                Ok(Some(from_utf8(title)?))
105            }
106            None => Ok(None),
107            _ => err!(ConversionError::UnsupportedConversion),
108        }
109    }
110}
111
112/// Implement SqlType::Enum(x)|String|FixedSring(size) - > &[u8]
113impl<'a> Value<'a, &'a [u8]> for ValueRef<'a> {
114    fn get(&'a self, field: &'a Field) -> Result<Option<&'_ [u8]>> {
115        match self.inner {
116            Some(ValueRefEnum::String(v)) => Ok(Some(v)),
117            Some(ValueRefEnum::Enum(v)) => {
118                let meta = field.get_meta().expect("corrupted enum index");
119                let title = v.transcode(meta);
120                Ok(Some(title))
121            }
122            Some(ValueRefEnum::Array8(v)) => Ok(Some(v)),
123            _ => err!(ConversionError::UnsupportedConversion),
124        }
125    }
126}
127
128impl<'a> Value<'a, i16> for ValueRef<'a> {
129    #[allow(clippy::unnested_or_patterns)]
130    fn get(&'a self, _: &'a Field) -> Result<Option<i16>> {
131        match self.inner {
132            Some(ValueRefEnum::Int16(v)) | Some(ValueRefEnum::Enum(v)) => Ok(Some(v)),
133            _ => err!(ConversionError::UnsupportedConversion),
134        }
135    }
136}
137
138#[inline]
139fn decimal_scale_from_field(field: &Field) -> u8 {
140    match field.sql_type {
141        SqlType::Decimal(_, s) => s,
142        _ => 0, //unreachable
143    }
144}
145
146macro_rules! impl_value {
147    ($f:ty, $vr:path) => {
148        impl<'a> Value<'a, $f> for ValueRef<'a> {
149            fn get(&'a self, _: &'a Field) -> Result<Option<$f>> {
150                match self.inner {
151                    Some($vr(v)) => Ok(Some(v.into())),
152                    None => Ok(None),
153                    _ => err!(ConversionError::UnsupportedConversion),
154                }
155            }
156        }
157    };
158    ($f:ty, $vr:path, decimal) => {
159        impl<'a> Value<'a, $f> for ValueRef<'a> {
160            fn get(&'a self, field: &'a Field) -> Result<Option<$f>> {
161                match self.inner {
162                    Some($vr(v)) => {
163                        let scale = decimal_scale_from_field(field);
164                        Ok(Some(Decimal::from(v.0, scale)))
165                    }
166                    _ => err!(ConversionError::UnsupportedConversion),
167                }
168            }
169        }
170    };
171}
172// Implement common types data conversion
173// SqlType::X -> rust data conversion
174impl_value!(f32, ValueRefEnum::Float32);
175impl_value!(f64, ValueRefEnum::Float64);
176// SqlType::Ipv4 - > Ipv4Addr
177impl_value!(Ipv4Addr, ValueRefEnum::Ip4);
178// SqlType::Ipv6 - > Ipv6Addr
179impl_value!(Ipv6Addr, ValueRefEnum::Ip6);
180// SqlType::UUID - > Uuid
181impl_value!(Uuid, ValueRefEnum::Uuid);
182
183// SqlType::X t-> X
184impl_value!(u64, ValueRefEnum::UInt64);
185impl_value!(i64, ValueRefEnum::Int64);
186impl_value!(u32, ValueRefEnum::UInt32);
187impl_value!(i32, ValueRefEnum::Int32);
188impl_value!(u16, ValueRefEnum::UInt16);
189// @note! particular case for i16. it can be retrieved from Enum as well
190// impl_value!(i16, ValueRefEnum::Int16);
191impl_value!(u8, ValueRefEnum::UInt8);
192impl_value!(i8, ValueRefEnum::Int8);
193// Implement SqlType::Decimal32 -> Decimal<i32> data conversion
194impl_value!(Decimal32, ValueRefEnum::Decimal32, decimal);
195// Implement SqlType::Decimal64 -> Decimal<i64> data conversion
196impl_value!(Decimal64, ValueRefEnum::Decimal64, decimal);
197// Implement SqlType::Decimal128 -> Decimal<i128> data conversion
198#[cfg(feature = "int128")]
199impl_value!(Decimal128, ValueRefEnum::Decimal128, decimal);
200
201// Array values
202
203/// signature to unsignature slice conversion
204#[inline]
205fn transmute_slice<T, U>(r: &[T]) -> &[U] {
206    debug_assert_eq!(std::mem::size_of::<T>(), std::mem::size_of::<U>());
207    debug_assert_eq!(std::mem::align_of::<T>(), std::mem::align_of::<U>());
208    unsafe { std::slice::from_raw_parts(r.as_ptr() as *const U, r.len()) }
209}
210
211macro_rules! impl_array_value_ref {
212    ($f:ty, $vr:path) => {
213        impl<'a> Value<'a, &'a [$f]> for ValueRef<'a> {
214            fn get(&'a self, _: &'a Field) -> Result<Option<&'a [$f]>> {
215                match self.inner {
216                    Some($vr(v)) => Ok(Some(v)),
217                    _ => err!(ConversionError::UnsupportedConversion),
218                }
219            }
220        }
221    };
222    ($f:ty, $vr:path, transmute) => {
223        impl<'a> Value<'a, &'a [$f]> for ValueRef<'a> {
224            fn get(&'a self, _: &'a Field) -> Result<Option<&'a [$f]>> {
225                match self.inner {
226                    Some($vr(v)) => Ok(Some(transmute_slice(v))),
227                    _ => err!(ConversionError::UnsupportedConversion),
228                }
229            }
230        }
231    };
232}
233// @note! &[u8] type array clashes with string type column. So
234// this type implemented together with  string column
235// impl_array_value!(u8,ValueRefEnum::Array8);
236impl_array_value_ref!(i8, ValueRefEnum::Array8, transmute);
237impl_array_value_ref!(u16, ValueRefEnum::Array16);
238impl_array_value_ref!(i16, ValueRefEnum::Array16, transmute);
239impl_array_value_ref!(u32, ValueRefEnum::Array32);
240impl_array_value_ref!(i32, ValueRefEnum::Array32, transmute);
241impl_array_value_ref!(u64, ValueRefEnum::Array64);
242impl_array_value_ref!(i64, ValueRefEnum::Array64, transmute);
243impl_array_value_ref!(f32, ValueRefEnum::Array32, transmute);
244impl_array_value_ref!(f64, ValueRefEnum::Array64, transmute);
245
246// Types that can not be represented as reference to an integral numeric type
247// but can be converted to vector.
248macro_rules! impl_array_to_vec {
249    ($t:ty, $refenum:path,$f:expr) => {
250        impl<'a> Value<'a, Vec<$t>> for ValueRef<'a> {
251            fn get(&'a self, _: &'a Field) -> Result<Option<Vec<$t>>> {
252                match self.inner {
253                    Some($refenum(v)) => {
254                        let vec = v.iter().map($f).collect();
255                        Ok(Some(vec))
256                    }
257                    _ => err!(ConversionError::UnsupportedConversion),
258                }
259            }
260        }
261    };
262    ($t:ty, $refenum:path,$f:expr,decimal) => {
263        impl<'a> Value<'a, Vec<$t>> for ValueRef<'a> {
264            fn get(&'a self, field: &'a Field) -> Result<Option<Vec<$t>>> {
265                match self.inner {
266                    Some($refenum(v)) => {
267                        let scale = decimal_scale_from_field(field);
268
269                        let decimal = v.iter().map(|item| $f(*item, scale)).collect();
270
271                        Ok(Some(decimal))
272                    }
273                    _ => err!(ConversionError::UnsupportedConversion),
274                }
275            }
276        }
277    };
278}
279// TODO: remove Array128 as redundant.
280// Make `Ipv6` and `UUID` data types based on [u8;16] rather u128
281impl_array_to_vec!(Date<Utc>, ValueRefEnum::Array16, |item| {
282    ValueDate::date_inner(*item as i16)
283});
284impl_array_to_vec!(DateTime<Utc>, ValueRefEnum::Array32, |item| {
285    ValueDateTime::datetime_inner(*item as i32)
286});
287impl_array_to_vec!(Ipv4Addr, ValueRefEnum::Array32, |item| Ipv4Addr::from(
288    *item
289));
290impl_array_to_vec!(Ipv6Addr, ValueRefEnum::Array128, |item| Ipv6Addr::from(
291    (*item).swap_bytes()
292));
293// The side effect of this design is the ability to interpret any array of u32/i32 as an array
294// of Decimal32 with zero fractional part.
295// The same is true for u64/i64 and Decimal64
296impl_array_to_vec!(
297    Decimal32,
298    ValueRefEnum::Array32,
299    |item, scale| Decimal::from(item as i32, scale),
300    decimal
301);
302impl_array_to_vec!(
303    Decimal64,
304    ValueRefEnum::Array64,
305    |item, scale| Decimal::from(item as i64, scale),
306    decimal
307);
308// TODO: UUID arrays
309
310/// An implementation provides Row-to-Object deserialization interface
311/// It's used internally by block iterator
312///
313/// # Example
314/// Some(object) = block.iter()
315pub trait Deserialize: Sized {
316    fn deserialize(row: Row) -> Result<Self>;
317}
318/// Input Block data row
319#[derive(Debug)]
320pub struct Row<'a> {
321    /// vector of data references
322    col: Vec<ValueRef<'a>>,
323    /// data store
324    block: &'a ServerBlock,
325}
326
327impl<'a> Row<'a> {
328    /// # Safety
329    /// This function should be called after
330    /// `row_index` parameter was checked against row array boundary
331    /// Block Iterators check it
332    pub unsafe fn create(block: &'a ServerBlock, row_index: u64) -> Row<'a> {
333        let col: Vec<_> = block
334            .columns
335            .iter()
336            .map(|c| c.data.get_at(row_index))
337            .collect();
338        Row { col, block }
339    }
340    /// Returns the number of columns
341    /// This number must correspond to the number of fields in the SELECT statement
342    #[inline]
343    pub fn len(&self) -> usize {
344        self.col.len()
345    }
346    /// Empty server Data block is the special type of message.
347    /// It's used internally and usually cannot be returned to user
348    #[inline]
349    pub fn is_empty(&self) -> bool {
350        self.col.is_empty()
351    }
352
353    pub fn iter_columns(&self) -> Iter<BlockColumn> {
354        self.block.columns.iter()
355    }
356
357    pub fn iter_values(&self) -> Iter<ValueRef<'_>> {
358        self.col.iter()
359    }
360    /// Returns row field value converting underlying Sql type
361    /// to rust data type if the specific conversion is available.
362    /// Otherwise it returns ConversionError
363    /// For nullable Sql types if the field contains null value this method
364    /// returns Ok(None)
365    pub fn value<T>(&'a self, index: usize) -> Result<Option<T>>
366    where
367        T: 'a,
368        ValueRef<'a>: Value<'a, T>,
369    {
370        let value_ref = self.col.get(index).ok_or(DriverError::IndexOutOfRange)?;
371        let field = self
372            .block
373            .columns
374            .get(index)
375            .expect("column index out of range")
376            .header
377            .field
378            .borrow();
379
380        value_ref.get(field)
381    }
382    /// The same as `value` method but without performing any checking.
383    /// # Safety
384    /// Calling this method with an out of bound 'index' value is UB.
385    /// Panic if this method is called with unsupported data conversion
386    /// At the moment the driver provides limited number of data conversions.
387    /// This method should be used only if you know that table data structure
388    /// will nether change and you know exactly data types every column of the query.
389    pub unsafe fn value_unchecked<T>(&'a self, index: usize) -> Option<T>
390    where
391        T: 'a,
392        ValueRef<'a>: Value<'a, T>,
393    {
394        assert!(self.col.len() > index);
395        let value_ref = self.col.get_unchecked(index);
396        let field = self
397            .block
398            .columns
399            .get_unchecked(index)
400            .header
401            .field
402            .borrow();
403
404        value_ref.get(field).expect("Conversion error")
405    }
406
407    #[inline]
408    pub fn column_descr(&self, index: usize) -> Option<&BlockColumn> {
409        self.block.columns.get(index)
410    }
411    /// Returns column index by its name
412    pub fn column_index(&self, name: &str) -> usize {
413        let item = self
414            .block
415            .columns
416            .iter()
417            .enumerate()
418            .find(|(_i, c)| c.header.name.eq(name))
419            .unwrap();
420        item.0
421    }
422    /// Perform transformation Row to Plain object.
423    /// Requires that object type implements Deserialize trait
424    pub fn deserialize<D: Deserialize>(self) -> Result<D> {
425        <D as Deserialize>::deserialize(self)
426    }
427}
428
429impl<'a, C: AsInColumn + ?Sized + 'a> AsInColumn for Box<C> {
430    #[inline]
431    unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
432        self.as_ref().get_at(index)
433    }
434}
435/// String data is stored in Clickhouse as arbitrary byte sequence.
436/// It's not always possible safely convert it to utf-8 rust string.
437/// Hence we return it as boxed slice.
438pub(crate) type BoxString = Box<[u8]>;
439
440/// Enum value, String index pair,
441/// 0-T, clickhouse value
442/// 1-BoxString, enum string value
443#[derive(Clone)]
444pub struct EnumIndex<T>(pub T, pub BoxString);
445
446impl<T> EnumIndex<T> {
447    #[inline]
448    pub(crate) unsafe fn as_str(&self) -> &str {
449        std::str::from_utf8_unchecked(self.1.as_ref())
450    }
451}
452
453impl<T: Ord + Copy> EnumIndex<T> {
454    /// Sort by enum value (key)
455    #[inline]
456    pub(crate) fn fn_sort_val(item1: &EnumIndex<T>) -> T {
457        item1.0
458    }
459    /// Sort by enum name
460    #[inline]
461    pub(crate) fn fn_sort_str(item1: &EnumIndex<T>, item2: &EnumIndex<T>) -> Ordering {
462        Ord::cmp(item1.1.as_ref(), &item2.1.as_ref())
463    }
464}
465
466impl<T: PartialEq> PartialEq for EnumIndex<T> {
467    fn eq(&self, other: &Self) -> bool {
468        self.0 == other.0
469    }
470}
471
472impl<T: Copy + fmt::Display> fmt::Display for EnumIndex<T> {
473    /// Format Enum index value as a string that represent enum metadata
474    #[allow(unused_must_use)]
475    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
476        use std::fmt::Write;
477        f.write_str("'")?;
478        // SAFETY! all Enum string values are received from Server as a
479        // part of Enum metadata. So, all titles  is valid utf-8.
480        let s = unsafe { self.as_str() };
481        s.escape_default().for_each(|c| {
482            f.write_char(c);
483        });
484        f.write_str("' = ")?;
485        f.write_fmt(format_args!("{}", self.0))
486    }
487}
488
489pub trait EnumTranscode: Copy {
490    /// Perform Enum value to Enum title conversion
491    fn transcode(self, meta: &FieldMeta) -> &[u8];
492}
493
494// interpret it as *T (i8, u8) (i16, u16) in relation to input type
495// TODO: return empty slice instead of panic
496/// transform enum value to index in title array
497impl EnumTranscode for i16 {
498    #[inline]
499    fn transcode(self, meta: &FieldMeta) -> &[u8] {
500        meta.val2str(self)
501    }
502}
503
504/// Array of Enum8 or Enum16 values
505pub(crate) struct EnumColumn<T> {
506    data: Vec<T>,
507}
508
509/// T can be 'u8'(Enum8) or 'u16'(Enum16)
510impl<T: Send> EnumColumn<T> {
511    /// Read server stream as a sequence of u8(u16) bytes
512    /// and store them in internal buffer
513    pub(crate) async fn load_column<R: AsyncBufRead + Unpin>(
514        mut reader: R,
515        rows: u64,
516        field: &Field,
517    ) -> Result<EnumColumn<T>> {
518        debug_assert!(field.get_meta().is_some());
519
520        let mut data: Vec<T> = Vec::with_capacity(rows as usize);
521        unsafe {
522            data.set_len(rows as usize);
523            reader.read_exact(as_bytes_bufer_mut(&mut data)).await?;
524        }
525        Ok(EnumColumn { data })
526    }
527    pub(crate) fn set_nulls(self, _nulls: Option<Vec<u8>>) -> Box<EnumColumn<T>> {
528        // TODO: Check nullable enum. If Enum can be nullable how it stores nulls?
529        Box::new(self)
530    }
531}
532
533impl<'a, T: Copy + Send + Into<i16>> AsInColumn for EnumColumn<T> {
534    /// Store the enum value  in ValueRef struct
535    /// ValueRef performs conversion of Enum value to corresponding reference to title
536    /// when a  caller requires it.
537    unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
538        assert!((index as usize) < self.data.len());
539        let enum_value: i16 = (*self.data.get_unchecked(index as usize)).into();
540
541        ValueRef {
542            inner: Some(ValueRefEnum::Enum(enum_value)),
543        }
544    }
545}
546
547/// Column of sized types that can be represented
548/// in memory as continuous array of fixed-size elements
549pub(crate) struct FixedColumn<T: Sized> {
550    data: Vec<T>,
551}
552
553impl<T: Sized> FixedColumn<T> {
554    #[cfg(feature = "is_sorted")]
555    pub fn is_sorted(&self) -> bool {
556        self.data.is_sorted()
557    }
558    #[cfg(not(feature = "is_sorted"))]
559    #[inline]
560    pub fn is_sorted(&self) -> bool {
561        // This is used only for debug assert macros.
562        // So, just skip this check
563        // FIXME: when `is_sorted` will be stabilized
564        true
565    }
566}
567
568/// Column of fixed-size types with `is-null` status indicator
569pub(crate) struct FixedNullColumn<T: Sized> {
570    inner: FixedColumn<T>,
571    // TODO: All values are '1' - null, '0' - no null.
572    // For memory usage it might be better  to compress byte array of nulls to bitwise.
573    // And yet it can hit performance. Here it is a matter for study
574    // AVX2 simd instructions for optimized bit compression:
575    // __m256i x0 <- load 32 bytes
576    // __m256i x1 = _mm256_set1_epi8 (0x01)
577    // __m256i x0 = _mm256_sub_epi8 (x0, x1)
578    // int pack = _mm256_movemask_epi8 (x0)
579    nulls: Vec<u8>,
580}
581
582impl<'a, T> AsInColumn for FixedColumn<T>
583where
584    T: Send + Sized + ValueIndex + 'static,
585{
586    unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
587        debug_assert!((index as usize) < self.data.len());
588        ValueRef {
589            inner: Some(ValueIndex::valueref_at(self.data.as_slice(), index)),
590        }
591    }
592}
593/// Borrow mutable reference to an array of
594/// arbitrary type and represent it as an array of bytes.
595/// It's used internally to load array of integer data types from socket stream.
596#[inline]
597pub(crate) unsafe fn as_bytes_bufer_mut<T>(v: &mut [T]) -> &mut [u8] {
598    std::slice::from_raw_parts_mut(
599        v.as_mut_ptr() as *mut u8,
600        v.len() * std::mem::size_of::<T>(),
601    )
602}
603
604#[inline]
605pub(crate) unsafe fn as_bytes_bufer<T>(v: &[T]) -> &[u8] {
606    std::slice::from_raw_parts(v.as_ptr() as *mut u8, v.len() * std::mem::size_of::<T>())
607}
608
609/// Column of String represented as an array of Boxed byte arrays
610pub(crate) type StringColumn = FixedColumn<BoxString>;
611
612impl FixedColumn<BoxString> {
613    pub(crate) async fn load_string_column<R: AsyncBufRead + Unpin>(
614        reader: R,
615        rows: u64,
616    ) -> Result<StringColumn> {
617        let mut data: Vec<BoxString> = Vec::with_capacity(rows as usize);
618
619        let mut rdr = ValueReader::new(reader);
620        let mut l: u64;
621        for _ in 0..rows {
622            l = rdr.read_vint().await?;
623            let s: Vec<u8> = rdr.read_string(l).await?;
624            data.push(s.into_boxed_slice());
625        }
626
627        Ok(StringColumn { data })
628    }
629
630    pub(crate) async fn load_fixed_string_column<R: AsyncBufRead + Unpin>(
631        mut reader: R,
632        rows: u64,
633        width: u32,
634    ) -> Result<StringColumn> {
635        let mut data: Vec<BoxString> = Vec::with_capacity(rows as usize);
636
637        for _ in 0..rows {
638            let mut s: Vec<u8> = Vec::with_capacity(width as usize);
639            unsafe {
640                s.set_len(width as usize);
641            }
642            reader.read_exact(s.as_mut_slice()).await?;
643            data.push(s.into_boxed_slice());
644        }
645
646        Ok(StringColumn { data })
647    }
648}
649
650// impl AsInColumn for FixedColumn<BoxString> {
651//     unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
652//         assert!((index as usize) < self.data.len());
653//         let vr = self.data.get_unchecked(index as usize);
654//
655//         ValueRef {
656//             inner: Some(ValueRefEnum::String(vr)),
657//         }
658//     }
659// }
660
661impl<T: Sized> FixedColumn<T> {
662    /// Load Column of integer data types from the socket buffer
663    pub(crate) async fn load_column<R: AsyncBufRead + Unpin>(
664        mut reader: R,
665        rows: u64,
666    ) -> Result<FixedColumn<T>> {
667        let mut data: Vec<T> = Vec::with_capacity(rows as usize);
668
669        unsafe {
670            data.set_len(rows as usize);
671            // Big-endian? Never heard
672            reader.read_exact(as_bytes_bufer_mut(&mut data)).await?;
673        }
674
675        Ok(FixedColumn { data })
676    }
677    /// Cast Column of one data type to another.
678    /// Many types (UUID, Date, DateTime, Ip..) have the same memory representations
679    /// as basic integer types (u16, u32, u64,...). Hence we can load data as an array of
680    /// integers and cast it to desired type. This is true for little endian platforms.
681    /// A bit-endian require byte swap, that can be done in ValueRef getter interface.
682    pub(crate) fn cast<U: Sized>(self: FixedColumn<T>) -> FixedColumn<U> {
683        assert_eq!(size_of::<T>(), size_of::<U>());
684        assert!(align_of::<T>() >= align_of::<U>());
685
686        unsafe {
687            let mut clone = std::mem::ManuallyDrop::new(self);
688            FixedColumn {
689                data: Vec::from_raw_parts(
690                    clone.data.as_mut_ptr() as *mut U,
691                    clone.data.len(),
692                    clone.data.capacity(),
693                ),
694            }
695        }
696    }
697}
698
699impl<T> FixedColumn<T>
700where
701    T: Sized + Send + 'static,
702    FixedNullColumn<T>: AsInColumn,
703    FixedColumn<T>: AsInColumn,
704{
705    /// Wrap the Column in FixedNullColumn adapter if nulls array is provided.
706    /// Return the unchanged Column if nulls is not provided.
707    #[inline]
708    pub(crate) fn set_nulls(self: Self, nulls: Option<Vec<u8>>) -> Box<dyn AsInColumn> {
709        if let Some(nulls) = nulls {
710            Box::new(FixedNullColumn { inner: self, nulls })
711        } else {
712            Box::new(self)
713        }
714    }
715}
716/// Row getter interface implementation for nullable column of fixed data types
717impl<T: Sized> AsInColumn for FixedNullColumn<T>
718where
719    FixedColumn<T>: AsInColumn,
720{
721    unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
722        debug_assert!((index as usize) < self.nulls.len());
723        if self.nulls[index as usize] == 1 {
724            ValueRef { inner: None }
725        } else {
726            self.inner.get_at(index)
727        }
728    }
729}
730/// Any type that implement ValueIndex trait can be loaded into FixedColumn
731pub(crate) trait ValueIndex: Sized {
732    // TODO: redesign trait method to 'valueref(&self)->ValueRefEnum<'_>
733    unsafe fn valueref_at(_: &[Self], index: u64) -> ValueRefEnum<'_>;
734}
735
736impl ValueIndex for BoxString {
737    #[inline]
738    unsafe fn valueref_at(this: &[BoxString], index: u64) -> ValueRefEnum<'_> {
739        let vr = this.get_unchecked(index as usize);
740        ValueRefEnum::String(vr.as_ref())
741    }
742}
743
744macro_rules! impl_vre_at {
745    ($f:ty,$vr:expr) => {
746        impl ValueIndex for $f {
747            #[inline]
748            unsafe fn valueref_at(this: &[$f], index: u64) -> ValueRefEnum<'_> {
749                debug_assert!((index as usize) < this.len());
750                let vr = this.get_unchecked(index as usize);
751                $vr(*vr)
752            }
753        }
754    };
755}
756
757impl_vre_at!(u8, ValueRefEnum::UInt8);
758impl_vre_at!(i8, ValueRefEnum::Int8);
759impl_vre_at!(u16, ValueRefEnum::UInt16);
760impl_vre_at!(i16, ValueRefEnum::Int16);
761impl_vre_at!(u32, ValueRefEnum::UInt32);
762impl_vre_at!(i32, ValueRefEnum::Int32);
763impl_vre_at!(u64, ValueRefEnum::UInt64);
764impl_vre_at!(i64, ValueRefEnum::Int64);
765#[cfg(feature = "int128")]
766impl_vre_at!(u128, ValueRefEnum::UInt128);
767
768impl_vre_at!(f32, ValueRefEnum::Float32);
769impl_vre_at!(f64, ValueRefEnum::Float64);
770
771impl_vre_at!(ValueUuid, ValueRefEnum::Uuid);
772impl_vre_at!(ValueIp4, ValueRefEnum::Ip4);
773impl_vre_at!(ValueIp6, ValueRefEnum::Ip6);
774
775impl_vre_at!(ValueDecimal32, ValueRefEnum::Decimal32);
776impl_vre_at!(ValueDecimal64, ValueRefEnum::Decimal64);
777#[cfg(feature = "int128")]
778impl_vre_at!(ValueDecimal128, ValueRefEnum::Decimal128);
779
780impl_vre_at!(ValueDate, ValueRefEnum::Date);
781impl_vre_at!(ValueDateTime, ValueRefEnum::DateTime);
782impl_vre_at!(ValueDateTime64, ValueRefEnum::DateTime64);
783
784#[allow(unused_macros)]
785macro_rules! impl_fixed_column {
786    ($f:ty,$vr:expr) => {
787        impl AsInColumn for FixedColumn<$f> {
788            unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
789                debug_assert!((index as usize) < self.data.len());
790                let vr = self.data.get_unchecked(index as usize);
791                ValueRef {
792                    inner: Some($vr(*vr)),
793                }
794            }
795        }
796    };
797}
798/// One dimensional array of integral types
799#[allow(dead_code)]
800pub(crate) struct FixedArrayColumn<T> {
801    /// Array values
802    data: Vec<T>,
803    /// Array offsets
804    index: Vec<u64>,
805}
806
807impl<T: Send + IntoArray + 'static> FixedArrayColumn<T> {
808    pub(crate) async fn load_column<R>(mut reader: R, rows: u64) -> Result<Box<dyn AsInColumn>>
809    where
810        R: AsyncBufRead + Unpin,
811    {
812        let index: FixedColumn<u64> = FixedColumn::load_column(reader.borrow_mut(), rows).await?;
813
814        debug_assert!(index.is_sorted());
815
816        let rows = *index.data.last().expect("null size array");
817        let data: FixedColumn<T> = FixedColumn::load_column(reader.borrow_mut(), rows).await?;
818
819        Ok(Box::new(FixedArrayColumn {
820            data: data.data,
821            index: index.data,
822        }))
823    }
824}
825
826pub trait IntoArray: Sized {
827    fn into_array(this: &[Self]) -> ValueRefEnum<'_>;
828}
829
830macro_rules! impl_intoarray {
831    ($t:ty, $array: expr) => {
832        impl IntoArray for $t {
833            #[inline]
834            fn into_array(this: &[$t]) -> ValueRefEnum<'_> {
835                $array(this)
836            }
837        }
838    };
839}
840
841impl_intoarray!(u8, ValueRefEnum::Array8);
842impl_intoarray!(u16, ValueRefEnum::Array16);
843impl_intoarray!(u32, ValueRefEnum::Array32);
844impl_intoarray!(u64, ValueRefEnum::Array64);
845impl_intoarray!(u128, ValueRefEnum::Array128);
846
847impl<T: Send + IntoArray> AsInColumn for FixedArrayColumn<T> {
848    unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
849        let size1 = if index == 0 {
850            0_u64
851        } else {
852            *self.index.get_unchecked((index - 1) as usize)
853        };
854
855        let size2 = *self.index.get_unchecked(index as usize);
856        debug_assert!(size1 <= size2);
857        let size1 = size1 as usize;
858        let size2 = size2 as usize;
859        debug_assert!(size2 <= self.data.len());
860
861        ValueRef {
862            inner: Some(IntoArray::into_array(&self.data[size1..size2])),
863        }
864    }
865}
866
867/// LowCardinality data type is used to reduce the storage requirements
868/// and significantly improve query performance for String and some other data.
869/// Internally it's encoded as a dictionary with numeric keys (u8, u16, u32, or u64 type)
870/// T - key type. The current implementation supports only String data type.
871pub(crate) struct LowCardinalityColumn<T: Sized + Send> {
872    values: Vec<BoxString>,
873    data: Vec<T>,
874}
875// TODO: redesign bearing in mind 32-bit system limitations.
876// In 32-bit platforms 64-bit indexes cannot be fit in memory.
877// Probably it is not practical to  create and send LowCardinality column
878// with more than 2^32 keys. It is worth setting a reasonable limit on
879// the number or keys sending in one block.
880impl<T> LowCardinalityColumn<T>
881where
882    T: Sized + Ord + Copy + Send + Into<u64> + 'static,
883{
884    pub(crate) async fn load_column<R>(
885        reader: R,
886        rows: u64,
887        values: FixedColumn<BoxString>,
888    ) -> Result<Box<dyn AsInColumn>>
889    where
890        R: AsyncBufRead + Unpin,
891    {
892        debug_assert!(rows > 0);
893        let data: FixedColumn<T> = FixedColumn::load_column(reader, rows).await?;
894
895        let m = data
896            .data
897            .iter()
898            .max()
899            .expect("corrupted lowcardinality column");
900
901        if (*m).into() >= values.data.len() as u64 {
902            return err!(DriverError::IntegrityError);
903        }
904
905        Ok(Box::new(LowCardinalityColumn {
906            data: data.data,
907            values: values.data,
908        }))
909    }
910}
911
912impl<T: Copy + Send + Sized + Into<u64>> AsInColumn for LowCardinalityColumn<T> {
913    unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
914        debug_assert!((index as usize) < self.data.len());
915        let index = self.data.get_unchecked(index as usize);
916
917        let index: u64 = (*index).into();
918        if index == 0 {
919            // Supposed the first item in list  is always NULL value
920            debug_assert!(self.values[0].len() == 0);
921            ValueRef { inner: None }
922        } else {
923            ValueRef {
924                inner: Some(ValueIndex::valueref_at(self.values.as_slice(), index)),
925            }
926        }
927    }
928}