1#![doc(
157 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
158 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
159)]
160#![cfg_attr(docsrs, feature(doc_cfg))]
161#![warn(missing_docs)]
162use std::cmp::Ordering;
163use std::hash::{Hash, Hasher};
164use std::sync::Arc;
165
166use arrow_array::cast::*;
167use arrow_array::types::ArrowDictionaryKeyType;
168use arrow_array::*;
169use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
170use arrow_data::{ArrayData, ArrayDataBuilder};
171use arrow_schema::*;
172use variable::{decode_binary_view, decode_string_view};
173
174use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
175use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
176use crate::variable::{decode_binary, decode_string};
177use arrow_array::types::{Int16Type, Int32Type, Int64Type};
178
179mod fixed;
180mod list;
181mod run;
182mod variable;
183
184#[derive(Debug)]
482pub struct RowConverter {
483 fields: Arc<[SortField]>,
484 codecs: Vec<Codec>,
486}
487
488#[derive(Debug)]
489enum Codec {
490 Stateless,
492 Dictionary(RowConverter, OwnedRow),
495 Struct(RowConverter, OwnedRow),
498 List(RowConverter),
500 RunEndEncoded(RowConverter),
502 Union(Vec<RowConverter>, Vec<OwnedRow>),
505}
506
507impl Codec {
508 fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
509 match &sort_field.data_type {
510 DataType::Dictionary(_, values) => {
511 let sort_field =
512 SortField::new_with_options(values.as_ref().clone(), sort_field.options);
513
514 let converter = RowConverter::new(vec![sort_field])?;
515 let null_array = new_null_array(values.as_ref(), 1);
516 let nulls = converter.convert_columns(&[null_array])?;
517
518 let owned = OwnedRow {
519 data: nulls.buffer.into(),
520 config: nulls.config,
521 };
522 Ok(Self::Dictionary(converter, owned))
523 }
524 DataType::RunEndEncoded(_, values) => {
525 let options = SortOptions {
527 descending: false,
528 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
529 };
530
531 let field = SortField::new_with_options(values.data_type().clone(), options);
532 let converter = RowConverter::new(vec![field])?;
533 Ok(Self::RunEndEncoded(converter))
534 }
535 d if !d.is_nested() => Ok(Self::Stateless),
536 DataType::List(f) | DataType::LargeList(f) => {
537 let options = SortOptions {
541 descending: false,
542 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
543 };
544
545 let field = SortField::new_with_options(f.data_type().clone(), options);
546 let converter = RowConverter::new(vec![field])?;
547 Ok(Self::List(converter))
548 }
549 DataType::FixedSizeList(f, _) => {
550 let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
551 let converter = RowConverter::new(vec![field])?;
552 Ok(Self::List(converter))
553 }
554 DataType::Struct(f) => {
555 let sort_fields = f
556 .iter()
557 .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
558 .collect();
559
560 let converter = RowConverter::new(sort_fields)?;
561 let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
562
563 let nulls = converter.convert_columns(&nulls)?;
564 let owned = OwnedRow {
565 data: nulls.buffer.into(),
566 config: nulls.config,
567 };
568
569 Ok(Self::Struct(converter, owned))
570 }
571 DataType::Union(fields, _mode) => {
572 let options = SortOptions {
575 descending: false,
576 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
577 };
578
579 let mut converters = Vec::with_capacity(fields.len());
580 let mut null_rows = Vec::with_capacity(fields.len());
581
582 for (_type_id, field) in fields.iter() {
583 let sort_field =
584 SortField::new_with_options(field.data_type().clone(), options);
585 let converter = RowConverter::new(vec![sort_field])?;
586
587 let null_array = new_null_array(field.data_type(), 1);
588 let nulls = converter.convert_columns(&[null_array])?;
589 let owned = OwnedRow {
590 data: nulls.buffer.into(),
591 config: nulls.config,
592 };
593
594 converters.push(converter);
595 null_rows.push(owned);
596 }
597
598 Ok(Self::Union(converters, null_rows))
599 }
600 _ => Err(ArrowError::NotYetImplemented(format!(
601 "not yet implemented: {:?}",
602 sort_field.data_type
603 ))),
604 }
605 }
606
607 fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
608 match self {
609 Codec::Stateless => Ok(Encoder::Stateless),
610 Codec::Dictionary(converter, nulls) => {
611 let values = array.as_any_dictionary().values().clone();
612 let rows = converter.convert_columns(&[values])?;
613 Ok(Encoder::Dictionary(rows, nulls.row()))
614 }
615 Codec::Struct(converter, null) => {
616 let v = as_struct_array(array);
617 let rows = converter.convert_columns(v.columns())?;
618 Ok(Encoder::Struct(rows, null.row()))
619 }
620 Codec::List(converter) => {
621 let values = match array.data_type() {
622 DataType::List(_) => {
623 let list_array = as_list_array(array);
624 let first_offset = list_array.offsets()[0] as usize;
625 let last_offset =
626 list_array.offsets()[list_array.offsets().len() - 1] as usize;
627
628 list_array
631 .values()
632 .slice(first_offset, last_offset - first_offset)
633 }
634 DataType::LargeList(_) => {
635 let list_array = as_large_list_array(array);
636
637 let first_offset = list_array.offsets()[0] as usize;
638 let last_offset =
639 list_array.offsets()[list_array.offsets().len() - 1] as usize;
640
641 list_array
644 .values()
645 .slice(first_offset, last_offset - first_offset)
646 }
647 DataType::FixedSizeList(_, _) => {
648 as_fixed_size_list_array(array).values().clone()
649 }
650 _ => unreachable!(),
651 };
652 let rows = converter.convert_columns(&[values])?;
653 Ok(Encoder::List(rows))
654 }
655 Codec::RunEndEncoded(converter) => {
656 let values = match array.data_type() {
657 DataType::RunEndEncoded(r, _) => match r.data_type() {
658 DataType::Int16 => array.as_run::<Int16Type>().values(),
659 DataType::Int32 => array.as_run::<Int32Type>().values(),
660 DataType::Int64 => array.as_run::<Int64Type>().values(),
661 _ => unreachable!("Unsupported run end index type: {r:?}"),
662 },
663 _ => unreachable!(),
664 };
665 let rows = converter.convert_columns(std::slice::from_ref(values))?;
666 Ok(Encoder::RunEndEncoded(rows))
667 }
668 Codec::Union(converters, _) => {
669 let union_array = array
670 .as_any()
671 .downcast_ref::<UnionArray>()
672 .expect("expected Union array");
673
674 let type_ids = union_array.type_ids().clone();
675 let offsets = union_array.offsets().cloned();
676
677 let mut child_rows = Vec::with_capacity(converters.len());
678 for (type_id, converter) in converters.iter().enumerate() {
679 let child_array = union_array.child(type_id as i8);
680 let rows = converter.convert_columns(std::slice::from_ref(child_array))?;
681 child_rows.push(rows);
682 }
683
684 Ok(Encoder::Union {
685 child_rows,
686 type_ids,
687 offsets,
688 })
689 }
690 }
691 }
692
693 fn size(&self) -> usize {
694 match self {
695 Codec::Stateless => 0,
696 Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
697 Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
698 Codec::List(converter) => converter.size(),
699 Codec::RunEndEncoded(converter) => converter.size(),
700 Codec::Union(converters, null_rows) => {
701 converters.iter().map(|c| c.size()).sum::<usize>()
702 + null_rows.iter().map(|n| n.data.len()).sum::<usize>()
703 }
704 }
705 }
706}
707
708#[derive(Debug)]
709enum Encoder<'a> {
710 Stateless,
712 Dictionary(Rows, Row<'a>),
714 Struct(Rows, Row<'a>),
720 List(Rows),
722 RunEndEncoded(Rows),
724 Union {
726 child_rows: Vec<Rows>,
727 type_ids: ScalarBuffer<i8>,
728 offsets: Option<ScalarBuffer<i32>>,
729 },
730}
731
732#[derive(Debug, Clone, PartialEq, Eq)]
734pub struct SortField {
735 options: SortOptions,
737 data_type: DataType,
739}
740
741impl SortField {
742 pub fn new(data_type: DataType) -> Self {
744 Self::new_with_options(data_type, Default::default())
745 }
746
747 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
749 Self { options, data_type }
750 }
751
752 pub fn size(&self) -> usize {
756 self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
757 }
758}
759
760impl RowConverter {
761 pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
763 if !Self::supports_fields(&fields) {
764 return Err(ArrowError::NotYetImplemented(format!(
765 "Row format support not yet implemented for: {fields:?}"
766 )));
767 }
768
769 let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
770 Ok(Self {
771 fields: fields.into(),
772 codecs,
773 })
774 }
775
776 pub fn supports_fields(fields: &[SortField]) -> bool {
778 fields.iter().all(|x| Self::supports_datatype(&x.data_type))
779 }
780
781 fn supports_datatype(d: &DataType) -> bool {
782 match d {
783 _ if !d.is_nested() => true,
784 DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
785 Self::supports_datatype(f.data_type())
786 }
787 DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
788 DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
789 DataType::Union(fs, _mode) => fs
790 .iter()
791 .all(|(_, f)| Self::supports_datatype(f.data_type())),
792 _ => false,
793 }
794 }
795
796 pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
806 let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
807 let mut rows = self.empty_rows(num_rows, 0);
808 self.append(&mut rows, columns)?;
809 Ok(rows)
810 }
811
812 pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
843 assert!(
844 Arc::ptr_eq(&rows.config.fields, &self.fields),
845 "rows were not produced by this RowConverter"
846 );
847
848 if columns.len() != self.fields.len() {
849 return Err(ArrowError::InvalidArgumentError(format!(
850 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
851 self.fields.len(),
852 columns.len()
853 )));
854 }
855 for colum in columns.iter().skip(1) {
856 if colum.len() != columns[0].len() {
857 return Err(ArrowError::InvalidArgumentError(format!(
858 "RowConverter columns must all have the same length, expected {} got {}",
859 columns[0].len(),
860 colum.len()
861 )));
862 }
863 }
864
865 let encoders = columns
866 .iter()
867 .zip(&self.codecs)
868 .zip(self.fields.iter())
869 .map(|((column, codec), field)| {
870 if !column.data_type().equals_datatype(&field.data_type) {
871 return Err(ArrowError::InvalidArgumentError(format!(
872 "RowConverter column schema mismatch, expected {} got {}",
873 field.data_type,
874 column.data_type()
875 )));
876 }
877 codec.encoder(column.as_ref())
878 })
879 .collect::<Result<Vec<_>, _>>()?;
880
881 let write_offset = rows.num_rows();
882 let lengths = row_lengths(columns, &encoders);
883 let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
884 rows.buffer.resize(total, 0);
885
886 for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
887 encode_column(
889 &mut rows.buffer,
890 &mut rows.offsets[write_offset..],
891 column.as_ref(),
892 field.options,
893 &encoder,
894 )
895 }
896
897 if cfg!(debug_assertions) {
898 assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
899 rows.offsets
900 .windows(2)
901 .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
902 }
903
904 Ok(())
905 }
906
907 pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
915 where
916 I: IntoIterator<Item = Row<'a>>,
917 {
918 let mut validate_utf8 = false;
919 let mut rows: Vec<_> = rows
920 .into_iter()
921 .map(|row| {
922 assert!(
923 Arc::ptr_eq(&row.config.fields, &self.fields),
924 "rows were not produced by this RowConverter"
925 );
926 validate_utf8 |= row.config.validate_utf8;
927 row.data
928 })
929 .collect();
930
931 let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
935
936 if cfg!(debug_assertions) {
937 for (i, row) in rows.iter().enumerate() {
938 if !row.is_empty() {
939 return Err(ArrowError::InvalidArgumentError(format!(
940 "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
941 codecs = &self.codecs
942 )));
943 }
944 }
945 }
946
947 Ok(result)
948 }
949
950 pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
979 let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
980 offsets.push(0);
981
982 Rows {
983 offsets,
984 buffer: Vec::with_capacity(data_capacity),
985 config: RowConfig {
986 fields: self.fields.clone(),
987 validate_utf8: false,
988 },
989 }
990 }
991
992 pub fn from_binary(&self, array: BinaryArray) -> Rows {
1019 assert_eq!(
1020 array.null_count(),
1021 0,
1022 "can't construct Rows instance from array with nulls"
1023 );
1024 let (offsets, values, _) = array.into_parts();
1025 let offsets = offsets.iter().map(|&i| i.as_usize()).collect();
1026 let buffer = values.into_vec().unwrap_or_else(|values| values.to_vec());
1028 Rows {
1029 buffer,
1030 offsets,
1031 config: RowConfig {
1032 fields: Arc::clone(&self.fields),
1033 validate_utf8: true,
1034 },
1035 }
1036 }
1037
1038 unsafe fn convert_raw(
1044 &self,
1045 rows: &mut [&[u8]],
1046 validate_utf8: bool,
1047 ) -> Result<Vec<ArrayRef>, ArrowError> {
1048 self.fields
1049 .iter()
1050 .zip(&self.codecs)
1051 .map(|(field, codec)| unsafe { decode_column(field, rows, codec, validate_utf8) })
1052 .collect()
1053 }
1054
1055 pub fn parser(&self) -> RowParser {
1057 RowParser::new(Arc::clone(&self.fields))
1058 }
1059
1060 pub fn size(&self) -> usize {
1064 std::mem::size_of::<Self>()
1065 + self.fields.iter().map(|x| x.size()).sum::<usize>()
1066 + self.codecs.capacity() * std::mem::size_of::<Codec>()
1067 + self.codecs.iter().map(Codec::size).sum::<usize>()
1068 }
1069}
1070
1071#[derive(Debug)]
1073pub struct RowParser {
1074 config: RowConfig,
1075}
1076
1077impl RowParser {
1078 fn new(fields: Arc<[SortField]>) -> Self {
1079 Self {
1080 config: RowConfig {
1081 fields,
1082 validate_utf8: true,
1083 },
1084 }
1085 }
1086
1087 pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
1092 Row {
1093 data: bytes,
1094 config: &self.config,
1095 }
1096 }
1097}
1098
1099#[derive(Debug, Clone)]
1101struct RowConfig {
1102 fields: Arc<[SortField]>,
1104 validate_utf8: bool,
1106}
1107
1108#[derive(Debug)]
1112pub struct Rows {
1113 buffer: Vec<u8>,
1115 offsets: Vec<usize>,
1117 config: RowConfig,
1119}
1120
1121impl Rows {
1122 pub fn push(&mut self, row: Row<'_>) {
1124 assert!(
1125 Arc::ptr_eq(&row.config.fields, &self.config.fields),
1126 "row was not produced by this RowConverter"
1127 );
1128 self.config.validate_utf8 |= row.config.validate_utf8;
1129 self.buffer.extend_from_slice(row.data);
1130 self.offsets.push(self.buffer.len())
1131 }
1132
1133 pub fn row(&self, row: usize) -> Row<'_> {
1135 self.checked_row_end(row);
1136 unsafe { self.row_unchecked(row) }
1137 }
1138
1139 fn checked_row_end(&self, row: usize) -> usize {
1140 row.checked_add(1)
1141 .filter(|end| *end < self.offsets.len())
1142 .expect("row index out of bounds")
1143 }
1144
1145 pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1150 let end = unsafe { self.offsets.get_unchecked(index + 1) };
1151 let start = unsafe { self.offsets.get_unchecked(index) };
1152 let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1153 Row {
1154 data,
1155 config: &self.config,
1156 }
1157 }
1158
1159 pub fn clear(&mut self) {
1161 self.offsets.truncate(1);
1162 self.buffer.clear();
1163 }
1164
1165 pub fn num_rows(&self) -> usize {
1167 self.offsets.len() - 1
1168 }
1169
1170 pub fn iter(&self) -> RowsIter<'_> {
1172 self.into_iter()
1173 }
1174
1175 pub fn size(&self) -> usize {
1179 std::mem::size_of::<Self>()
1181 + self.buffer.capacity()
1182 + self.offsets.capacity() * std::mem::size_of::<usize>()
1183 }
1184
1185 pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1215 if self.buffer.len() > i32::MAX as usize {
1216 return Err(ArrowError::InvalidArgumentError(format!(
1217 "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1218 self.buffer.len()
1219 )));
1220 }
1221 let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1223 let array = unsafe {
1225 BinaryArray::new_unchecked(
1226 OffsetBuffer::new_unchecked(offsets_scalar),
1227 Buffer::from_vec(self.buffer),
1228 None,
1229 )
1230 };
1231 Ok(array)
1232 }
1233}
1234
1235impl<'a> IntoIterator for &'a Rows {
1236 type Item = Row<'a>;
1237 type IntoIter = RowsIter<'a>;
1238
1239 fn into_iter(self) -> Self::IntoIter {
1240 RowsIter {
1241 rows: self,
1242 start: 0,
1243 end: self.num_rows(),
1244 }
1245 }
1246}
1247
1248#[derive(Debug)]
1250pub struct RowsIter<'a> {
1251 rows: &'a Rows,
1252 start: usize,
1253 end: usize,
1254}
1255
1256impl<'a> Iterator for RowsIter<'a> {
1257 type Item = Row<'a>;
1258
1259 fn next(&mut self) -> Option<Self::Item> {
1260 if self.end == self.start {
1261 return None;
1262 }
1263
1264 let row = unsafe { self.rows.row_unchecked(self.start) };
1266 self.start += 1;
1267 Some(row)
1268 }
1269
1270 fn size_hint(&self) -> (usize, Option<usize>) {
1271 let len = self.len();
1272 (len, Some(len))
1273 }
1274}
1275
1276impl ExactSizeIterator for RowsIter<'_> {
1277 fn len(&self) -> usize {
1278 self.end - self.start
1279 }
1280}
1281
1282impl DoubleEndedIterator for RowsIter<'_> {
1283 fn next_back(&mut self) -> Option<Self::Item> {
1284 if self.end == self.start {
1285 return None;
1286 }
1287 let row = unsafe { self.rows.row_unchecked(self.end) };
1289 self.end -= 1;
1290 Some(row)
1291 }
1292}
1293
1294#[derive(Debug, Copy, Clone)]
1303pub struct Row<'a> {
1304 data: &'a [u8],
1305 config: &'a RowConfig,
1306}
1307
1308impl<'a> Row<'a> {
1309 pub fn owned(&self) -> OwnedRow {
1311 OwnedRow {
1312 data: self.data.into(),
1313 config: self.config.clone(),
1314 }
1315 }
1316
1317 pub fn data(&self) -> &'a [u8] {
1319 self.data
1320 }
1321}
1322
1323impl PartialEq for Row<'_> {
1326 #[inline]
1327 fn eq(&self, other: &Self) -> bool {
1328 self.data.eq(other.data)
1329 }
1330}
1331
1332impl Eq for Row<'_> {}
1333
1334impl PartialOrd for Row<'_> {
1335 #[inline]
1336 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1337 Some(self.cmp(other))
1338 }
1339}
1340
1341impl Ord for Row<'_> {
1342 #[inline]
1343 fn cmp(&self, other: &Self) -> Ordering {
1344 self.data.cmp(other.data)
1345 }
1346}
1347
1348impl Hash for Row<'_> {
1349 #[inline]
1350 fn hash<H: Hasher>(&self, state: &mut H) {
1351 self.data.hash(state)
1352 }
1353}
1354
1355impl AsRef<[u8]> for Row<'_> {
1356 #[inline]
1357 fn as_ref(&self) -> &[u8] {
1358 self.data
1359 }
1360}
1361
1362#[derive(Debug, Clone)]
1366pub struct OwnedRow {
1367 data: Box<[u8]>,
1368 config: RowConfig,
1369}
1370
1371impl OwnedRow {
1372 pub fn row(&self) -> Row<'_> {
1376 Row {
1377 data: &self.data,
1378 config: &self.config,
1379 }
1380 }
1381}
1382
1383impl PartialEq for OwnedRow {
1386 #[inline]
1387 fn eq(&self, other: &Self) -> bool {
1388 self.row().eq(&other.row())
1389 }
1390}
1391
1392impl Eq for OwnedRow {}
1393
1394impl PartialOrd for OwnedRow {
1395 #[inline]
1396 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1397 Some(self.cmp(other))
1398 }
1399}
1400
1401impl Ord for OwnedRow {
1402 #[inline]
1403 fn cmp(&self, other: &Self) -> Ordering {
1404 self.row().cmp(&other.row())
1405 }
1406}
1407
1408impl Hash for OwnedRow {
1409 #[inline]
1410 fn hash<H: Hasher>(&self, state: &mut H) {
1411 self.row().hash(state)
1412 }
1413}
1414
1415impl AsRef<[u8]> for OwnedRow {
1416 #[inline]
1417 fn as_ref(&self) -> &[u8] {
1418 &self.data
1419 }
1420}
1421
1422#[inline]
1424fn null_sentinel(options: SortOptions) -> u8 {
1425 match options.nulls_first {
1426 true => 0,
1427 false => 0xFF,
1428 }
1429}
1430
1431enum LengthTracker {
1433 Fixed { length: usize, num_rows: usize },
1435 Variable {
1437 fixed_length: usize,
1438 lengths: Vec<usize>,
1439 },
1440}
1441
1442impl LengthTracker {
1443 fn new(num_rows: usize) -> Self {
1444 Self::Fixed {
1445 length: 0,
1446 num_rows,
1447 }
1448 }
1449
1450 fn push_fixed(&mut self, new_length: usize) {
1452 match self {
1453 LengthTracker::Fixed { length, .. } => *length += new_length,
1454 LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1455 }
1456 }
1457
1458 fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1460 match self {
1461 LengthTracker::Fixed { length, .. } => {
1462 *self = LengthTracker::Variable {
1463 fixed_length: *length,
1464 lengths: new_lengths.collect(),
1465 }
1466 }
1467 LengthTracker::Variable { lengths, .. } => {
1468 assert_eq!(lengths.len(), new_lengths.len());
1469 lengths
1470 .iter_mut()
1471 .zip(new_lengths)
1472 .for_each(|(length, new_length)| *length += new_length);
1473 }
1474 }
1475 }
1476
1477 fn materialized(&mut self) -> &mut [usize] {
1479 if let LengthTracker::Fixed { length, num_rows } = *self {
1480 *self = LengthTracker::Variable {
1481 fixed_length: length,
1482 lengths: vec![0; num_rows],
1483 };
1484 }
1485
1486 match self {
1487 LengthTracker::Variable { lengths, .. } => lengths,
1488 LengthTracker::Fixed { .. } => unreachable!(),
1489 }
1490 }
1491
1492 fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1510 match self {
1511 LengthTracker::Fixed { length, num_rows } => {
1512 offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1513
1514 initial_offset + num_rows * length
1515 }
1516 LengthTracker::Variable {
1517 fixed_length,
1518 lengths,
1519 } => {
1520 let mut acc = initial_offset;
1521
1522 offsets.extend(lengths.iter().map(|length| {
1523 let current = acc;
1524 acc += length + fixed_length;
1525 current
1526 }));
1527
1528 acc
1529 }
1530 }
1531 }
1532}
1533
1534fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1536 use fixed::FixedLengthEncoding;
1537
1538 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1539 let mut tracker = LengthTracker::new(num_rows);
1540
1541 for (array, encoder) in cols.iter().zip(encoders) {
1542 match encoder {
1543 Encoder::Stateless => {
1544 downcast_primitive_array! {
1545 array => tracker.push_fixed(fixed::encoded_len(array)),
1546 DataType::Null => {},
1547 DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1548 DataType::Binary => tracker.push_variable(
1549 as_generic_binary_array::<i32>(array)
1550 .iter()
1551 .map(|slice| variable::encoded_len(slice))
1552 ),
1553 DataType::LargeBinary => tracker.push_variable(
1554 as_generic_binary_array::<i64>(array)
1555 .iter()
1556 .map(|slice| variable::encoded_len(slice))
1557 ),
1558 DataType::BinaryView => tracker.push_variable(
1559 array.as_binary_view()
1560 .iter()
1561 .map(|slice| variable::encoded_len(slice))
1562 ),
1563 DataType::Utf8 => tracker.push_variable(
1564 array.as_string::<i32>()
1565 .iter()
1566 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1567 ),
1568 DataType::LargeUtf8 => tracker.push_variable(
1569 array.as_string::<i64>()
1570 .iter()
1571 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1572 ),
1573 DataType::Utf8View => tracker.push_variable(
1574 array.as_string_view()
1575 .iter()
1576 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1577 ),
1578 DataType::FixedSizeBinary(len) => {
1579 let len = len.to_usize().unwrap();
1580 tracker.push_fixed(1 + len)
1581 }
1582 _ => unimplemented!("unsupported data type: {}", array.data_type()),
1583 }
1584 }
1585 Encoder::Dictionary(values, null) => {
1586 downcast_dictionary_array! {
1587 array => {
1588 tracker.push_variable(
1589 array.keys().iter().map(|v| match v {
1590 Some(k) => values.row(k.as_usize()).data.len(),
1591 None => null.data.len(),
1592 })
1593 )
1594 }
1595 _ => unreachable!(),
1596 }
1597 }
1598 Encoder::Struct(rows, null) => {
1599 let array = as_struct_array(array);
1600 tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1601 true => 1 + rows.row(idx).as_ref().len(),
1602 false => 1 + null.data.len(),
1603 }));
1604 }
1605 Encoder::List(rows) => match array.data_type() {
1606 DataType::List(_) => {
1607 list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1608 }
1609 DataType::LargeList(_) => {
1610 list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1611 }
1612 DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1613 &mut tracker,
1614 rows,
1615 as_fixed_size_list_array(array),
1616 ),
1617 _ => unreachable!(),
1618 },
1619 Encoder::RunEndEncoded(rows) => match array.data_type() {
1620 DataType::RunEndEncoded(r, _) => match r.data_type() {
1621 DataType::Int16 => run::compute_lengths(
1622 tracker.materialized(),
1623 rows,
1624 array.as_run::<Int16Type>(),
1625 ),
1626 DataType::Int32 => run::compute_lengths(
1627 tracker.materialized(),
1628 rows,
1629 array.as_run::<Int32Type>(),
1630 ),
1631 DataType::Int64 => run::compute_lengths(
1632 tracker.materialized(),
1633 rows,
1634 array.as_run::<Int64Type>(),
1635 ),
1636 _ => unreachable!("Unsupported run end index type: {r:?}"),
1637 },
1638 _ => unreachable!(),
1639 },
1640 Encoder::Union {
1641 child_rows,
1642 type_ids,
1643 offsets,
1644 } => {
1645 let union_array = array
1646 .as_any()
1647 .downcast_ref::<UnionArray>()
1648 .expect("expected UnionArray");
1649
1650 let lengths = (0..union_array.len()).map(|i| {
1651 let type_id = type_ids[i];
1652 let child_row_i = offsets.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1653 let child_row = child_rows[type_id as usize].row(child_row_i);
1654
1655 1 + child_row.as_ref().len()
1657 });
1658
1659 tracker.push_variable(lengths);
1660 }
1661 }
1662 }
1663
1664 tracker
1665}
1666
1667fn encode_column(
1669 data: &mut [u8],
1670 offsets: &mut [usize],
1671 column: &dyn Array,
1672 opts: SortOptions,
1673 encoder: &Encoder<'_>,
1674) {
1675 match encoder {
1676 Encoder::Stateless => {
1677 downcast_primitive_array! {
1678 column => {
1679 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1680 fixed::encode(data, offsets, column.values(), nulls, opts)
1681 } else {
1682 fixed::encode_not_null(data, offsets, column.values(), opts)
1683 }
1684 }
1685 DataType::Null => {}
1686 DataType::Boolean => {
1687 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1688 fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1689 } else {
1690 fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1691 }
1692 }
1693 DataType::Binary => {
1694 variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i32>(column), opts)
1695 }
1696 DataType::BinaryView => {
1697 variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1698 }
1699 DataType::LargeBinary => {
1700 variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i64>(column), opts)
1701 }
1702 DataType::Utf8 => variable::encode_generic_byte_array(
1703 data, offsets,
1704 column.as_string::<i32>(),
1705 opts,
1706 ),
1707 DataType::LargeUtf8 => variable::encode_generic_byte_array(
1708 data, offsets,
1709 column.as_string::<i64>(),
1710 opts,
1711 ),
1712 DataType::Utf8View => variable::encode(
1713 data, offsets,
1714 column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1715 opts,
1716 ),
1717 DataType::FixedSizeBinary(_) => {
1718 let array = column.as_any().downcast_ref().unwrap();
1719 fixed::encode_fixed_size_binary(data, offsets, array, opts)
1720 }
1721 _ => unimplemented!("unsupported data type: {}", column.data_type()),
1722 }
1723 }
1724 Encoder::Dictionary(values, nulls) => {
1725 downcast_dictionary_array! {
1726 column => encode_dictionary_values(data, offsets, column, values, nulls),
1727 _ => unreachable!()
1728 }
1729 }
1730 Encoder::Struct(rows, null) => {
1731 let array = as_struct_array(column);
1732 let null_sentinel = null_sentinel(opts);
1733 offsets
1734 .iter_mut()
1735 .skip(1)
1736 .enumerate()
1737 .for_each(|(idx, offset)| {
1738 let (row, sentinel) = match array.is_valid(idx) {
1739 true => (rows.row(idx), 0x01),
1740 false => (*null, null_sentinel),
1741 };
1742 let end_offset = *offset + 1 + row.as_ref().len();
1743 data[*offset] = sentinel;
1744 data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1745 *offset = end_offset;
1746 })
1747 }
1748 Encoder::List(rows) => match column.data_type() {
1749 DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1750 DataType::LargeList(_) => {
1751 list::encode(data, offsets, rows, opts, as_large_list_array(column))
1752 }
1753 DataType::FixedSizeList(_, _) => {
1754 encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1755 }
1756 _ => unreachable!(),
1757 },
1758 Encoder::RunEndEncoded(rows) => match column.data_type() {
1759 DataType::RunEndEncoded(r, _) => match r.data_type() {
1760 DataType::Int16 => {
1761 run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1762 }
1763 DataType::Int32 => {
1764 run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1765 }
1766 DataType::Int64 => {
1767 run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1768 }
1769 _ => unreachable!("Unsupported run end index type: {r:?}"),
1770 },
1771 _ => unreachable!(),
1772 },
1773 Encoder::Union {
1774 child_rows,
1775 type_ids,
1776 offsets: offsets_buf,
1777 } => {
1778 offsets
1779 .iter_mut()
1780 .skip(1)
1781 .enumerate()
1782 .for_each(|(i, offset)| {
1783 let type_id = type_ids[i];
1784
1785 let child_row_idx = offsets_buf.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1786 let child_row = child_rows[type_id as usize].row(child_row_idx);
1787 let child_bytes = child_row.as_ref();
1788
1789 let type_id_byte = if opts.descending {
1790 !(type_id as u8)
1791 } else {
1792 type_id as u8
1793 };
1794 data[*offset] = type_id_byte;
1795
1796 let child_start = *offset + 1;
1797 let child_end = child_start + child_bytes.len();
1798 data[child_start..child_end].copy_from_slice(child_bytes);
1799
1800 *offset = child_end;
1801 });
1802 }
1803 }
1804}
1805
1806pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1808 data: &mut [u8],
1809 offsets: &mut [usize],
1810 column: &DictionaryArray<K>,
1811 values: &Rows,
1812 null: &Row<'_>,
1813) {
1814 for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1815 let row = match k {
1816 Some(k) => values.row(k.as_usize()).data,
1817 None => null.data,
1818 };
1819 let end_offset = *offset + row.len();
1820 data[*offset..end_offset].copy_from_slice(row);
1821 *offset = end_offset;
1822 }
1823}
1824
1825macro_rules! decode_primitive_helper {
1826 ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1827 Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1828 };
1829}
1830
1831unsafe fn decode_column(
1837 field: &SortField,
1838 rows: &mut [&[u8]],
1839 codec: &Codec,
1840 validate_utf8: bool,
1841) -> Result<ArrayRef, ArrowError> {
1842 let options = field.options;
1843
1844 let array: ArrayRef = match codec {
1845 Codec::Stateless => {
1846 let data_type = field.data_type.clone();
1847 downcast_primitive! {
1848 data_type => (decode_primitive_helper, rows, data_type, options),
1849 DataType::Null => Arc::new(NullArray::new(rows.len())),
1850 DataType::Boolean => Arc::new(decode_bool(rows, options)),
1851 DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1852 DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1853 DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1854 DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1855 DataType::Utf8 => Arc::new(unsafe{ decode_string::<i32>(rows, options, validate_utf8) }),
1856 DataType::LargeUtf8 => Arc::new(unsafe { decode_string::<i64>(rows, options, validate_utf8) }),
1857 DataType::Utf8View => Arc::new(unsafe { decode_string_view(rows, options, validate_utf8) }),
1858 _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1859 }
1860 }
1861 Codec::Dictionary(converter, _) => {
1862 let cols = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1863 cols.into_iter().next().unwrap()
1864 }
1865 Codec::Struct(converter, _) => {
1866 let (null_count, nulls) = fixed::decode_nulls(rows);
1867 rows.iter_mut().for_each(|row| *row = &row[1..]);
1868 let children = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1869
1870 let child_data: Vec<ArrayData> = children.iter().map(|c| c.to_data()).collect();
1871 let corrected_fields: Vec<Field> = match &field.data_type {
1874 DataType::Struct(struct_fields) => struct_fields
1875 .iter()
1876 .zip(child_data.iter())
1877 .map(|(orig_field, child_array)| {
1878 orig_field
1879 .as_ref()
1880 .clone()
1881 .with_data_type(child_array.data_type().clone())
1882 })
1883 .collect(),
1884 _ => unreachable!("Only Struct types should be corrected here"),
1885 };
1886 let corrected_struct_type = DataType::Struct(corrected_fields.into());
1887 let builder = ArrayDataBuilder::new(corrected_struct_type)
1888 .len(rows.len())
1889 .null_count(null_count)
1890 .null_bit_buffer(Some(nulls))
1891 .child_data(child_data);
1892
1893 Arc::new(StructArray::from(unsafe { builder.build_unchecked() }))
1894 }
1895 Codec::List(converter) => match &field.data_type {
1896 DataType::List(_) => {
1897 Arc::new(unsafe { list::decode::<i32>(converter, rows, field, validate_utf8) }?)
1898 }
1899 DataType::LargeList(_) => {
1900 Arc::new(unsafe { list::decode::<i64>(converter, rows, field, validate_utf8) }?)
1901 }
1902 DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
1903 list::decode_fixed_size_list(
1904 converter,
1905 rows,
1906 field,
1907 validate_utf8,
1908 value_length.as_usize(),
1909 )
1910 }?),
1911 _ => unreachable!(),
1912 },
1913 Codec::RunEndEncoded(converter) => match &field.data_type {
1914 DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1915 DataType::Int16 => Arc::new(unsafe {
1916 run::decode::<Int16Type>(converter, rows, field, validate_utf8)
1917 }?),
1918 DataType::Int32 => Arc::new(unsafe {
1919 run::decode::<Int32Type>(converter, rows, field, validate_utf8)
1920 }?),
1921 DataType::Int64 => Arc::new(unsafe {
1922 run::decode::<Int64Type>(converter, rows, field, validate_utf8)
1923 }?),
1924 _ => unreachable!(),
1925 },
1926 _ => unreachable!(),
1927 },
1928 Codec::Union(converters, null_rows) => {
1929 let len = rows.len();
1930
1931 let DataType::Union(union_fields, mode) = &field.data_type else {
1932 unreachable!()
1933 };
1934
1935 let mut type_ids = Vec::with_capacity(len);
1936 let mut rows_by_field: Vec<Vec<(usize, &[u8])>> = vec![Vec::new(); converters.len()];
1937
1938 for (idx, row) in rows.iter_mut().enumerate() {
1939 let type_id_byte = {
1940 let id = row[0];
1941 if options.descending { !id } else { id }
1942 };
1943
1944 let type_id = type_id_byte as i8;
1945 type_ids.push(type_id);
1946
1947 let field_idx = type_id as usize;
1948
1949 let child_row = &row[1..];
1950 rows_by_field[field_idx].push((idx, child_row));
1951 }
1952
1953 let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(converters.len());
1954 let mut offsets = (*mode == UnionMode::Dense).then(|| Vec::with_capacity(len));
1955
1956 for (field_idx, converter) in converters.iter().enumerate() {
1957 let field_rows = &rows_by_field[field_idx];
1958
1959 match &mode {
1960 UnionMode::Dense => {
1961 if field_rows.is_empty() {
1962 let (_, field) = union_fields.iter().nth(field_idx).unwrap();
1963 child_arrays.push(arrow_array::new_empty_array(field.data_type()));
1964 continue;
1965 }
1966
1967 let mut child_data = field_rows
1968 .iter()
1969 .map(|(_, bytes)| *bytes)
1970 .collect::<Vec<_>>();
1971
1972 let child_array =
1973 unsafe { converter.convert_raw(&mut child_data, validate_utf8) }?;
1974
1975 for ((row_idx, original_bytes), remaining_bytes) in
1977 field_rows.iter().zip(child_data)
1978 {
1979 let consumed_length = 1 + original_bytes.len() - remaining_bytes.len();
1980 rows[*row_idx] = &rows[*row_idx][consumed_length..];
1981 }
1982
1983 child_arrays.push(child_array.into_iter().next().unwrap());
1984 }
1985 UnionMode::Sparse => {
1986 let mut sparse_data: Vec<&[u8]> = Vec::with_capacity(len);
1987 let mut field_row_iter = field_rows.iter().peekable();
1988 let null_row_bytes: &[u8] = &null_rows[field_idx].data;
1989
1990 for idx in 0..len {
1991 if let Some((next_idx, bytes)) = field_row_iter.peek() {
1992 if *next_idx == idx {
1993 sparse_data.push(*bytes);
1994
1995 field_row_iter.next();
1996 continue;
1997 }
1998 }
1999 sparse_data.push(null_row_bytes);
2000 }
2001
2002 let child_array =
2003 unsafe { converter.convert_raw(&mut sparse_data, validate_utf8) }?;
2004
2005 for (row_idx, child_row) in field_rows.iter() {
2007 let remaining_len = sparse_data[*row_idx].len();
2008 let consumed_length = 1 + child_row.len() - remaining_len;
2009 rows[*row_idx] = &rows[*row_idx][consumed_length..];
2010 }
2011
2012 child_arrays.push(child_array.into_iter().next().unwrap());
2013 }
2014 }
2015 }
2016
2017 if let Some(ref mut offsets_vec) = offsets {
2019 let mut count = vec![0i32; converters.len()];
2020 for type_id in &type_ids {
2021 let field_idx = *type_id as usize;
2022 offsets_vec.push(count[field_idx]);
2023
2024 count[field_idx] += 1;
2025 }
2026 }
2027
2028 let type_ids_buffer = ScalarBuffer::from(type_ids);
2029 let offsets_buffer = offsets.map(ScalarBuffer::from);
2030
2031 let union_array = UnionArray::try_new(
2032 union_fields.clone(),
2033 type_ids_buffer,
2034 offsets_buffer,
2035 child_arrays,
2036 )?;
2037
2038 Arc::new(union_array)
2041 }
2042 };
2043 Ok(array)
2044}
2045
2046#[cfg(test)]
2047mod tests {
2048 use rand::distr::uniform::SampleUniform;
2049 use rand::distr::{Distribution, StandardUniform};
2050 use rand::{Rng, rng};
2051
2052 use arrow_array::builder::*;
2053 use arrow_array::types::*;
2054 use arrow_array::*;
2055 use arrow_buffer::{Buffer, OffsetBuffer};
2056 use arrow_buffer::{NullBuffer, i256};
2057 use arrow_cast::display::{ArrayFormatter, FormatOptions};
2058 use arrow_ord::sort::{LexicographicalComparator, SortColumn};
2059
2060 use super::*;
2061
2062 #[test]
2063 fn test_fixed_width() {
2064 let cols = [
2065 Arc::new(Int16Array::from_iter([
2066 Some(1),
2067 Some(2),
2068 None,
2069 Some(-5),
2070 Some(2),
2071 Some(2),
2072 Some(0),
2073 ])) as ArrayRef,
2074 Arc::new(Float32Array::from_iter([
2075 Some(1.3),
2076 Some(2.5),
2077 None,
2078 Some(4.),
2079 Some(0.1),
2080 Some(-4.),
2081 Some(-0.),
2082 ])) as ArrayRef,
2083 ];
2084
2085 let converter = RowConverter::new(vec![
2086 SortField::new(DataType::Int16),
2087 SortField::new(DataType::Float32),
2088 ])
2089 .unwrap();
2090 let rows = converter.convert_columns(&cols).unwrap();
2091
2092 assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
2093 assert_eq!(
2094 rows.buffer,
2095 &[
2096 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
2111 );
2112
2113 assert!(rows.row(3) < rows.row(6));
2114 assert!(rows.row(0) < rows.row(1));
2115 assert!(rows.row(3) < rows.row(0));
2116 assert!(rows.row(4) < rows.row(1));
2117 assert!(rows.row(5) < rows.row(4));
2118
2119 let back = converter.convert_rows(&rows).unwrap();
2120 for (expected, actual) in cols.iter().zip(&back) {
2121 assert_eq!(expected, actual);
2122 }
2123 }
2124
2125 #[test]
2126 fn test_decimal32() {
2127 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal32(
2128 DECIMAL32_MAX_PRECISION,
2129 7,
2130 ))])
2131 .unwrap();
2132 let col = Arc::new(
2133 Decimal32Array::from_iter([
2134 None,
2135 Some(i32::MIN),
2136 Some(-13),
2137 Some(46_i32),
2138 Some(5456_i32),
2139 Some(i32::MAX),
2140 ])
2141 .with_precision_and_scale(9, 7)
2142 .unwrap(),
2143 ) as ArrayRef;
2144
2145 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2146 for i in 0..rows.num_rows() - 1 {
2147 assert!(rows.row(i) < rows.row(i + 1));
2148 }
2149
2150 let back = converter.convert_rows(&rows).unwrap();
2151 assert_eq!(back.len(), 1);
2152 assert_eq!(col.as_ref(), back[0].as_ref())
2153 }
2154
2155 #[test]
2156 fn test_decimal64() {
2157 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal64(
2158 DECIMAL64_MAX_PRECISION,
2159 7,
2160 ))])
2161 .unwrap();
2162 let col = Arc::new(
2163 Decimal64Array::from_iter([
2164 None,
2165 Some(i64::MIN),
2166 Some(-13),
2167 Some(46_i64),
2168 Some(5456_i64),
2169 Some(i64::MAX),
2170 ])
2171 .with_precision_and_scale(18, 7)
2172 .unwrap(),
2173 ) as ArrayRef;
2174
2175 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2176 for i in 0..rows.num_rows() - 1 {
2177 assert!(rows.row(i) < rows.row(i + 1));
2178 }
2179
2180 let back = converter.convert_rows(&rows).unwrap();
2181 assert_eq!(back.len(), 1);
2182 assert_eq!(col.as_ref(), back[0].as_ref())
2183 }
2184
2185 #[test]
2186 fn test_decimal128() {
2187 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
2188 DECIMAL128_MAX_PRECISION,
2189 7,
2190 ))])
2191 .unwrap();
2192 let col = Arc::new(
2193 Decimal128Array::from_iter([
2194 None,
2195 Some(i128::MIN),
2196 Some(-13),
2197 Some(46_i128),
2198 Some(5456_i128),
2199 Some(i128::MAX),
2200 ])
2201 .with_precision_and_scale(38, 7)
2202 .unwrap(),
2203 ) as ArrayRef;
2204
2205 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2206 for i in 0..rows.num_rows() - 1 {
2207 assert!(rows.row(i) < rows.row(i + 1));
2208 }
2209
2210 let back = converter.convert_rows(&rows).unwrap();
2211 assert_eq!(back.len(), 1);
2212 assert_eq!(col.as_ref(), back[0].as_ref())
2213 }
2214
2215 #[test]
2216 fn test_decimal256() {
2217 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
2218 DECIMAL256_MAX_PRECISION,
2219 7,
2220 ))])
2221 .unwrap();
2222 let col = Arc::new(
2223 Decimal256Array::from_iter([
2224 None,
2225 Some(i256::MIN),
2226 Some(i256::from_parts(0, -1)),
2227 Some(i256::from_parts(u128::MAX, -1)),
2228 Some(i256::from_parts(u128::MAX, 0)),
2229 Some(i256::from_parts(0, 46_i128)),
2230 Some(i256::from_parts(5, 46_i128)),
2231 Some(i256::MAX),
2232 ])
2233 .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
2234 .unwrap(),
2235 ) as ArrayRef;
2236
2237 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2238 for i in 0..rows.num_rows() - 1 {
2239 assert!(rows.row(i) < rows.row(i + 1));
2240 }
2241
2242 let back = converter.convert_rows(&rows).unwrap();
2243 assert_eq!(back.len(), 1);
2244 assert_eq!(col.as_ref(), back[0].as_ref())
2245 }
2246
2247 #[test]
2248 fn test_bool() {
2249 let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
2250
2251 let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
2252
2253 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2254 assert!(rows.row(2) > rows.row(1));
2255 assert!(rows.row(2) > rows.row(0));
2256 assert!(rows.row(1) > rows.row(0));
2257
2258 let cols = converter.convert_rows(&rows).unwrap();
2259 assert_eq!(&cols[0], &col);
2260
2261 let converter = RowConverter::new(vec![SortField::new_with_options(
2262 DataType::Boolean,
2263 SortOptions::default().desc().with_nulls_first(false),
2264 )])
2265 .unwrap();
2266
2267 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2268 assert!(rows.row(2) < rows.row(1));
2269 assert!(rows.row(2) < rows.row(0));
2270 assert!(rows.row(1) < rows.row(0));
2271 let cols = converter.convert_rows(&rows).unwrap();
2272 assert_eq!(&cols[0], &col);
2273 }
2274
2275 #[test]
2276 fn test_timezone() {
2277 let a =
2278 TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
2279 let d = a.data_type().clone();
2280
2281 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2282 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2283 let back = converter.convert_rows(&rows).unwrap();
2284 assert_eq!(back.len(), 1);
2285 assert_eq!(back[0].data_type(), &d);
2286
2287 let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
2289 a.append(34).unwrap();
2290 a.append_null();
2291 a.append(345).unwrap();
2292
2293 let dict = a.finish();
2295 let values = TimestampNanosecondArray::from(dict.values().to_data());
2296 let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
2297 let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
2298 let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
2299
2300 assert_eq!(dict_with_tz.data_type(), &d);
2301 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2302 let rows = converter
2303 .convert_columns(&[Arc::new(dict_with_tz) as _])
2304 .unwrap();
2305 let back = converter.convert_rows(&rows).unwrap();
2306 assert_eq!(back.len(), 1);
2307 assert_eq!(back[0].data_type(), &v);
2308 }
2309
2310 #[test]
2311 fn test_null_encoding() {
2312 let col = Arc::new(NullArray::new(10));
2313 let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
2314 let rows = converter.convert_columns(&[col]).unwrap();
2315 assert_eq!(rows.num_rows(), 10);
2316 assert_eq!(rows.row(1).data.len(), 0);
2317 }
2318
2319 #[test]
2320 fn test_variable_width() {
2321 let col = Arc::new(StringArray::from_iter([
2322 Some("hello"),
2323 Some("he"),
2324 None,
2325 Some("foo"),
2326 Some(""),
2327 ])) as ArrayRef;
2328
2329 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2330 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2331
2332 assert!(rows.row(1) < rows.row(0));
2333 assert!(rows.row(2) < rows.row(4));
2334 assert!(rows.row(3) < rows.row(0));
2335 assert!(rows.row(3) < rows.row(1));
2336
2337 let cols = converter.convert_rows(&rows).unwrap();
2338 assert_eq!(&cols[0], &col);
2339
2340 let col = Arc::new(BinaryArray::from_iter([
2341 None,
2342 Some(vec![0_u8; 0]),
2343 Some(vec![0_u8; 6]),
2344 Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
2345 Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
2346 Some(vec![0_u8; variable::BLOCK_SIZE]),
2347 Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
2348 Some(vec![1_u8; 6]),
2349 Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
2350 Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
2351 Some(vec![1_u8; variable::BLOCK_SIZE]),
2352 Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
2353 Some(vec![0xFF_u8; 6]),
2354 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
2355 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
2356 Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
2357 Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
2358 ])) as ArrayRef;
2359
2360 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2361 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2362
2363 for i in 0..rows.num_rows() {
2364 for j in i + 1..rows.num_rows() {
2365 assert!(
2366 rows.row(i) < rows.row(j),
2367 "{} < {} - {:?} < {:?}",
2368 i,
2369 j,
2370 rows.row(i),
2371 rows.row(j)
2372 );
2373 }
2374 }
2375
2376 let cols = converter.convert_rows(&rows).unwrap();
2377 assert_eq!(&cols[0], &col);
2378
2379 let converter = RowConverter::new(vec![SortField::new_with_options(
2380 DataType::Binary,
2381 SortOptions::default().desc().with_nulls_first(false),
2382 )])
2383 .unwrap();
2384 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2385
2386 for i in 0..rows.num_rows() {
2387 for j in i + 1..rows.num_rows() {
2388 assert!(
2389 rows.row(i) > rows.row(j),
2390 "{} > {} - {:?} > {:?}",
2391 i,
2392 j,
2393 rows.row(i),
2394 rows.row(j)
2395 );
2396 }
2397 }
2398
2399 let cols = converter.convert_rows(&rows).unwrap();
2400 assert_eq!(&cols[0], &col);
2401 }
2402
2403 fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2405 match b.data_type() {
2406 DataType::Dictionary(_, v) => {
2407 assert_eq!(a.data_type(), v.as_ref());
2408 let b = arrow_cast::cast(b, v).unwrap();
2409 assert_eq!(a, b.as_ref())
2410 }
2411 _ => assert_eq!(a, b),
2412 }
2413 }
2414
2415 #[test]
2416 fn test_string_dictionary() {
2417 let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2418 Some("foo"),
2419 Some("hello"),
2420 Some("he"),
2421 None,
2422 Some("hello"),
2423 Some(""),
2424 Some("hello"),
2425 Some("hello"),
2426 ])) as ArrayRef;
2427
2428 let field = SortField::new(a.data_type().clone());
2429 let converter = RowConverter::new(vec![field]).unwrap();
2430 let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2431
2432 assert!(rows_a.row(3) < rows_a.row(5));
2433 assert!(rows_a.row(2) < rows_a.row(1));
2434 assert!(rows_a.row(0) < rows_a.row(1));
2435 assert!(rows_a.row(3) < rows_a.row(0));
2436
2437 assert_eq!(rows_a.row(1), rows_a.row(4));
2438 assert_eq!(rows_a.row(1), rows_a.row(6));
2439 assert_eq!(rows_a.row(1), rows_a.row(7));
2440
2441 let cols = converter.convert_rows(&rows_a).unwrap();
2442 dictionary_eq(&cols[0], &a);
2443
2444 let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2445 Some("hello"),
2446 None,
2447 Some("cupcakes"),
2448 ])) as ArrayRef;
2449
2450 let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2451 assert_eq!(rows_a.row(1), rows_b.row(0));
2452 assert_eq!(rows_a.row(3), rows_b.row(1));
2453 assert!(rows_b.row(2) < rows_a.row(0));
2454
2455 let cols = converter.convert_rows(&rows_b).unwrap();
2456 dictionary_eq(&cols[0], &b);
2457
2458 let converter = RowConverter::new(vec![SortField::new_with_options(
2459 a.data_type().clone(),
2460 SortOptions::default().desc().with_nulls_first(false),
2461 )])
2462 .unwrap();
2463
2464 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2465 assert!(rows_c.row(3) > rows_c.row(5));
2466 assert!(rows_c.row(2) > rows_c.row(1));
2467 assert!(rows_c.row(0) > rows_c.row(1));
2468 assert!(rows_c.row(3) > rows_c.row(0));
2469
2470 let cols = converter.convert_rows(&rows_c).unwrap();
2471 dictionary_eq(&cols[0], &a);
2472
2473 let converter = RowConverter::new(vec![SortField::new_with_options(
2474 a.data_type().clone(),
2475 SortOptions::default().desc().with_nulls_first(true),
2476 )])
2477 .unwrap();
2478
2479 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2480 assert!(rows_c.row(3) < rows_c.row(5));
2481 assert!(rows_c.row(2) > rows_c.row(1));
2482 assert!(rows_c.row(0) > rows_c.row(1));
2483 assert!(rows_c.row(3) < rows_c.row(0));
2484
2485 let cols = converter.convert_rows(&rows_c).unwrap();
2486 dictionary_eq(&cols[0], &a);
2487 }
2488
2489 #[test]
2490 fn test_struct() {
2491 let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2493 let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2494 let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2495 let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2496 let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2497
2498 let sort_fields = vec![SortField::new(s1.data_type().clone())];
2499 let converter = RowConverter::new(sort_fields).unwrap();
2500 let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2501
2502 for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2503 assert!(a < b);
2504 }
2505
2506 let back = converter.convert_rows(&r1).unwrap();
2507 assert_eq!(back.len(), 1);
2508 assert_eq!(&back[0], &s1);
2509
2510 let data = s1
2512 .to_data()
2513 .into_builder()
2514 .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2515 .null_count(2)
2516 .build()
2517 .unwrap();
2518
2519 let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2520 let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2521 assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1)); let back = converter.convert_rows(&r2).unwrap();
2527 assert_eq!(back.len(), 1);
2528 assert_eq!(&back[0], &s2);
2529
2530 back[0].to_data().validate_full().unwrap();
2531 }
2532
2533 #[test]
2534 fn test_dictionary_in_struct() {
2535 let builder = StringDictionaryBuilder::<Int32Type>::new();
2536 let mut struct_builder = StructBuilder::new(
2537 vec![Field::new_dictionary(
2538 "foo",
2539 DataType::Int32,
2540 DataType::Utf8,
2541 true,
2542 )],
2543 vec![Box::new(builder)],
2544 );
2545
2546 let dict_builder = struct_builder
2547 .field_builder::<StringDictionaryBuilder<Int32Type>>(0)
2548 .unwrap();
2549
2550 dict_builder.append_value("a");
2552 dict_builder.append_null();
2553 dict_builder.append_value("a");
2554 dict_builder.append_value("b");
2555
2556 for _ in 0..4 {
2557 struct_builder.append(true);
2558 }
2559
2560 let s = Arc::new(struct_builder.finish()) as ArrayRef;
2561 let sort_fields = vec![SortField::new(s.data_type().clone())];
2562 let converter = RowConverter::new(sort_fields).unwrap();
2563 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2564
2565 let back = converter.convert_rows(&r).unwrap();
2566 let [s2] = back.try_into().unwrap();
2567
2568 assert_ne!(&s.data_type(), &s2.data_type());
2571 s2.to_data().validate_full().unwrap();
2572
2573 let s1_struct = s.as_struct();
2577 let s1_0 = s1_struct.column(0);
2578 let s1_idx_0 = s1_0.as_dictionary::<Int32Type>();
2579 let keys = s1_idx_0.keys();
2580 let values = s1_idx_0.values().as_string::<i32>();
2581 let s2_struct = s2.as_struct();
2583 let s2_0 = s2_struct.column(0);
2584 let s2_idx_0 = s2_0.as_string::<i32>();
2585
2586 for i in 0..keys.len() {
2587 if keys.is_null(i) {
2588 assert!(s2_idx_0.is_null(i));
2589 } else {
2590 let dict_index = keys.value(i) as usize;
2591 assert_eq!(values.value(dict_index), s2_idx_0.value(i));
2592 }
2593 }
2594 }
2595
2596 #[test]
2597 fn test_dictionary_in_struct_empty() {
2598 let ty = DataType::Struct(
2599 vec![Field::new_dictionary(
2600 "foo",
2601 DataType::Int32,
2602 DataType::Int32,
2603 false,
2604 )]
2605 .into(),
2606 );
2607 let s = arrow_array::new_empty_array(&ty);
2608
2609 let sort_fields = vec![SortField::new(s.data_type().clone())];
2610 let converter = RowConverter::new(sort_fields).unwrap();
2611 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2612
2613 let back = converter.convert_rows(&r).unwrap();
2614 let [s2] = back.try_into().unwrap();
2615
2616 assert_ne!(&s.data_type(), &s2.data_type());
2619 s2.to_data().validate_full().unwrap();
2620 assert_eq!(s.len(), 0);
2621 assert_eq!(s2.len(), 0);
2622 }
2623
2624 #[test]
2625 fn test_list_of_string_dictionary() {
2626 let mut builder = ListBuilder::<StringDictionaryBuilder<Int32Type>>::default();
2627 builder.values().append("a").unwrap();
2629 builder.values().append("b").unwrap();
2630 builder.values().append("zero").unwrap();
2631 builder.values().append_null();
2632 builder.values().append("c").unwrap();
2633 builder.values().append("b").unwrap();
2634 builder.values().append("d").unwrap();
2635 builder.append(true);
2636 builder.append(false);
2638 builder.values().append("e").unwrap();
2640 builder.values().append("zero").unwrap();
2641 builder.values().append("a").unwrap();
2642 builder.append(true);
2643
2644 let a = Arc::new(builder.finish()) as ArrayRef;
2645 let data_type = a.data_type().clone();
2646
2647 let field = SortField::new(data_type.clone());
2648 let converter = RowConverter::new(vec![field]).unwrap();
2649 let rows = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2650
2651 let back = converter.convert_rows(&rows).unwrap();
2652 assert_eq!(back.len(), 1);
2653 let [a2] = back.try_into().unwrap();
2654
2655 assert_ne!(&a.data_type(), &a2.data_type());
2658
2659 a2.to_data().validate_full().unwrap();
2660
2661 let a2_list = a2.as_list::<i32>();
2662 let a1_list = a.as_list::<i32>();
2663
2664 let a1_0 = a1_list.value(0);
2667 let a1_idx_0 = a1_0.as_dictionary::<Int32Type>();
2668 let keys = a1_idx_0.keys();
2669 let values = a1_idx_0.values().as_string::<i32>();
2670 let a2_0 = a2_list.value(0);
2671 let a2_idx_0 = a2_0.as_string::<i32>();
2672
2673 for i in 0..keys.len() {
2674 if keys.is_null(i) {
2675 assert!(a2_idx_0.is_null(i));
2676 } else {
2677 let dict_index = keys.value(i) as usize;
2678 assert_eq!(values.value(dict_index), a2_idx_0.value(i));
2679 }
2680 }
2681
2682 assert!(a1_list.is_null(1));
2684 assert!(a2_list.is_null(1));
2685
2686 let a1_2 = a1_list.value(2);
2688 let a1_idx_2 = a1_2.as_dictionary::<Int32Type>();
2689 let keys = a1_idx_2.keys();
2690 let values = a1_idx_2.values().as_string::<i32>();
2691 let a2_2 = a2_list.value(2);
2692 let a2_idx_2 = a2_2.as_string::<i32>();
2693
2694 for i in 0..keys.len() {
2695 if keys.is_null(i) {
2696 assert!(a2_idx_2.is_null(i));
2697 } else {
2698 let dict_index = keys.value(i) as usize;
2699 assert_eq!(values.value(dict_index), a2_idx_2.value(i));
2700 }
2701 }
2702 }
2703
2704 #[test]
2705 fn test_primitive_dictionary() {
2706 let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2707 builder.append(2).unwrap();
2708 builder.append(3).unwrap();
2709 builder.append(0).unwrap();
2710 builder.append_null();
2711 builder.append(5).unwrap();
2712 builder.append(3).unwrap();
2713 builder.append(-1).unwrap();
2714
2715 let a = builder.finish();
2716 let data_type = a.data_type().clone();
2717 let columns = [Arc::new(a) as ArrayRef];
2718
2719 let field = SortField::new(data_type.clone());
2720 let converter = RowConverter::new(vec![field]).unwrap();
2721 let rows = converter.convert_columns(&columns).unwrap();
2722 assert!(rows.row(0) < rows.row(1));
2723 assert!(rows.row(2) < rows.row(0));
2724 assert!(rows.row(3) < rows.row(2));
2725 assert!(rows.row(6) < rows.row(2));
2726 assert!(rows.row(3) < rows.row(6));
2727
2728 let back = converter.convert_rows(&rows).unwrap();
2729 assert_eq!(back.len(), 1);
2730 back[0].to_data().validate_full().unwrap();
2731 }
2732
2733 #[test]
2734 fn test_dictionary_nulls() {
2735 let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2736 let keys =
2737 Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2738
2739 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2740 let data = keys
2741 .into_builder()
2742 .data_type(data_type.clone())
2743 .child_data(vec![values])
2744 .build()
2745 .unwrap();
2746
2747 let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2748 let field = SortField::new(data_type.clone());
2749 let converter = RowConverter::new(vec![field]).unwrap();
2750 let rows = converter.convert_columns(&columns).unwrap();
2751
2752 assert_eq!(rows.row(0), rows.row(1));
2753 assert_eq!(rows.row(3), rows.row(4));
2754 assert_eq!(rows.row(4), rows.row(5));
2755 assert!(rows.row(3) < rows.row(0));
2756 }
2757
2758 #[test]
2759 fn test_from_binary_shared_buffer() {
2760 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2761 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2762 let rows = converter.convert_columns(&[array]).unwrap();
2763 let binary_rows = rows.try_into_binary().expect("known-small rows");
2764 let _binary_rows_shared_buffer = binary_rows.clone();
2765
2766 let parsed = converter.from_binary(binary_rows);
2767
2768 converter.convert_rows(parsed.iter()).unwrap();
2769 }
2770
2771 #[test]
2772 #[should_panic(expected = "Encountered non UTF-8 data")]
2773 fn test_invalid_utf8() {
2774 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2775 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2776 let rows = converter.convert_columns(&[array]).unwrap();
2777 let binary_row = rows.row(0);
2778
2779 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2780 let parser = converter.parser();
2781 let utf8_row = parser.parse(binary_row.as_ref());
2782
2783 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2784 }
2785
2786 #[test]
2787 #[should_panic(expected = "Encountered non UTF-8 data")]
2788 fn test_invalid_utf8_array() {
2789 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2790 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2791 let rows = converter.convert_columns(&[array]).unwrap();
2792 let binary_rows = rows.try_into_binary().expect("known-small rows");
2793
2794 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2795 let parsed = converter.from_binary(binary_rows);
2796
2797 converter.convert_rows(parsed.iter()).unwrap();
2798 }
2799
2800 #[test]
2801 #[should_panic(expected = "index out of bounds")]
2802 fn test_invalid_empty() {
2803 let binary_row: &[u8] = &[];
2804
2805 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2806 let parser = converter.parser();
2807 let utf8_row = parser.parse(binary_row.as_ref());
2808
2809 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2810 }
2811
2812 #[test]
2813 #[should_panic(expected = "index out of bounds")]
2814 fn test_invalid_empty_array() {
2815 let row: &[u8] = &[];
2816 let binary_rows = BinaryArray::from(vec![row]);
2817
2818 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2819 let parsed = converter.from_binary(binary_rows);
2820
2821 converter.convert_rows(parsed.iter()).unwrap();
2822 }
2823
2824 #[test]
2825 #[should_panic(expected = "index out of bounds")]
2826 fn test_invalid_truncated() {
2827 let binary_row: &[u8] = &[0x02];
2828
2829 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2830 let parser = converter.parser();
2831 let utf8_row = parser.parse(binary_row.as_ref());
2832
2833 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2834 }
2835
2836 #[test]
2837 #[should_panic(expected = "index out of bounds")]
2838 fn test_invalid_truncated_array() {
2839 let row: &[u8] = &[0x02];
2840 let binary_rows = BinaryArray::from(vec![row]);
2841
2842 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2843 let parsed = converter.from_binary(binary_rows);
2844
2845 converter.convert_rows(parsed.iter()).unwrap();
2846 }
2847
2848 #[test]
2849 #[should_panic(expected = "rows were not produced by this RowConverter")]
2850 fn test_different_converter() {
2851 let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2852 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2853 let rows = converter.convert_columns(&[values]).unwrap();
2854
2855 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2856 let _ = converter.convert_rows(&rows);
2857 }
2858
2859 fn test_single_list<O: OffsetSizeTrait>() {
2860 let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2861 builder.values().append_value(32);
2862 builder.values().append_value(52);
2863 builder.values().append_value(32);
2864 builder.append(true);
2865 builder.values().append_value(32);
2866 builder.values().append_value(52);
2867 builder.values().append_value(12);
2868 builder.append(true);
2869 builder.values().append_value(32);
2870 builder.values().append_value(52);
2871 builder.append(true);
2872 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
2875 builder.values().append_value(32);
2876 builder.values().append_null();
2877 builder.append(true);
2878 builder.append(true);
2879 builder.values().append_value(17); builder.values().append_null(); builder.append(false);
2882
2883 let list = Arc::new(builder.finish()) as ArrayRef;
2884 let d = list.data_type().clone();
2885
2886 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2887
2888 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2889 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2898 assert_eq!(back.len(), 1);
2899 back[0].to_data().validate_full().unwrap();
2900 assert_eq!(&back[0], &list);
2901
2902 let options = SortOptions::default().asc().with_nulls_first(false);
2903 let field = SortField::new_with_options(d.clone(), options);
2904 let converter = RowConverter::new(vec![field]).unwrap();
2905 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2906
2907 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2916 assert_eq!(back.len(), 1);
2917 back[0].to_data().validate_full().unwrap();
2918 assert_eq!(&back[0], &list);
2919
2920 let options = SortOptions::default().desc().with_nulls_first(false);
2921 let field = SortField::new_with_options(d.clone(), options);
2922 let converter = RowConverter::new(vec![field]).unwrap();
2923 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2924
2925 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2934 assert_eq!(back.len(), 1);
2935 back[0].to_data().validate_full().unwrap();
2936 assert_eq!(&back[0], &list);
2937
2938 let options = SortOptions::default().desc().with_nulls_first(true);
2939 let field = SortField::new_with_options(d, options);
2940 let converter = RowConverter::new(vec![field]).unwrap();
2941 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2942
2943 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2952 assert_eq!(back.len(), 1);
2953 back[0].to_data().validate_full().unwrap();
2954 assert_eq!(&back[0], &list);
2955
2956 let sliced_list = list.slice(1, 5);
2957 let rows_on_sliced_list = converter
2958 .convert_columns(&[Arc::clone(&sliced_list)])
2959 .unwrap();
2960
2961 assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2968 assert_eq!(back.len(), 1);
2969 back[0].to_data().validate_full().unwrap();
2970 assert_eq!(&back[0], &sliced_list);
2971 }
2972
2973 fn test_nested_list<O: OffsetSizeTrait>() {
2974 let mut builder =
2975 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2976
2977 builder.values().values().append_value(1);
2978 builder.values().values().append_value(2);
2979 builder.values().append(true);
2980 builder.values().values().append_value(1);
2981 builder.values().values().append_null();
2982 builder.values().append(true);
2983 builder.append(true);
2984
2985 builder.values().values().append_value(1);
2986 builder.values().values().append_null();
2987 builder.values().append(true);
2988 builder.values().values().append_value(1);
2989 builder.values().values().append_null();
2990 builder.values().append(true);
2991 builder.append(true);
2992
2993 builder.values().values().append_value(1);
2994 builder.values().values().append_null();
2995 builder.values().append(true);
2996 builder.values().append(false);
2997 builder.append(true);
2998 builder.append(false);
2999
3000 builder.values().values().append_value(1);
3001 builder.values().values().append_value(2);
3002 builder.values().append(true);
3003 builder.append(true);
3004
3005 let list = Arc::new(builder.finish()) as ArrayRef;
3006 let d = list.data_type().clone();
3007
3008 let options = SortOptions::default().asc().with_nulls_first(true);
3016 let field = SortField::new_with_options(d.clone(), options);
3017 let converter = RowConverter::new(vec![field]).unwrap();
3018 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3019
3020 assert!(rows.row(0) > rows.row(1));
3021 assert!(rows.row(1) > rows.row(2));
3022 assert!(rows.row(2) > rows.row(3));
3023 assert!(rows.row(4) < rows.row(0));
3024 assert!(rows.row(4) > rows.row(1));
3025
3026 let back = converter.convert_rows(&rows).unwrap();
3027 assert_eq!(back.len(), 1);
3028 back[0].to_data().validate_full().unwrap();
3029 assert_eq!(&back[0], &list);
3030
3031 let options = SortOptions::default().desc().with_nulls_first(true);
3032 let field = SortField::new_with_options(d.clone(), options);
3033 let converter = RowConverter::new(vec![field]).unwrap();
3034 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3035
3036 assert!(rows.row(0) > rows.row(1));
3037 assert!(rows.row(1) > rows.row(2));
3038 assert!(rows.row(2) > rows.row(3));
3039 assert!(rows.row(4) > rows.row(0));
3040 assert!(rows.row(4) > rows.row(1));
3041
3042 let back = converter.convert_rows(&rows).unwrap();
3043 assert_eq!(back.len(), 1);
3044 back[0].to_data().validate_full().unwrap();
3045 assert_eq!(&back[0], &list);
3046
3047 let options = SortOptions::default().desc().with_nulls_first(false);
3048 let field = SortField::new_with_options(d, options);
3049 let converter = RowConverter::new(vec![field]).unwrap();
3050 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3051
3052 assert!(rows.row(0) < rows.row(1));
3053 assert!(rows.row(1) < rows.row(2));
3054 assert!(rows.row(2) < rows.row(3));
3055 assert!(rows.row(4) > rows.row(0));
3056 assert!(rows.row(4) < rows.row(1));
3057
3058 let back = converter.convert_rows(&rows).unwrap();
3059 assert_eq!(back.len(), 1);
3060 back[0].to_data().validate_full().unwrap();
3061 assert_eq!(&back[0], &list);
3062
3063 let sliced_list = list.slice(1, 3);
3064 let rows = converter
3065 .convert_columns(&[Arc::clone(&sliced_list)])
3066 .unwrap();
3067
3068 assert!(rows.row(0) < rows.row(1));
3069 assert!(rows.row(1) < rows.row(2));
3070
3071 let back = converter.convert_rows(&rows).unwrap();
3072 assert_eq!(back.len(), 1);
3073 back[0].to_data().validate_full().unwrap();
3074 assert_eq!(&back[0], &sliced_list);
3075 }
3076
3077 #[test]
3078 fn test_list() {
3079 test_single_list::<i32>();
3080 test_nested_list::<i32>();
3081 }
3082
3083 #[test]
3084 fn test_large_list() {
3085 test_single_list::<i64>();
3086 test_nested_list::<i64>();
3087 }
3088
3089 #[test]
3090 fn test_fixed_size_list() {
3091 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
3092 builder.values().append_value(32);
3093 builder.values().append_value(52);
3094 builder.values().append_value(32);
3095 builder.append(true);
3096 builder.values().append_value(32);
3097 builder.values().append_value(52);
3098 builder.values().append_value(12);
3099 builder.append(true);
3100 builder.values().append_value(32);
3101 builder.values().append_value(52);
3102 builder.values().append_null();
3103 builder.append(true);
3104 builder.values().append_value(32); builder.values().append_value(52); builder.values().append_value(13); builder.append(false);
3108 builder.values().append_value(32);
3109 builder.values().append_null();
3110 builder.values().append_null();
3111 builder.append(true);
3112 builder.values().append_null();
3113 builder.values().append_null();
3114 builder.values().append_null();
3115 builder.append(true);
3116 builder.values().append_value(17); builder.values().append_null(); builder.values().append_value(77); builder.append(false);
3120
3121 let list = Arc::new(builder.finish()) as ArrayRef;
3122 let d = list.data_type().clone();
3123
3124 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3126
3127 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3128 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3137 assert_eq!(back.len(), 1);
3138 back[0].to_data().validate_full().unwrap();
3139 assert_eq!(&back[0], &list);
3140
3141 let options = SortOptions::default().asc().with_nulls_first(false);
3143 let field = SortField::new_with_options(d.clone(), options);
3144 let converter = RowConverter::new(vec![field]).unwrap();
3145 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3146 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3155 assert_eq!(back.len(), 1);
3156 back[0].to_data().validate_full().unwrap();
3157 assert_eq!(&back[0], &list);
3158
3159 let options = SortOptions::default().desc().with_nulls_first(false);
3161 let field = SortField::new_with_options(d.clone(), options);
3162 let converter = RowConverter::new(vec![field]).unwrap();
3163 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3164 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3173 assert_eq!(back.len(), 1);
3174 back[0].to_data().validate_full().unwrap();
3175 assert_eq!(&back[0], &list);
3176
3177 let options = SortOptions::default().desc().with_nulls_first(true);
3179 let field = SortField::new_with_options(d, options);
3180 let converter = RowConverter::new(vec![field]).unwrap();
3181 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3182
3183 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3192 assert_eq!(back.len(), 1);
3193 back[0].to_data().validate_full().unwrap();
3194 assert_eq!(&back[0], &list);
3195
3196 let sliced_list = list.slice(1, 5);
3197 let rows_on_sliced_list = converter
3198 .convert_columns(&[Arc::clone(&sliced_list)])
3199 .unwrap();
3200
3201 assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
3207 assert_eq!(back.len(), 1);
3208 back[0].to_data().validate_full().unwrap();
3209 assert_eq!(&back[0], &sliced_list);
3210 }
3211
3212 #[test]
3213 fn test_two_fixed_size_lists() {
3214 let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3215 first.values().append_value(100);
3217 first.append(true);
3218 first.values().append_value(101);
3220 first.append(true);
3221 first.values().append_value(102);
3223 first.append(true);
3224 first.values().append_null();
3226 first.append(true);
3227 first.values().append_null(); first.append(false);
3230 let first = Arc::new(first.finish()) as ArrayRef;
3231 let first_type = first.data_type().clone();
3232
3233 let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3234 second.values().append_value(200);
3236 second.append(true);
3237 second.values().append_value(201);
3239 second.append(true);
3240 second.values().append_value(202);
3242 second.append(true);
3243 second.values().append_null();
3245 second.append(true);
3246 second.values().append_null(); second.append(false);
3249 let second = Arc::new(second.finish()) as ArrayRef;
3250 let second_type = second.data_type().clone();
3251
3252 let converter = RowConverter::new(vec![
3253 SortField::new(first_type.clone()),
3254 SortField::new(second_type.clone()),
3255 ])
3256 .unwrap();
3257
3258 let rows = converter
3259 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3260 .unwrap();
3261
3262 let back = converter.convert_rows(&rows).unwrap();
3263 assert_eq!(back.len(), 2);
3264 back[0].to_data().validate_full().unwrap();
3265 assert_eq!(&back[0], &first);
3266 back[1].to_data().validate_full().unwrap();
3267 assert_eq!(&back[1], &second);
3268 }
3269
3270 #[test]
3271 fn test_fixed_size_list_with_variable_width_content() {
3272 let mut first = FixedSizeListBuilder::new(
3273 StructBuilder::from_fields(
3274 vec![
3275 Field::new(
3276 "timestamp",
3277 DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
3278 false,
3279 ),
3280 Field::new("offset_minutes", DataType::Int16, false),
3281 Field::new("time_zone", DataType::Utf8, false),
3282 ],
3283 1,
3284 ),
3285 1,
3286 );
3287 first
3289 .values()
3290 .field_builder::<TimestampMicrosecondBuilder>(0)
3291 .unwrap()
3292 .append_null();
3293 first
3294 .values()
3295 .field_builder::<Int16Builder>(1)
3296 .unwrap()
3297 .append_null();
3298 first
3299 .values()
3300 .field_builder::<StringBuilder>(2)
3301 .unwrap()
3302 .append_null();
3303 first.values().append(false);
3304 first.append(false);
3305 first
3307 .values()
3308 .field_builder::<TimestampMicrosecondBuilder>(0)
3309 .unwrap()
3310 .append_null();
3311 first
3312 .values()
3313 .field_builder::<Int16Builder>(1)
3314 .unwrap()
3315 .append_null();
3316 first
3317 .values()
3318 .field_builder::<StringBuilder>(2)
3319 .unwrap()
3320 .append_null();
3321 first.values().append(false);
3322 first.append(true);
3323 first
3325 .values()
3326 .field_builder::<TimestampMicrosecondBuilder>(0)
3327 .unwrap()
3328 .append_value(0);
3329 first
3330 .values()
3331 .field_builder::<Int16Builder>(1)
3332 .unwrap()
3333 .append_value(0);
3334 first
3335 .values()
3336 .field_builder::<StringBuilder>(2)
3337 .unwrap()
3338 .append_value("UTC");
3339 first.values().append(true);
3340 first.append(true);
3341 first
3343 .values()
3344 .field_builder::<TimestampMicrosecondBuilder>(0)
3345 .unwrap()
3346 .append_value(1126351800123456);
3347 first
3348 .values()
3349 .field_builder::<Int16Builder>(1)
3350 .unwrap()
3351 .append_value(120);
3352 first
3353 .values()
3354 .field_builder::<StringBuilder>(2)
3355 .unwrap()
3356 .append_value("Europe/Warsaw");
3357 first.values().append(true);
3358 first.append(true);
3359 let first = Arc::new(first.finish()) as ArrayRef;
3360 let first_type = first.data_type().clone();
3361
3362 let mut second = StringBuilder::new();
3363 second.append_value("somewhere near");
3364 second.append_null();
3365 second.append_value("Greenwich");
3366 second.append_value("Warsaw");
3367 let second = Arc::new(second.finish()) as ArrayRef;
3368 let second_type = second.data_type().clone();
3369
3370 let converter = RowConverter::new(vec![
3371 SortField::new(first_type.clone()),
3372 SortField::new(second_type.clone()),
3373 ])
3374 .unwrap();
3375
3376 let rows = converter
3377 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3378 .unwrap();
3379
3380 let back = converter.convert_rows(&rows).unwrap();
3381 assert_eq!(back.len(), 2);
3382 back[0].to_data().validate_full().unwrap();
3383 assert_eq!(&back[0], &first);
3384 back[1].to_data().validate_full().unwrap();
3385 assert_eq!(&back[1], &second);
3386 }
3387
3388 fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
3389 where
3390 K: ArrowPrimitiveType,
3391 StandardUniform: Distribution<K::Native>,
3392 {
3393 let mut rng = rng();
3394 (0..len)
3395 .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
3396 .collect()
3397 }
3398
3399 fn generate_strings<O: OffsetSizeTrait>(
3400 len: usize,
3401 valid_percent: f64,
3402 ) -> GenericStringArray<O> {
3403 let mut rng = rng();
3404 (0..len)
3405 .map(|_| {
3406 rng.random_bool(valid_percent).then(|| {
3407 let len = rng.random_range(0..100);
3408 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3409 String::from_utf8(bytes).unwrap()
3410 })
3411 })
3412 .collect()
3413 }
3414
3415 fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
3416 let mut rng = rng();
3417 (0..len)
3418 .map(|_| {
3419 rng.random_bool(valid_percent).then(|| {
3420 let len = rng.random_range(0..100);
3421 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3422 String::from_utf8(bytes).unwrap()
3423 })
3424 })
3425 .collect()
3426 }
3427
3428 fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
3429 let mut rng = rng();
3430 (0..len)
3431 .map(|_| {
3432 rng.random_bool(valid_percent).then(|| {
3433 let len = rng.random_range(0..100);
3434 let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
3435 bytes
3436 })
3437 })
3438 .collect()
3439 }
3440
3441 fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
3442 let edge_cases = vec![
3443 Some("bar".to_string()),
3444 Some("bar\0".to_string()),
3445 Some("LongerThan12Bytes".to_string()),
3446 Some("LongerThan12Bytez".to_string()),
3447 Some("LongerThan12Bytes\0".to_string()),
3448 Some("LongerThan12Byt".to_string()),
3449 Some("backend one".to_string()),
3450 Some("backend two".to_string()),
3451 Some("a".repeat(257)),
3452 Some("a".repeat(300)),
3453 ];
3454
3455 let mut values = Vec::with_capacity(len);
3457 for i in 0..len {
3458 values.push(
3459 edge_cases
3460 .get(i % edge_cases.len())
3461 .cloned()
3462 .unwrap_or(None),
3463 );
3464 }
3465
3466 StringViewArray::from(values)
3467 }
3468
3469 fn generate_dictionary<K>(
3470 values: ArrayRef,
3471 len: usize,
3472 valid_percent: f64,
3473 ) -> DictionaryArray<K>
3474 where
3475 K: ArrowDictionaryKeyType,
3476 K::Native: SampleUniform,
3477 {
3478 let mut rng = rng();
3479 let min_key = K::Native::from_usize(0).unwrap();
3480 let max_key = K::Native::from_usize(values.len()).unwrap();
3481 let keys: PrimitiveArray<K> = (0..len)
3482 .map(|_| {
3483 rng.random_bool(valid_percent)
3484 .then(|| rng.random_range(min_key..max_key))
3485 })
3486 .collect();
3487
3488 let data_type =
3489 DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
3490
3491 let data = keys
3492 .into_data()
3493 .into_builder()
3494 .data_type(data_type)
3495 .add_child_data(values.to_data())
3496 .build()
3497 .unwrap();
3498
3499 DictionaryArray::from(data)
3500 }
3501
3502 fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
3503 let mut rng = rng();
3504 let width = rng.random_range(0..20);
3505 let mut builder = FixedSizeBinaryBuilder::new(width);
3506
3507 let mut b = vec![0; width as usize];
3508 for _ in 0..len {
3509 match rng.random_bool(valid_percent) {
3510 true => {
3511 b.iter_mut().for_each(|x| *x = rng.random());
3512 builder.append_value(&b).unwrap();
3513 }
3514 false => builder.append_null(),
3515 }
3516 }
3517
3518 builder.finish()
3519 }
3520
3521 fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
3522 let mut rng = rng();
3523 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3524 let a = generate_primitive_array::<Int32Type>(len, valid_percent);
3525 let b = generate_strings::<i32>(len, valid_percent);
3526 let fields = Fields::from(vec![
3527 Field::new("a", DataType::Int32, true),
3528 Field::new("b", DataType::Utf8, true),
3529 ]);
3530 let values = vec![Arc::new(a) as _, Arc::new(b) as _];
3531 StructArray::new(fields, values, Some(nulls))
3532 }
3533
3534 fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
3535 where
3536 F: FnOnce(usize) -> ArrayRef,
3537 {
3538 let mut rng = rng();
3539 let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
3540 let values_len = offsets.last().unwrap().to_usize().unwrap();
3541 let values = values(values_len);
3542 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3543 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
3544 ListArray::new(field, offsets, values, Some(nulls))
3545 }
3546
3547 fn generate_column(len: usize) -> ArrayRef {
3548 let mut rng = rng();
3549 match rng.random_range(0..18) {
3550 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
3551 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
3552 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
3553 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
3554 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
3555 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
3556 6 => Arc::new(generate_strings::<i32>(len, 0.8)),
3557 7 => Arc::new(generate_dictionary::<Int64Type>(
3558 Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
3560 len,
3561 0.8,
3562 )),
3563 8 => Arc::new(generate_dictionary::<Int64Type>(
3564 Arc::new(generate_primitive_array::<Int64Type>(
3566 rng.random_range(1..len),
3567 1.0,
3568 )),
3569 len,
3570 0.8,
3571 )),
3572 9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
3573 10 => Arc::new(generate_struct(len, 0.8)),
3574 11 => Arc::new(generate_list(len, 0.8, |values_len| {
3575 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3576 })),
3577 12 => Arc::new(generate_list(len, 0.8, |values_len| {
3578 Arc::new(generate_strings::<i32>(values_len, 0.8))
3579 })),
3580 13 => Arc::new(generate_list(len, 0.8, |values_len| {
3581 Arc::new(generate_struct(values_len, 0.8))
3582 })),
3583 14 => Arc::new(generate_string_view(len, 0.8)),
3584 15 => Arc::new(generate_byte_view(len, 0.8)),
3585 16 => Arc::new(generate_fixed_stringview_column(len)),
3586 17 => Arc::new(
3587 generate_list(len + 1000, 0.8, |values_len| {
3588 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3589 })
3590 .slice(500, len),
3591 ),
3592 _ => unreachable!(),
3593 }
3594 }
3595
3596 fn print_row(cols: &[SortColumn], row: usize) -> String {
3597 let t: Vec<_> = cols
3598 .iter()
3599 .map(|x| match x.values.is_valid(row) {
3600 true => {
3601 let opts = FormatOptions::default().with_null("NULL");
3602 let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
3603 formatter.value(row).to_string()
3604 }
3605 false => "NULL".to_string(),
3606 })
3607 .collect();
3608 t.join(",")
3609 }
3610
3611 fn print_col_types(cols: &[SortColumn]) -> String {
3612 let t: Vec<_> = cols
3613 .iter()
3614 .map(|x| x.values.data_type().to_string())
3615 .collect();
3616 t.join(",")
3617 }
3618
3619 #[test]
3620 #[cfg_attr(miri, ignore)]
3621 fn fuzz_test() {
3622 for _ in 0..100 {
3623 let mut rng = rng();
3624 let num_columns = rng.random_range(1..5);
3625 let len = rng.random_range(5..100);
3626 let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
3627
3628 let options: Vec<_> = (0..num_columns)
3629 .map(|_| SortOptions {
3630 descending: rng.random_bool(0.5),
3631 nulls_first: rng.random_bool(0.5),
3632 })
3633 .collect();
3634
3635 let sort_columns: Vec<_> = options
3636 .iter()
3637 .zip(&arrays)
3638 .map(|(o, c)| SortColumn {
3639 values: Arc::clone(c),
3640 options: Some(*o),
3641 })
3642 .collect();
3643
3644 let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
3645
3646 let columns: Vec<SortField> = options
3647 .into_iter()
3648 .zip(&arrays)
3649 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
3650 .collect();
3651
3652 let converter = RowConverter::new(columns).unwrap();
3653 let rows = converter.convert_columns(&arrays).unwrap();
3654
3655 for i in 0..len {
3656 for j in 0..len {
3657 let row_i = rows.row(i);
3658 let row_j = rows.row(j);
3659 let row_cmp = row_i.cmp(&row_j);
3660 let lex_cmp = comparator.compare(i, j);
3661 assert_eq!(
3662 row_cmp,
3663 lex_cmp,
3664 "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
3665 print_row(&sort_columns, i),
3666 print_row(&sort_columns, j),
3667 row_i,
3668 row_j,
3669 print_col_types(&sort_columns)
3670 );
3671 }
3672 }
3673
3674 let back = converter.convert_rows(&rows).unwrap();
3677 for (actual, expected) in back.iter().zip(&arrays) {
3678 actual.to_data().validate_full().unwrap();
3679 dictionary_eq(actual, expected)
3680 }
3681
3682 let rows = rows.try_into_binary().expect("reasonable size");
3685 let parser = converter.parser();
3686 let back = converter
3687 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3688 .unwrap();
3689 for (actual, expected) in back.iter().zip(&arrays) {
3690 actual.to_data().validate_full().unwrap();
3691 dictionary_eq(actual, expected)
3692 }
3693
3694 let rows = converter.from_binary(rows);
3695 let back = converter.convert_rows(&rows).unwrap();
3696 for (actual, expected) in back.iter().zip(&arrays) {
3697 actual.to_data().validate_full().unwrap();
3698 dictionary_eq(actual, expected)
3699 }
3700 }
3701 }
3702
3703 #[test]
3704 fn test_clear() {
3705 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3706 let mut rows = converter.empty_rows(3, 128);
3707
3708 let first = Int32Array::from(vec![None, Some(2), Some(4)]);
3709 let second = Int32Array::from(vec![Some(2), None, Some(4)]);
3710 let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
3711
3712 for array in arrays.iter() {
3713 rows.clear();
3714 converter
3715 .append(&mut rows, std::slice::from_ref(array))
3716 .unwrap();
3717 let back = converter.convert_rows(&rows).unwrap();
3718 assert_eq!(&back[0], array);
3719 }
3720
3721 let mut rows_expected = converter.empty_rows(3, 128);
3722 converter.append(&mut rows_expected, &arrays[1..]).unwrap();
3723
3724 for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
3725 assert_eq!(
3726 actual, expected,
3727 "For row {i}: expected {expected:?}, actual: {actual:?}",
3728 );
3729 }
3730 }
3731
3732 #[test]
3733 fn test_append_codec_dictionary_binary() {
3734 use DataType::*;
3735 let converter = RowConverter::new(vec![SortField::new(Dictionary(
3737 Box::new(Int32),
3738 Box::new(Binary),
3739 ))])
3740 .unwrap();
3741 let mut rows = converter.empty_rows(4, 128);
3742
3743 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
3744 let values = BinaryArray::from(vec![
3745 Some("a".as_bytes()),
3746 Some(b"b"),
3747 Some(b"c"),
3748 Some(b"d"),
3749 ]);
3750 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3751
3752 rows.clear();
3753 let array = Arc::new(dict_array) as ArrayRef;
3754 converter
3755 .append(&mut rows, std::slice::from_ref(&array))
3756 .unwrap();
3757 let back = converter.convert_rows(&rows).unwrap();
3758
3759 dictionary_eq(&back[0], &array);
3760 }
3761
3762 #[test]
3763 fn test_list_prefix() {
3764 let mut a = ListBuilder::new(Int8Builder::new());
3765 a.append_value([None]);
3766 a.append_value([None, None]);
3767 let a = a.finish();
3768
3769 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
3770 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
3771 assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
3772 }
3773
3774 #[test]
3775 fn map_should_be_marked_as_unsupported() {
3776 let map_data_type = Field::new_map(
3777 "map",
3778 "entries",
3779 Field::new("key", DataType::Utf8, false),
3780 Field::new("value", DataType::Utf8, true),
3781 false,
3782 true,
3783 )
3784 .data_type()
3785 .clone();
3786
3787 let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
3788
3789 assert!(!is_supported, "Map should not be supported");
3790 }
3791
3792 #[test]
3793 fn should_fail_to_create_row_converter_for_unsupported_map_type() {
3794 let map_data_type = Field::new_map(
3795 "map",
3796 "entries",
3797 Field::new("key", DataType::Utf8, false),
3798 Field::new("value", DataType::Utf8, true),
3799 false,
3800 true,
3801 )
3802 .data_type()
3803 .clone();
3804
3805 let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
3806
3807 match converter {
3808 Err(ArrowError::NotYetImplemented(message)) => {
3809 assert!(
3810 message.contains("Row format support not yet implemented for"),
3811 "Expected NotYetImplemented error for map data type, got: {message}",
3812 );
3813 }
3814 Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
3815 Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
3816 }
3817 }
3818
3819 #[test]
3820 fn test_values_buffer_smaller_when_utf8_validation_disabled() {
3821 fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
3822 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
3824
3825 let rows = converter.convert_columns(&[col]).unwrap();
3827 let converted = converter.convert_rows(&rows).unwrap();
3828 let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3829
3830 let rows = rows.try_into_binary().expect("reasonable size");
3832 let parser = converter.parser();
3833 let converted = converter
3834 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3835 .unwrap();
3836 let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3837 (unchecked_values_len, checked_values_len)
3838 }
3839
3840 let col = Arc::new(StringViewArray::from_iter([
3842 Some("hello"), None, Some("short"), Some("tiny"), ])) as ArrayRef;
3847
3848 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3849 assert_eq!(unchecked_values_len, 0);
3851 assert_eq!(checked_values_len, 14);
3853
3854 let col = Arc::new(StringViewArray::from_iter([
3856 Some("this is a very long string over 12 bytes"),
3857 Some("another long string to test the buffer"),
3858 ])) as ArrayRef;
3859
3860 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3861 assert!(unchecked_values_len > 0);
3863 assert_eq!(unchecked_values_len, checked_values_len);
3864
3865 let col = Arc::new(StringViewArray::from_iter([
3867 Some("tiny"), Some("thisisexact13"), None,
3870 Some("short"), ])) as ArrayRef;
3872
3873 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3874 assert_eq!(unchecked_values_len, 13);
3876 assert!(checked_values_len > unchecked_values_len);
3877 }
3878
3879 #[test]
3880 fn test_sparse_union() {
3881 let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
3883 let str_array = StringArray::from(vec![None, Some("b"), None, Some("d"), None]);
3884
3885 let type_ids = vec![0, 1, 0, 1, 0].into();
3887
3888 let union_fields = [
3889 (0, Arc::new(Field::new("int", DataType::Int32, false))),
3890 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
3891 ]
3892 .into_iter()
3893 .collect();
3894
3895 let union_array = UnionArray::try_new(
3896 union_fields,
3897 type_ids,
3898 None,
3899 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3900 )
3901 .unwrap();
3902
3903 let union_type = union_array.data_type().clone();
3904 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3905
3906 let rows = converter
3907 .convert_columns(&[Arc::new(union_array.clone())])
3908 .unwrap();
3909
3910 let back = converter.convert_rows(&rows).unwrap();
3912 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3913
3914 assert_eq!(union_array.len(), back_union.len());
3915 for i in 0..union_array.len() {
3916 assert_eq!(union_array.type_id(i), back_union.type_id(i));
3917 }
3918 }
3919
3920 #[test]
3921 fn test_sparse_union_with_nulls() {
3922 let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
3924 let str_array = StringArray::from(vec![None::<&str>; 5]);
3925
3926 let type_ids = vec![0, 1, 0, 1, 0].into();
3928
3929 let union_fields = [
3930 (0, Arc::new(Field::new("int", DataType::Int32, true))),
3931 (1, Arc::new(Field::new("str", DataType::Utf8, true))),
3932 ]
3933 .into_iter()
3934 .collect();
3935
3936 let union_array = UnionArray::try_new(
3937 union_fields,
3938 type_ids,
3939 None,
3940 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3941 )
3942 .unwrap();
3943
3944 let union_type = union_array.data_type().clone();
3945 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3946
3947 let rows = converter
3948 .convert_columns(&[Arc::new(union_array.clone())])
3949 .unwrap();
3950
3951 let back = converter.convert_rows(&rows).unwrap();
3953 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3954
3955 assert_eq!(union_array.len(), back_union.len());
3956 for i in 0..union_array.len() {
3957 let expected_null = union_array.is_null(i);
3958 let actual_null = back_union.is_null(i);
3959 assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
3960 if !expected_null {
3961 assert_eq!(union_array.type_id(i), back_union.type_id(i));
3962 }
3963 }
3964 }
3965
3966 #[test]
3967 fn test_dense_union() {
3968 let int_array = Int32Array::from(vec![1, 3, 5]);
3970 let str_array = StringArray::from(vec!["a", "b"]);
3971
3972 let type_ids = vec![0, 1, 0, 1, 0].into();
3973
3974 let offsets = vec![0, 0, 1, 1, 2].into();
3976
3977 let union_fields = [
3978 (0, Arc::new(Field::new("int", DataType::Int32, false))),
3979 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
3980 ]
3981 .into_iter()
3982 .collect();
3983
3984 let union_array = UnionArray::try_new(
3985 union_fields,
3986 type_ids,
3987 Some(offsets), vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3989 )
3990 .unwrap();
3991
3992 let union_type = union_array.data_type().clone();
3993 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3994
3995 let rows = converter
3996 .convert_columns(&[Arc::new(union_array.clone())])
3997 .unwrap();
3998
3999 let back = converter.convert_rows(&rows).unwrap();
4001 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
4002
4003 assert_eq!(union_array.len(), back_union.len());
4004 for i in 0..union_array.len() {
4005 assert_eq!(union_array.type_id(i), back_union.type_id(i));
4006 }
4007 }
4008
4009 #[test]
4010 fn test_dense_union_with_nulls() {
4011 let int_array = Int32Array::from(vec![Some(1), None, Some(5)]);
4013 let str_array = StringArray::from(vec![Some("a"), None]);
4014
4015 let type_ids = vec![0, 1, 0, 1, 0].into();
4017 let offsets = vec![0, 0, 1, 1, 2].into();
4018
4019 let union_fields = [
4020 (0, Arc::new(Field::new("int", DataType::Int32, true))),
4021 (1, Arc::new(Field::new("str", DataType::Utf8, true))),
4022 ]
4023 .into_iter()
4024 .collect();
4025
4026 let union_array = UnionArray::try_new(
4027 union_fields,
4028 type_ids,
4029 Some(offsets),
4030 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4031 )
4032 .unwrap();
4033
4034 let union_type = union_array.data_type().clone();
4035 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4036
4037 let rows = converter
4038 .convert_columns(&[Arc::new(union_array.clone())])
4039 .unwrap();
4040
4041 let back = converter.convert_rows(&rows).unwrap();
4043 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
4044
4045 assert_eq!(union_array.len(), back_union.len());
4046 for i in 0..union_array.len() {
4047 let expected_null = union_array.is_null(i);
4048 let actual_null = back_union.is_null(i);
4049 assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
4050 if !expected_null {
4051 assert_eq!(union_array.type_id(i), back_union.type_id(i));
4052 }
4053 }
4054 }
4055
4056 #[test]
4057 fn test_union_ordering() {
4058 let int_array = Int32Array::from(vec![100, 5, 20]);
4059 let str_array = StringArray::from(vec!["z", "a"]);
4060
4061 let type_ids = vec![0, 1, 0, 1, 0].into();
4063 let offsets = vec![0, 0, 1, 1, 2].into();
4064
4065 let union_fields = [
4066 (0, Arc::new(Field::new("int", DataType::Int32, false))),
4067 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
4068 ]
4069 .into_iter()
4070 .collect();
4071
4072 let union_array = UnionArray::try_new(
4073 union_fields,
4074 type_ids,
4075 Some(offsets),
4076 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4077 )
4078 .unwrap();
4079
4080 let union_type = union_array.data_type().clone();
4081 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4082
4083 let rows = converter.convert_columns(&[Arc::new(union_array)]).unwrap();
4084
4085 assert!(rows.row(2) < rows.row(1));
4097
4098 assert!(rows.row(0) < rows.row(3));
4100
4101 assert!(rows.row(2) < rows.row(4));
4104 assert!(rows.row(4) < rows.row(0));
4106
4107 assert!(rows.row(3) < rows.row(1));
4110 }
4111
4112 #[test]
4113 fn test_row_converter_roundtrip_with_many_union_columns() {
4114 let fields1 = UnionFields::try_new(
4116 vec![0, 1],
4117 vec![
4118 Field::new("int", DataType::Int32, true),
4119 Field::new("string", DataType::Utf8, true),
4120 ],
4121 )
4122 .unwrap();
4123
4124 let int_array1 = Int32Array::from(vec![Some(67), None]);
4125 let string_array1 = StringArray::from(vec![None::<&str>, Some("hello")]);
4126 let type_ids1 = vec![0i8, 1].into();
4127
4128 let union_array1 = UnionArray::try_new(
4129 fields1.clone(),
4130 type_ids1,
4131 None,
4132 vec![
4133 Arc::new(int_array1) as ArrayRef,
4134 Arc::new(string_array1) as ArrayRef,
4135 ],
4136 )
4137 .unwrap();
4138
4139 let fields2 = UnionFields::try_new(
4141 vec![0, 1],
4142 vec![
4143 Field::new("int", DataType::Int32, true),
4144 Field::new("string", DataType::Utf8, true),
4145 ],
4146 )
4147 .unwrap();
4148
4149 let int_array2 = Int32Array::from(vec![Some(100), None]);
4150 let string_array2 = StringArray::from(vec![None::<&str>, Some("world")]);
4151 let type_ids2 = vec![0i8, 1].into();
4152
4153 let union_array2 = UnionArray::try_new(
4154 fields2.clone(),
4155 type_ids2,
4156 None,
4157 vec![
4158 Arc::new(int_array2) as ArrayRef,
4159 Arc::new(string_array2) as ArrayRef,
4160 ],
4161 )
4162 .unwrap();
4163
4164 let field1 = Field::new("col1", DataType::Union(fields1, UnionMode::Sparse), true);
4166 let field2 = Field::new("col2", DataType::Union(fields2, UnionMode::Sparse), true);
4167
4168 let sort_field1 = SortField::new(field1.data_type().clone());
4169 let sort_field2 = SortField::new(field2.data_type().clone());
4170
4171 let converter = RowConverter::new(vec![sort_field1, sort_field2]).unwrap();
4172
4173 let rows = converter
4174 .convert_columns(&[
4175 Arc::new(union_array1.clone()) as ArrayRef,
4176 Arc::new(union_array2.clone()) as ArrayRef,
4177 ])
4178 .unwrap();
4179
4180 let out = converter.convert_rows(&rows).unwrap();
4182
4183 let [col1, col2] = out.as_slice() else {
4184 panic!("expected 2 columns")
4185 };
4186
4187 let col1 = col1.as_any().downcast_ref::<UnionArray>().unwrap();
4188 let col2 = col2.as_any().downcast_ref::<UnionArray>().unwrap();
4189
4190 for (expected, got) in [union_array1, union_array2].iter().zip([col1, col2]) {
4191 assert_eq!(expected.len(), got.len());
4192 assert_eq!(expected.type_ids(), got.type_ids());
4193
4194 for i in 0..expected.len() {
4195 assert_eq!(expected.value(i).as_ref(), got.value(i).as_ref());
4196 }
4197 }
4198 }
4199
4200 #[test]
4201 fn test_row_converter_roundtrip_with_one_union_column() {
4202 let fields = UnionFields::try_new(
4203 vec![0, 1],
4204 vec![
4205 Field::new("int", DataType::Int32, true),
4206 Field::new("string", DataType::Utf8, true),
4207 ],
4208 )
4209 .unwrap();
4210
4211 let int_array = Int32Array::from(vec![Some(67), None]);
4212 let string_array = StringArray::from(vec![None::<&str>, Some("hello")]);
4213 let type_ids = vec![0i8, 1].into();
4214
4215 let union_array = UnionArray::try_new(
4216 fields.clone(),
4217 type_ids,
4218 None,
4219 vec![
4220 Arc::new(int_array) as ArrayRef,
4221 Arc::new(string_array) as ArrayRef,
4222 ],
4223 )
4224 .unwrap();
4225
4226 let field = Field::new("col", DataType::Union(fields, UnionMode::Sparse), true);
4227 let sort_field = SortField::new(field.data_type().clone());
4228 let converter = RowConverter::new(vec![sort_field]).unwrap();
4229
4230 let rows = converter
4231 .convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
4232 .unwrap();
4233
4234 let out = converter.convert_rows(&rows).unwrap();
4236
4237 let [col1] = out.as_slice() else {
4238 panic!("expected 1 column")
4239 };
4240
4241 let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
4242 assert_eq!(col.len(), union_array.len());
4243 assert_eq!(col.type_ids(), union_array.type_ids());
4244
4245 for i in 0..col.len() {
4246 assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
4247 }
4248 }
4249
4250 #[test]
4251 fn rows_size_should_count_for_capacity() {
4252 let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
4253
4254 let empty_rows_size_with_preallocate_rows_and_data = {
4255 let rows = row_converter.empty_rows(1000, 1000);
4256
4257 rows.size()
4258 };
4259 let empty_rows_size_with_preallocate_rows = {
4260 let rows = row_converter.empty_rows(1000, 0);
4261
4262 rows.size()
4263 };
4264 let empty_rows_size_with_preallocate_data = {
4265 let rows = row_converter.empty_rows(0, 1000);
4266
4267 rows.size()
4268 };
4269 let empty_rows_size_without_preallocate = {
4270 let rows = row_converter.empty_rows(0, 0);
4271
4272 rows.size()
4273 };
4274
4275 assert!(
4276 empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_rows,
4277 "{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_rows}"
4278 );
4279 assert!(
4280 empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_data,
4281 "{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_data}"
4282 );
4283 assert!(
4284 empty_rows_size_with_preallocate_rows > empty_rows_size_without_preallocate,
4285 "{empty_rows_size_with_preallocate_rows} should be larger than {empty_rows_size_without_preallocate}"
4286 );
4287 assert!(
4288 empty_rows_size_with_preallocate_data > empty_rows_size_without_preallocate,
4289 "{empty_rows_size_with_preallocate_data} should be larger than {empty_rows_size_without_preallocate}"
4290 );
4291 }
4292
4293 #[test]
4294 #[should_panic(expected = "row index out of bounds")]
4295 fn row_should_panic_on_overflowing_index() {
4296 let rows = RowConverter::new(vec![SortField::new(DataType::Int32)])
4297 .unwrap()
4298 .empty_rows(0, 0);
4299 rows.row(usize::MAX);
4300 }
4301}