1use std::{
35 cmp::Ordering,
36 hash::{Hash, Hasher},
37 sync::Arc,
38};
39
40use crate::{
41 array::{Array, BinaryArray, BooleanArray, DictionaryArray, PrimitiveArray, Utf8Array},
42 datatypes::PhysicalType,
43 error::*,
44};
45use crate::{compute::sort::SortOptions, datatypes::DataType};
46
47use self::{
48 dictionary::{compute_dictionary_mapping, encode_dictionary},
49 interner::OrderPreservingInterner,
50};
51
52mod dictionary;
53mod fixed;
54mod interner;
55mod variable;
56
57#[derive(Debug)]
140pub struct RowConverter {
141 fields: Arc<[SortField]>,
143 interners: Vec<Option<Box<OrderPreservingInterner>>>,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
149pub struct SortField {
150 options: SortOptions,
152 data_type: DataType,
154}
155
156impl SortField {
157 pub fn new(data_type: DataType) -> Self {
159 Self::new_with_options(data_type, SortOptions::default())
160 }
161
162 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
164 Self { options, data_type }
165 }
166}
167
168impl RowConverter {
169 pub fn new(fields: Vec<SortField>) -> Self {
171 let interners = vec![None; fields.len()];
172 Self {
173 fields: fields.into(),
174 interners,
175 }
176 }
177
178 pub fn convert_columns(&mut self, columns: &[Box<dyn Array>]) -> Result<Rows> {
186 if columns.len() != self.fields.len() {
187 return Err(Error::InvalidArgumentError(format!(
188 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
189 self.fields.len(),
190 columns.len()
191 )));
192 }
193
194 let dictionaries = columns
195 .iter()
196 .zip(&mut self.interners)
197 .zip(self.fields.iter())
198 .map(|((column, interner), field)| {
199 if column.data_type() != &field.data_type {
200 return Err(Error::InvalidArgumentError(format!(
201 "RowConverter column schema mismatch, expected {:?} got {:?}",
202 field.data_type,
203 column.data_type()
204 )));
205 }
206
207 let values = match column.data_type().to_logical_type() {
208 DataType::Dictionary(k, _, _) => match_integer_type!(k, |$T| {
209 let column = column
210 .as_any()
211 .downcast_ref::<DictionaryArray<$T>>()
212 .unwrap();
213 column.values()
214 }),
215 _ => return Ok(None),
216 };
217
218 let interner = interner.get_or_insert_with(Default::default);
219
220 let mapping = compute_dictionary_mapping(interner, values)?
221 .into_iter()
222 .map(|maybe_interned| {
223 maybe_interned.map(|interned| interner.normalized_key(interned))
224 })
225 .collect::<Vec<_>>();
226
227 Ok(Some(mapping))
228 })
229 .collect::<Result<Vec<_>>>()?;
230
231 let mut rows = new_empty_rows(columns, &dictionaries)?;
232
233 for ((column, field), dictionary) in
240 columns.iter().zip(self.fields.iter()).zip(dictionaries)
241 {
242 encode_column(&mut rows, column, field.options, dictionary.as_deref())
244 }
245
246 Ok(rows)
247 }
248}
249
250#[derive(Debug)]
254pub struct Rows {
255 buffer: Box<[u8]>,
257 offsets: Box<[usize]>,
259}
260
261impl Rows {
262 pub fn row(&self, row: usize) -> Row<'_> {
264 let end = self.offsets[row + 1];
265 let start = self.offsets[row];
266 Row {
267 data: unsafe { self.buffer.get_unchecked(start..end) },
268 }
269 }
270
271 pub fn row_unchecked(&self, row: usize) -> Row<'_> {
273 let data = unsafe {
274 let end = *self.offsets.get_unchecked(row + 1);
275 let start = *self.offsets.get_unchecked(row);
276 self.buffer.get_unchecked(start..end)
277 };
278 Row { data }
279 }
280
281 #[inline]
283 pub fn len(&self) -> usize {
284 self.offsets.len() - 1
285 }
286
287 #[inline]
288 pub fn iter(&self) -> RowsIter<'_> {
290 self.into_iter()
291 }
292}
293
294impl<'a> IntoIterator for &'a Rows {
295 type Item = Row<'a>;
296 type IntoIter = RowsIter<'a>;
297
298 #[inline]
299 fn into_iter(self) -> Self::IntoIter {
300 RowsIter {
301 rows: self,
302 start: 0,
303 end: self.len(),
304 }
305 }
306}
307
308#[derive(Debug)]
310pub struct RowsIter<'a> {
311 rows: &'a Rows,
312 start: usize,
313 end: usize,
314}
315
316impl<'a> Iterator for RowsIter<'a> {
317 type Item = Row<'a>;
318
319 #[inline]
320 fn next(&mut self) -> Option<Self::Item> {
321 if self.start < self.end {
322 let row = self.rows.row_unchecked(self.start);
323 self.start += 1;
324 Some(row)
325 } else {
326 None
327 }
328 }
329
330 #[inline]
331 fn size_hint(&self) -> (usize, Option<usize>) {
332 let len = self.len();
333 (len, Some(len))
334 }
335}
336
337impl<'a> ExactSizeIterator for RowsIter<'a> {
338 #[inline]
339 fn len(&self) -> usize {
340 self.end - self.start
341 }
342}
343
344impl<'a> DoubleEndedIterator for RowsIter<'a> {
345 fn next_back(&mut self) -> Option<Self::Item> {
346 if self.end == self.start {
347 return None;
348 }
349 let row = self.rows.row(self.end);
350 self.end -= 1;
351 Some(row)
352 }
353}
354
355unsafe impl<'a> crate::trusted_len::TrustedLen for RowsIter<'a> {}
356
357#[derive(Debug, Copy, Clone)]
364pub struct Row<'a> {
365 data: &'a [u8],
366}
367
368impl<'a> PartialEq for Row<'a> {
371 #[inline]
372 fn eq(&self, other: &Self) -> bool {
373 self.data.eq(other.data)
374 }
375}
376
377impl<'a> Eq for Row<'a> {}
378
379impl<'a> PartialOrd for Row<'a> {
380 #[inline]
381 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
382 self.data.partial_cmp(other.data)
383 }
384}
385
386impl<'a> Ord for Row<'a> {
387 #[inline]
388 fn cmp(&self, other: &Self) -> Ordering {
389 self.data.cmp(other.data)
390 }
391}
392
393impl<'a> Hash for Row<'a> {
394 #[inline]
395 fn hash<H: Hasher>(&self, state: &mut H) {
396 self.data.hash(state)
397 }
398}
399
400impl<'a> AsRef<[u8]> for Row<'a> {
401 #[inline]
402 fn as_ref(&self) -> &[u8] {
403 self.data
404 }
405}
406
407#[inline]
409fn null_sentinel(options: SortOptions) -> u8 {
410 match options.nulls_first {
411 true => 0,
412 false => 0xFF,
413 }
414}
415
416#[macro_export]
418macro_rules! with_match_primitive_without_interval_type {(
419 $key_type:expr, | $_:tt $T:ident | $($body:tt)*
420) => ({
421 macro_rules! __with_ty__ {( $_ $T:ident ) => ( $($body)* )}
422 use $crate::datatypes::PrimitiveType::*;
423 use $crate::types::{f16, i256};
424 match $key_type {
425 Int8 => __with_ty__! { i8 },
426 Int16 => __with_ty__! { i16 },
427 Int32 => __with_ty__! { i32 },
428 Int64 => __with_ty__! { i64 },
429 Int128 => __with_ty__! { i128 },
430 Int256 => __with_ty__! { i256 },
431 UInt8 => __with_ty__! { u8 },
432 UInt16 => __with_ty__! { u16 },
433 UInt32 => __with_ty__! { u32 },
434 UInt64 => __with_ty__! { u64 },
435 Float16 => __with_ty__! { f16 },
436 Float32 => __with_ty__! { f32 },
437 Float64 => __with_ty__! { f64 },
438 _ => unimplemented!("Unsupported type: {:?}", $key_type),
439 }
440})}
441
442fn new_empty_rows(
444 cols: &[Box<dyn Array>],
445 dictionaries: &[Option<Vec<Option<&[u8]>>>],
446) -> Result<Rows> {
447 use fixed::FixedLengthEncoding;
448
449 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
450 let mut lengths = vec![0; num_rows];
451
452 for (array, dict) in cols.iter().zip(dictionaries) {
453 match array.data_type().to_physical_type() {
454 PhysicalType::Primitive(primitive) => {
455 with_match_primitive_without_interval_type!(primitive, |$T| {
456 let array = array
457 .as_any()
458 .downcast_ref::<PrimitiveArray<$T>>()
459 .unwrap();
460 lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array))
461 })
462 }
463 PhysicalType::Null => {}
464 PhysicalType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN),
465 PhysicalType::Binary => array
466 .as_any()
467 .downcast_ref::<BinaryArray<i32>>()
468 .unwrap()
469 .iter()
470 .zip(lengths.iter_mut())
471 .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
472 PhysicalType::LargeBinary => array
473 .as_any()
474 .downcast_ref::<BinaryArray<i64>>()
475 .unwrap()
476 .iter()
477 .zip(lengths.iter_mut())
478 .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
479 PhysicalType::Utf8 => array
480 .as_any()
481 .downcast_ref::<Utf8Array<i32>>()
482 .unwrap()
483 .iter()
484 .zip(lengths.iter_mut())
485 .for_each(|(slice, length)| {
486 *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
487 }),
488 PhysicalType::LargeUtf8 => array
489 .as_any()
490 .downcast_ref::<Utf8Array<i64>>()
491 .unwrap()
492 .iter()
493 .zip(lengths.iter_mut())
494 .for_each(|(slice, length)| {
495 *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
496 }),
497 PhysicalType::Dictionary(k) => match_integer_type!(k, |$T| {
498 let array = array
499 .as_any()
500 .downcast_ref::<DictionaryArray<$T>>()
501 .unwrap();
502 let dict = dict.as_ref().unwrap();
503 for (v, length) in array.keys().iter().zip(lengths.iter_mut()) {
504 match v.and_then(|v| dict[*v as usize]) {
505 Some(k) => *length += k.len() + 1,
506 None => *length += 1,
507 }
508 }
509 }),
510 t => {
511 return Err(Error::NotYetImplemented(format!(
512 "not yet implemented: {t:?}"
513 )))
514 }
515 }
516 }
517
518 let mut offsets = Vec::with_capacity(num_rows + 1);
519 offsets.push(0);
520
521 let mut cur_offset = 0_usize;
537 for l in lengths {
538 offsets.push(cur_offset);
539 cur_offset = cur_offset.checked_add(l).expect("overflow");
540 }
541
542 let buffer = vec![0_u8; cur_offset];
543
544 Ok(Rows {
545 buffer: buffer.into(),
546 offsets: offsets.into(),
547 })
548}
549
550fn encode_column(
552 out: &mut Rows,
553 column: &Box<dyn Array>,
554 opts: SortOptions,
555 dictionary: Option<&[Option<&[u8]>]>,
556) {
557 match column.data_type().to_physical_type() {
558 PhysicalType::Primitive(primitive) => {
559 with_match_primitive_without_interval_type!(primitive, |$T| {
560 let column = column
561 .as_any()
562 .downcast_ref::<PrimitiveArray<$T>>()
563 .unwrap()
564 .iter()
565 .map(|v| v.map(|v| *v));
566 fixed::encode(out, column, opts);
567 })
568 }
569 PhysicalType::Null => {}
570 PhysicalType::Boolean => fixed::encode(
571 out,
572 column.as_any().downcast_ref::<BooleanArray>().unwrap(),
573 opts,
574 ),
575 PhysicalType::Binary => {
576 variable::encode(
577 out,
578 column
579 .as_any()
580 .downcast_ref::<BinaryArray<i32>>()
581 .unwrap()
582 .iter(),
583 opts,
584 );
585 }
586 PhysicalType::LargeBinary => {
587 variable::encode(
588 out,
589 column
590 .as_any()
591 .downcast_ref::<BinaryArray<i64>>()
592 .unwrap()
593 .iter(),
594 opts,
595 );
596 }
597 PhysicalType::Utf8 => variable::encode(
598 out,
599 column
600 .as_any()
601 .downcast_ref::<Utf8Array<i32>>()
602 .unwrap()
603 .iter()
604 .map(|x| x.map(|x| x.as_bytes())),
605 opts,
606 ),
607 PhysicalType::LargeUtf8 => variable::encode(
608 out,
609 column
610 .as_any()
611 .downcast_ref::<Utf8Array<i64>>()
612 .unwrap()
613 .iter()
614 .map(|x| x.map(|x| x.as_bytes())),
615 opts,
616 ),
617 PhysicalType::Dictionary(k) => match_integer_type!(k, |$T| {
618 let column = column
619 .as_any()
620 .downcast_ref::<DictionaryArray<$T>>()
621 .unwrap();
622 encode_dictionary(out, column, dictionary.unwrap(), opts);
623 }),
624 t => unimplemented!("not yet implemented: {:?}", t),
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use std::fmt::Debug;
631
632 use rand::{
633 distributions::{uniform::SampleUniform, Distribution, Standard},
634 thread_rng, Rng,
635 };
636
637 use super::*;
638 use crate::{
639 array::{Array, DictionaryKey, Float32Array, Int16Array, NullArray},
640 compute::sort::build_compare,
641 datatypes::DataType,
642 offset::Offset,
643 types::NativeType,
644 };
645
646 #[test]
647 fn test_fixed_width() {
648 let cols = [
649 Int16Array::from([Some(1), Some(2), None, Some(-5), Some(2), Some(2), Some(0)])
650 .to_boxed(),
651 Float32Array::from([
652 Some(1.3),
653 Some(2.5),
654 None,
655 Some(4.),
656 Some(0.1),
657 Some(-4.),
658 Some(-0.),
659 ])
660 .to_boxed(),
661 ];
662
663 let mut converter = RowConverter::new(vec![
664 SortField::new(DataType::Int16),
665 SortField::new(DataType::Float32),
666 ]);
667 let rows = converter.convert_columns(&cols).unwrap();
668
669 assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]);
670 assert_eq!(
671 rows.buffer.as_ref(),
672 &[
673 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
688 );
689
690 assert!(rows.row(3) < rows.row(6));
691 assert!(rows.row(0) < rows.row(1));
692 assert!(rows.row(3) < rows.row(0));
693 assert!(rows.row(4) < rows.row(1));
694 assert!(rows.row(5) < rows.row(4));
695 }
696
697 #[test]
698 fn test_null_encoding() {
699 let col = NullArray::new(DataType::Null, 10).to_boxed();
700 let mut converter = RowConverter::new(vec![SortField::new(DataType::Null)]);
701 let rows = converter.convert_columns(&[col]).unwrap();
702 assert_eq!(rows.len(), 10);
703 assert_eq!(rows.row(1).data.len(), 0);
704 }
705
706 fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
707 where
708 K: NativeType,
709 Standard: Distribution<K>,
710 {
711 let mut rng = thread_rng();
712 (0..len)
713 .map(|_| rng.gen_bool(valid_percent).then(|| rng.gen()))
714 .collect()
715 }
716
717 fn generate_strings<O: Offset>(len: usize, valid_percent: f64) -> Utf8Array<O> {
718 let mut rng = thread_rng();
719 (0..len)
720 .map(|_| {
721 rng.gen_bool(valid_percent).then(|| {
722 let len = rng.gen_range(0..100);
723 let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect();
724 String::from_utf8(bytes).unwrap()
725 })
726 })
727 .collect()
728 }
729
730 fn generate_dictionary<K>(
731 values: Box<dyn Array>,
732 len: usize,
733 valid_percent: f64,
734 ) -> DictionaryArray<K>
735 where
736 K: DictionaryKey + Ord + SampleUniform,
737 <K as TryFrom<usize>>::Error: Debug,
738 {
739 let mut rng = thread_rng();
740 let min_key = 0_usize.try_into().unwrap();
741 let max_key = values.len().try_into().unwrap();
742 let keys: PrimitiveArray<K> = (0..len)
743 .map(|_| {
744 rng.gen_bool(valid_percent)
745 .then(|| rng.gen_range(min_key..max_key))
746 })
747 .collect();
748
749 DictionaryArray::try_from_keys(keys, values).unwrap()
750 }
751
752 fn generate_column(len: usize) -> Box<dyn Array> {
753 let mut rng = thread_rng();
754 match rng.gen_range(0..9) {
755 0 => Box::new(generate_primitive_array::<i32>(len, 0.8)),
756 1 => Box::new(generate_primitive_array::<u32>(len, 0.8)),
757 2 => Box::new(generate_primitive_array::<i64>(len, 0.8)),
758 3 => Box::new(generate_primitive_array::<u64>(len, 0.8)),
759 4 => Box::new(generate_primitive_array::<f32>(len, 0.8)),
760 5 => Box::new(generate_primitive_array::<f64>(len, 0.8)),
761 6 => Box::new(generate_strings::<i32>(len, 0.8)),
762 7 => Box::new(generate_dictionary::<i64>(
763 Box::new(generate_strings::<i32>(rng.gen_range(1..len), 1.0)),
765 len,
766 0.8,
767 )),
768 8 => Box::new(generate_dictionary::<i64>(
769 Box::new(generate_primitive_array::<i64>(rng.gen_range(1..len), 1.0)),
771 len,
772 0.8,
773 )),
774 _ => unreachable!(),
775 }
776 }
777
778 #[test]
779 #[cfg_attr(miri, ignore)]
780 fn fuzz_test() {
781 for _ in 0..100 {
782 let mut rng = thread_rng();
783 let num_columns = rng.gen_range(1..5);
784 let len = rng.gen_range(5..100);
785 let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
786
787 let options: Vec<_> = (0..num_columns)
788 .map(|_| SortOptions {
789 descending: rng.gen_bool(0.5),
790 nulls_first: rng.gen_bool(0.5),
791 })
792 .collect();
793
794 let comparators = arrays
795 .iter()
796 .zip(options.iter())
797 .map(|(a, o)| build_compare(&**a, *o).unwrap())
798 .collect::<Vec<_>>();
799
800 let columns = options
801 .into_iter()
802 .zip(&arrays)
803 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
804 .collect();
805
806 let mut converter = RowConverter::new(columns);
807 let rows = converter.convert_columns(&arrays).unwrap();
808 let cmp = |i, j| {
809 for cmp in comparators.iter() {
810 let cmp = cmp(i, j);
811 if cmp != Ordering::Equal {
812 return cmp;
813 }
814 }
815 Ordering::Equal
816 };
817
818 for i in 0..len {
819 for j in 0..len {
820 let row_i = rows.row(i);
821 let row_j = rows.row(j);
822 let row_cmp = row_i.cmp(&row_j);
823 let lex_cmp = cmp(i, j);
824 assert_eq!(row_cmp, lex_cmp);
825 }
826 }
827 }
828 }
829}