opensrv_clickhouse/types/column/
mod.rs

1// Copyright 2021 Datafuse Labs.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::marker;
17use std::net::Ipv4Addr;
18use std::net::Ipv6Addr;
19use std::ops;
20use std::sync::Arc;
21
22use chrono_tz::Tz;
23
24use self::chunk::ChunkColumnData;
25pub use self::column_data::ArcColumnData;
26pub use self::column_data::BoxColumnData;
27pub use self::column_data::ColumnData;
28pub use self::column_data::ColumnDataExt;
29pub use self::concat::ConcatColumnData;
30pub use self::list::List;
31pub use self::nullable::NullableColumnData;
32pub use self::numeric::VectorColumnData;
33pub use self::string::StringColumnData;
34pub use self::string_pool::StringPool;
35pub use self::tuple::TupleColumnData;
36pub use crate::types::column::array::ArrayColumnData;
37
38use crate::binary::Encoder;
39use crate::binary::ReadEx;
40use crate::errors::Error;
41use crate::errors::FromSqlError;
42use crate::errors::Result;
43use crate::types::column::decimal::DecimalAdapter;
44use crate::types::column::decimal::NullableDecimalAdapter;
45use crate::types::column::enums::Enum16Adapter;
46use crate::types::column::enums::Enum8Adapter;
47use crate::types::column::enums::NullableEnum16Adapter;
48use crate::types::column::enums::NullableEnum8Adapter;
49use crate::types::column::fixed_string::FixedStringAdapter;
50use crate::types::column::fixed_string::NullableFixedStringAdapter;
51use crate::types::column::ip::IpColumnData;
52use crate::types::column::ip::Ipv4;
53use crate::types::column::ip::Ipv6;
54use crate::types::column::iter::Iterable;
55use crate::types::column::string::StringAdapter;
56use crate::types::decimal::NoBits;
57use crate::types::SqlType;
58use crate::types::Value;
59use crate::types::ValueRef;
60
61mod array;
62pub(crate) mod chrono_datetime;
63mod chunk;
64mod column_data;
65pub mod concat;
66mod date;
67pub mod datetime64;
68mod decimal;
69mod enums;
70pub mod factory;
71pub(crate) mod fixed_string;
72mod ip;
73pub mod iter;
74mod list;
75mod nullable;
76mod numeric;
77mod string;
78mod string_pool;
79mod tuple;
80
81/// Represents Clickhouse Column
82pub struct Column<K: ColumnType> {
83    pub(crate) name: String,
84    pub(crate) data: ArcColumnData,
85    pub(crate) _marker: marker::PhantomData<K>,
86}
87
88pub trait ColumnFrom {
89    fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper;
90}
91
92pub trait ColumnType: Send + Copy + Sync + 'static {}
93
94#[derive(Copy, Clone, Default)]
95pub struct Simple {
96    _private: (),
97}
98
99#[derive(Copy, Clone, Default)]
100pub struct Complex {
101    _private: (),
102}
103
104impl ColumnType for Simple {}
105
106impl ColumnType for Complex {}
107
108impl<K: ColumnType> ColumnFrom for Column<K> {
109    fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
110        W::wrap_arc(source.data)
111    }
112}
113
114impl<L: ColumnType, R: ColumnType> PartialEq<Column<R>> for Column<L> {
115    fn eq(&self, other: &Column<R>) -> bool {
116        if self.len() != other.len() {
117            return false;
118        }
119
120        if self.sql_type() != other.sql_type() {
121            return false;
122        }
123
124        for i in 0..self.len() {
125            if self.at(i) != other.at(i) {
126                return false;
127            }
128        }
129
130        true
131    }
132}
133
134impl<K: ColumnType> Clone for Column<K> {
135    fn clone(&self) -> Self {
136        Self {
137            name: self.name.clone(),
138            data: self.data.clone(),
139            _marker: marker::PhantomData,
140        }
141    }
142}
143
144impl Column<Simple> {
145    pub(crate) fn concat<'a, I>(items: I) -> Column<Complex>
146    where
147        I: Iterator<Item = &'a Self>,
148    {
149        let items_vec: Vec<&Self> = items.collect();
150        let chunks: Vec<_> = items_vec.iter().map(|column| column.data.clone()).collect();
151        match items_vec.first() {
152            None => unreachable!(),
153            Some(first_column) => {
154                let name: String = first_column.name().to_string();
155                let data = ConcatColumnData::concat(chunks);
156                Column {
157                    name,
158                    data: Arc::new(data),
159                    _marker: marker::PhantomData,
160                }
161            }
162        }
163    }
164}
165
166impl<K: ColumnType> Column<K> {
167    /*
168    /// Returns an iterator over the column.
169    ///
170    /// ### Example
171    ///
172    /// ```rust
173    /// # use std::env;
174    /// # use clickhouse_rs::{errors::Error, Pool, errors::Result};
175    /// # use futures_util::stream::StreamExt;
176    /// # let mut rt = tokio::runtime::Runtime::new().unwrap();
177    /// # let ret: Result<()> = rt.block_on(async {
178    /// #     let database_url = "tcp://localhost:9000";
179    /// #     let pool = Pool::new(database_url);
180    /// #     let mut client = pool.get_handle().await?;
181    ///       let mut stream = client
182    ///             .query("SELECT number as n1, number as n2, number as n3 FROM numbers(100000000)")
183    ///             .stream_blocks();
184    ///
185    ///       let mut sum = 0;
186    ///       while let Some(block) = stream.next().await {
187    ///           let block = block?;
188    ///
189    ///           let c1 = block.get_column("n1")?.iter::<u64>()?;
190    ///           let c2 = block.get_column("n2")?.iter::<u64>()?;
191    ///           let c3 = block.get_column("n3")?.iter::<u64>()?;
192    ///
193    ///           for ((v1, v2), v3) in c1.zip(c2).zip(c3) {
194    ///               sum = v1 + v2 + v3;
195    ///           }
196    ///       }
197    ///
198    ///       dbg!(sum);
199    /// #     Ok(())
200    /// # });
201    /// # ret.unwrap()
202    /// ```
203     */
204    pub fn iter<'a, T: Iterable<'a, K>>(&'a self) -> Result<T::Iter> {
205        <T as Iterable<'a, K>>::iter(self, self.sql_type())
206    }
207}
208
209impl<K: ColumnType> Column<K> {
210    pub(crate) fn read<R: ReadEx>(reader: &mut R, size: usize, tz: Tz) -> Result<Column<K>> {
211        let name = reader.read_string()?;
212        let type_name = reader.read_string()?;
213        let data =
214            <dyn ColumnData>::load_data::<ArcColumnWrapper, _>(reader, &type_name, size, tz)?;
215        let column = Self {
216            name,
217            data,
218            _marker: marker::PhantomData,
219        };
220        Ok(column)
221    }
222
223    #[inline(always)]
224    pub fn name(&self) -> &str {
225        &self.name
226    }
227
228    #[inline(always)]
229    pub fn sql_type(&self) -> SqlType {
230        self.data.sql_type()
231    }
232
233    #[inline(always)]
234    pub(crate) fn at(&self, index: usize) -> ValueRef {
235        self.data.at(index)
236    }
237
238    pub(crate) fn write(&self, encoder: &mut Encoder) {
239        encoder.string(&self.name);
240        encoder.string(self.data.sql_type().to_string().as_ref());
241        let len = self.data.len();
242        self.data.save(encoder, 0, len);
243    }
244
245    #[inline(always)]
246    pub(crate) fn len(&self) -> usize {
247        self.data.len()
248    }
249
250    pub(crate) fn slice(&self, range: ops::Range<usize>) -> Column<Complex> {
251        let data = ChunkColumnData::new(self.data.clone(), range);
252        Column {
253            name: self.name.clone(),
254            data: Arc::new(data),
255            _marker: marker::PhantomData,
256        }
257    }
258
259    pub(crate) fn cast_to(self, dst_type: SqlType) -> Result<Self> {
260        let src_type = self.sql_type();
261
262        if dst_type == src_type {
263            return Ok(self);
264        }
265
266        match (dst_type.clone(), src_type.clone()) {
267            (SqlType::FixedString(str_len), SqlType::String) => {
268                let name = self.name().to_owned();
269                let adapter = FixedStringAdapter {
270                    column: self,
271                    str_len,
272                };
273                Ok(Column {
274                    name,
275                    data: Arc::new(adapter),
276                    _marker: marker::PhantomData,
277                })
278            }
279            (
280                SqlType::Nullable(SqlType::FixedString(str_len)),
281                SqlType::Nullable(SqlType::String),
282            ) => {
283                let name = self.name().to_owned();
284                let adapter = NullableFixedStringAdapter {
285                    column: self,
286                    str_len: *str_len,
287                };
288                Ok(Column {
289                    name,
290                    data: Arc::new(adapter),
291                    _marker: marker::PhantomData,
292                })
293            }
294            (SqlType::String, SqlType::Array(SqlType::UInt8)) => {
295                let name = self.name().to_owned();
296                let adapter = StringAdapter { column: self };
297                Ok(Column {
298                    name,
299                    data: Arc::new(adapter),
300                    _marker: marker::PhantomData,
301                })
302            }
303            (SqlType::FixedString(n), SqlType::Array(SqlType::UInt8)) => {
304                let string_column = self.cast_to(SqlType::String)?;
305                string_column.cast_to(SqlType::FixedString(n))
306            }
307            (SqlType::Decimal(dst_p, dst_s), SqlType::Decimal(_, _)) => {
308                let name = self.name().to_owned();
309                let nobits = NoBits::from_precision(dst_p).unwrap();
310                let adapter = DecimalAdapter {
311                    column: self,
312                    precision: dst_p,
313                    scale: dst_s,
314                    nobits,
315                };
316                Ok(Column {
317                    name,
318                    data: Arc::new(adapter),
319                    _marker: marker::PhantomData,
320                })
321            }
322            (SqlType::Enum8(enum_values), SqlType::Enum8(_)) => {
323                let name = self.name().to_owned();
324                let adapter = Enum8Adapter {
325                    column: self,
326                    enum_values,
327                };
328                Ok(Column {
329                    name,
330                    data: Arc::new(adapter),
331                    _marker: marker::PhantomData,
332                })
333            }
334            (SqlType::Enum16(enum_values), SqlType::Enum16(_)) => {
335                let name = self.name().to_owned();
336                let adapter = Enum16Adapter {
337                    column: self,
338                    enum_values,
339                };
340                Ok(Column {
341                    name,
342                    data: Arc::new(adapter),
343                    _marker: marker::PhantomData,
344                })
345            }
346            (
347                SqlType::Nullable(SqlType::Enum8(enum_values)),
348                SqlType::Nullable(SqlType::Enum8(_)),
349            ) => {
350                let name = self.name().to_owned();
351                let enum_values = enum_values.clone();
352                let adapter = NullableEnum8Adapter {
353                    column: self,
354                    enum_values,
355                };
356                Ok(Column {
357                    name,
358                    data: Arc::new(adapter),
359                    _marker: marker::PhantomData,
360                })
361            }
362            (
363                SqlType::Nullable(SqlType::Enum16(enum_values)),
364                SqlType::Nullable(SqlType::Enum16(_)),
365            ) => {
366                let name = self.name().to_owned();
367                let enum_values = enum_values.clone();
368                let adapter = NullableEnum16Adapter {
369                    column: self,
370                    enum_values,
371                };
372                Ok(Column {
373                    name,
374                    data: Arc::new(adapter),
375                    _marker: marker::PhantomData,
376                })
377            }
378            (
379                SqlType::Nullable(SqlType::Decimal(dst_p, dst_s)),
380                SqlType::Nullable(SqlType::Decimal(_, _)),
381            ) => {
382                let name = self.name().to_owned();
383                let nobits = NoBits::from_precision(*dst_p).unwrap();
384                let adapter = NullableDecimalAdapter {
385                    column: self,
386                    precision: *dst_p,
387                    scale: *dst_s,
388                    nobits,
389                };
390                Ok(Column {
391                    name,
392                    data: Arc::new(adapter),
393                    _marker: marker::PhantomData,
394                })
395            }
396            (SqlType::Ipv4, SqlType::String) => {
397                let name = self.name().to_owned();
398
399                let n = self.len();
400                let mut inner = Vec::with_capacity(n);
401                for i in 0..n {
402                    let source = self.at(i).as_str().unwrap();
403                    let ip: Ipv4Addr = source.parse().unwrap();
404                    let mut buffer = [0_u8; 4];
405                    buffer.copy_from_slice(&ip.octets());
406                    buffer.reverse();
407                    inner.extend(buffer);
408                }
409
410                let data = Arc::new(IpColumnData::<Ipv4> {
411                    inner,
412                    phantom: marker::PhantomData,
413                });
414
415                Ok(Column {
416                    name,
417                    data,
418                    _marker: marker::PhantomData,
419                })
420            }
421            (SqlType::Ipv6, SqlType::String) => {
422                let name = self.name().to_owned();
423
424                let n = self.len();
425                let mut inner = Vec::with_capacity(n);
426                for i in 0..n {
427                    let source = self.at(i).as_str().unwrap();
428                    let ip: Ipv6Addr = source.parse().unwrap();
429                    inner.extend(ip.octets());
430                }
431
432                let data = Arc::new(IpColumnData::<Ipv6> {
433                    inner,
434                    phantom: marker::PhantomData,
435                });
436
437                Ok(Column {
438                    name,
439                    data,
440                    _marker: marker::PhantomData,
441                })
442            }
443            _ => {
444                if let Some(data) = self.data.cast_to(&self.data, &dst_type) {
445                    let name = self.name().to_owned();
446                    Ok(Column {
447                        name,
448                        data,
449                        _marker: marker::PhantomData,
450                    })
451                } else {
452                    Err(Error::FromSql(FromSqlError::InvalidType {
453                        src: src_type.to_string(),
454                        dst: dst_type.to_string(),
455                    }))
456                }
457            }
458        }
459    }
460
461    pub(crate) fn push(&mut self, value: Value) {
462        loop {
463            match Arc::get_mut(&mut self.data) {
464                None => {
465                    self.data = Arc::from(self.data.clone_instance());
466                }
467                Some(data) => {
468                    data.push(value);
469                    break;
470                }
471            }
472        }
473    }
474
475    pub(crate) unsafe fn get_internal(&self, pointers: &[*mut *const u8], level: u8) -> Result<()> {
476        self.data.get_internal(pointers, level)
477    }
478}
479
480pub fn new_column<K: ColumnType>(
481    name: &str,
482    data: Arc<(dyn ColumnData + Sync + Send + 'static)>,
483) -> Column<K> {
484    Column {
485        name: name.to_string(),
486        data,
487        _marker: marker::PhantomData,
488    }
489}
490
491#[derive(Clone, Debug, Eq, PartialEq)]
492pub enum Either<L, R>
493where
494    L: fmt::Debug + PartialEq + Clone,
495    R: fmt::Debug + PartialEq + Clone,
496{
497    Left(L),
498    Right(R),
499}
500
501pub trait ColumnWrapper {
502    type Wrapper;
503    fn wrap<T: ColumnData + Send + Sync + 'static>(column: T) -> Self::Wrapper;
504
505    fn wrap_arc(data: ArcColumnData) -> Self::Wrapper;
506}
507
508pub struct ArcColumnWrapper {
509    _private: (),
510}
511
512impl ColumnWrapper for ArcColumnWrapper {
513    type Wrapper = Arc<dyn ColumnData + Send + Sync>;
514
515    fn wrap<T: ColumnData + Send + Sync + 'static>(column: T) -> Self::Wrapper {
516        Arc::new(column)
517    }
518
519    fn wrap_arc(data: ArcColumnData) -> Self::Wrapper {
520        data
521    }
522}
523
524pub(crate) struct BoxColumnWrapper {
525    _private: (),
526}
527
528impl ColumnWrapper for BoxColumnWrapper {
529    type Wrapper = Box<dyn ColumnData + Send + Sync>;
530
531    fn wrap<T: ColumnData + Send + Sync + 'static>(column: T) -> Self::Wrapper {
532        Box::new(column)
533    }
534
535    fn wrap_arc(_: ArcColumnData) -> Self::Wrapper {
536        unimplemented!()
537    }
538}