clickhouse_readonly/column/
mod.rs

1use std::{marker, ops, sync::Arc};
2
3use chrono_tz::Tz;
4
5use crate::{
6    binary::{Encoder, ReadEx},
7    column::{column_data::ArcColumnData, iter::Iterable},
8    error::Result,
9    types::SqlType,
10    value::{Value, ValueRef},
11};
12
13use self::chunk::ChunkColumnData;
14pub(crate) use self::{column_data::ColumnData, string_pool::StringPool};
15pub use self::{concat::ConcatColumnData, numeric::VectorColumnData};
16
17mod array;
18mod chunk;
19mod column_data;
20mod concat;
21mod factory;
22pub(crate) mod fixed_string;
23pub(crate) mod iter;
24mod list;
25mod nullable;
26mod numeric;
27mod string;
28mod string_pool;
29
30/// Represents Clickhouse Column
31pub struct Column<K: ColumnType> {
32    pub(crate) name: String,
33    pub(crate) data: ArcColumnData,
34    pub(crate) _marker: marker::PhantomData<K>,
35}
36
37pub trait ColumnFrom {
38    fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper;
39}
40
41pub trait ColumnType: Send + Copy + Sync + 'static {}
42
43#[derive(Copy, Clone, Default)]
44pub struct Simple {
45    _private: (),
46}
47
48#[derive(Copy, Clone, Default)]
49pub struct Complex {
50    _private: (),
51}
52
53impl ColumnType for Simple {}
54
55impl ColumnType for Complex {}
56
57impl<K: ColumnType> ColumnFrom for Column<K> {
58    fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
59        W::wrap_arc(source.data)
60    }
61}
62
63impl<L: ColumnType, R: ColumnType> PartialEq<Column<R>> for Column<L> {
64    fn eq(&self, other: &Column<R>) -> bool {
65        if self.len() != other.len() {
66            return false;
67        }
68
69        if self.sql_type() != other.sql_type() {
70            return false;
71        }
72
73        for i in 0..self.len() {
74            if self.at(i) != other.at(i) {
75                return false;
76            }
77        }
78
79        true
80    }
81}
82
83impl<K: ColumnType> Clone for Column<K> {
84    fn clone(&self) -> Self {
85        Self {
86            name: self.name.clone(),
87            data: self.data.clone(),
88            _marker: marker::PhantomData,
89        }
90    }
91}
92
93impl Column<Simple> {
94    pub(crate) fn concat<'a, I>(items: I) -> Column<Complex>
95    where
96        I: Iterator<Item = &'a Self>,
97    {
98        let items_vec: Vec<&Self> = items.collect();
99        let chunks: Vec<_> = items_vec.iter().map(|column| column.data.clone()).collect();
100        match items_vec.first() {
101            None => unreachable!(),
102            Some(first_column) => {
103                let name: String = first_column.name().to_string();
104                let data = ConcatColumnData::concat(chunks);
105                Column {
106                    name,
107                    data: Arc::new(data),
108                    _marker: marker::PhantomData,
109                }
110            }
111        }
112    }
113}
114
115impl<K: ColumnType> Column<K> {
116    /// Returns an iterator over the column.
117    ///
118    /// ### Example
119    ///
120    /// ```no_run
121    /// # use std::env;
122    /// # use clickhouse_readonly::{error::{Error,Result}, Pool, PoolConfigBuilder};
123    /// # use futures_util::stream::StreamExt;
124    /// # let mut rt = tokio::runtime::Runtime::new().unwrap();
125    /// # let ret: Result<()> = rt.block_on(async {
126    /// #     let config = PoolConfigBuilder::new(
127    /// #         "127.0.0.1".parse().unwrap(),
128    /// #         "default".to_string(),
129    /// #         "username".to_string(),
130    /// #         "password".to_string(),
131    /// #         true,
132    /// #     )
133    /// #     .build();
134
135    /// #     let pool = Pool::new(config);
136    /// #     let mut client = pool.get_handle().await?;
137    ///       let mut stream = client
138    ///             .query("SELECT number as n1, number as n2, number as n3 FROM numbers(100000000)")
139    ///             .stream_blocks();
140    ///
141    ///       let mut sum = 0;
142    ///       while let Some(block) = stream.next().await {
143    ///           let block = block?;
144    ///
145    ///           let c1 = block.get_column("n1")?.iter::<u64>()?;
146    ///           let c2 = block.get_column("n2")?.iter::<u64>()?;
147    ///           let c3 = block.get_column("n3")?.iter::<u64>()?;
148    ///
149    ///           for ((v1, v2), v3) in c1.zip(c2).zip(c3) {
150    ///               sum = v1 + v2 + v3;
151    ///           }
152    ///       }
153    ///
154    ///       dbg!(sum);
155    /// #     Ok(())
156    /// # });
157    /// # ret.unwrap()
158    /// ```
159    pub fn iter<'a, T: Iterable<'a, K>>(&'a self) -> Result<T::Iter> {
160        <T as Iterable<'a, K>>::iter(self, self.sql_type())
161    }
162}
163
164impl<K: ColumnType> Column<K> {
165    pub(crate) fn read<R: ReadEx>(reader: &mut R, size: usize, tz: Tz) -> Result<Column<K>> {
166        let name = reader.read_string()?;
167        let type_name = reader.read_string()?;
168        let data =
169            <dyn ColumnData>::load_data::<ArcColumnWrapper, _>(reader, &type_name, size, tz)?;
170        let column = Self {
171            name,
172            data,
173            _marker: marker::PhantomData,
174        };
175        Ok(column)
176    }
177
178    #[inline(always)]
179    pub fn name(&self) -> &str {
180        &self.name
181    }
182
183    #[inline(always)]
184    pub fn sql_type(&self) -> SqlType {
185        self.data.sql_type()
186    }
187
188    #[inline(always)]
189    pub(crate) fn at(&self, index: usize) -> ValueRef {
190        self.data.at(index)
191    }
192
193    pub(crate) fn write(&self, encoder: &mut Encoder) {
194        encoder.string(&self.name);
195        encoder.string(self.data.sql_type().to_string().as_ref());
196        let len = self.data.len();
197        self.data.save(encoder, 0, len);
198    }
199
200    #[inline(always)]
201    pub(crate) fn len(&self) -> usize {
202        self.data.len()
203    }
204
205    pub(crate) fn slice(&self, range: ops::Range<usize>) -> Column<Complex> {
206        let data = ChunkColumnData::new(self.data.clone(), range);
207        Column {
208            name: self.name.clone(),
209            data: Arc::new(data),
210            _marker: marker::PhantomData,
211        }
212    }
213
214    // pub(crate) fn cast_to(self, dst_type: SqlType) -> Result<Self> {
215    //     let src_type = self.sql_type();
216
217    //     if dst_type == src_type {
218    //         return Ok(self);
219    //     }
220
221    //     match (dst_type.clone(), src_type.clone()) {
222    //         (SqlType::FixedString(str_len), SqlType::String) => {
223    //             let name = self.name().to_owned();
224    //             let adapter = FixedStringAdapter {
225    //                 column: self,
226    //                 str_len,
227    //             };
228    //             Ok(Column {
229    //                 name,
230    //                 data: Arc::new(adapter),
231    //                 _marker: marker::PhantomData,
232    //             })
233    //         }
234    //         (
235    //             SqlType::Nullable(SqlType::FixedString(str_len)),
236    //             SqlType::Nullable(SqlType::String),
237    //         ) => {
238    //             let name = self.name().to_owned();
239    //             let adapter = NullableFixedStringAdapter {
240    //                 column: self,
241    //                 str_len: *str_len,
242    //             };
243    //             Ok(Column {
244    //                 name,
245    //                 data: Arc::new(adapter),
246    //                 _marker: marker::PhantomData,
247    //             })
248    //         }
249    //         (SqlType::String, SqlType::Array(SqlType::UInt8)) => {
250    //             let name = self.name().to_owned();
251    //             let adapter = StringAdapter { column: self };
252    //             Ok(Column {
253    //                 name,
254    //                 data: Arc::new(adapter),
255    //                 _marker: marker::PhantomData,
256    //             })
257    //         }
258    //         (SqlType::FixedString(n), SqlType::Array(SqlType::UInt8)) => {
259    //             let string_column = self.cast_to(SqlType::String)?;
260    //             string_column.cast_to(SqlType::FixedString(n))
261    //         }
262    //         _ => {
263    //             if let Some(data) = self.data.cast_to(&self.data, &dst_type) {
264    //                 let name = self.name().to_owned();
265    //                 Ok(Column {
266    //                     name,
267    //                     data,
268    //                     _marker: marker::PhantomData,
269    //                 })
270    //             } else {
271    //                 anyhow::bail!(
272    //                     format!(
273    //                         "InvalidType {} -> {}",
274    //                         src_type,
275    //                         dst_type
276    //                     )
277    //                 )
278    //             }
279    //         }
280    //     }
281    // }
282
283    pub(crate) fn push(&mut self, value: Value) {
284        loop {
285            match Arc::get_mut(&mut self.data) {
286                None => {
287                    self.data = Arc::from(self.data.clone_instance());
288                }
289                Some(data) => {
290                    data.push(value);
291                    break;
292                }
293            }
294        }
295    }
296
297    pub(crate) unsafe fn get_internal(
298        &self,
299        pointers: &[*mut *const u8],
300        level: u8,
301        props: u32,
302    ) -> Result<()> {
303        self.data.get_internal(pointers, level, props)
304    }
305}
306
307pub(crate) fn new_column<K: ColumnType>(
308    name: &str,
309    data: Arc<(dyn ColumnData + Sync + Send + 'static)>,
310) -> Column<K> {
311    Column {
312        name: name.to_string(),
313        data,
314        _marker: marker::PhantomData,
315    }
316}
317
318pub trait ColumnWrapper {
319    type Wrapper;
320    fn wrap<T: ColumnData + Send + Sync + 'static>(column: T) -> Self::Wrapper;
321
322    fn wrap_arc(data: ArcColumnData) -> Self::Wrapper;
323}
324
325pub(crate) struct ArcColumnWrapper {
326    _private: (),
327}
328
329impl ColumnWrapper for ArcColumnWrapper {
330    type Wrapper = Arc<dyn ColumnData + Send + Sync>;
331
332    fn wrap<T: ColumnData + Send + Sync + 'static>(column: T) -> Self::Wrapper {
333        Arc::new(column)
334    }
335
336    fn wrap_arc(data: ArcColumnData) -> Self::Wrapper {
337        data
338    }
339}
340
341pub(crate) struct BoxColumnWrapper {
342    _private: (),
343}
344
345impl ColumnWrapper for BoxColumnWrapper {
346    type Wrapper = Box<dyn ColumnData + Send + Sync>;
347
348    fn wrap<T: ColumnData + Send + Sync + 'static>(column: T) -> Self::Wrapper {
349        Box::new(column)
350    }
351
352    fn wrap_arc(_: ArcColumnData) -> Self::Wrapper {
353        unimplemented!()
354    }
355}