1#![doc(
157 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
158 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
159)]
160#![cfg_attr(docsrs, feature(doc_cfg))]
161#![warn(missing_docs)]
162use std::cmp::Ordering;
163use std::hash::{Hash, Hasher};
164use std::sync::Arc;
165
166use arrow_array::cast::*;
167use arrow_array::types::ArrowDictionaryKeyType;
168use arrow_array::*;
169use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
170use arrow_data::{ArrayData, ArrayDataBuilder};
171use arrow_schema::*;
172use variable::{decode_binary_view, decode_string_view};
173
174use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
175use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
176use crate::variable::{decode_binary, decode_string};
177use arrow_array::types::{Int16Type, Int32Type, Int64Type};
178
179mod fixed;
180mod list;
181mod run;
182mod variable;
183
184#[derive(Debug)]
482pub struct RowConverter {
483 fields: Arc<[SortField]>,
484 codecs: Vec<Codec>,
486}
487
488#[derive(Debug)]
489enum Codec {
490 Stateless,
492 Dictionary(RowConverter, OwnedRow),
495 Struct(RowConverter, OwnedRow),
498 List(RowConverter),
500 RunEndEncoded(RowConverter),
502 Union(Vec<RowConverter>, Vec<OwnedRow>),
505}
506
507impl Codec {
508 fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
509 match &sort_field.data_type {
510 DataType::Dictionary(_, values) => {
511 let sort_field =
512 SortField::new_with_options(values.as_ref().clone(), sort_field.options);
513
514 let converter = RowConverter::new(vec![sort_field])?;
515 let null_array = new_null_array(values.as_ref(), 1);
516 let nulls = converter.convert_columns(&[null_array])?;
517
518 let owned = OwnedRow {
519 data: nulls.buffer.into(),
520 config: nulls.config,
521 };
522 Ok(Self::Dictionary(converter, owned))
523 }
524 DataType::RunEndEncoded(_, values) => {
525 let options = SortOptions {
527 descending: false,
528 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
529 };
530
531 let field = SortField::new_with_options(values.data_type().clone(), options);
532 let converter = RowConverter::new(vec![field])?;
533 Ok(Self::RunEndEncoded(converter))
534 }
535 d if !d.is_nested() => Ok(Self::Stateless),
536 DataType::List(f) | DataType::LargeList(f) => {
537 let options = SortOptions {
541 descending: false,
542 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
543 };
544
545 let field = SortField::new_with_options(f.data_type().clone(), options);
546 let converter = RowConverter::new(vec![field])?;
547 Ok(Self::List(converter))
548 }
549 DataType::FixedSizeList(f, _) => {
550 let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
551 let converter = RowConverter::new(vec![field])?;
552 Ok(Self::List(converter))
553 }
554 DataType::Struct(f) => {
555 let sort_fields = f
556 .iter()
557 .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
558 .collect();
559
560 let converter = RowConverter::new(sort_fields)?;
561 let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
562
563 let nulls = converter.convert_columns(&nulls)?;
564 let owned = OwnedRow {
565 data: nulls.buffer.into(),
566 config: nulls.config,
567 };
568
569 Ok(Self::Struct(converter, owned))
570 }
571 DataType::Union(fields, _mode) => {
572 let options = SortOptions {
575 descending: false,
576 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
577 };
578
579 let mut converters = Vec::with_capacity(fields.len());
580 let mut null_rows = Vec::with_capacity(fields.len());
581
582 for (_type_id, field) in fields.iter() {
583 let sort_field =
584 SortField::new_with_options(field.data_type().clone(), options);
585 let converter = RowConverter::new(vec![sort_field])?;
586
587 let null_array = new_null_array(field.data_type(), 1);
588 let nulls = converter.convert_columns(&[null_array])?;
589 let owned = OwnedRow {
590 data: nulls.buffer.into(),
591 config: nulls.config,
592 };
593
594 converters.push(converter);
595 null_rows.push(owned);
596 }
597
598 Ok(Self::Union(converters, null_rows))
599 }
600 _ => Err(ArrowError::NotYetImplemented(format!(
601 "not yet implemented: {:?}",
602 sort_field.data_type
603 ))),
604 }
605 }
606
607 fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
608 match self {
609 Codec::Stateless => Ok(Encoder::Stateless),
610 Codec::Dictionary(converter, nulls) => {
611 let values = array.as_any_dictionary().values().clone();
612 let rows = converter.convert_columns(&[values])?;
613 Ok(Encoder::Dictionary(rows, nulls.row()))
614 }
615 Codec::Struct(converter, null) => {
616 let v = as_struct_array(array);
617 let rows = converter.convert_columns(v.columns())?;
618 Ok(Encoder::Struct(rows, null.row()))
619 }
620 Codec::List(converter) => {
621 let values = match array.data_type() {
622 DataType::List(_) => {
623 let list_array = as_list_array(array);
624 let first_offset = list_array.offsets()[0] as usize;
625 let last_offset =
626 list_array.offsets()[list_array.offsets().len() - 1] as usize;
627
628 list_array
631 .values()
632 .slice(first_offset, last_offset - first_offset)
633 }
634 DataType::LargeList(_) => {
635 let list_array = as_large_list_array(array);
636
637 let first_offset = list_array.offsets()[0] as usize;
638 let last_offset =
639 list_array.offsets()[list_array.offsets().len() - 1] as usize;
640
641 list_array
644 .values()
645 .slice(first_offset, last_offset - first_offset)
646 }
647 DataType::FixedSizeList(_, _) => {
648 as_fixed_size_list_array(array).values().clone()
649 }
650 _ => unreachable!(),
651 };
652 let rows = converter.convert_columns(&[values])?;
653 Ok(Encoder::List(rows))
654 }
655 Codec::RunEndEncoded(converter) => {
656 let values = match array.data_type() {
657 DataType::RunEndEncoded(r, _) => match r.data_type() {
658 DataType::Int16 => array.as_run::<Int16Type>().values(),
659 DataType::Int32 => array.as_run::<Int32Type>().values(),
660 DataType::Int64 => array.as_run::<Int64Type>().values(),
661 _ => unreachable!("Unsupported run end index type: {r:?}"),
662 },
663 _ => unreachable!(),
664 };
665 let rows = converter.convert_columns(std::slice::from_ref(values))?;
666 Ok(Encoder::RunEndEncoded(rows))
667 }
668 Codec::Union(converters, _) => {
669 let union_array = array
670 .as_any()
671 .downcast_ref::<UnionArray>()
672 .expect("expected Union array");
673
674 let type_ids = union_array.type_ids().clone();
675 let offsets = union_array.offsets().cloned();
676
677 let mut child_rows = Vec::with_capacity(converters.len());
678 for (type_id, converter) in converters.iter().enumerate() {
679 let child_array = union_array.child(type_id as i8);
680 let rows = converter.convert_columns(std::slice::from_ref(child_array))?;
681 child_rows.push(rows);
682 }
683
684 Ok(Encoder::Union {
685 child_rows,
686 type_ids,
687 offsets,
688 })
689 }
690 }
691 }
692
693 fn size(&self) -> usize {
694 match self {
695 Codec::Stateless => 0,
696 Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
697 Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
698 Codec::List(converter) => converter.size(),
699 Codec::RunEndEncoded(converter) => converter.size(),
700 Codec::Union(converters, null_rows) => {
701 converters.iter().map(|c| c.size()).sum::<usize>()
702 + null_rows.iter().map(|n| n.data.len()).sum::<usize>()
703 }
704 }
705 }
706}
707
708#[derive(Debug)]
709enum Encoder<'a> {
710 Stateless,
712 Dictionary(Rows, Row<'a>),
714 Struct(Rows, Row<'a>),
720 List(Rows),
722 RunEndEncoded(Rows),
724 Union {
726 child_rows: Vec<Rows>,
727 type_ids: ScalarBuffer<i8>,
728 offsets: Option<ScalarBuffer<i32>>,
729 },
730}
731
732#[derive(Debug, Clone, PartialEq, Eq)]
734pub struct SortField {
735 options: SortOptions,
737 data_type: DataType,
739}
740
741impl SortField {
742 pub fn new(data_type: DataType) -> Self {
744 Self::new_with_options(data_type, Default::default())
745 }
746
747 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
749 Self { options, data_type }
750 }
751
752 pub fn size(&self) -> usize {
756 self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
757 }
758}
759
760impl RowConverter {
761 pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
763 if !Self::supports_fields(&fields) {
764 return Err(ArrowError::NotYetImplemented(format!(
765 "Row format support not yet implemented for: {fields:?}"
766 )));
767 }
768
769 let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
770 Ok(Self {
771 fields: fields.into(),
772 codecs,
773 })
774 }
775
776 pub fn supports_fields(fields: &[SortField]) -> bool {
778 fields.iter().all(|x| Self::supports_datatype(&x.data_type))
779 }
780
781 fn supports_datatype(d: &DataType) -> bool {
782 match d {
783 _ if !d.is_nested() => true,
784 DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
785 Self::supports_datatype(f.data_type())
786 }
787 DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
788 DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
789 DataType::Union(fs, _mode) => fs
790 .iter()
791 .all(|(_, f)| Self::supports_datatype(f.data_type())),
792 _ => false,
793 }
794 }
795
796 pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
806 let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
807 let mut rows = self.empty_rows(num_rows, 0);
808 self.append(&mut rows, columns)?;
809 Ok(rows)
810 }
811
812 pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
843 assert!(
844 Arc::ptr_eq(&rows.config.fields, &self.fields),
845 "rows were not produced by this RowConverter"
846 );
847
848 if columns.len() != self.fields.len() {
849 return Err(ArrowError::InvalidArgumentError(format!(
850 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
851 self.fields.len(),
852 columns.len()
853 )));
854 }
855 for colum in columns.iter().skip(1) {
856 if colum.len() != columns[0].len() {
857 return Err(ArrowError::InvalidArgumentError(format!(
858 "RowConverter columns must all have the same length, expected {} got {}",
859 columns[0].len(),
860 colum.len()
861 )));
862 }
863 }
864
865 let encoders = columns
866 .iter()
867 .zip(&self.codecs)
868 .zip(self.fields.iter())
869 .map(|((column, codec), field)| {
870 if !column.data_type().equals_datatype(&field.data_type) {
871 return Err(ArrowError::InvalidArgumentError(format!(
872 "RowConverter column schema mismatch, expected {} got {}",
873 field.data_type,
874 column.data_type()
875 )));
876 }
877 codec.encoder(column.as_ref())
878 })
879 .collect::<Result<Vec<_>, _>>()?;
880
881 let write_offset = rows.num_rows();
882 let lengths = row_lengths(columns, &encoders);
883 let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
884 rows.buffer.resize(total, 0);
885
886 for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
887 encode_column(
889 &mut rows.buffer,
890 &mut rows.offsets[write_offset..],
891 column.as_ref(),
892 field.options,
893 &encoder,
894 )
895 }
896
897 if cfg!(debug_assertions) {
898 assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
899 rows.offsets
900 .windows(2)
901 .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
902 }
903
904 Ok(())
905 }
906
907 pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
915 where
916 I: IntoIterator<Item = Row<'a>>,
917 {
918 let mut validate_utf8 = false;
919 let mut rows: Vec<_> = rows
920 .into_iter()
921 .map(|row| {
922 assert!(
923 Arc::ptr_eq(&row.config.fields, &self.fields),
924 "rows were not produced by this RowConverter"
925 );
926 validate_utf8 |= row.config.validate_utf8;
927 row.data
928 })
929 .collect();
930
931 let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
935
936 if cfg!(debug_assertions) {
937 for (i, row) in rows.iter().enumerate() {
938 if !row.is_empty() {
939 return Err(ArrowError::InvalidArgumentError(format!(
940 "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
941 codecs = &self.codecs
942 )));
943 }
944 }
945 }
946
947 Ok(result)
948 }
949
950 pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
979 let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
980 offsets.push(0);
981
982 Rows {
983 offsets,
984 buffer: Vec::with_capacity(data_capacity),
985 config: RowConfig {
986 fields: self.fields.clone(),
987 validate_utf8: false,
988 },
989 }
990 }
991
992 pub fn from_binary(&self, array: BinaryArray) -> Rows {
1019 assert_eq!(
1020 array.null_count(),
1021 0,
1022 "can't construct Rows instance from array with nulls"
1023 );
1024 let (offsets, values, _) = array.into_parts();
1025 let offsets = offsets.iter().map(|&i| i.as_usize()).collect();
1026 let buffer = values.into_vec().unwrap_or_else(|values| values.to_vec());
1028 Rows {
1029 buffer,
1030 offsets,
1031 config: RowConfig {
1032 fields: Arc::clone(&self.fields),
1033 validate_utf8: true,
1034 },
1035 }
1036 }
1037
1038 unsafe fn convert_raw(
1044 &self,
1045 rows: &mut [&[u8]],
1046 validate_utf8: bool,
1047 ) -> Result<Vec<ArrayRef>, ArrowError> {
1048 self.fields
1049 .iter()
1050 .zip(&self.codecs)
1051 .map(|(field, codec)| unsafe { decode_column(field, rows, codec, validate_utf8) })
1052 .collect()
1053 }
1054
1055 pub fn parser(&self) -> RowParser {
1057 RowParser::new(Arc::clone(&self.fields))
1058 }
1059
1060 pub fn size(&self) -> usize {
1064 std::mem::size_of::<Self>()
1065 + self.fields.iter().map(|x| x.size()).sum::<usize>()
1066 + self.codecs.capacity() * std::mem::size_of::<Codec>()
1067 + self.codecs.iter().map(Codec::size).sum::<usize>()
1068 }
1069}
1070
1071#[derive(Debug)]
1073pub struct RowParser {
1074 config: RowConfig,
1075}
1076
1077impl RowParser {
1078 fn new(fields: Arc<[SortField]>) -> Self {
1079 Self {
1080 config: RowConfig {
1081 fields,
1082 validate_utf8: true,
1083 },
1084 }
1085 }
1086
1087 pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
1092 Row {
1093 data: bytes,
1094 config: &self.config,
1095 }
1096 }
1097}
1098
1099#[derive(Debug, Clone)]
1101struct RowConfig {
1102 fields: Arc<[SortField]>,
1104 validate_utf8: bool,
1106}
1107
1108#[derive(Debug)]
1112pub struct Rows {
1113 buffer: Vec<u8>,
1115 offsets: Vec<usize>,
1117 config: RowConfig,
1119}
1120
1121impl Rows {
1122 pub fn push(&mut self, row: Row<'_>) {
1124 assert!(
1125 Arc::ptr_eq(&row.config.fields, &self.config.fields),
1126 "row was not produced by this RowConverter"
1127 );
1128 self.config.validate_utf8 |= row.config.validate_utf8;
1129 self.buffer.extend_from_slice(row.data);
1130 self.offsets.push(self.buffer.len())
1131 }
1132
1133 pub fn row(&self, row: usize) -> Row<'_> {
1135 assert!(row + 1 < self.offsets.len());
1136 unsafe { self.row_unchecked(row) }
1137 }
1138
1139 pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1144 let end = unsafe { self.offsets.get_unchecked(index + 1) };
1145 let start = unsafe { self.offsets.get_unchecked(index) };
1146 let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1147 Row {
1148 data,
1149 config: &self.config,
1150 }
1151 }
1152
1153 pub fn clear(&mut self) {
1155 self.offsets.truncate(1);
1156 self.buffer.clear();
1157 }
1158
1159 pub fn num_rows(&self) -> usize {
1161 self.offsets.len() - 1
1162 }
1163
1164 pub fn iter(&self) -> RowsIter<'_> {
1166 self.into_iter()
1167 }
1168
1169 pub fn size(&self) -> usize {
1173 std::mem::size_of::<Self>()
1175 + self.buffer.capacity()
1176 + self.offsets.capacity() * std::mem::size_of::<usize>()
1177 }
1178
1179 pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1209 if self.buffer.len() > i32::MAX as usize {
1210 return Err(ArrowError::InvalidArgumentError(format!(
1211 "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1212 self.buffer.len()
1213 )));
1214 }
1215 let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1217 let array = unsafe {
1219 BinaryArray::new_unchecked(
1220 OffsetBuffer::new_unchecked(offsets_scalar),
1221 Buffer::from_vec(self.buffer),
1222 None,
1223 )
1224 };
1225 Ok(array)
1226 }
1227}
1228
1229impl<'a> IntoIterator for &'a Rows {
1230 type Item = Row<'a>;
1231 type IntoIter = RowsIter<'a>;
1232
1233 fn into_iter(self) -> Self::IntoIter {
1234 RowsIter {
1235 rows: self,
1236 start: 0,
1237 end: self.num_rows(),
1238 }
1239 }
1240}
1241
1242#[derive(Debug)]
1244pub struct RowsIter<'a> {
1245 rows: &'a Rows,
1246 start: usize,
1247 end: usize,
1248}
1249
1250impl<'a> Iterator for RowsIter<'a> {
1251 type Item = Row<'a>;
1252
1253 fn next(&mut self) -> Option<Self::Item> {
1254 if self.end == self.start {
1255 return None;
1256 }
1257
1258 let row = unsafe { self.rows.row_unchecked(self.start) };
1260 self.start += 1;
1261 Some(row)
1262 }
1263
1264 fn size_hint(&self) -> (usize, Option<usize>) {
1265 let len = self.len();
1266 (len, Some(len))
1267 }
1268}
1269
1270impl ExactSizeIterator for RowsIter<'_> {
1271 fn len(&self) -> usize {
1272 self.end - self.start
1273 }
1274}
1275
1276impl DoubleEndedIterator for RowsIter<'_> {
1277 fn next_back(&mut self) -> Option<Self::Item> {
1278 if self.end == self.start {
1279 return None;
1280 }
1281 let row = unsafe { self.rows.row_unchecked(self.end) };
1283 self.end -= 1;
1284 Some(row)
1285 }
1286}
1287
1288#[derive(Debug, Copy, Clone)]
1297pub struct Row<'a> {
1298 data: &'a [u8],
1299 config: &'a RowConfig,
1300}
1301
1302impl<'a> Row<'a> {
1303 pub fn owned(&self) -> OwnedRow {
1305 OwnedRow {
1306 data: self.data.into(),
1307 config: self.config.clone(),
1308 }
1309 }
1310
1311 pub fn data(&self) -> &'a [u8] {
1313 self.data
1314 }
1315}
1316
1317impl PartialEq for Row<'_> {
1320 #[inline]
1321 fn eq(&self, other: &Self) -> bool {
1322 self.data.eq(other.data)
1323 }
1324}
1325
1326impl Eq for Row<'_> {}
1327
1328impl PartialOrd for Row<'_> {
1329 #[inline]
1330 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1331 Some(self.cmp(other))
1332 }
1333}
1334
1335impl Ord for Row<'_> {
1336 #[inline]
1337 fn cmp(&self, other: &Self) -> Ordering {
1338 self.data.cmp(other.data)
1339 }
1340}
1341
1342impl Hash for Row<'_> {
1343 #[inline]
1344 fn hash<H: Hasher>(&self, state: &mut H) {
1345 self.data.hash(state)
1346 }
1347}
1348
1349impl AsRef<[u8]> for Row<'_> {
1350 #[inline]
1351 fn as_ref(&self) -> &[u8] {
1352 self.data
1353 }
1354}
1355
1356#[derive(Debug, Clone)]
1360pub struct OwnedRow {
1361 data: Box<[u8]>,
1362 config: RowConfig,
1363}
1364
1365impl OwnedRow {
1366 pub fn row(&self) -> Row<'_> {
1370 Row {
1371 data: &self.data,
1372 config: &self.config,
1373 }
1374 }
1375}
1376
1377impl PartialEq for OwnedRow {
1380 #[inline]
1381 fn eq(&self, other: &Self) -> bool {
1382 self.row().eq(&other.row())
1383 }
1384}
1385
1386impl Eq for OwnedRow {}
1387
1388impl PartialOrd for OwnedRow {
1389 #[inline]
1390 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1391 Some(self.cmp(other))
1392 }
1393}
1394
1395impl Ord for OwnedRow {
1396 #[inline]
1397 fn cmp(&self, other: &Self) -> Ordering {
1398 self.row().cmp(&other.row())
1399 }
1400}
1401
1402impl Hash for OwnedRow {
1403 #[inline]
1404 fn hash<H: Hasher>(&self, state: &mut H) {
1405 self.row().hash(state)
1406 }
1407}
1408
1409impl AsRef<[u8]> for OwnedRow {
1410 #[inline]
1411 fn as_ref(&self) -> &[u8] {
1412 &self.data
1413 }
1414}
1415
1416#[inline]
1418fn null_sentinel(options: SortOptions) -> u8 {
1419 match options.nulls_first {
1420 true => 0,
1421 false => 0xFF,
1422 }
1423}
1424
1425enum LengthTracker {
1427 Fixed { length: usize, num_rows: usize },
1429 Variable {
1431 fixed_length: usize,
1432 lengths: Vec<usize>,
1433 },
1434}
1435
1436impl LengthTracker {
1437 fn new(num_rows: usize) -> Self {
1438 Self::Fixed {
1439 length: 0,
1440 num_rows,
1441 }
1442 }
1443
1444 fn push_fixed(&mut self, new_length: usize) {
1446 match self {
1447 LengthTracker::Fixed { length, .. } => *length += new_length,
1448 LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1449 }
1450 }
1451
1452 fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1454 match self {
1455 LengthTracker::Fixed { length, .. } => {
1456 *self = LengthTracker::Variable {
1457 fixed_length: *length,
1458 lengths: new_lengths.collect(),
1459 }
1460 }
1461 LengthTracker::Variable { lengths, .. } => {
1462 assert_eq!(lengths.len(), new_lengths.len());
1463 lengths
1464 .iter_mut()
1465 .zip(new_lengths)
1466 .for_each(|(length, new_length)| *length += new_length);
1467 }
1468 }
1469 }
1470
1471 fn materialized(&mut self) -> &mut [usize] {
1473 if let LengthTracker::Fixed { length, num_rows } = *self {
1474 *self = LengthTracker::Variable {
1475 fixed_length: length,
1476 lengths: vec![0; num_rows],
1477 };
1478 }
1479
1480 match self {
1481 LengthTracker::Variable { lengths, .. } => lengths,
1482 LengthTracker::Fixed { .. } => unreachable!(),
1483 }
1484 }
1485
1486 fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1504 match self {
1505 LengthTracker::Fixed { length, num_rows } => {
1506 offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1507
1508 initial_offset + num_rows * length
1509 }
1510 LengthTracker::Variable {
1511 fixed_length,
1512 lengths,
1513 } => {
1514 let mut acc = initial_offset;
1515
1516 offsets.extend(lengths.iter().map(|length| {
1517 let current = acc;
1518 acc += length + fixed_length;
1519 current
1520 }));
1521
1522 acc
1523 }
1524 }
1525 }
1526}
1527
1528fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1530 use fixed::FixedLengthEncoding;
1531
1532 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1533 let mut tracker = LengthTracker::new(num_rows);
1534
1535 for (array, encoder) in cols.iter().zip(encoders) {
1536 match encoder {
1537 Encoder::Stateless => {
1538 downcast_primitive_array! {
1539 array => tracker.push_fixed(fixed::encoded_len(array)),
1540 DataType::Null => {},
1541 DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1542 DataType::Binary => tracker.push_variable(
1543 as_generic_binary_array::<i32>(array)
1544 .iter()
1545 .map(|slice| variable::encoded_len(slice))
1546 ),
1547 DataType::LargeBinary => tracker.push_variable(
1548 as_generic_binary_array::<i64>(array)
1549 .iter()
1550 .map(|slice| variable::encoded_len(slice))
1551 ),
1552 DataType::BinaryView => tracker.push_variable(
1553 array.as_binary_view()
1554 .iter()
1555 .map(|slice| variable::encoded_len(slice))
1556 ),
1557 DataType::Utf8 => tracker.push_variable(
1558 array.as_string::<i32>()
1559 .iter()
1560 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1561 ),
1562 DataType::LargeUtf8 => tracker.push_variable(
1563 array.as_string::<i64>()
1564 .iter()
1565 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1566 ),
1567 DataType::Utf8View => tracker.push_variable(
1568 array.as_string_view()
1569 .iter()
1570 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1571 ),
1572 DataType::FixedSizeBinary(len) => {
1573 let len = len.to_usize().unwrap();
1574 tracker.push_fixed(1 + len)
1575 }
1576 _ => unimplemented!("unsupported data type: {}", array.data_type()),
1577 }
1578 }
1579 Encoder::Dictionary(values, null) => {
1580 downcast_dictionary_array! {
1581 array => {
1582 tracker.push_variable(
1583 array.keys().iter().map(|v| match v {
1584 Some(k) => values.row(k.as_usize()).data.len(),
1585 None => null.data.len(),
1586 })
1587 )
1588 }
1589 _ => unreachable!(),
1590 }
1591 }
1592 Encoder::Struct(rows, null) => {
1593 let array = as_struct_array(array);
1594 tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1595 true => 1 + rows.row(idx).as_ref().len(),
1596 false => 1 + null.data.len(),
1597 }));
1598 }
1599 Encoder::List(rows) => match array.data_type() {
1600 DataType::List(_) => {
1601 list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1602 }
1603 DataType::LargeList(_) => {
1604 list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1605 }
1606 DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1607 &mut tracker,
1608 rows,
1609 as_fixed_size_list_array(array),
1610 ),
1611 _ => unreachable!(),
1612 },
1613 Encoder::RunEndEncoded(rows) => match array.data_type() {
1614 DataType::RunEndEncoded(r, _) => match r.data_type() {
1615 DataType::Int16 => run::compute_lengths(
1616 tracker.materialized(),
1617 rows,
1618 array.as_run::<Int16Type>(),
1619 ),
1620 DataType::Int32 => run::compute_lengths(
1621 tracker.materialized(),
1622 rows,
1623 array.as_run::<Int32Type>(),
1624 ),
1625 DataType::Int64 => run::compute_lengths(
1626 tracker.materialized(),
1627 rows,
1628 array.as_run::<Int64Type>(),
1629 ),
1630 _ => unreachable!("Unsupported run end index type: {r:?}"),
1631 },
1632 _ => unreachable!(),
1633 },
1634 Encoder::Union {
1635 child_rows,
1636 type_ids,
1637 offsets,
1638 } => {
1639 let union_array = array
1640 .as_any()
1641 .downcast_ref::<UnionArray>()
1642 .expect("expected UnionArray");
1643
1644 let lengths = (0..union_array.len()).map(|i| {
1645 let type_id = type_ids[i];
1646 let child_row_i = offsets.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1647 let child_row = child_rows[type_id as usize].row(child_row_i);
1648
1649 1 + child_row.as_ref().len()
1651 });
1652
1653 tracker.push_variable(lengths);
1654 }
1655 }
1656 }
1657
1658 tracker
1659}
1660
1661fn encode_column(
1663 data: &mut [u8],
1664 offsets: &mut [usize],
1665 column: &dyn Array,
1666 opts: SortOptions,
1667 encoder: &Encoder<'_>,
1668) {
1669 match encoder {
1670 Encoder::Stateless => {
1671 downcast_primitive_array! {
1672 column => {
1673 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1674 fixed::encode(data, offsets, column.values(), nulls, opts)
1675 } else {
1676 fixed::encode_not_null(data, offsets, column.values(), opts)
1677 }
1678 }
1679 DataType::Null => {}
1680 DataType::Boolean => {
1681 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1682 fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1683 } else {
1684 fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1685 }
1686 }
1687 DataType::Binary => {
1688 variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i32>(column), opts)
1689 }
1690 DataType::BinaryView => {
1691 variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1692 }
1693 DataType::LargeBinary => {
1694 variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i64>(column), opts)
1695 }
1696 DataType::Utf8 => variable::encode_generic_byte_array(
1697 data, offsets,
1698 column.as_string::<i32>(),
1699 opts,
1700 ),
1701 DataType::LargeUtf8 => variable::encode_generic_byte_array(
1702 data, offsets,
1703 column.as_string::<i64>(),
1704 opts,
1705 ),
1706 DataType::Utf8View => variable::encode(
1707 data, offsets,
1708 column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1709 opts,
1710 ),
1711 DataType::FixedSizeBinary(_) => {
1712 let array = column.as_any().downcast_ref().unwrap();
1713 fixed::encode_fixed_size_binary(data, offsets, array, opts)
1714 }
1715 _ => unimplemented!("unsupported data type: {}", column.data_type()),
1716 }
1717 }
1718 Encoder::Dictionary(values, nulls) => {
1719 downcast_dictionary_array! {
1720 column => encode_dictionary_values(data, offsets, column, values, nulls),
1721 _ => unreachable!()
1722 }
1723 }
1724 Encoder::Struct(rows, null) => {
1725 let array = as_struct_array(column);
1726 let null_sentinel = null_sentinel(opts);
1727 offsets
1728 .iter_mut()
1729 .skip(1)
1730 .enumerate()
1731 .for_each(|(idx, offset)| {
1732 let (row, sentinel) = match array.is_valid(idx) {
1733 true => (rows.row(idx), 0x01),
1734 false => (*null, null_sentinel),
1735 };
1736 let end_offset = *offset + 1 + row.as_ref().len();
1737 data[*offset] = sentinel;
1738 data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1739 *offset = end_offset;
1740 })
1741 }
1742 Encoder::List(rows) => match column.data_type() {
1743 DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1744 DataType::LargeList(_) => {
1745 list::encode(data, offsets, rows, opts, as_large_list_array(column))
1746 }
1747 DataType::FixedSizeList(_, _) => {
1748 encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1749 }
1750 _ => unreachable!(),
1751 },
1752 Encoder::RunEndEncoded(rows) => match column.data_type() {
1753 DataType::RunEndEncoded(r, _) => match r.data_type() {
1754 DataType::Int16 => {
1755 run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1756 }
1757 DataType::Int32 => {
1758 run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1759 }
1760 DataType::Int64 => {
1761 run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1762 }
1763 _ => unreachable!("Unsupported run end index type: {r:?}"),
1764 },
1765 _ => unreachable!(),
1766 },
1767 Encoder::Union {
1768 child_rows,
1769 type_ids,
1770 offsets: offsets_buf,
1771 } => {
1772 offsets
1773 .iter_mut()
1774 .skip(1)
1775 .enumerate()
1776 .for_each(|(i, offset)| {
1777 let type_id = type_ids[i];
1778
1779 let child_row_idx = offsets_buf.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1780 let child_row = child_rows[type_id as usize].row(child_row_idx);
1781 let child_bytes = child_row.as_ref();
1782
1783 let type_id_byte = if opts.descending {
1784 !(type_id as u8)
1785 } else {
1786 type_id as u8
1787 };
1788 data[*offset] = type_id_byte;
1789
1790 let child_start = *offset + 1;
1791 let child_end = child_start + child_bytes.len();
1792 data[child_start..child_end].copy_from_slice(child_bytes);
1793
1794 *offset = child_end;
1795 });
1796 }
1797 }
1798}
1799
1800pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1802 data: &mut [u8],
1803 offsets: &mut [usize],
1804 column: &DictionaryArray<K>,
1805 values: &Rows,
1806 null: &Row<'_>,
1807) {
1808 for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1809 let row = match k {
1810 Some(k) => values.row(k.as_usize()).data,
1811 None => null.data,
1812 };
1813 let end_offset = *offset + row.len();
1814 data[*offset..end_offset].copy_from_slice(row);
1815 *offset = end_offset;
1816 }
1817}
1818
1819macro_rules! decode_primitive_helper {
1820 ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1821 Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1822 };
1823}
1824
1825unsafe fn decode_column(
1831 field: &SortField,
1832 rows: &mut [&[u8]],
1833 codec: &Codec,
1834 validate_utf8: bool,
1835) -> Result<ArrayRef, ArrowError> {
1836 let options = field.options;
1837
1838 let array: ArrayRef = match codec {
1839 Codec::Stateless => {
1840 let data_type = field.data_type.clone();
1841 downcast_primitive! {
1842 data_type => (decode_primitive_helper, rows, data_type, options),
1843 DataType::Null => Arc::new(NullArray::new(rows.len())),
1844 DataType::Boolean => Arc::new(decode_bool(rows, options)),
1845 DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1846 DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1847 DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1848 DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1849 DataType::Utf8 => Arc::new(unsafe{ decode_string::<i32>(rows, options, validate_utf8) }),
1850 DataType::LargeUtf8 => Arc::new(unsafe { decode_string::<i64>(rows, options, validate_utf8) }),
1851 DataType::Utf8View => Arc::new(unsafe { decode_string_view(rows, options, validate_utf8) }),
1852 _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1853 }
1854 }
1855 Codec::Dictionary(converter, _) => {
1856 let cols = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1857 cols.into_iter().next().unwrap()
1858 }
1859 Codec::Struct(converter, _) => {
1860 let (null_count, nulls) = fixed::decode_nulls(rows);
1861 rows.iter_mut().for_each(|row| *row = &row[1..]);
1862 let children = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1863
1864 let child_data: Vec<ArrayData> = children.iter().map(|c| c.to_data()).collect();
1865 let corrected_fields: Vec<Field> = match &field.data_type {
1868 DataType::Struct(struct_fields) => struct_fields
1869 .iter()
1870 .zip(child_data.iter())
1871 .map(|(orig_field, child_array)| {
1872 orig_field
1873 .as_ref()
1874 .clone()
1875 .with_data_type(child_array.data_type().clone())
1876 })
1877 .collect(),
1878 _ => unreachable!("Only Struct types should be corrected here"),
1879 };
1880 let corrected_struct_type = DataType::Struct(corrected_fields.into());
1881 let builder = ArrayDataBuilder::new(corrected_struct_type)
1882 .len(rows.len())
1883 .null_count(null_count)
1884 .null_bit_buffer(Some(nulls))
1885 .child_data(child_data);
1886
1887 Arc::new(StructArray::from(unsafe { builder.build_unchecked() }))
1888 }
1889 Codec::List(converter) => match &field.data_type {
1890 DataType::List(_) => {
1891 Arc::new(unsafe { list::decode::<i32>(converter, rows, field, validate_utf8) }?)
1892 }
1893 DataType::LargeList(_) => {
1894 Arc::new(unsafe { list::decode::<i64>(converter, rows, field, validate_utf8) }?)
1895 }
1896 DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
1897 list::decode_fixed_size_list(
1898 converter,
1899 rows,
1900 field,
1901 validate_utf8,
1902 value_length.as_usize(),
1903 )
1904 }?),
1905 _ => unreachable!(),
1906 },
1907 Codec::RunEndEncoded(converter) => match &field.data_type {
1908 DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1909 DataType::Int16 => Arc::new(unsafe {
1910 run::decode::<Int16Type>(converter, rows, field, validate_utf8)
1911 }?),
1912 DataType::Int32 => Arc::new(unsafe {
1913 run::decode::<Int32Type>(converter, rows, field, validate_utf8)
1914 }?),
1915 DataType::Int64 => Arc::new(unsafe {
1916 run::decode::<Int64Type>(converter, rows, field, validate_utf8)
1917 }?),
1918 _ => unreachable!(),
1919 },
1920 _ => unreachable!(),
1921 },
1922 Codec::Union(converters, null_rows) => {
1923 let len = rows.len();
1924
1925 let DataType::Union(union_fields, mode) = &field.data_type else {
1926 unreachable!()
1927 };
1928
1929 let mut type_ids = Vec::with_capacity(len);
1930 let mut rows_by_field: Vec<Vec<(usize, &[u8])>> = vec![Vec::new(); converters.len()];
1931
1932 for (idx, row) in rows.iter_mut().enumerate() {
1933 let type_id_byte = {
1934 let id = row[0];
1935 if options.descending { !id } else { id }
1936 };
1937
1938 let type_id = type_id_byte as i8;
1939 type_ids.push(type_id);
1940
1941 let field_idx = type_id as usize;
1942
1943 let child_row = &row[1..];
1944 rows_by_field[field_idx].push((idx, child_row));
1945 }
1946
1947 let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(converters.len());
1948 let mut offsets = (*mode == UnionMode::Dense).then(|| Vec::with_capacity(len));
1949
1950 for (field_idx, converter) in converters.iter().enumerate() {
1951 let field_rows = &rows_by_field[field_idx];
1952
1953 match &mode {
1954 UnionMode::Dense => {
1955 if field_rows.is_empty() {
1956 let (_, field) = union_fields.iter().nth(field_idx).unwrap();
1957 child_arrays.push(arrow_array::new_empty_array(field.data_type()));
1958 continue;
1959 }
1960
1961 let mut child_data = field_rows
1962 .iter()
1963 .map(|(_, bytes)| *bytes)
1964 .collect::<Vec<_>>();
1965
1966 let child_array =
1967 unsafe { converter.convert_raw(&mut child_data, validate_utf8) }?;
1968
1969 for ((row_idx, original_bytes), remaining_bytes) in
1971 field_rows.iter().zip(child_data)
1972 {
1973 let consumed_length = 1 + original_bytes.len() - remaining_bytes.len();
1974 rows[*row_idx] = &rows[*row_idx][consumed_length..];
1975 }
1976
1977 child_arrays.push(child_array.into_iter().next().unwrap());
1978 }
1979 UnionMode::Sparse => {
1980 let mut sparse_data: Vec<&[u8]> = Vec::with_capacity(len);
1981 let mut field_row_iter = field_rows.iter().peekable();
1982 let null_row_bytes: &[u8] = &null_rows[field_idx].data;
1983
1984 for idx in 0..len {
1985 if let Some((next_idx, bytes)) = field_row_iter.peek() {
1986 if *next_idx == idx {
1987 sparse_data.push(*bytes);
1988
1989 field_row_iter.next();
1990 continue;
1991 }
1992 }
1993 sparse_data.push(null_row_bytes);
1994 }
1995
1996 let child_array =
1997 unsafe { converter.convert_raw(&mut sparse_data, validate_utf8) }?;
1998
1999 for (row_idx, child_row) in field_rows.iter() {
2001 let remaining_len = sparse_data[*row_idx].len();
2002 let consumed_length = 1 + child_row.len() - remaining_len;
2003 rows[*row_idx] = &rows[*row_idx][consumed_length..];
2004 }
2005
2006 child_arrays.push(child_array.into_iter().next().unwrap());
2007 }
2008 }
2009 }
2010
2011 if let Some(ref mut offsets_vec) = offsets {
2013 let mut count = vec![0i32; converters.len()];
2014 for type_id in &type_ids {
2015 let field_idx = *type_id as usize;
2016 offsets_vec.push(count[field_idx]);
2017
2018 count[field_idx] += 1;
2019 }
2020 }
2021
2022 let type_ids_buffer = ScalarBuffer::from(type_ids);
2023 let offsets_buffer = offsets.map(ScalarBuffer::from);
2024
2025 let union_array = UnionArray::try_new(
2026 union_fields.clone(),
2027 type_ids_buffer,
2028 offsets_buffer,
2029 child_arrays,
2030 )?;
2031
2032 Arc::new(union_array)
2035 }
2036 };
2037 Ok(array)
2038}
2039
2040#[cfg(test)]
2041mod tests {
2042 use rand::distr::uniform::SampleUniform;
2043 use rand::distr::{Distribution, StandardUniform};
2044 use rand::{Rng, rng};
2045
2046 use arrow_array::builder::*;
2047 use arrow_array::types::*;
2048 use arrow_array::*;
2049 use arrow_buffer::{Buffer, OffsetBuffer};
2050 use arrow_buffer::{NullBuffer, i256};
2051 use arrow_cast::display::{ArrayFormatter, FormatOptions};
2052 use arrow_ord::sort::{LexicographicalComparator, SortColumn};
2053
2054 use super::*;
2055
2056 #[test]
2057 fn test_fixed_width() {
2058 let cols = [
2059 Arc::new(Int16Array::from_iter([
2060 Some(1),
2061 Some(2),
2062 None,
2063 Some(-5),
2064 Some(2),
2065 Some(2),
2066 Some(0),
2067 ])) as ArrayRef,
2068 Arc::new(Float32Array::from_iter([
2069 Some(1.3),
2070 Some(2.5),
2071 None,
2072 Some(4.),
2073 Some(0.1),
2074 Some(-4.),
2075 Some(-0.),
2076 ])) as ArrayRef,
2077 ];
2078
2079 let converter = RowConverter::new(vec![
2080 SortField::new(DataType::Int16),
2081 SortField::new(DataType::Float32),
2082 ])
2083 .unwrap();
2084 let rows = converter.convert_columns(&cols).unwrap();
2085
2086 assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
2087 assert_eq!(
2088 rows.buffer,
2089 &[
2090 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
2105 );
2106
2107 assert!(rows.row(3) < rows.row(6));
2108 assert!(rows.row(0) < rows.row(1));
2109 assert!(rows.row(3) < rows.row(0));
2110 assert!(rows.row(4) < rows.row(1));
2111 assert!(rows.row(5) < rows.row(4));
2112
2113 let back = converter.convert_rows(&rows).unwrap();
2114 for (expected, actual) in cols.iter().zip(&back) {
2115 assert_eq!(expected, actual);
2116 }
2117 }
2118
2119 #[test]
2120 fn test_decimal32() {
2121 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal32(
2122 DECIMAL32_MAX_PRECISION,
2123 7,
2124 ))])
2125 .unwrap();
2126 let col = Arc::new(
2127 Decimal32Array::from_iter([
2128 None,
2129 Some(i32::MIN),
2130 Some(-13),
2131 Some(46_i32),
2132 Some(5456_i32),
2133 Some(i32::MAX),
2134 ])
2135 .with_precision_and_scale(9, 7)
2136 .unwrap(),
2137 ) as ArrayRef;
2138
2139 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2140 for i in 0..rows.num_rows() - 1 {
2141 assert!(rows.row(i) < rows.row(i + 1));
2142 }
2143
2144 let back = converter.convert_rows(&rows).unwrap();
2145 assert_eq!(back.len(), 1);
2146 assert_eq!(col.as_ref(), back[0].as_ref())
2147 }
2148
2149 #[test]
2150 fn test_decimal64() {
2151 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal64(
2152 DECIMAL64_MAX_PRECISION,
2153 7,
2154 ))])
2155 .unwrap();
2156 let col = Arc::new(
2157 Decimal64Array::from_iter([
2158 None,
2159 Some(i64::MIN),
2160 Some(-13),
2161 Some(46_i64),
2162 Some(5456_i64),
2163 Some(i64::MAX),
2164 ])
2165 .with_precision_and_scale(18, 7)
2166 .unwrap(),
2167 ) as ArrayRef;
2168
2169 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2170 for i in 0..rows.num_rows() - 1 {
2171 assert!(rows.row(i) < rows.row(i + 1));
2172 }
2173
2174 let back = converter.convert_rows(&rows).unwrap();
2175 assert_eq!(back.len(), 1);
2176 assert_eq!(col.as_ref(), back[0].as_ref())
2177 }
2178
2179 #[test]
2180 fn test_decimal128() {
2181 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
2182 DECIMAL128_MAX_PRECISION,
2183 7,
2184 ))])
2185 .unwrap();
2186 let col = Arc::new(
2187 Decimal128Array::from_iter([
2188 None,
2189 Some(i128::MIN),
2190 Some(-13),
2191 Some(46_i128),
2192 Some(5456_i128),
2193 Some(i128::MAX),
2194 ])
2195 .with_precision_and_scale(38, 7)
2196 .unwrap(),
2197 ) as ArrayRef;
2198
2199 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2200 for i in 0..rows.num_rows() - 1 {
2201 assert!(rows.row(i) < rows.row(i + 1));
2202 }
2203
2204 let back = converter.convert_rows(&rows).unwrap();
2205 assert_eq!(back.len(), 1);
2206 assert_eq!(col.as_ref(), back[0].as_ref())
2207 }
2208
2209 #[test]
2210 fn test_decimal256() {
2211 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
2212 DECIMAL256_MAX_PRECISION,
2213 7,
2214 ))])
2215 .unwrap();
2216 let col = Arc::new(
2217 Decimal256Array::from_iter([
2218 None,
2219 Some(i256::MIN),
2220 Some(i256::from_parts(0, -1)),
2221 Some(i256::from_parts(u128::MAX, -1)),
2222 Some(i256::from_parts(u128::MAX, 0)),
2223 Some(i256::from_parts(0, 46_i128)),
2224 Some(i256::from_parts(5, 46_i128)),
2225 Some(i256::MAX),
2226 ])
2227 .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
2228 .unwrap(),
2229 ) as ArrayRef;
2230
2231 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2232 for i in 0..rows.num_rows() - 1 {
2233 assert!(rows.row(i) < rows.row(i + 1));
2234 }
2235
2236 let back = converter.convert_rows(&rows).unwrap();
2237 assert_eq!(back.len(), 1);
2238 assert_eq!(col.as_ref(), back[0].as_ref())
2239 }
2240
2241 #[test]
2242 fn test_bool() {
2243 let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
2244
2245 let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
2246
2247 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2248 assert!(rows.row(2) > rows.row(1));
2249 assert!(rows.row(2) > rows.row(0));
2250 assert!(rows.row(1) > rows.row(0));
2251
2252 let cols = converter.convert_rows(&rows).unwrap();
2253 assert_eq!(&cols[0], &col);
2254
2255 let converter = RowConverter::new(vec![SortField::new_with_options(
2256 DataType::Boolean,
2257 SortOptions::default().desc().with_nulls_first(false),
2258 )])
2259 .unwrap();
2260
2261 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2262 assert!(rows.row(2) < rows.row(1));
2263 assert!(rows.row(2) < rows.row(0));
2264 assert!(rows.row(1) < rows.row(0));
2265 let cols = converter.convert_rows(&rows).unwrap();
2266 assert_eq!(&cols[0], &col);
2267 }
2268
2269 #[test]
2270 fn test_timezone() {
2271 let a =
2272 TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
2273 let d = a.data_type().clone();
2274
2275 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2276 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2277 let back = converter.convert_rows(&rows).unwrap();
2278 assert_eq!(back.len(), 1);
2279 assert_eq!(back[0].data_type(), &d);
2280
2281 let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
2283 a.append(34).unwrap();
2284 a.append_null();
2285 a.append(345).unwrap();
2286
2287 let dict = a.finish();
2289 let values = TimestampNanosecondArray::from(dict.values().to_data());
2290 let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
2291 let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
2292 let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
2293
2294 assert_eq!(dict_with_tz.data_type(), &d);
2295 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2296 let rows = converter
2297 .convert_columns(&[Arc::new(dict_with_tz) as _])
2298 .unwrap();
2299 let back = converter.convert_rows(&rows).unwrap();
2300 assert_eq!(back.len(), 1);
2301 assert_eq!(back[0].data_type(), &v);
2302 }
2303
2304 #[test]
2305 fn test_null_encoding() {
2306 let col = Arc::new(NullArray::new(10));
2307 let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
2308 let rows = converter.convert_columns(&[col]).unwrap();
2309 assert_eq!(rows.num_rows(), 10);
2310 assert_eq!(rows.row(1).data.len(), 0);
2311 }
2312
2313 #[test]
2314 fn test_variable_width() {
2315 let col = Arc::new(StringArray::from_iter([
2316 Some("hello"),
2317 Some("he"),
2318 None,
2319 Some("foo"),
2320 Some(""),
2321 ])) as ArrayRef;
2322
2323 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2324 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2325
2326 assert!(rows.row(1) < rows.row(0));
2327 assert!(rows.row(2) < rows.row(4));
2328 assert!(rows.row(3) < rows.row(0));
2329 assert!(rows.row(3) < rows.row(1));
2330
2331 let cols = converter.convert_rows(&rows).unwrap();
2332 assert_eq!(&cols[0], &col);
2333
2334 let col = Arc::new(BinaryArray::from_iter([
2335 None,
2336 Some(vec![0_u8; 0]),
2337 Some(vec![0_u8; 6]),
2338 Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
2339 Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
2340 Some(vec![0_u8; variable::BLOCK_SIZE]),
2341 Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
2342 Some(vec![1_u8; 6]),
2343 Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
2344 Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
2345 Some(vec![1_u8; variable::BLOCK_SIZE]),
2346 Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
2347 Some(vec![0xFF_u8; 6]),
2348 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
2349 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
2350 Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
2351 Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
2352 ])) as ArrayRef;
2353
2354 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2355 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2356
2357 for i in 0..rows.num_rows() {
2358 for j in i + 1..rows.num_rows() {
2359 assert!(
2360 rows.row(i) < rows.row(j),
2361 "{} < {} - {:?} < {:?}",
2362 i,
2363 j,
2364 rows.row(i),
2365 rows.row(j)
2366 );
2367 }
2368 }
2369
2370 let cols = converter.convert_rows(&rows).unwrap();
2371 assert_eq!(&cols[0], &col);
2372
2373 let converter = RowConverter::new(vec![SortField::new_with_options(
2374 DataType::Binary,
2375 SortOptions::default().desc().with_nulls_first(false),
2376 )])
2377 .unwrap();
2378 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2379
2380 for i in 0..rows.num_rows() {
2381 for j in i + 1..rows.num_rows() {
2382 assert!(
2383 rows.row(i) > rows.row(j),
2384 "{} > {} - {:?} > {:?}",
2385 i,
2386 j,
2387 rows.row(i),
2388 rows.row(j)
2389 );
2390 }
2391 }
2392
2393 let cols = converter.convert_rows(&rows).unwrap();
2394 assert_eq!(&cols[0], &col);
2395 }
2396
2397 fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2399 match b.data_type() {
2400 DataType::Dictionary(_, v) => {
2401 assert_eq!(a.data_type(), v.as_ref());
2402 let b = arrow_cast::cast(b, v).unwrap();
2403 assert_eq!(a, b.as_ref())
2404 }
2405 _ => assert_eq!(a, b),
2406 }
2407 }
2408
2409 #[test]
2410 fn test_string_dictionary() {
2411 let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2412 Some("foo"),
2413 Some("hello"),
2414 Some("he"),
2415 None,
2416 Some("hello"),
2417 Some(""),
2418 Some("hello"),
2419 Some("hello"),
2420 ])) as ArrayRef;
2421
2422 let field = SortField::new(a.data_type().clone());
2423 let converter = RowConverter::new(vec![field]).unwrap();
2424 let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2425
2426 assert!(rows_a.row(3) < rows_a.row(5));
2427 assert!(rows_a.row(2) < rows_a.row(1));
2428 assert!(rows_a.row(0) < rows_a.row(1));
2429 assert!(rows_a.row(3) < rows_a.row(0));
2430
2431 assert_eq!(rows_a.row(1), rows_a.row(4));
2432 assert_eq!(rows_a.row(1), rows_a.row(6));
2433 assert_eq!(rows_a.row(1), rows_a.row(7));
2434
2435 let cols = converter.convert_rows(&rows_a).unwrap();
2436 dictionary_eq(&cols[0], &a);
2437
2438 let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2439 Some("hello"),
2440 None,
2441 Some("cupcakes"),
2442 ])) as ArrayRef;
2443
2444 let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2445 assert_eq!(rows_a.row(1), rows_b.row(0));
2446 assert_eq!(rows_a.row(3), rows_b.row(1));
2447 assert!(rows_b.row(2) < rows_a.row(0));
2448
2449 let cols = converter.convert_rows(&rows_b).unwrap();
2450 dictionary_eq(&cols[0], &b);
2451
2452 let converter = RowConverter::new(vec![SortField::new_with_options(
2453 a.data_type().clone(),
2454 SortOptions::default().desc().with_nulls_first(false),
2455 )])
2456 .unwrap();
2457
2458 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2459 assert!(rows_c.row(3) > rows_c.row(5));
2460 assert!(rows_c.row(2) > rows_c.row(1));
2461 assert!(rows_c.row(0) > rows_c.row(1));
2462 assert!(rows_c.row(3) > rows_c.row(0));
2463
2464 let cols = converter.convert_rows(&rows_c).unwrap();
2465 dictionary_eq(&cols[0], &a);
2466
2467 let converter = RowConverter::new(vec![SortField::new_with_options(
2468 a.data_type().clone(),
2469 SortOptions::default().desc().with_nulls_first(true),
2470 )])
2471 .unwrap();
2472
2473 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2474 assert!(rows_c.row(3) < rows_c.row(5));
2475 assert!(rows_c.row(2) > rows_c.row(1));
2476 assert!(rows_c.row(0) > rows_c.row(1));
2477 assert!(rows_c.row(3) < rows_c.row(0));
2478
2479 let cols = converter.convert_rows(&rows_c).unwrap();
2480 dictionary_eq(&cols[0], &a);
2481 }
2482
2483 #[test]
2484 fn test_struct() {
2485 let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2487 let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2488 let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2489 let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2490 let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2491
2492 let sort_fields = vec![SortField::new(s1.data_type().clone())];
2493 let converter = RowConverter::new(sort_fields).unwrap();
2494 let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2495
2496 for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2497 assert!(a < b);
2498 }
2499
2500 let back = converter.convert_rows(&r1).unwrap();
2501 assert_eq!(back.len(), 1);
2502 assert_eq!(&back[0], &s1);
2503
2504 let data = s1
2506 .to_data()
2507 .into_builder()
2508 .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2509 .null_count(2)
2510 .build()
2511 .unwrap();
2512
2513 let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2514 let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2515 assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1)); let back = converter.convert_rows(&r2).unwrap();
2521 assert_eq!(back.len(), 1);
2522 assert_eq!(&back[0], &s2);
2523
2524 back[0].to_data().validate_full().unwrap();
2525 }
2526
2527 #[test]
2528 fn test_dictionary_in_struct() {
2529 let builder = StringDictionaryBuilder::<Int32Type>::new();
2530 let mut struct_builder = StructBuilder::new(
2531 vec![Field::new_dictionary(
2532 "foo",
2533 DataType::Int32,
2534 DataType::Utf8,
2535 true,
2536 )],
2537 vec![Box::new(builder)],
2538 );
2539
2540 let dict_builder = struct_builder
2541 .field_builder::<StringDictionaryBuilder<Int32Type>>(0)
2542 .unwrap();
2543
2544 dict_builder.append_value("a");
2546 dict_builder.append_null();
2547 dict_builder.append_value("a");
2548 dict_builder.append_value("b");
2549
2550 for _ in 0..4 {
2551 struct_builder.append(true);
2552 }
2553
2554 let s = Arc::new(struct_builder.finish()) as ArrayRef;
2555 let sort_fields = vec![SortField::new(s.data_type().clone())];
2556 let converter = RowConverter::new(sort_fields).unwrap();
2557 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2558
2559 let back = converter.convert_rows(&r).unwrap();
2560 let [s2] = back.try_into().unwrap();
2561
2562 assert_ne!(&s.data_type(), &s2.data_type());
2565 s2.to_data().validate_full().unwrap();
2566
2567 let s1_struct = s.as_struct();
2571 let s1_0 = s1_struct.column(0);
2572 let s1_idx_0 = s1_0.as_dictionary::<Int32Type>();
2573 let keys = s1_idx_0.keys();
2574 let values = s1_idx_0.values().as_string::<i32>();
2575 let s2_struct = s2.as_struct();
2577 let s2_0 = s2_struct.column(0);
2578 let s2_idx_0 = s2_0.as_string::<i32>();
2579
2580 for i in 0..keys.len() {
2581 if keys.is_null(i) {
2582 assert!(s2_idx_0.is_null(i));
2583 } else {
2584 let dict_index = keys.value(i) as usize;
2585 assert_eq!(values.value(dict_index), s2_idx_0.value(i));
2586 }
2587 }
2588 }
2589
2590 #[test]
2591 fn test_dictionary_in_struct_empty() {
2592 let ty = DataType::Struct(
2593 vec![Field::new_dictionary(
2594 "foo",
2595 DataType::Int32,
2596 DataType::Int32,
2597 false,
2598 )]
2599 .into(),
2600 );
2601 let s = arrow_array::new_empty_array(&ty);
2602
2603 let sort_fields = vec![SortField::new(s.data_type().clone())];
2604 let converter = RowConverter::new(sort_fields).unwrap();
2605 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2606
2607 let back = converter.convert_rows(&r).unwrap();
2608 let [s2] = back.try_into().unwrap();
2609
2610 assert_ne!(&s.data_type(), &s2.data_type());
2613 s2.to_data().validate_full().unwrap();
2614 assert_eq!(s.len(), 0);
2615 assert_eq!(s2.len(), 0);
2616 }
2617
2618 #[test]
2619 fn test_list_of_string_dictionary() {
2620 let mut builder = ListBuilder::<StringDictionaryBuilder<Int32Type>>::default();
2621 builder.values().append("a").unwrap();
2623 builder.values().append("b").unwrap();
2624 builder.values().append("zero").unwrap();
2625 builder.values().append_null();
2626 builder.values().append("c").unwrap();
2627 builder.values().append("b").unwrap();
2628 builder.values().append("d").unwrap();
2629 builder.append(true);
2630 builder.append(false);
2632 builder.values().append("e").unwrap();
2634 builder.values().append("zero").unwrap();
2635 builder.values().append("a").unwrap();
2636 builder.append(true);
2637
2638 let a = Arc::new(builder.finish()) as ArrayRef;
2639 let data_type = a.data_type().clone();
2640
2641 let field = SortField::new(data_type.clone());
2642 let converter = RowConverter::new(vec![field]).unwrap();
2643 let rows = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2644
2645 let back = converter.convert_rows(&rows).unwrap();
2646 assert_eq!(back.len(), 1);
2647 let [a2] = back.try_into().unwrap();
2648
2649 assert_ne!(&a.data_type(), &a2.data_type());
2652
2653 a2.to_data().validate_full().unwrap();
2654
2655 let a2_list = a2.as_list::<i32>();
2656 let a1_list = a.as_list::<i32>();
2657
2658 let a1_0 = a1_list.value(0);
2661 let a1_idx_0 = a1_0.as_dictionary::<Int32Type>();
2662 let keys = a1_idx_0.keys();
2663 let values = a1_idx_0.values().as_string::<i32>();
2664 let a2_0 = a2_list.value(0);
2665 let a2_idx_0 = a2_0.as_string::<i32>();
2666
2667 for i in 0..keys.len() {
2668 if keys.is_null(i) {
2669 assert!(a2_idx_0.is_null(i));
2670 } else {
2671 let dict_index = keys.value(i) as usize;
2672 assert_eq!(values.value(dict_index), a2_idx_0.value(i));
2673 }
2674 }
2675
2676 assert!(a1_list.is_null(1));
2678 assert!(a2_list.is_null(1));
2679
2680 let a1_2 = a1_list.value(2);
2682 let a1_idx_2 = a1_2.as_dictionary::<Int32Type>();
2683 let keys = a1_idx_2.keys();
2684 let values = a1_idx_2.values().as_string::<i32>();
2685 let a2_2 = a2_list.value(2);
2686 let a2_idx_2 = a2_2.as_string::<i32>();
2687
2688 for i in 0..keys.len() {
2689 if keys.is_null(i) {
2690 assert!(a2_idx_2.is_null(i));
2691 } else {
2692 let dict_index = keys.value(i) as usize;
2693 assert_eq!(values.value(dict_index), a2_idx_2.value(i));
2694 }
2695 }
2696 }
2697
2698 #[test]
2699 fn test_primitive_dictionary() {
2700 let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2701 builder.append(2).unwrap();
2702 builder.append(3).unwrap();
2703 builder.append(0).unwrap();
2704 builder.append_null();
2705 builder.append(5).unwrap();
2706 builder.append(3).unwrap();
2707 builder.append(-1).unwrap();
2708
2709 let a = builder.finish();
2710 let data_type = a.data_type().clone();
2711 let columns = [Arc::new(a) as ArrayRef];
2712
2713 let field = SortField::new(data_type.clone());
2714 let converter = RowConverter::new(vec![field]).unwrap();
2715 let rows = converter.convert_columns(&columns).unwrap();
2716 assert!(rows.row(0) < rows.row(1));
2717 assert!(rows.row(2) < rows.row(0));
2718 assert!(rows.row(3) < rows.row(2));
2719 assert!(rows.row(6) < rows.row(2));
2720 assert!(rows.row(3) < rows.row(6));
2721
2722 let back = converter.convert_rows(&rows).unwrap();
2723 assert_eq!(back.len(), 1);
2724 back[0].to_data().validate_full().unwrap();
2725 }
2726
2727 #[test]
2728 fn test_dictionary_nulls() {
2729 let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2730 let keys =
2731 Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2732
2733 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2734 let data = keys
2735 .into_builder()
2736 .data_type(data_type.clone())
2737 .child_data(vec![values])
2738 .build()
2739 .unwrap();
2740
2741 let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2742 let field = SortField::new(data_type.clone());
2743 let converter = RowConverter::new(vec![field]).unwrap();
2744 let rows = converter.convert_columns(&columns).unwrap();
2745
2746 assert_eq!(rows.row(0), rows.row(1));
2747 assert_eq!(rows.row(3), rows.row(4));
2748 assert_eq!(rows.row(4), rows.row(5));
2749 assert!(rows.row(3) < rows.row(0));
2750 }
2751
2752 #[test]
2753 fn test_from_binary_shared_buffer() {
2754 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2755 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2756 let rows = converter.convert_columns(&[array]).unwrap();
2757 let binary_rows = rows.try_into_binary().expect("known-small rows");
2758 let _binary_rows_shared_buffer = binary_rows.clone();
2759
2760 let parsed = converter.from_binary(binary_rows);
2761
2762 converter.convert_rows(parsed.iter()).unwrap();
2763 }
2764
2765 #[test]
2766 #[should_panic(expected = "Encountered non UTF-8 data")]
2767 fn test_invalid_utf8() {
2768 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2769 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2770 let rows = converter.convert_columns(&[array]).unwrap();
2771 let binary_row = rows.row(0);
2772
2773 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2774 let parser = converter.parser();
2775 let utf8_row = parser.parse(binary_row.as_ref());
2776
2777 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2778 }
2779
2780 #[test]
2781 #[should_panic(expected = "Encountered non UTF-8 data")]
2782 fn test_invalid_utf8_array() {
2783 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2784 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2785 let rows = converter.convert_columns(&[array]).unwrap();
2786 let binary_rows = rows.try_into_binary().expect("known-small rows");
2787
2788 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2789 let parsed = converter.from_binary(binary_rows);
2790
2791 converter.convert_rows(parsed.iter()).unwrap();
2792 }
2793
2794 #[test]
2795 #[should_panic(expected = "index out of bounds")]
2796 fn test_invalid_empty() {
2797 let binary_row: &[u8] = &[];
2798
2799 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2800 let parser = converter.parser();
2801 let utf8_row = parser.parse(binary_row.as_ref());
2802
2803 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2804 }
2805
2806 #[test]
2807 #[should_panic(expected = "index out of bounds")]
2808 fn test_invalid_empty_array() {
2809 let row: &[u8] = &[];
2810 let binary_rows = BinaryArray::from(vec![row]);
2811
2812 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2813 let parsed = converter.from_binary(binary_rows);
2814
2815 converter.convert_rows(parsed.iter()).unwrap();
2816 }
2817
2818 #[test]
2819 #[should_panic(expected = "index out of bounds")]
2820 fn test_invalid_truncated() {
2821 let binary_row: &[u8] = &[0x02];
2822
2823 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2824 let parser = converter.parser();
2825 let utf8_row = parser.parse(binary_row.as_ref());
2826
2827 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2828 }
2829
2830 #[test]
2831 #[should_panic(expected = "index out of bounds")]
2832 fn test_invalid_truncated_array() {
2833 let row: &[u8] = &[0x02];
2834 let binary_rows = BinaryArray::from(vec![row]);
2835
2836 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2837 let parsed = converter.from_binary(binary_rows);
2838
2839 converter.convert_rows(parsed.iter()).unwrap();
2840 }
2841
2842 #[test]
2843 #[should_panic(expected = "rows were not produced by this RowConverter")]
2844 fn test_different_converter() {
2845 let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2846 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2847 let rows = converter.convert_columns(&[values]).unwrap();
2848
2849 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2850 let _ = converter.convert_rows(&rows);
2851 }
2852
2853 fn test_single_list<O: OffsetSizeTrait>() {
2854 let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2855 builder.values().append_value(32);
2856 builder.values().append_value(52);
2857 builder.values().append_value(32);
2858 builder.append(true);
2859 builder.values().append_value(32);
2860 builder.values().append_value(52);
2861 builder.values().append_value(12);
2862 builder.append(true);
2863 builder.values().append_value(32);
2864 builder.values().append_value(52);
2865 builder.append(true);
2866 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
2869 builder.values().append_value(32);
2870 builder.values().append_null();
2871 builder.append(true);
2872 builder.append(true);
2873 builder.values().append_value(17); builder.values().append_null(); builder.append(false);
2876
2877 let list = Arc::new(builder.finish()) as ArrayRef;
2878 let d = list.data_type().clone();
2879
2880 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2881
2882 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2883 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2892 assert_eq!(back.len(), 1);
2893 back[0].to_data().validate_full().unwrap();
2894 assert_eq!(&back[0], &list);
2895
2896 let options = SortOptions::default().asc().with_nulls_first(false);
2897 let field = SortField::new_with_options(d.clone(), options);
2898 let converter = RowConverter::new(vec![field]).unwrap();
2899 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2900
2901 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2910 assert_eq!(back.len(), 1);
2911 back[0].to_data().validate_full().unwrap();
2912 assert_eq!(&back[0], &list);
2913
2914 let options = SortOptions::default().desc().with_nulls_first(false);
2915 let field = SortField::new_with_options(d.clone(), options);
2916 let converter = RowConverter::new(vec![field]).unwrap();
2917 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2918
2919 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2928 assert_eq!(back.len(), 1);
2929 back[0].to_data().validate_full().unwrap();
2930 assert_eq!(&back[0], &list);
2931
2932 let options = SortOptions::default().desc().with_nulls_first(true);
2933 let field = SortField::new_with_options(d, options);
2934 let converter = RowConverter::new(vec![field]).unwrap();
2935 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2936
2937 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2946 assert_eq!(back.len(), 1);
2947 back[0].to_data().validate_full().unwrap();
2948 assert_eq!(&back[0], &list);
2949
2950 let sliced_list = list.slice(1, 5);
2951 let rows_on_sliced_list = converter
2952 .convert_columns(&[Arc::clone(&sliced_list)])
2953 .unwrap();
2954
2955 assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2962 assert_eq!(back.len(), 1);
2963 back[0].to_data().validate_full().unwrap();
2964 assert_eq!(&back[0], &sliced_list);
2965 }
2966
2967 fn test_nested_list<O: OffsetSizeTrait>() {
2968 let mut builder =
2969 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2970
2971 builder.values().values().append_value(1);
2972 builder.values().values().append_value(2);
2973 builder.values().append(true);
2974 builder.values().values().append_value(1);
2975 builder.values().values().append_null();
2976 builder.values().append(true);
2977 builder.append(true);
2978
2979 builder.values().values().append_value(1);
2980 builder.values().values().append_null();
2981 builder.values().append(true);
2982 builder.values().values().append_value(1);
2983 builder.values().values().append_null();
2984 builder.values().append(true);
2985 builder.append(true);
2986
2987 builder.values().values().append_value(1);
2988 builder.values().values().append_null();
2989 builder.values().append(true);
2990 builder.values().append(false);
2991 builder.append(true);
2992 builder.append(false);
2993
2994 builder.values().values().append_value(1);
2995 builder.values().values().append_value(2);
2996 builder.values().append(true);
2997 builder.append(true);
2998
2999 let list = Arc::new(builder.finish()) as ArrayRef;
3000 let d = list.data_type().clone();
3001
3002 let options = SortOptions::default().asc().with_nulls_first(true);
3010 let field = SortField::new_with_options(d.clone(), options);
3011 let converter = RowConverter::new(vec![field]).unwrap();
3012 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3013
3014 assert!(rows.row(0) > rows.row(1));
3015 assert!(rows.row(1) > rows.row(2));
3016 assert!(rows.row(2) > rows.row(3));
3017 assert!(rows.row(4) < rows.row(0));
3018 assert!(rows.row(4) > rows.row(1));
3019
3020 let back = converter.convert_rows(&rows).unwrap();
3021 assert_eq!(back.len(), 1);
3022 back[0].to_data().validate_full().unwrap();
3023 assert_eq!(&back[0], &list);
3024
3025 let options = SortOptions::default().desc().with_nulls_first(true);
3026 let field = SortField::new_with_options(d.clone(), options);
3027 let converter = RowConverter::new(vec![field]).unwrap();
3028 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3029
3030 assert!(rows.row(0) > rows.row(1));
3031 assert!(rows.row(1) > rows.row(2));
3032 assert!(rows.row(2) > rows.row(3));
3033 assert!(rows.row(4) > rows.row(0));
3034 assert!(rows.row(4) > rows.row(1));
3035
3036 let back = converter.convert_rows(&rows).unwrap();
3037 assert_eq!(back.len(), 1);
3038 back[0].to_data().validate_full().unwrap();
3039 assert_eq!(&back[0], &list);
3040
3041 let options = SortOptions::default().desc().with_nulls_first(false);
3042 let field = SortField::new_with_options(d, options);
3043 let converter = RowConverter::new(vec![field]).unwrap();
3044 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3045
3046 assert!(rows.row(0) < rows.row(1));
3047 assert!(rows.row(1) < rows.row(2));
3048 assert!(rows.row(2) < rows.row(3));
3049 assert!(rows.row(4) > rows.row(0));
3050 assert!(rows.row(4) < rows.row(1));
3051
3052 let back = converter.convert_rows(&rows).unwrap();
3053 assert_eq!(back.len(), 1);
3054 back[0].to_data().validate_full().unwrap();
3055 assert_eq!(&back[0], &list);
3056
3057 let sliced_list = list.slice(1, 3);
3058 let rows = converter
3059 .convert_columns(&[Arc::clone(&sliced_list)])
3060 .unwrap();
3061
3062 assert!(rows.row(0) < rows.row(1));
3063 assert!(rows.row(1) < rows.row(2));
3064
3065 let back = converter.convert_rows(&rows).unwrap();
3066 assert_eq!(back.len(), 1);
3067 back[0].to_data().validate_full().unwrap();
3068 assert_eq!(&back[0], &sliced_list);
3069 }
3070
3071 #[test]
3072 fn test_list() {
3073 test_single_list::<i32>();
3074 test_nested_list::<i32>();
3075 }
3076
3077 #[test]
3078 fn test_large_list() {
3079 test_single_list::<i64>();
3080 test_nested_list::<i64>();
3081 }
3082
3083 #[test]
3084 fn test_fixed_size_list() {
3085 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
3086 builder.values().append_value(32);
3087 builder.values().append_value(52);
3088 builder.values().append_value(32);
3089 builder.append(true);
3090 builder.values().append_value(32);
3091 builder.values().append_value(52);
3092 builder.values().append_value(12);
3093 builder.append(true);
3094 builder.values().append_value(32);
3095 builder.values().append_value(52);
3096 builder.values().append_null();
3097 builder.append(true);
3098 builder.values().append_value(32); builder.values().append_value(52); builder.values().append_value(13); builder.append(false);
3102 builder.values().append_value(32);
3103 builder.values().append_null();
3104 builder.values().append_null();
3105 builder.append(true);
3106 builder.values().append_null();
3107 builder.values().append_null();
3108 builder.values().append_null();
3109 builder.append(true);
3110 builder.values().append_value(17); builder.values().append_null(); builder.values().append_value(77); builder.append(false);
3114
3115 let list = Arc::new(builder.finish()) as ArrayRef;
3116 let d = list.data_type().clone();
3117
3118 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3120
3121 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3122 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3131 assert_eq!(back.len(), 1);
3132 back[0].to_data().validate_full().unwrap();
3133 assert_eq!(&back[0], &list);
3134
3135 let options = SortOptions::default().asc().with_nulls_first(false);
3137 let field = SortField::new_with_options(d.clone(), options);
3138 let converter = RowConverter::new(vec![field]).unwrap();
3139 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3140 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3149 assert_eq!(back.len(), 1);
3150 back[0].to_data().validate_full().unwrap();
3151 assert_eq!(&back[0], &list);
3152
3153 let options = SortOptions::default().desc().with_nulls_first(false);
3155 let field = SortField::new_with_options(d.clone(), options);
3156 let converter = RowConverter::new(vec![field]).unwrap();
3157 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3158 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3167 assert_eq!(back.len(), 1);
3168 back[0].to_data().validate_full().unwrap();
3169 assert_eq!(&back[0], &list);
3170
3171 let options = SortOptions::default().desc().with_nulls_first(true);
3173 let field = SortField::new_with_options(d, options);
3174 let converter = RowConverter::new(vec![field]).unwrap();
3175 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3176
3177 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3186 assert_eq!(back.len(), 1);
3187 back[0].to_data().validate_full().unwrap();
3188 assert_eq!(&back[0], &list);
3189
3190 let sliced_list = list.slice(1, 5);
3191 let rows_on_sliced_list = converter
3192 .convert_columns(&[Arc::clone(&sliced_list)])
3193 .unwrap();
3194
3195 assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
3201 assert_eq!(back.len(), 1);
3202 back[0].to_data().validate_full().unwrap();
3203 assert_eq!(&back[0], &sliced_list);
3204 }
3205
3206 #[test]
3207 fn test_two_fixed_size_lists() {
3208 let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3209 first.values().append_value(100);
3211 first.append(true);
3212 first.values().append_value(101);
3214 first.append(true);
3215 first.values().append_value(102);
3217 first.append(true);
3218 first.values().append_null();
3220 first.append(true);
3221 first.values().append_null(); first.append(false);
3224 let first = Arc::new(first.finish()) as ArrayRef;
3225 let first_type = first.data_type().clone();
3226
3227 let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3228 second.values().append_value(200);
3230 second.append(true);
3231 second.values().append_value(201);
3233 second.append(true);
3234 second.values().append_value(202);
3236 second.append(true);
3237 second.values().append_null();
3239 second.append(true);
3240 second.values().append_null(); second.append(false);
3243 let second = Arc::new(second.finish()) as ArrayRef;
3244 let second_type = second.data_type().clone();
3245
3246 let converter = RowConverter::new(vec![
3247 SortField::new(first_type.clone()),
3248 SortField::new(second_type.clone()),
3249 ])
3250 .unwrap();
3251
3252 let rows = converter
3253 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3254 .unwrap();
3255
3256 let back = converter.convert_rows(&rows).unwrap();
3257 assert_eq!(back.len(), 2);
3258 back[0].to_data().validate_full().unwrap();
3259 assert_eq!(&back[0], &first);
3260 back[1].to_data().validate_full().unwrap();
3261 assert_eq!(&back[1], &second);
3262 }
3263
3264 #[test]
3265 fn test_fixed_size_list_with_variable_width_content() {
3266 let mut first = FixedSizeListBuilder::new(
3267 StructBuilder::from_fields(
3268 vec![
3269 Field::new(
3270 "timestamp",
3271 DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
3272 false,
3273 ),
3274 Field::new("offset_minutes", DataType::Int16, false),
3275 Field::new("time_zone", DataType::Utf8, false),
3276 ],
3277 1,
3278 ),
3279 1,
3280 );
3281 first
3283 .values()
3284 .field_builder::<TimestampMicrosecondBuilder>(0)
3285 .unwrap()
3286 .append_null();
3287 first
3288 .values()
3289 .field_builder::<Int16Builder>(1)
3290 .unwrap()
3291 .append_null();
3292 first
3293 .values()
3294 .field_builder::<StringBuilder>(2)
3295 .unwrap()
3296 .append_null();
3297 first.values().append(false);
3298 first.append(false);
3299 first
3301 .values()
3302 .field_builder::<TimestampMicrosecondBuilder>(0)
3303 .unwrap()
3304 .append_null();
3305 first
3306 .values()
3307 .field_builder::<Int16Builder>(1)
3308 .unwrap()
3309 .append_null();
3310 first
3311 .values()
3312 .field_builder::<StringBuilder>(2)
3313 .unwrap()
3314 .append_null();
3315 first.values().append(false);
3316 first.append(true);
3317 first
3319 .values()
3320 .field_builder::<TimestampMicrosecondBuilder>(0)
3321 .unwrap()
3322 .append_value(0);
3323 first
3324 .values()
3325 .field_builder::<Int16Builder>(1)
3326 .unwrap()
3327 .append_value(0);
3328 first
3329 .values()
3330 .field_builder::<StringBuilder>(2)
3331 .unwrap()
3332 .append_value("UTC");
3333 first.values().append(true);
3334 first.append(true);
3335 first
3337 .values()
3338 .field_builder::<TimestampMicrosecondBuilder>(0)
3339 .unwrap()
3340 .append_value(1126351800123456);
3341 first
3342 .values()
3343 .field_builder::<Int16Builder>(1)
3344 .unwrap()
3345 .append_value(120);
3346 first
3347 .values()
3348 .field_builder::<StringBuilder>(2)
3349 .unwrap()
3350 .append_value("Europe/Warsaw");
3351 first.values().append(true);
3352 first.append(true);
3353 let first = Arc::new(first.finish()) as ArrayRef;
3354 let first_type = first.data_type().clone();
3355
3356 let mut second = StringBuilder::new();
3357 second.append_value("somewhere near");
3358 second.append_null();
3359 second.append_value("Greenwich");
3360 second.append_value("Warsaw");
3361 let second = Arc::new(second.finish()) as ArrayRef;
3362 let second_type = second.data_type().clone();
3363
3364 let converter = RowConverter::new(vec![
3365 SortField::new(first_type.clone()),
3366 SortField::new(second_type.clone()),
3367 ])
3368 .unwrap();
3369
3370 let rows = converter
3371 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3372 .unwrap();
3373
3374 let back = converter.convert_rows(&rows).unwrap();
3375 assert_eq!(back.len(), 2);
3376 back[0].to_data().validate_full().unwrap();
3377 assert_eq!(&back[0], &first);
3378 back[1].to_data().validate_full().unwrap();
3379 assert_eq!(&back[1], &second);
3380 }
3381
3382 fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
3383 where
3384 K: ArrowPrimitiveType,
3385 StandardUniform: Distribution<K::Native>,
3386 {
3387 let mut rng = rng();
3388 (0..len)
3389 .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
3390 .collect()
3391 }
3392
3393 fn generate_strings<O: OffsetSizeTrait>(
3394 len: usize,
3395 valid_percent: f64,
3396 ) -> GenericStringArray<O> {
3397 let mut rng = rng();
3398 (0..len)
3399 .map(|_| {
3400 rng.random_bool(valid_percent).then(|| {
3401 let len = rng.random_range(0..100);
3402 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3403 String::from_utf8(bytes).unwrap()
3404 })
3405 })
3406 .collect()
3407 }
3408
3409 fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
3410 let mut rng = rng();
3411 (0..len)
3412 .map(|_| {
3413 rng.random_bool(valid_percent).then(|| {
3414 let len = rng.random_range(0..100);
3415 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3416 String::from_utf8(bytes).unwrap()
3417 })
3418 })
3419 .collect()
3420 }
3421
3422 fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
3423 let mut rng = rng();
3424 (0..len)
3425 .map(|_| {
3426 rng.random_bool(valid_percent).then(|| {
3427 let len = rng.random_range(0..100);
3428 let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
3429 bytes
3430 })
3431 })
3432 .collect()
3433 }
3434
3435 fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
3436 let edge_cases = vec![
3437 Some("bar".to_string()),
3438 Some("bar\0".to_string()),
3439 Some("LongerThan12Bytes".to_string()),
3440 Some("LongerThan12Bytez".to_string()),
3441 Some("LongerThan12Bytes\0".to_string()),
3442 Some("LongerThan12Byt".to_string()),
3443 Some("backend one".to_string()),
3444 Some("backend two".to_string()),
3445 Some("a".repeat(257)),
3446 Some("a".repeat(300)),
3447 ];
3448
3449 let mut values = Vec::with_capacity(len);
3451 for i in 0..len {
3452 values.push(
3453 edge_cases
3454 .get(i % edge_cases.len())
3455 .cloned()
3456 .unwrap_or(None),
3457 );
3458 }
3459
3460 StringViewArray::from(values)
3461 }
3462
3463 fn generate_dictionary<K>(
3464 values: ArrayRef,
3465 len: usize,
3466 valid_percent: f64,
3467 ) -> DictionaryArray<K>
3468 where
3469 K: ArrowDictionaryKeyType,
3470 K::Native: SampleUniform,
3471 {
3472 let mut rng = rng();
3473 let min_key = K::Native::from_usize(0).unwrap();
3474 let max_key = K::Native::from_usize(values.len()).unwrap();
3475 let keys: PrimitiveArray<K> = (0..len)
3476 .map(|_| {
3477 rng.random_bool(valid_percent)
3478 .then(|| rng.random_range(min_key..max_key))
3479 })
3480 .collect();
3481
3482 let data_type =
3483 DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
3484
3485 let data = keys
3486 .into_data()
3487 .into_builder()
3488 .data_type(data_type)
3489 .add_child_data(values.to_data())
3490 .build()
3491 .unwrap();
3492
3493 DictionaryArray::from(data)
3494 }
3495
3496 fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
3497 let mut rng = rng();
3498 let width = rng.random_range(0..20);
3499 let mut builder = FixedSizeBinaryBuilder::new(width);
3500
3501 let mut b = vec![0; width as usize];
3502 for _ in 0..len {
3503 match rng.random_bool(valid_percent) {
3504 true => {
3505 b.iter_mut().for_each(|x| *x = rng.random());
3506 builder.append_value(&b).unwrap();
3507 }
3508 false => builder.append_null(),
3509 }
3510 }
3511
3512 builder.finish()
3513 }
3514
3515 fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
3516 let mut rng = rng();
3517 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3518 let a = generate_primitive_array::<Int32Type>(len, valid_percent);
3519 let b = generate_strings::<i32>(len, valid_percent);
3520 let fields = Fields::from(vec![
3521 Field::new("a", DataType::Int32, true),
3522 Field::new("b", DataType::Utf8, true),
3523 ]);
3524 let values = vec![Arc::new(a) as _, Arc::new(b) as _];
3525 StructArray::new(fields, values, Some(nulls))
3526 }
3527
3528 fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
3529 where
3530 F: FnOnce(usize) -> ArrayRef,
3531 {
3532 let mut rng = rng();
3533 let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
3534 let values_len = offsets.last().unwrap().to_usize().unwrap();
3535 let values = values(values_len);
3536 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3537 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
3538 ListArray::new(field, offsets, values, Some(nulls))
3539 }
3540
3541 fn generate_column(len: usize) -> ArrayRef {
3542 let mut rng = rng();
3543 match rng.random_range(0..18) {
3544 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
3545 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
3546 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
3547 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
3548 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
3549 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
3550 6 => Arc::new(generate_strings::<i32>(len, 0.8)),
3551 7 => Arc::new(generate_dictionary::<Int64Type>(
3552 Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
3554 len,
3555 0.8,
3556 )),
3557 8 => Arc::new(generate_dictionary::<Int64Type>(
3558 Arc::new(generate_primitive_array::<Int64Type>(
3560 rng.random_range(1..len),
3561 1.0,
3562 )),
3563 len,
3564 0.8,
3565 )),
3566 9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
3567 10 => Arc::new(generate_struct(len, 0.8)),
3568 11 => Arc::new(generate_list(len, 0.8, |values_len| {
3569 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3570 })),
3571 12 => Arc::new(generate_list(len, 0.8, |values_len| {
3572 Arc::new(generate_strings::<i32>(values_len, 0.8))
3573 })),
3574 13 => Arc::new(generate_list(len, 0.8, |values_len| {
3575 Arc::new(generate_struct(values_len, 0.8))
3576 })),
3577 14 => Arc::new(generate_string_view(len, 0.8)),
3578 15 => Arc::new(generate_byte_view(len, 0.8)),
3579 16 => Arc::new(generate_fixed_stringview_column(len)),
3580 17 => Arc::new(
3581 generate_list(len + 1000, 0.8, |values_len| {
3582 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3583 })
3584 .slice(500, len),
3585 ),
3586 _ => unreachable!(),
3587 }
3588 }
3589
3590 fn print_row(cols: &[SortColumn], row: usize) -> String {
3591 let t: Vec<_> = cols
3592 .iter()
3593 .map(|x| match x.values.is_valid(row) {
3594 true => {
3595 let opts = FormatOptions::default().with_null("NULL");
3596 let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
3597 formatter.value(row).to_string()
3598 }
3599 false => "NULL".to_string(),
3600 })
3601 .collect();
3602 t.join(",")
3603 }
3604
3605 fn print_col_types(cols: &[SortColumn]) -> String {
3606 let t: Vec<_> = cols
3607 .iter()
3608 .map(|x| x.values.data_type().to_string())
3609 .collect();
3610 t.join(",")
3611 }
3612
3613 #[test]
3614 #[cfg_attr(miri, ignore)]
3615 fn fuzz_test() {
3616 for _ in 0..100 {
3617 let mut rng = rng();
3618 let num_columns = rng.random_range(1..5);
3619 let len = rng.random_range(5..100);
3620 let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
3621
3622 let options: Vec<_> = (0..num_columns)
3623 .map(|_| SortOptions {
3624 descending: rng.random_bool(0.5),
3625 nulls_first: rng.random_bool(0.5),
3626 })
3627 .collect();
3628
3629 let sort_columns: Vec<_> = options
3630 .iter()
3631 .zip(&arrays)
3632 .map(|(o, c)| SortColumn {
3633 values: Arc::clone(c),
3634 options: Some(*o),
3635 })
3636 .collect();
3637
3638 let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
3639
3640 let columns: Vec<SortField> = options
3641 .into_iter()
3642 .zip(&arrays)
3643 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
3644 .collect();
3645
3646 let converter = RowConverter::new(columns).unwrap();
3647 let rows = converter.convert_columns(&arrays).unwrap();
3648
3649 for i in 0..len {
3650 for j in 0..len {
3651 let row_i = rows.row(i);
3652 let row_j = rows.row(j);
3653 let row_cmp = row_i.cmp(&row_j);
3654 let lex_cmp = comparator.compare(i, j);
3655 assert_eq!(
3656 row_cmp,
3657 lex_cmp,
3658 "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
3659 print_row(&sort_columns, i),
3660 print_row(&sort_columns, j),
3661 row_i,
3662 row_j,
3663 print_col_types(&sort_columns)
3664 );
3665 }
3666 }
3667
3668 let back = converter.convert_rows(&rows).unwrap();
3671 for (actual, expected) in back.iter().zip(&arrays) {
3672 actual.to_data().validate_full().unwrap();
3673 dictionary_eq(actual, expected)
3674 }
3675
3676 let rows = rows.try_into_binary().expect("reasonable size");
3679 let parser = converter.parser();
3680 let back = converter
3681 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3682 .unwrap();
3683 for (actual, expected) in back.iter().zip(&arrays) {
3684 actual.to_data().validate_full().unwrap();
3685 dictionary_eq(actual, expected)
3686 }
3687
3688 let rows = converter.from_binary(rows);
3689 let back = converter.convert_rows(&rows).unwrap();
3690 for (actual, expected) in back.iter().zip(&arrays) {
3691 actual.to_data().validate_full().unwrap();
3692 dictionary_eq(actual, expected)
3693 }
3694 }
3695 }
3696
3697 #[test]
3698 fn test_clear() {
3699 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3700 let mut rows = converter.empty_rows(3, 128);
3701
3702 let first = Int32Array::from(vec![None, Some(2), Some(4)]);
3703 let second = Int32Array::from(vec![Some(2), None, Some(4)]);
3704 let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
3705
3706 for array in arrays.iter() {
3707 rows.clear();
3708 converter
3709 .append(&mut rows, std::slice::from_ref(array))
3710 .unwrap();
3711 let back = converter.convert_rows(&rows).unwrap();
3712 assert_eq!(&back[0], array);
3713 }
3714
3715 let mut rows_expected = converter.empty_rows(3, 128);
3716 converter.append(&mut rows_expected, &arrays[1..]).unwrap();
3717
3718 for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
3719 assert_eq!(
3720 actual, expected,
3721 "For row {i}: expected {expected:?}, actual: {actual:?}",
3722 );
3723 }
3724 }
3725
3726 #[test]
3727 fn test_append_codec_dictionary_binary() {
3728 use DataType::*;
3729 let converter = RowConverter::new(vec![SortField::new(Dictionary(
3731 Box::new(Int32),
3732 Box::new(Binary),
3733 ))])
3734 .unwrap();
3735 let mut rows = converter.empty_rows(4, 128);
3736
3737 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
3738 let values = BinaryArray::from(vec![
3739 Some("a".as_bytes()),
3740 Some(b"b"),
3741 Some(b"c"),
3742 Some(b"d"),
3743 ]);
3744 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3745
3746 rows.clear();
3747 let array = Arc::new(dict_array) as ArrayRef;
3748 converter
3749 .append(&mut rows, std::slice::from_ref(&array))
3750 .unwrap();
3751 let back = converter.convert_rows(&rows).unwrap();
3752
3753 dictionary_eq(&back[0], &array);
3754 }
3755
3756 #[test]
3757 fn test_list_prefix() {
3758 let mut a = ListBuilder::new(Int8Builder::new());
3759 a.append_value([None]);
3760 a.append_value([None, None]);
3761 let a = a.finish();
3762
3763 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
3764 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
3765 assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
3766 }
3767
3768 #[test]
3769 fn map_should_be_marked_as_unsupported() {
3770 let map_data_type = Field::new_map(
3771 "map",
3772 "entries",
3773 Field::new("key", DataType::Utf8, false),
3774 Field::new("value", DataType::Utf8, true),
3775 false,
3776 true,
3777 )
3778 .data_type()
3779 .clone();
3780
3781 let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
3782
3783 assert!(!is_supported, "Map should not be supported");
3784 }
3785
3786 #[test]
3787 fn should_fail_to_create_row_converter_for_unsupported_map_type() {
3788 let map_data_type = Field::new_map(
3789 "map",
3790 "entries",
3791 Field::new("key", DataType::Utf8, false),
3792 Field::new("value", DataType::Utf8, true),
3793 false,
3794 true,
3795 )
3796 .data_type()
3797 .clone();
3798
3799 let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
3800
3801 match converter {
3802 Err(ArrowError::NotYetImplemented(message)) => {
3803 assert!(
3804 message.contains("Row format support not yet implemented for"),
3805 "Expected NotYetImplemented error for map data type, got: {message}",
3806 );
3807 }
3808 Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
3809 Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
3810 }
3811 }
3812
3813 #[test]
3814 fn test_values_buffer_smaller_when_utf8_validation_disabled() {
3815 fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
3816 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
3818
3819 let rows = converter.convert_columns(&[col]).unwrap();
3821 let converted = converter.convert_rows(&rows).unwrap();
3822 let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3823
3824 let rows = rows.try_into_binary().expect("reasonable size");
3826 let parser = converter.parser();
3827 let converted = converter
3828 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3829 .unwrap();
3830 let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3831 (unchecked_values_len, checked_values_len)
3832 }
3833
3834 let col = Arc::new(StringViewArray::from_iter([
3836 Some("hello"), None, Some("short"), Some("tiny"), ])) as ArrayRef;
3841
3842 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3843 assert_eq!(unchecked_values_len, 0);
3845 assert_eq!(checked_values_len, 14);
3847
3848 let col = Arc::new(StringViewArray::from_iter([
3850 Some("this is a very long string over 12 bytes"),
3851 Some("another long string to test the buffer"),
3852 ])) as ArrayRef;
3853
3854 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3855 assert!(unchecked_values_len > 0);
3857 assert_eq!(unchecked_values_len, checked_values_len);
3858
3859 let col = Arc::new(StringViewArray::from_iter([
3861 Some("tiny"), Some("thisisexact13"), None,
3864 Some("short"), ])) as ArrayRef;
3866
3867 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3868 assert_eq!(unchecked_values_len, 13);
3870 assert!(checked_values_len > unchecked_values_len);
3871 }
3872
3873 #[test]
3874 fn test_sparse_union() {
3875 let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
3877 let str_array = StringArray::from(vec![None, Some("b"), None, Some("d"), None]);
3878
3879 let type_ids = vec![0, 1, 0, 1, 0].into();
3881
3882 let union_fields = [
3883 (0, Arc::new(Field::new("int", DataType::Int32, false))),
3884 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
3885 ]
3886 .into_iter()
3887 .collect();
3888
3889 let union_array = UnionArray::try_new(
3890 union_fields,
3891 type_ids,
3892 None,
3893 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3894 )
3895 .unwrap();
3896
3897 let union_type = union_array.data_type().clone();
3898 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3899
3900 let rows = converter
3901 .convert_columns(&[Arc::new(union_array.clone())])
3902 .unwrap();
3903
3904 let back = converter.convert_rows(&rows).unwrap();
3906 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3907
3908 assert_eq!(union_array.len(), back_union.len());
3909 for i in 0..union_array.len() {
3910 assert_eq!(union_array.type_id(i), back_union.type_id(i));
3911 }
3912 }
3913
3914 #[test]
3915 fn test_sparse_union_with_nulls() {
3916 let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
3918 let str_array = StringArray::from(vec![None::<&str>; 5]);
3919
3920 let type_ids = vec![0, 1, 0, 1, 0].into();
3922
3923 let union_fields = [
3924 (0, Arc::new(Field::new("int", DataType::Int32, true))),
3925 (1, Arc::new(Field::new("str", DataType::Utf8, true))),
3926 ]
3927 .into_iter()
3928 .collect();
3929
3930 let union_array = UnionArray::try_new(
3931 union_fields,
3932 type_ids,
3933 None,
3934 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3935 )
3936 .unwrap();
3937
3938 let union_type = union_array.data_type().clone();
3939 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3940
3941 let rows = converter
3942 .convert_columns(&[Arc::new(union_array.clone())])
3943 .unwrap();
3944
3945 let back = converter.convert_rows(&rows).unwrap();
3947 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3948
3949 assert_eq!(union_array.len(), back_union.len());
3950 for i in 0..union_array.len() {
3951 let expected_null = union_array.is_null(i);
3952 let actual_null = back_union.is_null(i);
3953 assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
3954 if !expected_null {
3955 assert_eq!(union_array.type_id(i), back_union.type_id(i));
3956 }
3957 }
3958 }
3959
3960 #[test]
3961 fn test_dense_union() {
3962 let int_array = Int32Array::from(vec![1, 3, 5]);
3964 let str_array = StringArray::from(vec!["a", "b"]);
3965
3966 let type_ids = vec![0, 1, 0, 1, 0].into();
3967
3968 let offsets = vec![0, 0, 1, 1, 2].into();
3970
3971 let union_fields = [
3972 (0, Arc::new(Field::new("int", DataType::Int32, false))),
3973 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
3974 ]
3975 .into_iter()
3976 .collect();
3977
3978 let union_array = UnionArray::try_new(
3979 union_fields,
3980 type_ids,
3981 Some(offsets), vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3983 )
3984 .unwrap();
3985
3986 let union_type = union_array.data_type().clone();
3987 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3988
3989 let rows = converter
3990 .convert_columns(&[Arc::new(union_array.clone())])
3991 .unwrap();
3992
3993 let back = converter.convert_rows(&rows).unwrap();
3995 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3996
3997 assert_eq!(union_array.len(), back_union.len());
3998 for i in 0..union_array.len() {
3999 assert_eq!(union_array.type_id(i), back_union.type_id(i));
4000 }
4001 }
4002
4003 #[test]
4004 fn test_dense_union_with_nulls() {
4005 let int_array = Int32Array::from(vec![Some(1), None, Some(5)]);
4007 let str_array = StringArray::from(vec![Some("a"), None]);
4008
4009 let type_ids = vec![0, 1, 0, 1, 0].into();
4011 let offsets = vec![0, 0, 1, 1, 2].into();
4012
4013 let union_fields = [
4014 (0, Arc::new(Field::new("int", DataType::Int32, true))),
4015 (1, Arc::new(Field::new("str", DataType::Utf8, true))),
4016 ]
4017 .into_iter()
4018 .collect();
4019
4020 let union_array = UnionArray::try_new(
4021 union_fields,
4022 type_ids,
4023 Some(offsets),
4024 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4025 )
4026 .unwrap();
4027
4028 let union_type = union_array.data_type().clone();
4029 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4030
4031 let rows = converter
4032 .convert_columns(&[Arc::new(union_array.clone())])
4033 .unwrap();
4034
4035 let back = converter.convert_rows(&rows).unwrap();
4037 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
4038
4039 assert_eq!(union_array.len(), back_union.len());
4040 for i in 0..union_array.len() {
4041 let expected_null = union_array.is_null(i);
4042 let actual_null = back_union.is_null(i);
4043 assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
4044 if !expected_null {
4045 assert_eq!(union_array.type_id(i), back_union.type_id(i));
4046 }
4047 }
4048 }
4049
4050 #[test]
4051 fn test_union_ordering() {
4052 let int_array = Int32Array::from(vec![100, 5, 20]);
4053 let str_array = StringArray::from(vec!["z", "a"]);
4054
4055 let type_ids = vec![0, 1, 0, 1, 0].into();
4057 let offsets = vec![0, 0, 1, 1, 2].into();
4058
4059 let union_fields = [
4060 (0, Arc::new(Field::new("int", DataType::Int32, false))),
4061 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
4062 ]
4063 .into_iter()
4064 .collect();
4065
4066 let union_array = UnionArray::try_new(
4067 union_fields,
4068 type_ids,
4069 Some(offsets),
4070 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4071 )
4072 .unwrap();
4073
4074 let union_type = union_array.data_type().clone();
4075 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4076
4077 let rows = converter.convert_columns(&[Arc::new(union_array)]).unwrap();
4078
4079 assert!(rows.row(2) < rows.row(1));
4091
4092 assert!(rows.row(0) < rows.row(3));
4094
4095 assert!(rows.row(2) < rows.row(4));
4098 assert!(rows.row(4) < rows.row(0));
4100
4101 assert!(rows.row(3) < rows.row(1));
4104 }
4105
4106 #[test]
4107 fn test_row_converter_roundtrip_with_many_union_columns() {
4108 let fields1 = UnionFields::try_new(
4110 vec![0, 1],
4111 vec![
4112 Field::new("int", DataType::Int32, true),
4113 Field::new("string", DataType::Utf8, true),
4114 ],
4115 )
4116 .unwrap();
4117
4118 let int_array1 = Int32Array::from(vec![Some(67), None]);
4119 let string_array1 = StringArray::from(vec![None::<&str>, Some("hello")]);
4120 let type_ids1 = vec![0i8, 1].into();
4121
4122 let union_array1 = UnionArray::try_new(
4123 fields1.clone(),
4124 type_ids1,
4125 None,
4126 vec![
4127 Arc::new(int_array1) as ArrayRef,
4128 Arc::new(string_array1) as ArrayRef,
4129 ],
4130 )
4131 .unwrap();
4132
4133 let fields2 = UnionFields::try_new(
4135 vec![0, 1],
4136 vec![
4137 Field::new("int", DataType::Int32, true),
4138 Field::new("string", DataType::Utf8, true),
4139 ],
4140 )
4141 .unwrap();
4142
4143 let int_array2 = Int32Array::from(vec![Some(100), None]);
4144 let string_array2 = StringArray::from(vec![None::<&str>, Some("world")]);
4145 let type_ids2 = vec![0i8, 1].into();
4146
4147 let union_array2 = UnionArray::try_new(
4148 fields2.clone(),
4149 type_ids2,
4150 None,
4151 vec![
4152 Arc::new(int_array2) as ArrayRef,
4153 Arc::new(string_array2) as ArrayRef,
4154 ],
4155 )
4156 .unwrap();
4157
4158 let field1 = Field::new("col1", DataType::Union(fields1, UnionMode::Sparse), true);
4160 let field2 = Field::new("col2", DataType::Union(fields2, UnionMode::Sparse), true);
4161
4162 let sort_field1 = SortField::new(field1.data_type().clone());
4163 let sort_field2 = SortField::new(field2.data_type().clone());
4164
4165 let converter = RowConverter::new(vec![sort_field1, sort_field2]).unwrap();
4166
4167 let rows = converter
4168 .convert_columns(&[
4169 Arc::new(union_array1.clone()) as ArrayRef,
4170 Arc::new(union_array2.clone()) as ArrayRef,
4171 ])
4172 .unwrap();
4173
4174 let out = converter.convert_rows(&rows).unwrap();
4176
4177 let [col1, col2] = out.as_slice() else {
4178 panic!("expected 2 columns")
4179 };
4180
4181 let col1 = col1.as_any().downcast_ref::<UnionArray>().unwrap();
4182 let col2 = col2.as_any().downcast_ref::<UnionArray>().unwrap();
4183
4184 for (expected, got) in [union_array1, union_array2].iter().zip([col1, col2]) {
4185 assert_eq!(expected.len(), got.len());
4186 assert_eq!(expected.type_ids(), got.type_ids());
4187
4188 for i in 0..expected.len() {
4189 assert_eq!(expected.value(i).as_ref(), got.value(i).as_ref());
4190 }
4191 }
4192 }
4193
4194 #[test]
4195 fn test_row_converter_roundtrip_with_one_union_column() {
4196 let fields = UnionFields::try_new(
4197 vec![0, 1],
4198 vec![
4199 Field::new("int", DataType::Int32, true),
4200 Field::new("string", DataType::Utf8, true),
4201 ],
4202 )
4203 .unwrap();
4204
4205 let int_array = Int32Array::from(vec![Some(67), None]);
4206 let string_array = StringArray::from(vec![None::<&str>, Some("hello")]);
4207 let type_ids = vec![0i8, 1].into();
4208
4209 let union_array = UnionArray::try_new(
4210 fields.clone(),
4211 type_ids,
4212 None,
4213 vec![
4214 Arc::new(int_array) as ArrayRef,
4215 Arc::new(string_array) as ArrayRef,
4216 ],
4217 )
4218 .unwrap();
4219
4220 let field = Field::new("col", DataType::Union(fields, UnionMode::Sparse), true);
4221 let sort_field = SortField::new(field.data_type().clone());
4222 let converter = RowConverter::new(vec![sort_field]).unwrap();
4223
4224 let rows = converter
4225 .convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
4226 .unwrap();
4227
4228 let out = converter.convert_rows(&rows).unwrap();
4230
4231 let [col1] = out.as_slice() else {
4232 panic!("expected 1 column")
4233 };
4234
4235 let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
4236 assert_eq!(col.len(), union_array.len());
4237 assert_eq!(col.type_ids(), union_array.type_ids());
4238
4239 for i in 0..col.len() {
4240 assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
4241 }
4242 }
4243
4244 #[test]
4245 fn rows_size_should_count_for_capacity() {
4246 let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
4247
4248 let empty_rows_size_with_preallocate_rows_and_data = {
4249 let rows = row_converter.empty_rows(1000, 1000);
4250
4251 rows.size()
4252 };
4253 let empty_rows_size_with_preallocate_rows = {
4254 let rows = row_converter.empty_rows(1000, 0);
4255
4256 rows.size()
4257 };
4258 let empty_rows_size_with_preallocate_data = {
4259 let rows = row_converter.empty_rows(0, 1000);
4260
4261 rows.size()
4262 };
4263 let empty_rows_size_without_preallocate = {
4264 let rows = row_converter.empty_rows(0, 0);
4265
4266 rows.size()
4267 };
4268
4269 assert!(
4270 empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_rows,
4271 "{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_rows}"
4272 );
4273 assert!(
4274 empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_data,
4275 "{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_data}"
4276 );
4277 assert!(
4278 empty_rows_size_with_preallocate_rows > empty_rows_size_without_preallocate,
4279 "{empty_rows_size_with_preallocate_rows} should be larger than {empty_rows_size_without_preallocate}"
4280 );
4281 assert!(
4282 empty_rows_size_with_preallocate_data > empty_rows_size_without_preallocate,
4283 "{empty_rows_size_with_preallocate_data} should be larger than {empty_rows_size_without_preallocate}"
4284 );
4285 }
4286}