opensrv_clickhouse/types/column/
decimal.rs

1// Copyright 2021 Datafuse Labs.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use chrono_tz::Tz;
18
19use crate::binary::Encoder;
20use crate::binary::ReadEx;
21use crate::errors::Result;
22use crate::types::column::column_data::BoxColumnData;
23use crate::types::column::column_data::ColumnData;
24use crate::types::column::list::List;
25use crate::types::column::nullable::NullableColumnData;
26use crate::types::column::BoxColumnWrapper;
27use crate::types::column::ColumnFrom;
28use crate::types::column::ColumnWrapper;
29use crate::types::column::Either;
30use crate::types::column::VectorColumnData;
31use crate::types::decimal::Decimal;
32use crate::types::decimal::NoBits;
33use crate::types::from_sql::FromSql;
34use crate::types::Column;
35use crate::types::ColumnType;
36use crate::types::SqlType;
37use crate::types::Value;
38use crate::types::ValueRef;
39
40pub(crate) struct DecimalColumnData {
41    pub(crate) inner: Box<dyn ColumnData + Send + Sync>,
42    pub(crate) precision: u8,
43    pub(crate) scale: u8,
44    pub(crate) nobits: NoBits,
45}
46
47pub(crate) struct DecimalAdapter<K: ColumnType> {
48    pub(crate) column: Column<K>,
49    pub(crate) precision: u8,
50    pub(crate) scale: u8,
51    pub(crate) nobits: NoBits,
52}
53
54pub(crate) struct NullableDecimalAdapter<K: ColumnType> {
55    pub(crate) column: Column<K>,
56    pub(crate) precision: u8,
57    pub(crate) scale: u8,
58    pub(crate) nobits: NoBits,
59}
60
61impl DecimalColumnData {
62    pub(crate) fn load<T: ReadEx>(
63        reader: &mut T,
64        precision: u8,
65        scale: u8,
66        nobits: NoBits,
67        size: usize,
68        tz: Tz,
69    ) -> Result<Self> {
70        let type_name = match nobits {
71            NoBits::N32 => "Int32",
72            NoBits::N64 => "Int64",
73        };
74        let inner =
75            <dyn ColumnData>::load_data::<BoxColumnWrapper, _>(reader, type_name, size, tz)?;
76
77        Ok(DecimalColumnData {
78            inner,
79            precision,
80            scale,
81            nobits,
82        })
83    }
84}
85
86impl ColumnFrom for Vec<Decimal> {
87    fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
88        let mut data = List::<i64>::with_capacity(source.len());
89        let mut precision = 18;
90        let mut opt_scale = None;
91        for s in source {
92            precision = std::cmp::max(precision, s.precision);
93            if let Some(old_scale) = opt_scale {
94                if old_scale != s.scale {
95                    panic!("scale != scale")
96                }
97            } else {
98                opt_scale = Some(s.scale);
99            }
100
101            data.push(s.internal());
102        }
103        let scale = opt_scale.unwrap_or(4);
104        let inner = Box::new(VectorColumnData { data });
105
106        let column = DecimalColumnData {
107            inner,
108            precision,
109            scale,
110            nobits: NoBits::N64,
111        };
112
113        W::wrap(column)
114    }
115}
116
117impl ColumnFrom for Vec<Option<Decimal>> {
118    fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
119        let mut nulls: Vec<u8> = Vec::with_capacity(source.len());
120        let mut data = List::<i64>::with_capacity(source.len());
121        let mut precision = 18;
122        let mut opt_scale = None;
123        for os in source {
124            if let Some(s) = os {
125                precision = std::cmp::max(precision, s.precision);
126                if let Some(old_scale) = opt_scale {
127                    if old_scale != s.scale {
128                        panic!("scale != scale")
129                    }
130                } else {
131                    opt_scale = Some(s.scale);
132                }
133
134                data.push(s.internal());
135                nulls.push(0);
136            } else {
137                data.push(0);
138                nulls.push(1);
139            }
140        }
141        let scale = opt_scale.unwrap_or(4);
142        let inner = Box::new(VectorColumnData { data });
143
144        let inner = DecimalColumnData {
145            inner,
146            precision,
147            scale,
148            nobits: NoBits::N64,
149        };
150
151        W::wrap(NullableColumnData {
152            nulls,
153            inner: Arc::new(inner),
154        })
155    }
156}
157
158impl ColumnData for DecimalColumnData {
159    fn sql_type(&self) -> SqlType {
160        SqlType::Decimal(self.precision, self.scale)
161    }
162
163    fn save(&self, encoder: &mut Encoder, start: usize, end: usize) {
164        self.inner.save(encoder, start, end)
165    }
166
167    fn len(&self) -> usize {
168        self.inner.len()
169    }
170
171    fn push(&mut self, value: Value) {
172        if let Value::Decimal(decimal) = value {
173            match self.nobits {
174                NoBits::N32 => {
175                    let internal: i32 = decimal.internal();
176                    self.inner.push(internal.into())
177                }
178                NoBits::N64 => {
179                    let internal: i64 = decimal.internal();
180                    self.inner.push(internal.into())
181                }
182            }
183        } else {
184            panic!("value should be decimal ({:?})", value);
185        }
186    }
187
188    fn at(&self, index: usize) -> ValueRef {
189        let underlying: i64 = match self.nobits {
190            NoBits::N32 => i64::from(i32::from(self.inner.at(index))),
191            NoBits::N64 => i64::from(self.inner.at(index)),
192        };
193
194        ValueRef::Decimal(Decimal {
195            underlying,
196            precision: self.precision,
197            scale: self.scale,
198            nobits: self.nobits,
199        })
200    }
201
202    fn clone_instance(&self) -> BoxColumnData {
203        Box::new(Self {
204            inner: self.inner.clone_instance(),
205            precision: self.precision,
206            scale: self.scale,
207            nobits: self.nobits,
208        })
209    }
210
211    unsafe fn get_internal(&self, pointers: &[*mut *const u8], level: u8) -> Result<()> {
212        assert_eq!(level, 0);
213        self.inner.get_internal(pointers, 0)?;
214        *(pointers[2] as *mut NoBits) = self.nobits;
215        Ok(())
216    }
217}
218
219impl<K: ColumnType> ColumnData for DecimalAdapter<K> {
220    fn sql_type(&self) -> SqlType {
221        SqlType::Decimal(self.precision, self.scale)
222    }
223
224    fn save(&self, encoder: &mut Encoder, start: usize, end: usize) {
225        for i in start..end {
226            if let ValueRef::Decimal(decimal) = self.at(i) {
227                match self.nobits {
228                    NoBits::N32 => {
229                        let internal: i32 = decimal.internal();
230                        encoder.write(internal);
231                    }
232                    NoBits::N64 => {
233                        let internal: i64 = decimal.internal();
234                        encoder.write(internal);
235                    }
236                }
237            } else {
238                panic!("should be decimal");
239            }
240        }
241    }
242
243    fn len(&self) -> usize {
244        self.column.len()
245    }
246
247    fn push(&mut self, _: Value) {
248        unimplemented!()
249    }
250
251    fn at(&self, index: usize) -> ValueRef {
252        if let ValueRef::Decimal(decimal) = self.column.at(index) {
253            let mut d = decimal.set_scale(self.scale);
254            d.precision = self.precision;
255            d.nobits = self.nobits;
256            ValueRef::Decimal(d)
257        } else {
258            panic!("should be decimal");
259        }
260    }
261
262    fn clone_instance(&self) -> BoxColumnData {
263        unimplemented!()
264    }
265}
266
267impl<K: ColumnType> ColumnData for NullableDecimalAdapter<K> {
268    fn sql_type(&self) -> SqlType {
269        SqlType::Nullable(SqlType::Decimal(self.precision, self.scale).into())
270    }
271
272    fn save(&self, encoder: &mut Encoder, start: usize, end: usize) {
273        let size = end - start;
274        let mut nulls = vec![0; size];
275        let mut values: Vec<Option<Decimal>> = vec![None; size];
276
277        for (i, index) in (start..end).enumerate() {
278            values[i] = Option::from_sql(self.at(index)).unwrap();
279            if values[i].is_none() {
280                nulls[i] = 1;
281            }
282        }
283
284        encoder.write_bytes(nulls.as_ref());
285
286        for value in values {
287            let underlying = if let Some(v) = value { v.underlying } else { 0 };
288
289            match self.nobits {
290                NoBits::N32 => {
291                    encoder.write(underlying as i32);
292                }
293                NoBits::N64 => {
294                    encoder.write(underlying);
295                }
296            }
297        }
298    }
299
300    fn len(&self) -> usize {
301        self.column.len()
302    }
303
304    fn push(&mut self, _: Value) {
305        unimplemented!()
306    }
307
308    fn at(&self, index: usize) -> ValueRef {
309        let value: Option<Decimal> = Option::from_sql(self.column.at(index)).unwrap();
310        match value {
311            None => ValueRef::Nullable(Either::Left(self.sql_type().into())),
312            Some(mut v) => {
313                v = v.set_scale(self.scale);
314                v.precision = self.precision;
315                v.nobits = self.nobits;
316                let inner = ValueRef::Decimal(v);
317                ValueRef::Nullable(Either::Right(Box::new(inner)))
318            }
319        }
320    }
321
322    fn clone_instance(&self) -> BoxColumnData {
323        unimplemented!()
324    }
325}