opensrv_clickhouse/types/column/
decimal.rs1use std::sync::Arc;
16
17use chrono_tz::Tz;
18
19use crate::binary::Encoder;
20use crate::binary::ReadEx;
21use crate::errors::Result;
22use crate::types::column::column_data::BoxColumnData;
23use crate::types::column::column_data::ColumnData;
24use crate::types::column::list::List;
25use crate::types::column::nullable::NullableColumnData;
26use crate::types::column::BoxColumnWrapper;
27use crate::types::column::ColumnFrom;
28use crate::types::column::ColumnWrapper;
29use crate::types::column::Either;
30use crate::types::column::VectorColumnData;
31use crate::types::decimal::Decimal;
32use crate::types::decimal::NoBits;
33use crate::types::from_sql::FromSql;
34use crate::types::Column;
35use crate::types::ColumnType;
36use crate::types::SqlType;
37use crate::types::Value;
38use crate::types::ValueRef;
39
40pub(crate) struct DecimalColumnData {
41 pub(crate) inner: Box<dyn ColumnData + Send + Sync>,
42 pub(crate) precision: u8,
43 pub(crate) scale: u8,
44 pub(crate) nobits: NoBits,
45}
46
47pub(crate) struct DecimalAdapter<K: ColumnType> {
48 pub(crate) column: Column<K>,
49 pub(crate) precision: u8,
50 pub(crate) scale: u8,
51 pub(crate) nobits: NoBits,
52}
53
54pub(crate) struct NullableDecimalAdapter<K: ColumnType> {
55 pub(crate) column: Column<K>,
56 pub(crate) precision: u8,
57 pub(crate) scale: u8,
58 pub(crate) nobits: NoBits,
59}
60
61impl DecimalColumnData {
62 pub(crate) fn load<T: ReadEx>(
63 reader: &mut T,
64 precision: u8,
65 scale: u8,
66 nobits: NoBits,
67 size: usize,
68 tz: Tz,
69 ) -> Result<Self> {
70 let type_name = match nobits {
71 NoBits::N32 => "Int32",
72 NoBits::N64 => "Int64",
73 };
74 let inner =
75 <dyn ColumnData>::load_data::<BoxColumnWrapper, _>(reader, type_name, size, tz)?;
76
77 Ok(DecimalColumnData {
78 inner,
79 precision,
80 scale,
81 nobits,
82 })
83 }
84}
85
86impl ColumnFrom for Vec<Decimal> {
87 fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
88 let mut data = List::<i64>::with_capacity(source.len());
89 let mut precision = 18;
90 let mut opt_scale = None;
91 for s in source {
92 precision = std::cmp::max(precision, s.precision);
93 if let Some(old_scale) = opt_scale {
94 if old_scale != s.scale {
95 panic!("scale != scale")
96 }
97 } else {
98 opt_scale = Some(s.scale);
99 }
100
101 data.push(s.internal());
102 }
103 let scale = opt_scale.unwrap_or(4);
104 let inner = Box::new(VectorColumnData { data });
105
106 let column = DecimalColumnData {
107 inner,
108 precision,
109 scale,
110 nobits: NoBits::N64,
111 };
112
113 W::wrap(column)
114 }
115}
116
117impl ColumnFrom for Vec<Option<Decimal>> {
118 fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
119 let mut nulls: Vec<u8> = Vec::with_capacity(source.len());
120 let mut data = List::<i64>::with_capacity(source.len());
121 let mut precision = 18;
122 let mut opt_scale = None;
123 for os in source {
124 if let Some(s) = os {
125 precision = std::cmp::max(precision, s.precision);
126 if let Some(old_scale) = opt_scale {
127 if old_scale != s.scale {
128 panic!("scale != scale")
129 }
130 } else {
131 opt_scale = Some(s.scale);
132 }
133
134 data.push(s.internal());
135 nulls.push(0);
136 } else {
137 data.push(0);
138 nulls.push(1);
139 }
140 }
141 let scale = opt_scale.unwrap_or(4);
142 let inner = Box::new(VectorColumnData { data });
143
144 let inner = DecimalColumnData {
145 inner,
146 precision,
147 scale,
148 nobits: NoBits::N64,
149 };
150
151 W::wrap(NullableColumnData {
152 nulls,
153 inner: Arc::new(inner),
154 })
155 }
156}
157
158impl ColumnData for DecimalColumnData {
159 fn sql_type(&self) -> SqlType {
160 SqlType::Decimal(self.precision, self.scale)
161 }
162
163 fn save(&self, encoder: &mut Encoder, start: usize, end: usize) {
164 self.inner.save(encoder, start, end)
165 }
166
167 fn len(&self) -> usize {
168 self.inner.len()
169 }
170
171 fn push(&mut self, value: Value) {
172 if let Value::Decimal(decimal) = value {
173 match self.nobits {
174 NoBits::N32 => {
175 let internal: i32 = decimal.internal();
176 self.inner.push(internal.into())
177 }
178 NoBits::N64 => {
179 let internal: i64 = decimal.internal();
180 self.inner.push(internal.into())
181 }
182 }
183 } else {
184 panic!("value should be decimal ({:?})", value);
185 }
186 }
187
188 fn at(&self, index: usize) -> ValueRef {
189 let underlying: i64 = match self.nobits {
190 NoBits::N32 => i64::from(i32::from(self.inner.at(index))),
191 NoBits::N64 => i64::from(self.inner.at(index)),
192 };
193
194 ValueRef::Decimal(Decimal {
195 underlying,
196 precision: self.precision,
197 scale: self.scale,
198 nobits: self.nobits,
199 })
200 }
201
202 fn clone_instance(&self) -> BoxColumnData {
203 Box::new(Self {
204 inner: self.inner.clone_instance(),
205 precision: self.precision,
206 scale: self.scale,
207 nobits: self.nobits,
208 })
209 }
210
211 unsafe fn get_internal(&self, pointers: &[*mut *const u8], level: u8) -> Result<()> {
212 assert_eq!(level, 0);
213 self.inner.get_internal(pointers, 0)?;
214 *(pointers[2] as *mut NoBits) = self.nobits;
215 Ok(())
216 }
217}
218
219impl<K: ColumnType> ColumnData for DecimalAdapter<K> {
220 fn sql_type(&self) -> SqlType {
221 SqlType::Decimal(self.precision, self.scale)
222 }
223
224 fn save(&self, encoder: &mut Encoder, start: usize, end: usize) {
225 for i in start..end {
226 if let ValueRef::Decimal(decimal) = self.at(i) {
227 match self.nobits {
228 NoBits::N32 => {
229 let internal: i32 = decimal.internal();
230 encoder.write(internal);
231 }
232 NoBits::N64 => {
233 let internal: i64 = decimal.internal();
234 encoder.write(internal);
235 }
236 }
237 } else {
238 panic!("should be decimal");
239 }
240 }
241 }
242
243 fn len(&self) -> usize {
244 self.column.len()
245 }
246
247 fn push(&mut self, _: Value) {
248 unimplemented!()
249 }
250
251 fn at(&self, index: usize) -> ValueRef {
252 if let ValueRef::Decimal(decimal) = self.column.at(index) {
253 let mut d = decimal.set_scale(self.scale);
254 d.precision = self.precision;
255 d.nobits = self.nobits;
256 ValueRef::Decimal(d)
257 } else {
258 panic!("should be decimal");
259 }
260 }
261
262 fn clone_instance(&self) -> BoxColumnData {
263 unimplemented!()
264 }
265}
266
267impl<K: ColumnType> ColumnData for NullableDecimalAdapter<K> {
268 fn sql_type(&self) -> SqlType {
269 SqlType::Nullable(SqlType::Decimal(self.precision, self.scale).into())
270 }
271
272 fn save(&self, encoder: &mut Encoder, start: usize, end: usize) {
273 let size = end - start;
274 let mut nulls = vec![0; size];
275 let mut values: Vec<Option<Decimal>> = vec![None; size];
276
277 for (i, index) in (start..end).enumerate() {
278 values[i] = Option::from_sql(self.at(index)).unwrap();
279 if values[i].is_none() {
280 nulls[i] = 1;
281 }
282 }
283
284 encoder.write_bytes(nulls.as_ref());
285
286 for value in values {
287 let underlying = if let Some(v) = value { v.underlying } else { 0 };
288
289 match self.nobits {
290 NoBits::N32 => {
291 encoder.write(underlying as i32);
292 }
293 NoBits::N64 => {
294 encoder.write(underlying);
295 }
296 }
297 }
298 }
299
300 fn len(&self) -> usize {
301 self.column.len()
302 }
303
304 fn push(&mut self, _: Value) {
305 unimplemented!()
306 }
307
308 fn at(&self, index: usize) -> ValueRef {
309 let value: Option<Decimal> = Option::from_sql(self.column.at(index)).unwrap();
310 match value {
311 None => ValueRef::Nullable(Either::Left(self.sql_type().into())),
312 Some(mut v) => {
313 v = v.set_scale(self.scale);
314 v.precision = self.precision;
315 v.nobits = self.nobits;
316 let inner = ValueRef::Decimal(v);
317 ValueRef::Nullable(Either::Right(Box::new(inner)))
318 }
319 }
320 }
321
322 fn clone_instance(&self) -> BoxColumnData {
323 unimplemented!()
324 }
325}