opensrv_clickhouse/types/column/
mod.rs1use std::fmt;
16use std::marker;
17use std::net::Ipv4Addr;
18use std::net::Ipv6Addr;
19use std::ops;
20use std::sync::Arc;
21
22use chrono_tz::Tz;
23
24use self::chunk::ChunkColumnData;
25pub use self::column_data::ArcColumnData;
26pub use self::column_data::BoxColumnData;
27pub use self::column_data::ColumnData;
28pub use self::column_data::ColumnDataExt;
29pub use self::concat::ConcatColumnData;
30pub use self::list::List;
31pub use self::nullable::NullableColumnData;
32pub use self::numeric::VectorColumnData;
33pub use self::string::StringColumnData;
34pub use self::string_pool::StringPool;
35pub use self::tuple::TupleColumnData;
36pub use crate::types::column::array::ArrayColumnData;
37
38use crate::binary::Encoder;
39use crate::binary::ReadEx;
40use crate::errors::Error;
41use crate::errors::FromSqlError;
42use crate::errors::Result;
43use crate::types::column::decimal::DecimalAdapter;
44use crate::types::column::decimal::NullableDecimalAdapter;
45use crate::types::column::enums::Enum16Adapter;
46use crate::types::column::enums::Enum8Adapter;
47use crate::types::column::enums::NullableEnum16Adapter;
48use crate::types::column::enums::NullableEnum8Adapter;
49use crate::types::column::fixed_string::FixedStringAdapter;
50use crate::types::column::fixed_string::NullableFixedStringAdapter;
51use crate::types::column::ip::IpColumnData;
52use crate::types::column::ip::Ipv4;
53use crate::types::column::ip::Ipv6;
54use crate::types::column::iter::Iterable;
55use crate::types::column::string::StringAdapter;
56use crate::types::decimal::NoBits;
57use crate::types::SqlType;
58use crate::types::Value;
59use crate::types::ValueRef;
60
61mod array;
62pub(crate) mod chrono_datetime;
63mod chunk;
64mod column_data;
65pub mod concat;
66mod date;
67pub mod datetime64;
68mod decimal;
69mod enums;
70pub mod factory;
71pub(crate) mod fixed_string;
72mod ip;
73pub mod iter;
74mod list;
75mod nullable;
76mod numeric;
77mod string;
78mod string_pool;
79mod tuple;
80
81pub struct Column<K: ColumnType> {
83 pub(crate) name: String,
84 pub(crate) data: ArcColumnData,
85 pub(crate) _marker: marker::PhantomData<K>,
86}
87
88pub trait ColumnFrom {
89 fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper;
90}
91
92pub trait ColumnType: Send + Copy + Sync + 'static {}
93
94#[derive(Copy, Clone, Default)]
95pub struct Simple {
96 _private: (),
97}
98
99#[derive(Copy, Clone, Default)]
100pub struct Complex {
101 _private: (),
102}
103
104impl ColumnType for Simple {}
105
106impl ColumnType for Complex {}
107
108impl<K: ColumnType> ColumnFrom for Column<K> {
109 fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
110 W::wrap_arc(source.data)
111 }
112}
113
114impl<L: ColumnType, R: ColumnType> PartialEq<Column<R>> for Column<L> {
115 fn eq(&self, other: &Column<R>) -> bool {
116 if self.len() != other.len() {
117 return false;
118 }
119
120 if self.sql_type() != other.sql_type() {
121 return false;
122 }
123
124 for i in 0..self.len() {
125 if self.at(i) != other.at(i) {
126 return false;
127 }
128 }
129
130 true
131 }
132}
133
134impl<K: ColumnType> Clone for Column<K> {
135 fn clone(&self) -> Self {
136 Self {
137 name: self.name.clone(),
138 data: self.data.clone(),
139 _marker: marker::PhantomData,
140 }
141 }
142}
143
144impl Column<Simple> {
145 pub(crate) fn concat<'a, I>(items: I) -> Column<Complex>
146 where
147 I: Iterator<Item = &'a Self>,
148 {
149 let items_vec: Vec<&Self> = items.collect();
150 let chunks: Vec<_> = items_vec.iter().map(|column| column.data.clone()).collect();
151 match items_vec.first() {
152 None => unreachable!(),
153 Some(first_column) => {
154 let name: String = first_column.name().to_string();
155 let data = ConcatColumnData::concat(chunks);
156 Column {
157 name,
158 data: Arc::new(data),
159 _marker: marker::PhantomData,
160 }
161 }
162 }
163 }
164}
165
166impl<K: ColumnType> Column<K> {
167 pub fn iter<'a, T: Iterable<'a, K>>(&'a self) -> Result<T::Iter> {
205 <T as Iterable<'a, K>>::iter(self, self.sql_type())
206 }
207}
208
209impl<K: ColumnType> Column<K> {
210 pub(crate) fn read<R: ReadEx>(reader: &mut R, size: usize, tz: Tz) -> Result<Column<K>> {
211 let name = reader.read_string()?;
212 let type_name = reader.read_string()?;
213 let data =
214 <dyn ColumnData>::load_data::<ArcColumnWrapper, _>(reader, &type_name, size, tz)?;
215 let column = Self {
216 name,
217 data,
218 _marker: marker::PhantomData,
219 };
220 Ok(column)
221 }
222
223 #[inline(always)]
224 pub fn name(&self) -> &str {
225 &self.name
226 }
227
228 #[inline(always)]
229 pub fn sql_type(&self) -> SqlType {
230 self.data.sql_type()
231 }
232
233 #[inline(always)]
234 pub(crate) fn at(&self, index: usize) -> ValueRef {
235 self.data.at(index)
236 }
237
238 pub(crate) fn write(&self, encoder: &mut Encoder) {
239 encoder.string(&self.name);
240 encoder.string(self.data.sql_type().to_string().as_ref());
241 let len = self.data.len();
242 self.data.save(encoder, 0, len);
243 }
244
245 #[inline(always)]
246 pub(crate) fn len(&self) -> usize {
247 self.data.len()
248 }
249
250 pub(crate) fn slice(&self, range: ops::Range<usize>) -> Column<Complex> {
251 let data = ChunkColumnData::new(self.data.clone(), range);
252 Column {
253 name: self.name.clone(),
254 data: Arc::new(data),
255 _marker: marker::PhantomData,
256 }
257 }
258
259 pub(crate) fn cast_to(self, dst_type: SqlType) -> Result<Self> {
260 let src_type = self.sql_type();
261
262 if dst_type == src_type {
263 return Ok(self);
264 }
265
266 match (dst_type.clone(), src_type.clone()) {
267 (SqlType::FixedString(str_len), SqlType::String) => {
268 let name = self.name().to_owned();
269 let adapter = FixedStringAdapter {
270 column: self,
271 str_len,
272 };
273 Ok(Column {
274 name,
275 data: Arc::new(adapter),
276 _marker: marker::PhantomData,
277 })
278 }
279 (
280 SqlType::Nullable(SqlType::FixedString(str_len)),
281 SqlType::Nullable(SqlType::String),
282 ) => {
283 let name = self.name().to_owned();
284 let adapter = NullableFixedStringAdapter {
285 column: self,
286 str_len: *str_len,
287 };
288 Ok(Column {
289 name,
290 data: Arc::new(adapter),
291 _marker: marker::PhantomData,
292 })
293 }
294 (SqlType::String, SqlType::Array(SqlType::UInt8)) => {
295 let name = self.name().to_owned();
296 let adapter = StringAdapter { column: self };
297 Ok(Column {
298 name,
299 data: Arc::new(adapter),
300 _marker: marker::PhantomData,
301 })
302 }
303 (SqlType::FixedString(n), SqlType::Array(SqlType::UInt8)) => {
304 let string_column = self.cast_to(SqlType::String)?;
305 string_column.cast_to(SqlType::FixedString(n))
306 }
307 (SqlType::Decimal(dst_p, dst_s), SqlType::Decimal(_, _)) => {
308 let name = self.name().to_owned();
309 let nobits = NoBits::from_precision(dst_p).unwrap();
310 let adapter = DecimalAdapter {
311 column: self,
312 precision: dst_p,
313 scale: dst_s,
314 nobits,
315 };
316 Ok(Column {
317 name,
318 data: Arc::new(adapter),
319 _marker: marker::PhantomData,
320 })
321 }
322 (SqlType::Enum8(enum_values), SqlType::Enum8(_)) => {
323 let name = self.name().to_owned();
324 let adapter = Enum8Adapter {
325 column: self,
326 enum_values,
327 };
328 Ok(Column {
329 name,
330 data: Arc::new(adapter),
331 _marker: marker::PhantomData,
332 })
333 }
334 (SqlType::Enum16(enum_values), SqlType::Enum16(_)) => {
335 let name = self.name().to_owned();
336 let adapter = Enum16Adapter {
337 column: self,
338 enum_values,
339 };
340 Ok(Column {
341 name,
342 data: Arc::new(adapter),
343 _marker: marker::PhantomData,
344 })
345 }
346 (
347 SqlType::Nullable(SqlType::Enum8(enum_values)),
348 SqlType::Nullable(SqlType::Enum8(_)),
349 ) => {
350 let name = self.name().to_owned();
351 let enum_values = enum_values.clone();
352 let adapter = NullableEnum8Adapter {
353 column: self,
354 enum_values,
355 };
356 Ok(Column {
357 name,
358 data: Arc::new(adapter),
359 _marker: marker::PhantomData,
360 })
361 }
362 (
363 SqlType::Nullable(SqlType::Enum16(enum_values)),
364 SqlType::Nullable(SqlType::Enum16(_)),
365 ) => {
366 let name = self.name().to_owned();
367 let enum_values = enum_values.clone();
368 let adapter = NullableEnum16Adapter {
369 column: self,
370 enum_values,
371 };
372 Ok(Column {
373 name,
374 data: Arc::new(adapter),
375 _marker: marker::PhantomData,
376 })
377 }
378 (
379 SqlType::Nullable(SqlType::Decimal(dst_p, dst_s)),
380 SqlType::Nullable(SqlType::Decimal(_, _)),
381 ) => {
382 let name = self.name().to_owned();
383 let nobits = NoBits::from_precision(*dst_p).unwrap();
384 let adapter = NullableDecimalAdapter {
385 column: self,
386 precision: *dst_p,
387 scale: *dst_s,
388 nobits,
389 };
390 Ok(Column {
391 name,
392 data: Arc::new(adapter),
393 _marker: marker::PhantomData,
394 })
395 }
396 (SqlType::Ipv4, SqlType::String) => {
397 let name = self.name().to_owned();
398
399 let n = self.len();
400 let mut inner = Vec::with_capacity(n);
401 for i in 0..n {
402 let source = self.at(i).as_str().unwrap();
403 let ip: Ipv4Addr = source.parse().unwrap();
404 let mut buffer = [0_u8; 4];
405 buffer.copy_from_slice(&ip.octets());
406 buffer.reverse();
407 inner.extend(buffer);
408 }
409
410 let data = Arc::new(IpColumnData::<Ipv4> {
411 inner,
412 phantom: marker::PhantomData,
413 });
414
415 Ok(Column {
416 name,
417 data,
418 _marker: marker::PhantomData,
419 })
420 }
421 (SqlType::Ipv6, SqlType::String) => {
422 let name = self.name().to_owned();
423
424 let n = self.len();
425 let mut inner = Vec::with_capacity(n);
426 for i in 0..n {
427 let source = self.at(i).as_str().unwrap();
428 let ip: Ipv6Addr = source.parse().unwrap();
429 inner.extend(ip.octets());
430 }
431
432 let data = Arc::new(IpColumnData::<Ipv6> {
433 inner,
434 phantom: marker::PhantomData,
435 });
436
437 Ok(Column {
438 name,
439 data,
440 _marker: marker::PhantomData,
441 })
442 }
443 _ => {
444 if let Some(data) = self.data.cast_to(&self.data, &dst_type) {
445 let name = self.name().to_owned();
446 Ok(Column {
447 name,
448 data,
449 _marker: marker::PhantomData,
450 })
451 } else {
452 Err(Error::FromSql(FromSqlError::InvalidType {
453 src: src_type.to_string(),
454 dst: dst_type.to_string(),
455 }))
456 }
457 }
458 }
459 }
460
461 pub(crate) fn push(&mut self, value: Value) {
462 loop {
463 match Arc::get_mut(&mut self.data) {
464 None => {
465 self.data = Arc::from(self.data.clone_instance());
466 }
467 Some(data) => {
468 data.push(value);
469 break;
470 }
471 }
472 }
473 }
474
475 pub(crate) unsafe fn get_internal(&self, pointers: &[*mut *const u8], level: u8) -> Result<()> {
476 self.data.get_internal(pointers, level)
477 }
478}
479
480pub fn new_column<K: ColumnType>(
481 name: &str,
482 data: Arc<(dyn ColumnData + Sync + Send + 'static)>,
483) -> Column<K> {
484 Column {
485 name: name.to_string(),
486 data,
487 _marker: marker::PhantomData,
488 }
489}
490
491#[derive(Clone, Debug, Eq, PartialEq)]
492pub enum Either<L, R>
493where
494 L: fmt::Debug + PartialEq + Clone,
495 R: fmt::Debug + PartialEq + Clone,
496{
497 Left(L),
498 Right(R),
499}
500
501pub trait ColumnWrapper {
502 type Wrapper;
503 fn wrap<T: ColumnData + Send + Sync + 'static>(column: T) -> Self::Wrapper;
504
505 fn wrap_arc(data: ArcColumnData) -> Self::Wrapper;
506}
507
508pub struct ArcColumnWrapper {
509 _private: (),
510}
511
512impl ColumnWrapper for ArcColumnWrapper {
513 type Wrapper = Arc<dyn ColumnData + Send + Sync>;
514
515 fn wrap<T: ColumnData + Send + Sync + 'static>(column: T) -> Self::Wrapper {
516 Arc::new(column)
517 }
518
519 fn wrap_arc(data: ArcColumnData) -> Self::Wrapper {
520 data
521 }
522}
523
524pub(crate) struct BoxColumnWrapper {
525 _private: (),
526}
527
528impl ColumnWrapper for BoxColumnWrapper {
529 type Wrapper = Box<dyn ColumnData + Send + Sync>;
530
531 fn wrap<T: ColumnData + Send + Sync + 'static>(column: T) -> Self::Wrapper {
532 Box::new(column)
533 }
534
535 fn wrap_arc(_: ArcColumnData) -> Self::Wrapper {
536 unimplemented!()
537 }
538}