1use std::borrow::{Borrow, BorrowMut};
2use std::fmt;
3use std::io::Write;
4use std::mem::{align_of, size_of};
5use std::net::{Ipv4Addr, Ipv6Addr};
6use std::slice::Iter;
7use std::str::from_utf8;
8
9use chrono::{Date, DateTime, Utc};
10use tokio::io::{AsyncBufRead, AsyncReadExt};
11use uuid::Uuid;
12
13use super::block::{BlockColumn, ServerBlock};
14use super::decoder::ValueReader;
15#[cfg(feature = "int128")]
16use super::value::ValueDecimal128;
17use super::value::{
18 ValueDate, ValueDateTime, ValueDateTime64, ValueDecimal32, ValueDecimal64, ValueIp4, ValueIp6,
19 ValueUuid,
20};
21
22use super::{Value, ValueRefEnum};
23use crate::errors::{ConversionError, DriverError, Result};
24#[cfg(feature = "int128")]
25use crate::types::Decimal128;
26use crate::types::{Decimal, Decimal32, Decimal64, Field, FieldMeta, SqlType};
27
28use std::cmp::Ordering;
29use std::fmt::Formatter;
30
31macro_rules! err {
32 ($err: expr) => {
33 Err($err.into())
34 };
35}
36
37pub trait AsOutColumn {
38 fn len(&self) -> usize;
40 fn encode(&self, field: &Field, writer: &mut dyn Write) -> Result<()>;
42 fn is_compatible(&self, field: &Field) -> bool;
45}
46
47pub trait AsInColumn: Send {
49 unsafe fn get_at(&self, index: u64) -> ValueRef<'_>;
50}
51impl AsInColumn for () {
53 unsafe fn get_at(&self, _: u64) -> ValueRef<'_> {
54 ValueRef { inner: None }
55 }
56}
57
58pub struct ColumnDataAdapter<'b> {
60 pub(crate) name: &'b str,
62 pub(crate) flag: u8,
64 pub(crate) data: Box<dyn AsOutColumn + 'b>,
65}
66
67#[derive(Debug)]
70pub struct ValueRef<'a> {
71 inner: Option<ValueRefEnum<'a>>,
73}
74impl<'a> Value<'a, DateTime<Utc>> for ValueRef<'a> {
76 fn get(&'a self, field: &'a Field) -> Result<Option<DateTime<Utc>>> {
77 match self.inner {
78 Some(ValueRefEnum::DateTime(v)) => Ok(Some(v.to_datetime())),
79 Some(ValueRefEnum::DateTime64(v)) => {
80 if let SqlType::DateTime64(p, _) = field.sql_type {
81 Ok(Some(v.to_datetime(p)))
82 } else {
83 err!(ConversionError::UnsupportedConversion)
85 }
86 }
87 Some(ValueRefEnum::Date(v)) => {
88 let d = v.to_date();
89 Ok(Some(d.and_hms(0, 0, 0)))
90 }
91 _ => err!(ConversionError::UnsupportedConversion),
92 }
93 }
94}
95impl<'a> Value<'a, &'a str> for ValueRef<'a> {
97 fn get(&'a self, field: &'a Field) -> Result<Option<&'_ str>> {
98 match self.inner {
99 Some(ValueRefEnum::String(v)) => Ok(Some(from_utf8(v)?)),
100 Some(ValueRefEnum::Enum(v)) => {
101 let meta = field.get_meta().expect("corrupted enum index");
103 let title = v.transcode(meta);
104 Ok(Some(from_utf8(title)?))
105 }
106 None => Ok(None),
107 _ => err!(ConversionError::UnsupportedConversion),
108 }
109 }
110}
111
112impl<'a> Value<'a, &'a [u8]> for ValueRef<'a> {
114 fn get(&'a self, field: &'a Field) -> Result<Option<&'_ [u8]>> {
115 match self.inner {
116 Some(ValueRefEnum::String(v)) => Ok(Some(v)),
117 Some(ValueRefEnum::Enum(v)) => {
118 let meta = field.get_meta().expect("corrupted enum index");
119 let title = v.transcode(meta);
120 Ok(Some(title))
121 }
122 Some(ValueRefEnum::Array8(v)) => Ok(Some(v)),
123 _ => err!(ConversionError::UnsupportedConversion),
124 }
125 }
126}
127
128impl<'a> Value<'a, i16> for ValueRef<'a> {
129 #[allow(clippy::unnested_or_patterns)]
130 fn get(&'a self, _: &'a Field) -> Result<Option<i16>> {
131 match self.inner {
132 Some(ValueRefEnum::Int16(v)) | Some(ValueRefEnum::Enum(v)) => Ok(Some(v)),
133 _ => err!(ConversionError::UnsupportedConversion),
134 }
135 }
136}
137
138#[inline]
139fn decimal_scale_from_field(field: &Field) -> u8 {
140 match field.sql_type {
141 SqlType::Decimal(_, s) => s,
142 _ => 0, }
144}
145
146macro_rules! impl_value {
147 ($f:ty, $vr:path) => {
148 impl<'a> Value<'a, $f> for ValueRef<'a> {
149 fn get(&'a self, _: &'a Field) -> Result<Option<$f>> {
150 match self.inner {
151 Some($vr(v)) => Ok(Some(v.into())),
152 None => Ok(None),
153 _ => err!(ConversionError::UnsupportedConversion),
154 }
155 }
156 }
157 };
158 ($f:ty, $vr:path, decimal) => {
159 impl<'a> Value<'a, $f> for ValueRef<'a> {
160 fn get(&'a self, field: &'a Field) -> Result<Option<$f>> {
161 match self.inner {
162 Some($vr(v)) => {
163 let scale = decimal_scale_from_field(field);
164 Ok(Some(Decimal::from(v.0, scale)))
165 }
166 _ => err!(ConversionError::UnsupportedConversion),
167 }
168 }
169 }
170 };
171}
172impl_value!(f32, ValueRefEnum::Float32);
175impl_value!(f64, ValueRefEnum::Float64);
176impl_value!(Ipv4Addr, ValueRefEnum::Ip4);
178impl_value!(Ipv6Addr, ValueRefEnum::Ip6);
180impl_value!(Uuid, ValueRefEnum::Uuid);
182
183impl_value!(u64, ValueRefEnum::UInt64);
185impl_value!(i64, ValueRefEnum::Int64);
186impl_value!(u32, ValueRefEnum::UInt32);
187impl_value!(i32, ValueRefEnum::Int32);
188impl_value!(u16, ValueRefEnum::UInt16);
189impl_value!(u8, ValueRefEnum::UInt8);
192impl_value!(i8, ValueRefEnum::Int8);
193impl_value!(Decimal32, ValueRefEnum::Decimal32, decimal);
195impl_value!(Decimal64, ValueRefEnum::Decimal64, decimal);
197#[cfg(feature = "int128")]
199impl_value!(Decimal128, ValueRefEnum::Decimal128, decimal);
200
201#[inline]
205fn transmute_slice<T, U>(r: &[T]) -> &[U] {
206 debug_assert_eq!(std::mem::size_of::<T>(), std::mem::size_of::<U>());
207 debug_assert_eq!(std::mem::align_of::<T>(), std::mem::align_of::<U>());
208 unsafe { std::slice::from_raw_parts(r.as_ptr() as *const U, r.len()) }
209}
210
211macro_rules! impl_array_value_ref {
212 ($f:ty, $vr:path) => {
213 impl<'a> Value<'a, &'a [$f]> for ValueRef<'a> {
214 fn get(&'a self, _: &'a Field) -> Result<Option<&'a [$f]>> {
215 match self.inner {
216 Some($vr(v)) => Ok(Some(v)),
217 _ => err!(ConversionError::UnsupportedConversion),
218 }
219 }
220 }
221 };
222 ($f:ty, $vr:path, transmute) => {
223 impl<'a> Value<'a, &'a [$f]> for ValueRef<'a> {
224 fn get(&'a self, _: &'a Field) -> Result<Option<&'a [$f]>> {
225 match self.inner {
226 Some($vr(v)) => Ok(Some(transmute_slice(v))),
227 _ => err!(ConversionError::UnsupportedConversion),
228 }
229 }
230 }
231 };
232}
233impl_array_value_ref!(i8, ValueRefEnum::Array8, transmute);
237impl_array_value_ref!(u16, ValueRefEnum::Array16);
238impl_array_value_ref!(i16, ValueRefEnum::Array16, transmute);
239impl_array_value_ref!(u32, ValueRefEnum::Array32);
240impl_array_value_ref!(i32, ValueRefEnum::Array32, transmute);
241impl_array_value_ref!(u64, ValueRefEnum::Array64);
242impl_array_value_ref!(i64, ValueRefEnum::Array64, transmute);
243impl_array_value_ref!(f32, ValueRefEnum::Array32, transmute);
244impl_array_value_ref!(f64, ValueRefEnum::Array64, transmute);
245
246macro_rules! impl_array_to_vec {
249 ($t:ty, $refenum:path,$f:expr) => {
250 impl<'a> Value<'a, Vec<$t>> for ValueRef<'a> {
251 fn get(&'a self, _: &'a Field) -> Result<Option<Vec<$t>>> {
252 match self.inner {
253 Some($refenum(v)) => {
254 let vec = v.iter().map($f).collect();
255 Ok(Some(vec))
256 }
257 _ => err!(ConversionError::UnsupportedConversion),
258 }
259 }
260 }
261 };
262 ($t:ty, $refenum:path,$f:expr,decimal) => {
263 impl<'a> Value<'a, Vec<$t>> for ValueRef<'a> {
264 fn get(&'a self, field: &'a Field) -> Result<Option<Vec<$t>>> {
265 match self.inner {
266 Some($refenum(v)) => {
267 let scale = decimal_scale_from_field(field);
268
269 let decimal = v.iter().map(|item| $f(*item, scale)).collect();
270
271 Ok(Some(decimal))
272 }
273 _ => err!(ConversionError::UnsupportedConversion),
274 }
275 }
276 }
277 };
278}
279impl_array_to_vec!(Date<Utc>, ValueRefEnum::Array16, |item| {
282 ValueDate::date_inner(*item as i16)
283});
284impl_array_to_vec!(DateTime<Utc>, ValueRefEnum::Array32, |item| {
285 ValueDateTime::datetime_inner(*item as i32)
286});
287impl_array_to_vec!(Ipv4Addr, ValueRefEnum::Array32, |item| Ipv4Addr::from(
288 *item
289));
290impl_array_to_vec!(Ipv6Addr, ValueRefEnum::Array128, |item| Ipv6Addr::from(
291 (*item).swap_bytes()
292));
293impl_array_to_vec!(
297 Decimal32,
298 ValueRefEnum::Array32,
299 |item, scale| Decimal::from(item as i32, scale),
300 decimal
301);
302impl_array_to_vec!(
303 Decimal64,
304 ValueRefEnum::Array64,
305 |item, scale| Decimal::from(item as i64, scale),
306 decimal
307);
308pub trait Deserialize: Sized {
316 fn deserialize(row: Row) -> Result<Self>;
317}
318#[derive(Debug)]
320pub struct Row<'a> {
321 col: Vec<ValueRef<'a>>,
323 block: &'a ServerBlock,
325}
326
327impl<'a> Row<'a> {
328 pub unsafe fn create(block: &'a ServerBlock, row_index: u64) -> Row<'a> {
333 let col: Vec<_> = block
334 .columns
335 .iter()
336 .map(|c| c.data.get_at(row_index))
337 .collect();
338 Row { col, block }
339 }
340 #[inline]
343 pub fn len(&self) -> usize {
344 self.col.len()
345 }
346 #[inline]
349 pub fn is_empty(&self) -> bool {
350 self.col.is_empty()
351 }
352
353 pub fn iter_columns(&self) -> Iter<BlockColumn> {
354 self.block.columns.iter()
355 }
356
357 pub fn iter_values(&self) -> Iter<ValueRef<'_>> {
358 self.col.iter()
359 }
360 pub fn value<T>(&'a self, index: usize) -> Result<Option<T>>
366 where
367 T: 'a,
368 ValueRef<'a>: Value<'a, T>,
369 {
370 let value_ref = self.col.get(index).ok_or(DriverError::IndexOutOfRange)?;
371 let field = self
372 .block
373 .columns
374 .get(index)
375 .expect("column index out of range")
376 .header
377 .field
378 .borrow();
379
380 value_ref.get(field)
381 }
382 pub unsafe fn value_unchecked<T>(&'a self, index: usize) -> Option<T>
390 where
391 T: 'a,
392 ValueRef<'a>: Value<'a, T>,
393 {
394 assert!(self.col.len() > index);
395 let value_ref = self.col.get_unchecked(index);
396 let field = self
397 .block
398 .columns
399 .get_unchecked(index)
400 .header
401 .field
402 .borrow();
403
404 value_ref.get(field).expect("Conversion error")
405 }
406
407 #[inline]
408 pub fn column_descr(&self, index: usize) -> Option<&BlockColumn> {
409 self.block.columns.get(index)
410 }
411 pub fn column_index(&self, name: &str) -> usize {
413 let item = self
414 .block
415 .columns
416 .iter()
417 .enumerate()
418 .find(|(_i, c)| c.header.name.eq(name))
419 .unwrap();
420 item.0
421 }
422 pub fn deserialize<D: Deserialize>(self) -> Result<D> {
425 <D as Deserialize>::deserialize(self)
426 }
427}
428
429impl<'a, C: AsInColumn + ?Sized + 'a> AsInColumn for Box<C> {
430 #[inline]
431 unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
432 self.as_ref().get_at(index)
433 }
434}
435pub(crate) type BoxString = Box<[u8]>;
439
440#[derive(Clone)]
444pub struct EnumIndex<T>(pub T, pub BoxString);
445
446impl<T> EnumIndex<T> {
447 #[inline]
448 pub(crate) unsafe fn as_str(&self) -> &str {
449 std::str::from_utf8_unchecked(self.1.as_ref())
450 }
451}
452
453impl<T: Ord + Copy> EnumIndex<T> {
454 #[inline]
456 pub(crate) fn fn_sort_val(item1: &EnumIndex<T>) -> T {
457 item1.0
458 }
459 #[inline]
461 pub(crate) fn fn_sort_str(item1: &EnumIndex<T>, item2: &EnumIndex<T>) -> Ordering {
462 Ord::cmp(item1.1.as_ref(), &item2.1.as_ref())
463 }
464}
465
466impl<T: PartialEq> PartialEq for EnumIndex<T> {
467 fn eq(&self, other: &Self) -> bool {
468 self.0 == other.0
469 }
470}
471
472impl<T: Copy + fmt::Display> fmt::Display for EnumIndex<T> {
473 #[allow(unused_must_use)]
475 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
476 use std::fmt::Write;
477 f.write_str("'")?;
478 let s = unsafe { self.as_str() };
481 s.escape_default().for_each(|c| {
482 f.write_char(c);
483 });
484 f.write_str("' = ")?;
485 f.write_fmt(format_args!("{}", self.0))
486 }
487}
488
489pub trait EnumTranscode: Copy {
490 fn transcode(self, meta: &FieldMeta) -> &[u8];
492}
493
494impl EnumTranscode for i16 {
498 #[inline]
499 fn transcode(self, meta: &FieldMeta) -> &[u8] {
500 meta.val2str(self)
501 }
502}
503
504pub(crate) struct EnumColumn<T> {
506 data: Vec<T>,
507}
508
509impl<T: Send> EnumColumn<T> {
511 pub(crate) async fn load_column<R: AsyncBufRead + Unpin>(
514 mut reader: R,
515 rows: u64,
516 field: &Field,
517 ) -> Result<EnumColumn<T>> {
518 debug_assert!(field.get_meta().is_some());
519
520 let mut data: Vec<T> = Vec::with_capacity(rows as usize);
521 unsafe {
522 data.set_len(rows as usize);
523 reader.read_exact(as_bytes_bufer_mut(&mut data)).await?;
524 }
525 Ok(EnumColumn { data })
526 }
527 pub(crate) fn set_nulls(self, _nulls: Option<Vec<u8>>) -> Box<EnumColumn<T>> {
528 Box::new(self)
530 }
531}
532
533impl<'a, T: Copy + Send + Into<i16>> AsInColumn for EnumColumn<T> {
534 unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
538 assert!((index as usize) < self.data.len());
539 let enum_value: i16 = (*self.data.get_unchecked(index as usize)).into();
540
541 ValueRef {
542 inner: Some(ValueRefEnum::Enum(enum_value)),
543 }
544 }
545}
546
547pub(crate) struct FixedColumn<T: Sized> {
550 data: Vec<T>,
551}
552
553impl<T: Sized> FixedColumn<T> {
554 #[cfg(feature = "is_sorted")]
555 pub fn is_sorted(&self) -> bool {
556 self.data.is_sorted()
557 }
558 #[cfg(not(feature = "is_sorted"))]
559 #[inline]
560 pub fn is_sorted(&self) -> bool {
561 true
565 }
566}
567
568pub(crate) struct FixedNullColumn<T: Sized> {
570 inner: FixedColumn<T>,
571 nulls: Vec<u8>,
580}
581
582impl<'a, T> AsInColumn for FixedColumn<T>
583where
584 T: Send + Sized + ValueIndex + 'static,
585{
586 unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
587 debug_assert!((index as usize) < self.data.len());
588 ValueRef {
589 inner: Some(ValueIndex::valueref_at(self.data.as_slice(), index)),
590 }
591 }
592}
593#[inline]
597pub(crate) unsafe fn as_bytes_bufer_mut<T>(v: &mut [T]) -> &mut [u8] {
598 std::slice::from_raw_parts_mut(
599 v.as_mut_ptr() as *mut u8,
600 v.len() * std::mem::size_of::<T>(),
601 )
602}
603
604#[inline]
605pub(crate) unsafe fn as_bytes_bufer<T>(v: &[T]) -> &[u8] {
606 std::slice::from_raw_parts(v.as_ptr() as *mut u8, v.len() * std::mem::size_of::<T>())
607}
608
609pub(crate) type StringColumn = FixedColumn<BoxString>;
611
612impl FixedColumn<BoxString> {
613 pub(crate) async fn load_string_column<R: AsyncBufRead + Unpin>(
614 reader: R,
615 rows: u64,
616 ) -> Result<StringColumn> {
617 let mut data: Vec<BoxString> = Vec::with_capacity(rows as usize);
618
619 let mut rdr = ValueReader::new(reader);
620 let mut l: u64;
621 for _ in 0..rows {
622 l = rdr.read_vint().await?;
623 let s: Vec<u8> = rdr.read_string(l).await?;
624 data.push(s.into_boxed_slice());
625 }
626
627 Ok(StringColumn { data })
628 }
629
630 pub(crate) async fn load_fixed_string_column<R: AsyncBufRead + Unpin>(
631 mut reader: R,
632 rows: u64,
633 width: u32,
634 ) -> Result<StringColumn> {
635 let mut data: Vec<BoxString> = Vec::with_capacity(rows as usize);
636
637 for _ in 0..rows {
638 let mut s: Vec<u8> = Vec::with_capacity(width as usize);
639 unsafe {
640 s.set_len(width as usize);
641 }
642 reader.read_exact(s.as_mut_slice()).await?;
643 data.push(s.into_boxed_slice());
644 }
645
646 Ok(StringColumn { data })
647 }
648}
649
650impl<T: Sized> FixedColumn<T> {
662 pub(crate) async fn load_column<R: AsyncBufRead + Unpin>(
664 mut reader: R,
665 rows: u64,
666 ) -> Result<FixedColumn<T>> {
667 let mut data: Vec<T> = Vec::with_capacity(rows as usize);
668
669 unsafe {
670 data.set_len(rows as usize);
671 reader.read_exact(as_bytes_bufer_mut(&mut data)).await?;
673 }
674
675 Ok(FixedColumn { data })
676 }
677 pub(crate) fn cast<U: Sized>(self: FixedColumn<T>) -> FixedColumn<U> {
683 assert_eq!(size_of::<T>(), size_of::<U>());
684 assert!(align_of::<T>() >= align_of::<U>());
685
686 unsafe {
687 let mut clone = std::mem::ManuallyDrop::new(self);
688 FixedColumn {
689 data: Vec::from_raw_parts(
690 clone.data.as_mut_ptr() as *mut U,
691 clone.data.len(),
692 clone.data.capacity(),
693 ),
694 }
695 }
696 }
697}
698
699impl<T> FixedColumn<T>
700where
701 T: Sized + Send + 'static,
702 FixedNullColumn<T>: AsInColumn,
703 FixedColumn<T>: AsInColumn,
704{
705 #[inline]
708 pub(crate) fn set_nulls(self: Self, nulls: Option<Vec<u8>>) -> Box<dyn AsInColumn> {
709 if let Some(nulls) = nulls {
710 Box::new(FixedNullColumn { inner: self, nulls })
711 } else {
712 Box::new(self)
713 }
714 }
715}
716impl<T: Sized> AsInColumn for FixedNullColumn<T>
718where
719 FixedColumn<T>: AsInColumn,
720{
721 unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
722 debug_assert!((index as usize) < self.nulls.len());
723 if self.nulls[index as usize] == 1 {
724 ValueRef { inner: None }
725 } else {
726 self.inner.get_at(index)
727 }
728 }
729}
730pub(crate) trait ValueIndex: Sized {
732 unsafe fn valueref_at(_: &[Self], index: u64) -> ValueRefEnum<'_>;
734}
735
736impl ValueIndex for BoxString {
737 #[inline]
738 unsafe fn valueref_at(this: &[BoxString], index: u64) -> ValueRefEnum<'_> {
739 let vr = this.get_unchecked(index as usize);
740 ValueRefEnum::String(vr.as_ref())
741 }
742}
743
744macro_rules! impl_vre_at {
745 ($f:ty,$vr:expr) => {
746 impl ValueIndex for $f {
747 #[inline]
748 unsafe fn valueref_at(this: &[$f], index: u64) -> ValueRefEnum<'_> {
749 debug_assert!((index as usize) < this.len());
750 let vr = this.get_unchecked(index as usize);
751 $vr(*vr)
752 }
753 }
754 };
755}
756
757impl_vre_at!(u8, ValueRefEnum::UInt8);
758impl_vre_at!(i8, ValueRefEnum::Int8);
759impl_vre_at!(u16, ValueRefEnum::UInt16);
760impl_vre_at!(i16, ValueRefEnum::Int16);
761impl_vre_at!(u32, ValueRefEnum::UInt32);
762impl_vre_at!(i32, ValueRefEnum::Int32);
763impl_vre_at!(u64, ValueRefEnum::UInt64);
764impl_vre_at!(i64, ValueRefEnum::Int64);
765#[cfg(feature = "int128")]
766impl_vre_at!(u128, ValueRefEnum::UInt128);
767
768impl_vre_at!(f32, ValueRefEnum::Float32);
769impl_vre_at!(f64, ValueRefEnum::Float64);
770
771impl_vre_at!(ValueUuid, ValueRefEnum::Uuid);
772impl_vre_at!(ValueIp4, ValueRefEnum::Ip4);
773impl_vre_at!(ValueIp6, ValueRefEnum::Ip6);
774
775impl_vre_at!(ValueDecimal32, ValueRefEnum::Decimal32);
776impl_vre_at!(ValueDecimal64, ValueRefEnum::Decimal64);
777#[cfg(feature = "int128")]
778impl_vre_at!(ValueDecimal128, ValueRefEnum::Decimal128);
779
780impl_vre_at!(ValueDate, ValueRefEnum::Date);
781impl_vre_at!(ValueDateTime, ValueRefEnum::DateTime);
782impl_vre_at!(ValueDateTime64, ValueRefEnum::DateTime64);
783
784#[allow(unused_macros)]
785macro_rules! impl_fixed_column {
786 ($f:ty,$vr:expr) => {
787 impl AsInColumn for FixedColumn<$f> {
788 unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
789 debug_assert!((index as usize) < self.data.len());
790 let vr = self.data.get_unchecked(index as usize);
791 ValueRef {
792 inner: Some($vr(*vr)),
793 }
794 }
795 }
796 };
797}
798#[allow(dead_code)]
800pub(crate) struct FixedArrayColumn<T> {
801 data: Vec<T>,
803 index: Vec<u64>,
805}
806
807impl<T: Send + IntoArray + 'static> FixedArrayColumn<T> {
808 pub(crate) async fn load_column<R>(mut reader: R, rows: u64) -> Result<Box<dyn AsInColumn>>
809 where
810 R: AsyncBufRead + Unpin,
811 {
812 let index: FixedColumn<u64> = FixedColumn::load_column(reader.borrow_mut(), rows).await?;
813
814 debug_assert!(index.is_sorted());
815
816 let rows = *index.data.last().expect("null size array");
817 let data: FixedColumn<T> = FixedColumn::load_column(reader.borrow_mut(), rows).await?;
818
819 Ok(Box::new(FixedArrayColumn {
820 data: data.data,
821 index: index.data,
822 }))
823 }
824}
825
826pub trait IntoArray: Sized {
827 fn into_array(this: &[Self]) -> ValueRefEnum<'_>;
828}
829
830macro_rules! impl_intoarray {
831 ($t:ty, $array: expr) => {
832 impl IntoArray for $t {
833 #[inline]
834 fn into_array(this: &[$t]) -> ValueRefEnum<'_> {
835 $array(this)
836 }
837 }
838 };
839}
840
841impl_intoarray!(u8, ValueRefEnum::Array8);
842impl_intoarray!(u16, ValueRefEnum::Array16);
843impl_intoarray!(u32, ValueRefEnum::Array32);
844impl_intoarray!(u64, ValueRefEnum::Array64);
845impl_intoarray!(u128, ValueRefEnum::Array128);
846
847impl<T: Send + IntoArray> AsInColumn for FixedArrayColumn<T> {
848 unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
849 let size1 = if index == 0 {
850 0_u64
851 } else {
852 *self.index.get_unchecked((index - 1) as usize)
853 };
854
855 let size2 = *self.index.get_unchecked(index as usize);
856 debug_assert!(size1 <= size2);
857 let size1 = size1 as usize;
858 let size2 = size2 as usize;
859 debug_assert!(size2 <= self.data.len());
860
861 ValueRef {
862 inner: Some(IntoArray::into_array(&self.data[size1..size2])),
863 }
864 }
865}
866
867pub(crate) struct LowCardinalityColumn<T: Sized + Send> {
872 values: Vec<BoxString>,
873 data: Vec<T>,
874}
875impl<T> LowCardinalityColumn<T>
881where
882 T: Sized + Ord + Copy + Send + Into<u64> + 'static,
883{
884 pub(crate) async fn load_column<R>(
885 reader: R,
886 rows: u64,
887 values: FixedColumn<BoxString>,
888 ) -> Result<Box<dyn AsInColumn>>
889 where
890 R: AsyncBufRead + Unpin,
891 {
892 debug_assert!(rows > 0);
893 let data: FixedColumn<T> = FixedColumn::load_column(reader, rows).await?;
894
895 let m = data
896 .data
897 .iter()
898 .max()
899 .expect("corrupted lowcardinality column");
900
901 if (*m).into() >= values.data.len() as u64 {
902 return err!(DriverError::IntegrityError);
903 }
904
905 Ok(Box::new(LowCardinalityColumn {
906 data: data.data,
907 values: values.data,
908 }))
909 }
910}
911
912impl<T: Copy + Send + Sized + Into<u64>> AsInColumn for LowCardinalityColumn<T> {
913 unsafe fn get_at(&self, index: u64) -> ValueRef<'_> {
914 debug_assert!((index as usize) < self.data.len());
915 let index = self.data.get_unchecked(index as usize);
916
917 let index: u64 = (*index).into();
918 if index == 0 {
919 debug_assert!(self.values[0].len() == 0);
921 ValueRef { inner: None }
922 } else {
923 ValueRef {
924 inner: Some(ValueIndex::valueref_at(self.values.as_slice(), index)),
925 }
926 }
927 }
928}